import asyncdispatch, threadpool, osproc, options from os import osLastError, newOsError from posix import SOL_SOCKET, SCM_RIGHTS, EINTR, EWOULDBLOCK, EAGAIN, IOVec, Tmsghdr, Tcmsghdr, SocketHandle, CMSG_SPACE, CMSG_FIRSTHDR, CMSG_LEN, CMSG_DATA, sendmsg, recvmsg ## asyncReadline as discussed at https://github.com/nim-lang/Nim/issues/11564 proc asyncReadline*(): Future[string] = let event = newAsyncEvent() let future = newFuture[string]("asyncReadline") proc readlineBackground(event: AsyncEvent): string = result = stdin.readline() event.trigger() let flowVar = spawn readlineBackground(event) proc callback(fd: AsyncFD): bool = future.complete(^flowVar) true addEvent(event, callback) return future proc asyncExecCmd*(command: string): Future[int] = let event = newAsyncEvent() let future = newFuture[int]("asyncExecCmd") proc execCmdBackground(event: AsyncEvent, command: string): int = result = execCmd(command) event.trigger() let flowVar = spawn execCmdBackground(event, command) proc callback(fd: AsyncFD): bool = future.complete(^flowVar) true addEvent(event, callback) return future proc asyncSendMsg*(fd: AsyncFD, data: string, ancillaryData: Option[AsyncFD] = none(AsyncFD)): Future[void] = var retFuture = newFuture[void]("asyncSendMsg") proc cb(sock: AsyncFD): bool = # FIXME: if file descriptor is given in ancillaryData, check if sock is a # unix socket result = true # sendmsg needs an array of iovec structs as described in the writev(2) man # page. The message is passed as a msghdr struct which may contain ancillary # data, see sendmsg(2) man page. We use ancillary data exclusively for # passing a file descriptor if one is given in ancillaryData. var iovec = IOVec(iov_base: data.cstring, iov_len: data.len.csize_t) var msg = Tmsghdr(msg_iov: addr iovec, msg_iovlen: 1) if ancillaryData.isSome: # assemble ancillary data, see cmsg(3) man page let cmsgBufferLen = CMSG_SPACE(sizeof(AsyncFD).csize_t) let cmsgBuffer = newString(cmsgBufferLen) msg.msg_control = cmsgBuffer.cstring msg.msg_controllen = cmsgBufferLen let cmsg: ptr Tcmsghdr = CMSG_FIRSTHDR(addr msg) cmsg.cmsg_len = CMSG_LEN(sizeof(AsyncFD).csize_t) cmsg.cmsg_level = SOL_SOCKET cmsg.cmsg_type = SCM_RIGHTS var ancillaryFd = ancillaryData.get copyMem(CMSG_DATA(cmsg), addr ancillaryFd, sizeof(AsyncFD)) let res = sendmsg(sock.SocketHandle, addr msg, 0) if res < 0: let lastError = osLastError() if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN: retFuture.fail(newOSError(lastError)) else: result = false else: retFuture.complete() addWrite(fd, cb) return retFuture proc asyncRecvMsg*(fd: AsyncFD, size: int): Future[tuple[data: string, ancillaryData: Option[AsyncFD]]] = var retFuture = newFuture[tuple[data: string, ancillaryData: Option[AsyncFD]]]("asyncRecvMsg") proc cb(sock: AsyncFD): bool = result = true var dataBuffer = newString(size) var iovec = IOVec(iov_base: dataBuffer.cstring, iov_len: dataBuffer.len.csize_t) let cmsgBufferLen = CMSG_SPACE(sizeof(AsyncFD).csize_t) let cmsgBuffer = newString(cmsgBufferLen) var msg = Tmsghdr(msg_iov: addr iovec, msg_iovlen: 1, msg_control: cmsgBuffer.cstring, msg_controllen: cmsgBufferLen) let cmsg: ptr Tcmsghdr = CMSG_FIRSTHDR(addr msg) cmsg.cmsg_len = 0 cmsg.cmsg_level = 0 cmsg.cmsg_type = 0 let res = recvmsg(sock.SocketHandle, addr msg, 0) if res < 0: let lastError = osLastError() if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN: retFuture.fail(newOSError(lastError)) else: result = false return var ancillaryData = none(AsyncFD) if cmsg.cmsg_len > 0: if cmsg.cmsg_len == CMSG_LEN(sizeof(AsyncFD).csize_t) and cmsg.cmsg_level == SOL_SOCKET and cmsg.cmsg_type == SCM_RIGHTS: var ancillaryFd: AsyncFD copyMem(addr ancillaryFd, CMSG_DATA(cmsg), sizeof(AsyncFD)) ancillaryData = some(ancillaryFd) else: retFuture.fail(newException(ValueError, "unexpected ancillary data")) return dataBuffer.setLen(res) retFuture.complete((dataBuffer, ancillaryData)) addRead(fd, cb) return retFuture