import asyncdispatch, threadpool, osproc from net import BufferSize from os import osLastError, newOsError from sequtils import map, foldl from posix import EINTR, EWOULDBLOCK, EAGAIN, IOVec, Tmsghdr, Tcmsghdr, SocketHandle, CMSG_SPACE, CMSG_FIRSTHDR, CMSG_NXTHDR, CMSG_LEN, CMSG_DATA, sendmsg, recvmsg ## asyncReadline as discussed at https://github.com/nim-lang/Nim/issues/11564 proc asyncReadline*(): Future[string] = let event = newAsyncEvent() let future = newFuture[string]("asyncReadline") proc readlineBackground(event: AsyncEvent): string = result = stdin.readline() event.trigger() let flowVar = spawn readlineBackground(event) proc callback(fd: AsyncFD): bool = future.complete(^flowVar) true addEvent(event, callback) return future proc asyncExecCmd*(command: string): Future[int] = let event = newAsyncEvent() let future = newFuture[int]("asyncExecCmd") proc execCmdBackground(event: AsyncEvent, command: string): int = result = execCmd(command) event.trigger() let flowVar = spawn execCmdBackground(event, command) proc callback(fd: AsyncFD): bool = future.complete(^flowVar) true addEvent(event, callback) return future type ControlMessage* = object level*: int msgType*: int data*: string proc asyncSendMsg*(fd: AsyncFD, data: string, cmsgs: seq[ControlMessage] = @[]): Future[void] = var retFuture = newFuture[void]("asyncSendMsg") proc cb(sock: AsyncFD): bool = result = true # sendmsg needs an array of iovec structs as described in the writev(2) man # page. The message is passed as a msghdr struct which may contain ancillary # data, see sendmsg(2) man page. var iovec = IOVec(iov_base: data.cstring, iov_len: data.len.csize_t) var msg = Tmsghdr(msg_iov: addr iovec, msg_iovlen: 1) var cmsgBuf: string if cmsgs.len > 0: proc space(c: ControlMessage): csize_t = CMSG_SPACE(c.data.len.csize_t) let cmsgBufLen = cmsgs.map(space).foldl(a + b, 0.csize_t) cmsgBuf = newString(cmsgBufLen) msg.msg_control = cmsgBuf.cstring msg.msg_controllen = cmsgBufLen var cmsgHeader = CMSG_FIRSTHDR(addr msg) for cmsg in cmsgs: cmsgHeader.cmsg_len = CMSG_LEN(cmsg.data.len.csize_t) cmsgHeader.cmsg_level = cmsg.level.int32 cmsgHeader.cmsg_type = cmsg.msgType.int32 copyMem(CMSG_DATA(cmsgHeader), cmsg.data.cstring, cmsg.data.len) cmsgHeader = CMSG_NXTHDR(addr msg, cmsgHeader) let res = sendmsg(sock.SocketHandle, addr msg, 0) if res < 0: let lastError = osLastError() if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN: retFuture.fail(newOSError(lastError)) else: result = false else: retFuture.complete() addWrite(fd, cb) return retFuture proc asyncRecvMsg*(fd: AsyncFD, size: int = BufferSize, cmsgSize: int = BufferSize): Future[tuple[data: string, cmsgs: seq[ControlMessage]]] = var retFuture = newFuture[tuple[data: string, cmsgs: seq[ControlMessage]]]("asyncRecvMsg") proc cb(sock: AsyncFD): bool = result = true var dataBuffer = newString(size) var iovec = IOVec(iov_base: dataBuffer.cstring, iov_len: dataBuffer.len.csize_t) var cmsgBuffer = newString(cmsgSize) zeroMem(cmsgBuffer.cstring, cmsgBuffer.len) var msg = Tmsghdr(msg_iov: addr iovec, msg_iovlen: 1, msg_control: addr cmsgBuffer[0], msg_controllen: cmsgSize.csize_t) let res = recvmsg(sock.SocketHandle, addr msg, 0) if res < 0: let lastError = osLastError() if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN: retFuture.fail(newOSError(lastError)) else: result = false return var cmsgs = newSeq[ControlMessage]() var cmsgHeader = CMSG_FIRSTHDR(addr msg) while cmsgHeader != nil: let dataLen = cmsgHeader.cmsg_len - sizeof(Tcmsghdr).csize_t var cmsg = ControlMessage(level: cmsgHeader.cmsg_level, msgType: cmsgHeader.cmsg_type, data: newString(dataLen)) copyMem(cmsg.data.cstring, CMSG_DATA(cmsgHeader), cmsgHeader.cmsg_len) cmsgs.add(cmsg) cmsgHeader = CMSG_NXTHDR(addr msg, cmsgHeader) dataBuffer.setLen(res) retFuture.complete((dataBuffer, cmsgs)) addRead(fd, cb) return retFuture