148 lines
4.9 KiB
Nim
148 lines
4.9 KiB
Nim
import asyncdispatch, threadpool, osproc, options
|
|
from os import
|
|
osLastError,
|
|
newOsError
|
|
from posix import
|
|
SOL_SOCKET,
|
|
SCM_RIGHTS,
|
|
EINTR,
|
|
EWOULDBLOCK,
|
|
EAGAIN,
|
|
IOVec,
|
|
Tmsghdr,
|
|
Tcmsghdr,
|
|
SocketHandle,
|
|
CMSG_SPACE,
|
|
CMSG_FIRSTHDR,
|
|
CMSG_LEN,
|
|
CMSG_DATA,
|
|
sendmsg,
|
|
recvmsg
|
|
|
|
## asyncReadline as discussed at https://github.com/nim-lang/Nim/issues/11564
|
|
proc asyncReadline*(): Future[string] =
|
|
let event = newAsyncEvent()
|
|
let future = newFuture[string]("asyncReadline")
|
|
proc readlineBackground(event: AsyncEvent): string =
|
|
result = stdin.readline()
|
|
event.trigger()
|
|
let flowVar = spawn readlineBackground(event)
|
|
proc callback(fd: AsyncFD): bool =
|
|
future.complete(^flowVar)
|
|
true
|
|
addEvent(event, callback)
|
|
return future
|
|
|
|
proc asyncExecCmd*(command: string): Future[int] =
|
|
let event = newAsyncEvent()
|
|
let future = newFuture[int]("asyncExecCmd")
|
|
proc execCmdBackground(event: AsyncEvent, command: string): int =
|
|
result = execCmd(command)
|
|
event.trigger()
|
|
let flowVar = spawn execCmdBackground(event, command)
|
|
proc callback(fd: AsyncFD): bool =
|
|
future.complete(^flowVar)
|
|
true
|
|
addEvent(event, callback)
|
|
return future
|
|
|
|
proc asyncSendMsg*(fd: AsyncFD,
|
|
data: string,
|
|
ancillaryData: Option[AsyncFD] = none(AsyncFD)): Future[void] =
|
|
var retFuture = newFuture[void]("asyncSendMsg")
|
|
|
|
proc cb(sock: AsyncFD): bool =
|
|
# FIXME: if file descriptor is given in ancillaryData, check if sock is a
|
|
# unix socket
|
|
result = true
|
|
|
|
# sendmsg needs an array of iovec structs as described in the writev(2) man
|
|
# page. The message is passed as a msghdr struct which may contain ancillary
|
|
# data, see sendmsg(2) man page. We use ancillary data exclusively for
|
|
# passing a file descriptor if one is given in ancillaryData.
|
|
var iovec = IOVec(iov_base: data.cstring,
|
|
iov_len: data.len.csize_t)
|
|
var msg = Tmsghdr(msg_iov: addr iovec,
|
|
msg_iovlen: 1)
|
|
if ancillaryData.isSome:
|
|
# assemble ancillary data, see cmsg(3) man page
|
|
let cmsgBufferLen = CMSG_SPACE(sizeof(AsyncFD).csize_t)
|
|
let cmsgBuffer = newString(cmsgBufferLen)
|
|
msg.msg_control = cmsgBuffer.cstring
|
|
msg.msg_controllen = cmsgBufferLen
|
|
let cmsg: ptr Tcmsghdr = CMSG_FIRSTHDR(addr msg)
|
|
cmsg.cmsg_len = CMSG_LEN(sizeof(AsyncFD).csize_t)
|
|
cmsg.cmsg_level = SOL_SOCKET
|
|
cmsg.cmsg_type = SCM_RIGHTS
|
|
var ancillaryFd = ancillaryData.get
|
|
copyMem(CMSG_DATA(cmsg), addr ancillaryFd, sizeof(AsyncFD))
|
|
|
|
let res = sendmsg(sock.SocketHandle, addr msg, 0)
|
|
if res < 0:
|
|
let lastError = osLastError()
|
|
if lastError.int32 != EINTR and
|
|
lastError.int32 != EWOULDBLOCK and
|
|
lastError.int32 != EAGAIN:
|
|
retFuture.fail(newOSError(lastError))
|
|
else:
|
|
result = false
|
|
else:
|
|
retFuture.complete()
|
|
|
|
addWrite(fd, cb)
|
|
return retFuture
|
|
|
|
proc asyncRecvMsg*(fd: AsyncFD,
|
|
size: int): Future[tuple[data: string, ancillaryFd: Option[AsyncFD]]] =
|
|
var retFuture = newFuture[
|
|
tuple[data: string, ancillaryFd: Option[AsyncFD]]
|
|
]("asyncRecvMsg")
|
|
|
|
proc cb(sock: AsyncFD): bool =
|
|
result = true
|
|
|
|
var dataBuffer = newString(size)
|
|
var iovec = IOVec(iov_base: dataBuffer.cstring,
|
|
iov_len: dataBuffer.len.csize_t)
|
|
let cmsgBufferLen = CMSG_SPACE(sizeof(AsyncFD).csize_t)
|
|
let cmsgBuffer = newString(cmsgBufferLen)
|
|
var msg = Tmsghdr(msg_iov: addr iovec,
|
|
msg_iovlen: 1,
|
|
msg_control: cmsgBuffer.cstring,
|
|
msg_controllen: cmsgBufferLen)
|
|
let cmsg: ptr Tcmsghdr = CMSG_FIRSTHDR(addr msg)
|
|
cmsg.cmsg_len = 0
|
|
cmsg.cmsg_level = 0
|
|
cmsg.cmsg_type = 0
|
|
|
|
let res = recvmsg(sock.SocketHandle, addr msg, 0)
|
|
if res < 0:
|
|
let lastError = osLastError()
|
|
if lastError.int32 != EINTR and
|
|
lastError.int32 != EWOULDBLOCK and
|
|
lastError.int32 != EAGAIN:
|
|
retFuture.fail(newOSError(lastError))
|
|
else:
|
|
result = false
|
|
return
|
|
|
|
var ancillaryData = none(AsyncFD)
|
|
if cmsg.cmsg_len > 0:
|
|
if cmsg.cmsg_len == CMSG_LEN(sizeof(AsyncFD).csize_t) and
|
|
cmsg.cmsg_level == SOL_SOCKET and
|
|
cmsg.cmsg_type == SCM_RIGHTS:
|
|
var ancillaryFd: AsyncFD
|
|
copyMem(addr ancillaryFd, CMSG_DATA(cmsg), sizeof(AsyncFD))
|
|
ancillaryData = some(ancillaryFd)
|
|
else:
|
|
retFuture.fail(newException(ValueError, "unexpected ancillary data"))
|
|
return
|
|
|
|
# FIXME: This will not work on FreeBSD as recvmsg(2) says:
|
|
# recvmmsg() returns the number of messages received
|
|
dataBuffer.setLen(res)
|
|
retFuture.complete((dataBuffer, ancillaryData))
|
|
|
|
addRead(fd, cb)
|
|
return retFuture
|