punchd/asyncutils.nim

148 lines
4.7 KiB
Nim

import asyncdispatch, threadpool, osproc
from net import
BufferSize
from os import
osLastError,
newOsError
from sequtils import
map,
foldl
from posix import
EINTR,
EWOULDBLOCK,
EAGAIN,
IOVec,
Tmsghdr,
Tcmsghdr,
SocketHandle,
CMSG_SPACE,
CMSG_FIRSTHDR,
CMSG_NXTHDR,
CMSG_LEN,
CMSG_DATA,
sendmsg,
recvmsg
## asyncReadline as discussed at https://github.com/nim-lang/Nim/issues/11564
proc asyncReadline*(): Future[string] =
let event = newAsyncEvent()
let future = newFuture[string]("asyncReadline")
proc readlineBackground(event: AsyncEvent): string =
result = stdin.readline()
event.trigger()
let flowVar = spawn readlineBackground(event)
proc callback(fd: AsyncFD): bool =
future.complete(^flowVar)
true
addEvent(event, callback)
return future
proc asyncExecCmd*(command: string): Future[int] =
let event = newAsyncEvent()
let future = newFuture[int]("asyncExecCmd")
proc execCmdBackground(event: AsyncEvent, command: string): int =
result = execCmd(command)
event.trigger()
let flowVar = spawn execCmdBackground(event, command)
proc callback(fd: AsyncFD): bool =
future.complete(^flowVar)
true
addEvent(event, callback)
return future
type ControlMessage* = object
level*: int
msgType*: int
data*: string
proc asyncSendMsg*(fd: AsyncFD, data: string,
cmsgs: seq[ControlMessage] = @[]): Future[void] =
var retFuture = newFuture[void]("asyncSendMsg")
proc cb(sock: AsyncFD): bool =
result = true
# sendmsg needs an array of iovec structs as described in the writev(2) man
# page. The message is passed as a msghdr struct which may contain ancillary
# data, see sendmsg(2) man page.
var iovec = IOVec(iov_base: data.cstring,
iov_len: data.len.csize_t)
var msg = Tmsghdr(msg_iov: addr iovec,
msg_iovlen: 1)
var cmsgBuf: string
if cmsgs.len > 0:
proc space(c: ControlMessage): csize_t = CMSG_SPACE(c.data.len.csize_t)
let cmsgBufLen = cmsgs.map(space).foldl(a + b, 0.csize_t)
cmsgBuf = newString(cmsgBufLen)
msg.msg_control = cmsgBuf.cstring
msg.msg_controllen = cmsgBufLen
var cmsgHeader = CMSG_FIRSTHDR(addr msg)
for cmsg in cmsgs:
cmsgHeader.cmsg_len = CMSG_LEN(cmsg.data.len.csize_t)
cmsgHeader.cmsg_level = cmsg.level.int32
cmsgHeader.cmsg_type = cmsg.msgType.int32
copyMem(CMSG_DATA(cmsgHeader), cmsg.data.cstring, cmsg.data.len)
cmsgHeader = CMSG_NXTHDR(addr msg, cmsgHeader)
let res = sendmsg(sock.SocketHandle, addr msg, 0)
if res < 0:
let lastError = osLastError()
if lastError.int32 != EINTR and
lastError.int32 != EWOULDBLOCK and
lastError.int32 != EAGAIN:
retFuture.fail(newOSError(lastError))
else:
result = false
else:
retFuture.complete()
addWrite(fd, cb)
return retFuture
proc asyncRecvMsg*(fd: AsyncFD, size: int = BufferSize,
cmsgSize: int = BufferSize):
Future[tuple[data: string, cmsgs: seq[ControlMessage]]] =
var retFuture = newFuture[tuple[data: string,
cmsgs: seq[ControlMessage]]]("asyncRecvMsg")
proc cb(sock: AsyncFD): bool =
result = true
var dataBuffer = newString(size)
var iovec = IOVec(iov_base: dataBuffer.cstring,
iov_len: dataBuffer.len.csize_t)
var cmsgBuffer = newString(cmsgSize)
zeroMem(cmsgBuffer.cstring, cmsgBuffer.len)
var msg = Tmsghdr(msg_iov: addr iovec,
msg_iovlen: 1,
msg_control: addr cmsgBuffer[0],
msg_controllen: cmsgSize.csize_t)
let res = recvmsg(sock.SocketHandle, addr msg, 0)
if res < 0:
let lastError = osLastError()
if lastError.int32 != EINTR and
lastError.int32 != EWOULDBLOCK and
lastError.int32 != EAGAIN:
retFuture.fail(newOSError(lastError))
else:
result = false
return
var cmsgs = newSeq[ControlMessage]()
var cmsgHeader = CMSG_FIRSTHDR(addr msg)
while cmsgHeader != nil:
let dataLen = cmsgHeader.cmsg_len - sizeof(Tcmsghdr).csize_t
var cmsg = ControlMessage(level: cmsgHeader.cmsg_level,
msgType: cmsgHeader.cmsg_type,
data: newString(dataLen))
copyMem(cmsg.data.cstring, CMSG_DATA(cmsgHeader), cmsgHeader.cmsg_len)
cmsgs.add(cmsg)
cmsgHeader = CMSG_NXTHDR(addr msg, cmsgHeader)
dataBuffer.setLen(res)
retFuture.complete((dataBuffer, cmsgs))
addRead(fd, cb)
return retFuture