punchd/asyncutils.nim

91 lines
3.0 KiB
Nim
Raw Normal View History

import asyncdispatch, threadpool, osproc
2020-06-10 21:22:08 +02:00
from os import
osLastError,
newOsError
from posix import
SOL_SOCKET,
SCM_RIGHTS,
EINTR,
EWOULDBLOCK,
EAGAIN,
IOVec,
Tmsghdr,
Tcmsghdr,
SocketHandle,
CMSG_SPACE,
CMSG_FIRSTHDR,
CMSG_LEN,
CMSG_DATA,
sendmsg
2020-05-08 19:34:37 +02:00
## asyncReadline as discussed at https://github.com/nim-lang/Nim/issues/11564
2020-05-08 19:34:37 +02:00
proc asyncReadline*(): Future[string] =
let event = newAsyncEvent()
let future = newFuture[string]("asyncReadline")
proc readlineBackground(event: AsyncEvent): string =
result = stdin.readline()
event.trigger()
let flowVar = spawn readlineBackground(event)
proc callback(fd: AsyncFD): bool =
future.complete(^flowVar)
true
addEvent(event, callback)
return future
proc asyncExecCmd*(command: string): Future[int] =
let event = newAsyncEvent()
let future = newFuture[int]("asyncExecCmd")
proc execCmdBackground(event: AsyncEvent, command: string): int =
result = execCmd(command)
event.trigger()
let flowVar = spawn execCmdBackground(event, command)
proc callback(fd: AsyncFD): bool =
future.complete(^flowVar)
true
addEvent(event, callback)
return future
2020-06-10 21:22:08 +02:00
proc asyncSendMsg*(fd: AsyncFD,
data: string,
fds: openArray[AsyncFD] = []): Future[void] =
var retFuture = newFuture[void]("asyncSendMsg")
proc cb(sock: AsyncFD): bool =
# FIXME: if file descriptors are given in fds, check if sock is a unix socket
result = true
# sendmsg needs an array of iovec structs as described in the writev(2) man
# page. The message is passed as a msghdr struct which may contain ancillary
# data, see sendmsg(2) man page. We use ancillary data exclusively for
# passing file descriptors if any are given in fds.
var iovec = IOVec(iov_base: data.cstring,
iov_len: data.len.csize_t)
var msg = Tmsghdr(msg_iov: addr iovec,
msg_iovlen: 1)
if fds.len > 0:
# assemble ancillary data, see cmsg(3) man page
let cmsgBufferLen = CMSG_SPACE(sizeof(AsyncFD).csize_t * fds.len.csize_t)
let cmsgBuffer = newString(cmsgBufferLen)
msg.msg_control = cmsgBuffer.cstring
msg.msg_controllen = cmsgBufferLen
let cmsg: ptr Tcmsghdr = CMSG_FIRSTHDR(addr msg)
cmsg.cmsg_len = CMSG_LEN(sizeof(AsyncFD).csize_t * fds.len.csize_t)
cmsg.cmsg_level = SOL_SOCKET
cmsg.cmsg_type = SCM_RIGHTS
copyMem(CMSG_DATA(cmsg), unsafeAddr fds[0], sizeof(AsyncFD) * fds.len)
let res = sendmsg(sock.SocketHandle, addr msg, 0)
if res < 0:
let lastError = osLastError()
if lastError.int32 != EINTR and
lastError.int32 != EWOULDBLOCK and
lastError.int32 != EAGAIN:
retFuture.fail(newOSError(lastError))
else:
result = false
else:
retFuture.complete()
addWrite(fd, cb)
return retFuture