punchd/asyncutils.nim

105 lines
3.2 KiB
Nim
Raw Normal View History

import asyncdispatch, threadpool, osproc
2020-06-10 21:22:08 +02:00
from os import
osLastError,
newOsError
from sequtils import
map,
foldl
2020-06-10 21:22:08 +02:00
from posix import
EINTR,
EWOULDBLOCK,
EAGAIN,
IOVec,
Tmsghdr,
Tcmsghdr,
SocketHandle,
CMSG_SPACE,
CMSG_FIRSTHDR,
CMSG_NXTHDR,
2020-06-10 21:22:08 +02:00
CMSG_LEN,
CMSG_DATA,
2020-07-07 19:36:44 +02:00
SOL_SOCKET,
SCM_RIGHTS,
2020-06-11 16:18:56 +02:00
sendmsg,
recvmsg
2020-05-08 19:34:37 +02:00
proc asyncExecCmd*(command: string): Future[string] =
let successEvent = newAsyncEvent()
let failureEvent = newAsyncEvent()
let future = newFuture[result.T]("asyncExecCmd")
proc execCmdBackground(successEvent: AsyncEvent, failureEvent: AsyncEvent,
command: string): string =
var exitCode: int
(result, exitCode) = execCmdEx(command)
if exitCode != 0:
failureEvent.trigger()
successEvent.close() # FIXME: is close the right way to cancel an event?
else:
successEvent.trigger()
failureEvent.close() # FIXME: is close the right way to cancel an event?
let flowVar = spawn execCmdBackground(successEvent, failureEvent, command)
proc successCallback(fd: AsyncFD): bool =
future.complete(^flowVar)
true
proc failureCallback(fd: AsyncFD): bool =
future.fail(newException(OSError, ^flowVar))
true
addEvent(successEvent, successCallback)
addEvent(failureEvent, failureCallback)
return future
2020-06-10 21:22:08 +02:00
type ControlMessage* = object
level*: int
msgType*: int
data*: string
2020-07-07 19:36:44 +02:00
proc fromFd*(fd: AsyncFD): ControlMessage =
result = ControlMessage(level: SOL_SOCKET,
msgType: SCM_RIGHTS,
data: newString(sizeof(AsyncFD)))
cast[ptr AsyncFD](result.data.cstring)[] = fd
2020-06-14 22:40:33 +02:00
proc asyncSendMsg*(fd: AsyncFD, data: string,
cmsgs: seq[ControlMessage] = @[]): Future[void] =
2020-06-10 21:22:08 +02:00
var retFuture = newFuture[void]("asyncSendMsg")
proc cb(sock: AsyncFD): bool =
result = true
# sendmsg needs an array of iovec structs as described in the writev(2) man
# page. The message is passed as a msghdr struct which may contain ancillary
# data, see sendmsg(2) man page.
2020-06-10 21:22:08 +02:00
var iovec = IOVec(iov_base: data.cstring,
iov_len: data.len.csize_t)
var msg = Tmsghdr(msg_iov: addr iovec,
msg_iovlen: 1)
var cmsgBuf: string
if cmsgs.len > 0:
proc space(c: ControlMessage): csize_t = CMSG_SPACE(c.data.len.csize_t)
let cmsgBufLen = cmsgs.map(space).foldl(a + b, 0.csize_t)
cmsgBuf = newString(cmsgBufLen)
msg.msg_control = cmsgBuf.cstring
msg.msg_controllen = cmsgBufLen
var cmsgHeader = CMSG_FIRSTHDR(addr msg)
for cmsg in cmsgs:
cmsgHeader.cmsg_len = CMSG_LEN(cmsg.data.len.csize_t)
cmsgHeader.cmsg_level = cmsg.level.int32
cmsgHeader.cmsg_type = cmsg.msgType.int32
copyMem(CMSG_DATA(cmsgHeader), cmsg.data.cstring, cmsg.data.len)
cmsgHeader = CMSG_NXTHDR(addr msg, cmsgHeader)
2020-06-10 21:22:08 +02:00
let res = sendmsg(sock.SocketHandle, addr msg, 0)
if res < 0:
let lastError = osLastError()
if lastError.int32 != EINTR and
lastError.int32 != EWOULDBLOCK and
lastError.int32 != EAGAIN:
retFuture.fail(newOSError(lastError))
else:
result = false
else:
retFuture.complete()
addWrite(fd, cb)
return retFuture