From aca688801146436c6993293919be2fe19d0cae07 Mon Sep 17 00:00:00 2001 From: Christian Ulrich Date: Mon, 15 Jun 2020 22:30:30 +0200 Subject: [PATCH] make asyncSendMsg and asyncRecvMsg generic --- asyncutils.nim | 92 ++++++++++++++++++++++++++------------------------ 1 file changed, 48 insertions(+), 44 deletions(-) diff --git a/asyncutils.nim b/asyncutils.nim index 5f61ca1..1016367 100644 --- a/asyncutils.nim +++ b/asyncutils.nim @@ -1,7 +1,12 @@ -import asyncdispatch, threadpool, osproc, options +import asyncdispatch, threadpool, osproc +from net import + BufferSize from os import osLastError, newOsError +from sequtils import + map, + foldl from posix import SOL_SOCKET, SCM_RIGHTS, @@ -14,6 +19,7 @@ from posix import SocketHandle, CMSG_SPACE, CMSG_FIRSTHDR, + CMSG_NXTHDR, CMSG_LEN, CMSG_DATA, sendmsg, @@ -46,35 +52,39 @@ proc asyncExecCmd*(command: string): Future[int] = addEvent(event, callback) return future +type ControlMessage = object + level: int + msgType: int + data: string + proc asyncSendMsg*(fd: AsyncFD, data: string, - ancillaryData: Option[AsyncFD] = none(AsyncFD)): Future[void] = + cmsgs: seq[ControlMessage] = @[]): Future[void] = var retFuture = newFuture[void]("asyncSendMsg") proc cb(sock: AsyncFD): bool = - # FIXME: if file descriptor is given in ancillaryData, check if sock is a - # unix socket result = true # sendmsg needs an array of iovec structs as described in the writev(2) man # page. The message is passed as a msghdr struct which may contain ancillary - # data, see sendmsg(2) man page. We use ancillary data exclusively for - # passing a file descriptor if one is given in ancillaryData. + # data, see sendmsg(2) man page. var iovec = IOVec(iov_base: data.cstring, iov_len: data.len.csize_t) var msg = Tmsghdr(msg_iov: addr iovec, msg_iovlen: 1) - if ancillaryData.isSome: - # assemble ancillary data, see cmsg(3) man page - let cmsgBufferLen = CMSG_SPACE(sizeof(AsyncFD).csize_t) - let cmsgBuffer = newString(cmsgBufferLen) - msg.msg_control = cmsgBuffer.cstring - msg.msg_controllen = cmsgBufferLen - let cmsg: ptr Tcmsghdr = CMSG_FIRSTHDR(addr msg) - cmsg.cmsg_len = CMSG_LEN(sizeof(AsyncFD).csize_t) - cmsg.cmsg_level = SOL_SOCKET - cmsg.cmsg_type = SCM_RIGHTS - var ancillaryFd = ancillaryData.get - copyMem(CMSG_DATA(cmsg), addr ancillaryFd, sizeof(AsyncFD)) + var cmsgBuf: string + if cmsgs.len > 0: + proc space(c: ControlMessage): csize_t = CMSG_SPACE(c.data.len.csize_t) + let cmsgBufLen = cmsgs.map(space).foldl(a + b, 0.csize_t) + cmsgBuf = newString(cmsgBufLen) + msg.msg_control = cmsgBuf.cstring + msg.msg_controllen = cmsgBufLen + var cmsgHeader = CMSG_FIRSTHDR(addr msg) + for cmsg in cmsgs: + cmsgHeader.cmsg_len = CMSG_LEN(cmsg.data.len.csize_t) + cmsgHeader.cmsg_level = cmsg.level.int32 + cmsgHeader.cmsg_type = cmsg.msgType.int32 + copyMem(CMSG_DATA(cmsgHeader), cmsg.data.cstring, cmsg.data.len) + cmsgHeader = CMSG_NXTHDR(addr msg, cmsgHeader) let res = sendmsg(sock.SocketHandle, addr msg, 0) if res < 0: @@ -91,12 +101,11 @@ proc asyncSendMsg*(fd: AsyncFD, data: string, addWrite(fd, cb) return retFuture -proc asyncRecvMsg*(fd: AsyncFD, - size: int): Future[tuple[data: string, - ancillaryData: Option[AsyncFD]]] = - var retFuture = - newFuture[tuple[data: string, - ancillaryData: Option[AsyncFD]]]("asyncRecvMsg") +proc asyncRecvMsg*(fd: AsyncFD, size: int = BufferSize, + cmsgSize: int = BufferSize): + Future[tuple[data: string, cmsgs: seq[ControlMessage]]] = + var retFuture = newFuture[tuple[data: string, + cmsgs: seq[ControlMessage]]]("asyncRecvMsg") proc cb(sock: AsyncFD): bool = result = true @@ -104,16 +113,12 @@ proc asyncRecvMsg*(fd: AsyncFD, var dataBuffer = newString(size) var iovec = IOVec(iov_base: dataBuffer.cstring, iov_len: dataBuffer.len.csize_t) - let cmsgBufferLen = CMSG_SPACE(sizeof(AsyncFD).csize_t) - let cmsgBuffer = newString(cmsgBufferLen) + var cmsgBuffer = newString(cmsgSize) + zeroMem(cmsgBuffer.cstring, cmsgBuffer.len) var msg = Tmsghdr(msg_iov: addr iovec, msg_iovlen: 1, - msg_control: cmsgBuffer.cstring, - msg_controllen: cmsgBufferLen) - let cmsg: ptr Tcmsghdr = CMSG_FIRSTHDR(addr msg) - cmsg.cmsg_len = 0 - cmsg.cmsg_level = 0 - cmsg.cmsg_type = 0 + msg_control: addr cmsgBuffer[0], + msg_controllen: cmsgSize.csize_t) let res = recvmsg(sock.SocketHandle, addr msg, 0) if res < 0: @@ -126,20 +131,19 @@ proc asyncRecvMsg*(fd: AsyncFD, result = false return - var ancillaryData = none(AsyncFD) - if cmsg.cmsg_len > 0: - if cmsg.cmsg_len == CMSG_LEN(sizeof(AsyncFD).csize_t) and - cmsg.cmsg_level == SOL_SOCKET and - cmsg.cmsg_type == SCM_RIGHTS: - var ancillaryFd: AsyncFD - copyMem(addr ancillaryFd, CMSG_DATA(cmsg), sizeof(AsyncFD)) - ancillaryData = some(ancillaryFd) - else: - retFuture.fail(newException(ValueError, "unexpected ancillary data")) - return + var cmsgs = newSeq[ControlMessage]() + var cmsgHeader = CMSG_FIRSTHDR(addr msg) + while cmsgHeader != nil: + let dataLen = cmsgHeader.cmsg_len - sizeof(Tcmsghdr).csize_t + var cmsg = ControlMessage(level: cmsgHeader.cmsg_level, + msgType: cmsgHeader.cmsg_type, + data: newString(dataLen)) + copyMem(cmsg.data.cstring, CMSG_DATA(cmsgHeader), cmsgHeader.cmsg_len) + cmsgs.add(cmsg) + cmsgHeader = CMSG_NXTHDR(addr msg, cmsgHeader) dataBuffer.setLen(res) - retFuture.complete((dataBuffer, ancillaryData)) + retFuture.complete((dataBuffer, cmsgs)) addRead(fd, cb) return retFuture