punchd/asyncutils.nim

105 lines
3.2 KiB
Nim

import asyncdispatch, threadpool, osproc
from os import
osLastError,
newOsError
from sequtils import
map,
foldl
from posix import
EINTR,
EWOULDBLOCK,
EAGAIN,
IOVec,
Tmsghdr,
Tcmsghdr,
SocketHandle,
CMSG_SPACE,
CMSG_FIRSTHDR,
CMSG_NXTHDR,
CMSG_LEN,
CMSG_DATA,
SOL_SOCKET,
SCM_RIGHTS,
sendmsg,
recvmsg
proc asyncExecCmd*(command: string): Future[string] =
let successEvent = newAsyncEvent()
let failureEvent = newAsyncEvent()
let future = newFuture[result.T]("asyncExecCmd")
proc execCmdBackground(successEvent: AsyncEvent, failureEvent: AsyncEvent,
command: string): string =
var exitCode: int
(result, exitCode) = execCmdEx(command)
if exitCode != 0:
failureEvent.trigger()
successEvent.close() # FIXME: is close the right way to cancel an event?
else:
successEvent.trigger()
failureEvent.close() # FIXME: is close the right way to cancel an event?
let flowVar = spawn execCmdBackground(successEvent, failureEvent, command)
proc successCallback(fd: AsyncFD): bool =
future.complete(^flowVar)
true
proc failureCallback(fd: AsyncFD): bool =
future.fail(newException(OSError, ^flowVar))
true
addEvent(successEvent, successCallback)
addEvent(failureEvent, failureCallback)
return future
type ControlMessage* = object
level*: int
msgType*: int
data*: string
proc fromFd*(fd: AsyncFD): ControlMessage =
result = ControlMessage(level: SOL_SOCKET,
msgType: SCM_RIGHTS,
data: newString(sizeof(AsyncFD)))
cast[ptr AsyncFD](result.data.cstring)[] = fd
proc asyncSendMsg*(fd: AsyncFD, data: string,
cmsgs: seq[ControlMessage] = @[]): Future[void] =
var retFuture = newFuture[void]("asyncSendMsg")
proc cb(sock: AsyncFD): bool =
result = true
# sendmsg needs an array of iovec structs as described in the writev(2) man
# page. The message is passed as a msghdr struct which may contain ancillary
# data, see sendmsg(2) man page.
var iovec = IOVec(iov_base: data.cstring,
iov_len: data.len.csize_t)
var msg = Tmsghdr(msg_iov: addr iovec,
msg_iovlen: 1)
var cmsgBuf: string
if cmsgs.len > 0:
proc space(c: ControlMessage): csize_t = CMSG_SPACE(c.data.len.csize_t)
let cmsgBufLen = cmsgs.map(space).foldl(a + b, 0.csize_t)
cmsgBuf = newString(cmsgBufLen)
msg.msg_control = cmsgBuf.cstring
msg.msg_controllen = cmsgBufLen
var cmsgHeader = CMSG_FIRSTHDR(addr msg)
for cmsg in cmsgs:
cmsgHeader.cmsg_len = CMSG_LEN(cmsg.data.len.csize_t)
cmsgHeader.cmsg_level = cmsg.level.int32
cmsgHeader.cmsg_type = cmsg.msgType.int32
copyMem(CMSG_DATA(cmsgHeader), cmsg.data.cstring, cmsg.data.len)
cmsgHeader = CMSG_NXTHDR(addr msg, cmsgHeader)
let res = sendmsg(sock.SocketHandle, addr msg, 0)
if res < 0:
let lastError = osLastError()
if lastError.int32 != EINTR and
lastError.int32 != EWOULDBLOCK and
lastError.int32 != EAGAIN:
retFuture.fail(newOSError(lastError))
else:
result = false
else:
retFuture.complete()
addWrite(fd, cb)
return retFuture