punchd/asyncutils.nim

117 lines
3.6 KiB
Nim

import asyncdispatch, threadpool, osproc
from os import
osLastError,
newOsError
from sequtils import
map,
foldl
from posix import
EINTR,
EWOULDBLOCK,
EAGAIN,
IOVec,
Tmsghdr,
Tcmsghdr,
SocketHandle,
SockLen,
CMSG_SPACE,
CMSG_FIRSTHDR,
CMSG_NXTHDR,
CMSG_LEN,
CMSG_DATA,
SOL_SOCKET,
SCM_RIGHTS,
sendmsg,
recvmsg
proc asyncExecCmd*(command: string): Future[string] =
let successEvent = newAsyncEvent()
let failureEvent = newAsyncEvent()
let future = newFuture[result.T]("asyncExecCmd")
proc execCmdBackground(successEvent: AsyncEvent, failureEvent: AsyncEvent,
command: string): string =
var exitCode: int
(result, exitCode) = execCmdEx(command)
if exitCode != 0:
failureEvent.trigger()
else:
successEvent.trigger()
let flowVar = spawn execCmdBackground(successEvent, failureEvent, command)
proc successCallback(fd: AsyncFD): bool =
future.complete(^flowVar)
successEvent.unregister()
successEvent.close()
failureEvent.unregister()
failureEvent.close()
true
proc failureCallback(fd: AsyncFD): bool =
future.fail(newException(OSError, ^flowVar))
successEvent.unregister()
successEvent.close()
failureEvent.unregister()
failureEvent.close()
true
addEvent(successEvent, successCallback)
addEvent(failureEvent, failureCallback)
return future
type ControlMessage* = object
level*: int
msgType*: int
data*: string
proc fromFd*(fd: AsyncFD): ControlMessage =
result = ControlMessage(level: SOL_SOCKET,
msgType: SCM_RIGHTS,
data: newString(sizeof(AsyncFD)))
cast[ptr AsyncFD](result.data.cstring)[] = fd
proc asyncSendMsg*(fd: AsyncFD, data: string,
cmsgs: seq[ControlMessage] = @[]): Future[void] =
var retFuture = newFuture[void]("asyncSendMsg")
proc cb(sock: AsyncFD): bool =
result = true
# sendmsg needs an array of iovec structs as described in the writev(2) man
# page. The message is passed as a msghdr struct which may contain ancillary
# data, see sendmsg(2) man page.
# TODO: remove the when clause once issue https://github.com/nim-lang/Nim/issues/15197 is solved.
when (defined(linux) and not defined(android)) and defined(amd64):
var iovec = IOVec(iov_base: data.cstring,
iov_len: data.len.csize_t)
else:
var iovec = IOVec(iov_base: data.cstring,
iov_len: data.len)
var msg = Tmsghdr(msg_iov: addr iovec,
msg_iovlen: 1)
var cmsgBuf: string
if cmsgs.len > 0:
proc space(c: ControlMessage): csize_t = CMSG_SPACE(c.data.len.csize_t)
let cmsgBufLen = cmsgs.map(space).foldl(a + b, 0.csize_t)
cmsgBuf = newString(cmsgBufLen)
msg.msg_control = cmsgBuf.cstring
msg.msg_controllen = cmsgBufLen.SockLen
var cmsgHeader = CMSG_FIRSTHDR(addr msg)
for cmsg in cmsgs:
cmsgHeader.cmsg_len = CMSG_LEN(cmsg.data.len.csize_t).SockLen
cmsgHeader.cmsg_level = cmsg.level.int32
cmsgHeader.cmsg_type = cmsg.msgType.int32
copyMem(CMSG_DATA(cmsgHeader), cmsg.data.cstring, cmsg.data.len)
cmsgHeader = CMSG_NXTHDR(addr msg, cmsgHeader)
let res = sendmsg(sock.SocketHandle, addr msg, 0)
if res < 0:
let lastError = osLastError()
if lastError.int32 != EINTR and
lastError.int32 != EWOULDBLOCK and
lastError.int32 != EAGAIN:
retFuture.fail(newOSError(lastError))
else:
result = false
else:
retFuture.complete()
addWrite(fd, cb)
return retFuture