make asyncSendMsg and asyncRecvMsg generic

This commit is contained in:
Christian Ulrich 2020-06-15 22:30:30 +02:00
parent ccd0a9983e
commit aca6888011
No known key found for this signature in database
GPG Key ID: 8241BE099775A097
1 changed files with 48 additions and 44 deletions

View File

@ -1,7 +1,12 @@
import asyncdispatch, threadpool, osproc, options import asyncdispatch, threadpool, osproc
from net import
BufferSize
from os import from os import
osLastError, osLastError,
newOsError newOsError
from sequtils import
map,
foldl
from posix import from posix import
SOL_SOCKET, SOL_SOCKET,
SCM_RIGHTS, SCM_RIGHTS,
@ -14,6 +19,7 @@ from posix import
SocketHandle, SocketHandle,
CMSG_SPACE, CMSG_SPACE,
CMSG_FIRSTHDR, CMSG_FIRSTHDR,
CMSG_NXTHDR,
CMSG_LEN, CMSG_LEN,
CMSG_DATA, CMSG_DATA,
sendmsg, sendmsg,
@ -46,35 +52,39 @@ proc asyncExecCmd*(command: string): Future[int] =
addEvent(event, callback) addEvent(event, callback)
return future return future
type ControlMessage = object
level: int
msgType: int
data: string
proc asyncSendMsg*(fd: AsyncFD, data: string, proc asyncSendMsg*(fd: AsyncFD, data: string,
ancillaryData: Option[AsyncFD] = none(AsyncFD)): Future[void] = cmsgs: seq[ControlMessage] = @[]): Future[void] =
var retFuture = newFuture[void]("asyncSendMsg") var retFuture = newFuture[void]("asyncSendMsg")
proc cb(sock: AsyncFD): bool = proc cb(sock: AsyncFD): bool =
# FIXME: if file descriptor is given in ancillaryData, check if sock is a
# unix socket
result = true result = true
# sendmsg needs an array of iovec structs as described in the writev(2) man # sendmsg needs an array of iovec structs as described in the writev(2) man
# page. The message is passed as a msghdr struct which may contain ancillary # page. The message is passed as a msghdr struct which may contain ancillary
# data, see sendmsg(2) man page. We use ancillary data exclusively for # data, see sendmsg(2) man page.
# passing a file descriptor if one is given in ancillaryData.
var iovec = IOVec(iov_base: data.cstring, var iovec = IOVec(iov_base: data.cstring,
iov_len: data.len.csize_t) iov_len: data.len.csize_t)
var msg = Tmsghdr(msg_iov: addr iovec, var msg = Tmsghdr(msg_iov: addr iovec,
msg_iovlen: 1) msg_iovlen: 1)
if ancillaryData.isSome: var cmsgBuf: string
# assemble ancillary data, see cmsg(3) man page if cmsgs.len > 0:
let cmsgBufferLen = CMSG_SPACE(sizeof(AsyncFD).csize_t) proc space(c: ControlMessage): csize_t = CMSG_SPACE(c.data.len.csize_t)
let cmsgBuffer = newString(cmsgBufferLen) let cmsgBufLen = cmsgs.map(space).foldl(a + b, 0.csize_t)
msg.msg_control = cmsgBuffer.cstring cmsgBuf = newString(cmsgBufLen)
msg.msg_controllen = cmsgBufferLen msg.msg_control = cmsgBuf.cstring
let cmsg: ptr Tcmsghdr = CMSG_FIRSTHDR(addr msg) msg.msg_controllen = cmsgBufLen
cmsg.cmsg_len = CMSG_LEN(sizeof(AsyncFD).csize_t) var cmsgHeader = CMSG_FIRSTHDR(addr msg)
cmsg.cmsg_level = SOL_SOCKET for cmsg in cmsgs:
cmsg.cmsg_type = SCM_RIGHTS cmsgHeader.cmsg_len = CMSG_LEN(cmsg.data.len.csize_t)
var ancillaryFd = ancillaryData.get cmsgHeader.cmsg_level = cmsg.level.int32
copyMem(CMSG_DATA(cmsg), addr ancillaryFd, sizeof(AsyncFD)) cmsgHeader.cmsg_type = cmsg.msgType.int32
copyMem(CMSG_DATA(cmsgHeader), cmsg.data.cstring, cmsg.data.len)
cmsgHeader = CMSG_NXTHDR(addr msg, cmsgHeader)
let res = sendmsg(sock.SocketHandle, addr msg, 0) let res = sendmsg(sock.SocketHandle, addr msg, 0)
if res < 0: if res < 0:
@ -91,12 +101,11 @@ proc asyncSendMsg*(fd: AsyncFD, data: string,
addWrite(fd, cb) addWrite(fd, cb)
return retFuture return retFuture
proc asyncRecvMsg*(fd: AsyncFD, proc asyncRecvMsg*(fd: AsyncFD, size: int = BufferSize,
size: int): Future[tuple[data: string, cmsgSize: int = BufferSize):
ancillaryData: Option[AsyncFD]]] = Future[tuple[data: string, cmsgs: seq[ControlMessage]]] =
var retFuture = var retFuture = newFuture[tuple[data: string,
newFuture[tuple[data: string, cmsgs: seq[ControlMessage]]]("asyncRecvMsg")
ancillaryData: Option[AsyncFD]]]("asyncRecvMsg")
proc cb(sock: AsyncFD): bool = proc cb(sock: AsyncFD): bool =
result = true result = true
@ -104,16 +113,12 @@ proc asyncRecvMsg*(fd: AsyncFD,
var dataBuffer = newString(size) var dataBuffer = newString(size)
var iovec = IOVec(iov_base: dataBuffer.cstring, var iovec = IOVec(iov_base: dataBuffer.cstring,
iov_len: dataBuffer.len.csize_t) iov_len: dataBuffer.len.csize_t)
let cmsgBufferLen = CMSG_SPACE(sizeof(AsyncFD).csize_t) var cmsgBuffer = newString(cmsgSize)
let cmsgBuffer = newString(cmsgBufferLen) zeroMem(cmsgBuffer.cstring, cmsgBuffer.len)
var msg = Tmsghdr(msg_iov: addr iovec, var msg = Tmsghdr(msg_iov: addr iovec,
msg_iovlen: 1, msg_iovlen: 1,
msg_control: cmsgBuffer.cstring, msg_control: addr cmsgBuffer[0],
msg_controllen: cmsgBufferLen) msg_controllen: cmsgSize.csize_t)
let cmsg: ptr Tcmsghdr = CMSG_FIRSTHDR(addr msg)
cmsg.cmsg_len = 0
cmsg.cmsg_level = 0
cmsg.cmsg_type = 0
let res = recvmsg(sock.SocketHandle, addr msg, 0) let res = recvmsg(sock.SocketHandle, addr msg, 0)
if res < 0: if res < 0:
@ -126,20 +131,19 @@ proc asyncRecvMsg*(fd: AsyncFD,
result = false result = false
return return
var ancillaryData = none(AsyncFD) var cmsgs = newSeq[ControlMessage]()
if cmsg.cmsg_len > 0: var cmsgHeader = CMSG_FIRSTHDR(addr msg)
if cmsg.cmsg_len == CMSG_LEN(sizeof(AsyncFD).csize_t) and while cmsgHeader != nil:
cmsg.cmsg_level == SOL_SOCKET and let dataLen = cmsgHeader.cmsg_len - sizeof(Tcmsghdr).csize_t
cmsg.cmsg_type == SCM_RIGHTS: var cmsg = ControlMessage(level: cmsgHeader.cmsg_level,
var ancillaryFd: AsyncFD msgType: cmsgHeader.cmsg_type,
copyMem(addr ancillaryFd, CMSG_DATA(cmsg), sizeof(AsyncFD)) data: newString(dataLen))
ancillaryData = some(ancillaryFd) copyMem(cmsg.data.cstring, CMSG_DATA(cmsgHeader), cmsgHeader.cmsg_len)
else: cmsgs.add(cmsg)
retFuture.fail(newException(ValueError, "unexpected ancillary data")) cmsgHeader = CMSG_NXTHDR(addr msg, cmsgHeader)
return
dataBuffer.setLen(res) dataBuffer.setLen(res)
retFuture.complete((dataBuffer, ancillaryData)) retFuture.complete((dataBuffer, cmsgs))
addRead(fd, cb) addRead(fd, cb)
return retFuture return retFuture