diff --git a/examples/app/app.nim b/examples/app/app.nim new file mode 100644 index 0000000..794fb03 --- /dev/null +++ b/examples/app/app.nim @@ -0,0 +1,215 @@ +import asyncdispatch, asyncnet, os, strformat, strutils, tables +from nativeSockets import Domain, SockType, Protocol +from net import IpAddress, Port, isIpAddress, `$` +import asyncutils +import ../../message +import random + +type + PunchdResponse = Future[tuple[msgContent: string, sock: AsyncSocket]] + PunchdProgressCb = proc (future: PunchdResponse, msgContent: string) {.async.} + + OutgoingPunchdMessage = object + future: PunchdResponse + progressCb: PunchdProgressCb + + PunchdConnection = object + sock: AsyncSocket + outMessages: TableRef[string, OutgoingPunchdMessage] + inConnections: FutureStream[AsyncSocket] + + ServerConnection = object + sock: AsyncSocket + outMessages: TableRef[string, Future[string]] + peerNotifications: FutureStream[string] + + # Punchd messages + ProgressTcpSyniConnect* = object + command: string + args: string + + # Server messages + OkGetInfo* = object + ip: string + ports: array[3, uint16] + NotifyEndpoint* = object + command: string + ip: IpAddress + port: Port + NotifyPeer* = object + sender: string + recipient: string + command: string + srcIp: IpAddress + srcPorts: array[3, Port] + dstIp: IpAddress + dstPorts: array[3, Port] + seqNumbers: array[10, uint32] + + # Exceptions + PunchdError = object of ValueError # FIXME: not used yet + ServerError = object of ValueError + +proc usage() = + echo &"usage: {paramStr(0)} SERVER_HOSTNAME SERVER_PORT PEER_ID [OTHER_PEER_ID]" + +proc handleServerMessages(connection: ServerConnection) {.async.} = + while true: + let line = await connection.sock.recvLine(maxLength = 400) + let args = line.parseArgs(3, 1) + case args[0]: + of "ok": + let future = connection.outMessages[args[1]] + connection.outMessages.del(args[1]) + future.complete(args[2]) + of "error": + let future = connection.outMessages[args[1]] + connection.outMessages.del(args[1]) + future.fail(newException(ServerError, args[2])) + of "notify-peer": + asyncCheck connection.peerNotifications.write(&"{args[1]}|{args[2]}") + else: + raise newException(ValueError, "invalid server message") + +proc handlePunchdMessages(connection: PunchdConnection) {.async.} = + while true: + let fd = connection.sock.getFd.AsyncFD + let resp = await fd.asyncRecvMsg(size = 400, cmsgSize = sizeof(AsyncFD)) + let args = resp.data.parseArgs(3, 1) + case args[0]: + of "ok": + let outMsg = connection.outMessages[args[1]] + connection.outMessages.del(args[1]) + if resp.cmsgs.len != 1: + raise newException(ValueError, "invalid punchd message") + let sock = newAsyncSocket(resp.cmsgs[0].getFd) + register(sock.getFd.AsyncFD) + outMsg.future.complete((args[2], sock)) + of "error": + let outMsg = connection.outMessages[args[1]] + connection.outMessages.del(args[1]) + outMsg.future.fail(newException(ServerError, args[2])) + of "progress": + let outMsg = connection.outMessages[args[1]] + asyncCheck outMsg.progressCb(outMsg.future, args[2]) + else: + raise newException(ValueError, "invalid punchd message") + +proc sendRequest(connection: PunchdConnection, command: string, content: string, + progressCb: PunchdProgressCb = nil): PunchdResponse = + result = newFuture[PunchdResponse.T]("sendRequest") + let id = $rand(uint32) + asyncCheck connection.sock.send(&"{command}|{id}|{content}\n") + let outMsg = OutgoingPunchdMessage(future: result, progressCb: progressCb) + echo "id = ", id + connection.outMessages[id] = outMsg + +proc sendRequest(connection: ServerConnection, command: string, + content: string): Future[string] = + result = newFuture[string]("sendRequest") + let id = $rand(uint32) + asyncCheck connection.sock.send(&"{command}|{id}|{content}\n") + connection.outMessages[id] = result + +proc acceptConnection(punchdConn: PunchdConnection, command: string, + msgContent: string) {.async.} = + echo "accepting connection ", msgContent + let resp = await punchdConn.sendRequest(command, msgContent) + asyncCheck punchdConn.inConnections.write(resp.sock) + +proc handlePeerNotifications(serverConn: ServerConnection, + punchdConn: PunchdConnection, + peerId: string) {.async.} = + while true: + let (hasData, data) = await serverConn.peerNotifications.read + if not hasData: + break + try: + let msg = parseMessage[NotifyPeer](data) + # FIXME: check if we want to receive messages from the sender + echo "received message from ", msg.sender + let srcPorts = msg.srcPorts.join(",") + let dstPorts = msg.dstPorts.join(",") + let seqNumbers = msg.seqNumbers.join(",") + let req = &"{msg.srcIp}|{srcPorts}|{msg.dstIp}|{dstPorts}|{seqNumbers}" + asyncCheck acceptConnection(punchdConn, msg.command, req) + except ValueError as e: + echo e.msg + discard + +proc punchHole(punchdConn: PunchdConnection, serverConn: ServerConnection, + ipAddress: IpAddress, peerId: string, + otherPeerId: string): Future[AsyncSocket] {.async.} = + let sResp = await serverConn.sendRequest("get-info", otherPeerId) + let peerInfo = parseMessage[OkGetInfo](sResp) + proc progressCb(future: PunchdResponse, msgContent: string) {.async.} = + try: + let parsedResp = parseMessage[ProgressTcpSyniConnect](msgContent) + let req = &"{peerId}|{otherPeerId}|{parsedResp.command}|{parsedResp.args}" + discard await serverConn.sendRequest("notify-peer", req) + except ServerError as e: + future.fail(e) + except ValueError as e: + future.fail(e) + let myPorts = &"{1234},{1234},{1234}" + let peerPorts = peerInfo.ports.join(",") + let req = &"{ipAddress}|{myPorts}|{peerInfo.ip}|{peerPorts}" + let pResp = await punchdConn.sendRequest("tcp-syni-connect", req, progressCb) + result = pResp.sock + +proc runApp(serverHostname: string, serverPort: Port, peerId: string, + otherPeerId: string = "") {.async.} = + # TODO: determine endpoint in another proc + randomize() # initialize random number generator + var punchdConn = PunchdConnection() + punchdConn.sock = newAsyncSocket(AF_UNIX, SOCK_STREAM, IPPROTO_IP) + punchdConn.outMessages = newTable[string, OutgoingPunchdMessage]() + punchdConn.inConnections = newFutureStream[AsyncSocket]("runApp") + await punchdConn.sock.connectUnix("/tmp/punchd.socket") + var serverConn = ServerConnection() + serverConn.sock = await asyncnet.dial(serverHostname, serverPort, IPPROTO_TCP) + serverConn.outMessages = newTable[string, Future[string]]() + serverConn.peerNotifications = newFutureStream[string]("runApp") + let resp = await serverConn.sock.recvLine(maxLength = 400) + let endpoint = parseMessage[NotifyEndpoint](resp) + echo &"rendezvous server says I am {endpoint.ip}:{endpoint.port.int}" + asyncCheck handlePunchdMessages(punchdConn) + asyncCheck handleServerMessages(serverConn) + asyncCheck handlePeerNotifications(serverConn, punchdConn, peerId) + if otherPeerId.len == 0: + # register and wait for connections + let req = &"{peerId}|{endpoint.ip}|{endpoint.port.int},{endpoint.port.int},{endpoint.port.int}" + discard await serverConn.sendRequest("register", req) + echo "registered" + while true: + let (hasSock, sock) = await punchdConn.inConnections.read + if not hasSock: + break + echo "got connection" + let msg = await sock.recv(1000) + echo "received message: ", msg + await sock.send("pong") + + else: + # initiate a new connection + let sock = await punchHole(punchdConn, serverConn, endpoint.ip, peerId, + otherPeerId) + await sock.send("ping") + let msg = await sock.recv(1000) + echo "received message: ", msg + +proc main() = + if paramCount() < 3 or paramCount() > 4: + usage() + quit(1) + let portNumber = paramStr(2).parseUInt + if portNumber > uint16.high: + usage() + quit(1) + if paramCount() == 4: + waitFor runApp(paramStr(1), Port(portNumber), paramStr(3), paramStr(4)) + else: + waitFor runApp(paramStr(1), Port(portNumber), paramStr(3)) + +when isMainModule: + main() diff --git a/examples/app/asyncutils.nim b/examples/app/asyncutils.nim new file mode 100644 index 0000000..202243d --- /dev/null +++ b/examples/app/asyncutils.nim @@ -0,0 +1,80 @@ +import asyncdispatch +from net import + BufferSize +from os import + osLastError, + newOsError +from posix import + EINTR, + EWOULDBLOCK, + EAGAIN, + IOVec, + Tmsghdr, + Tcmsghdr, + SocketHandle, + CMSG_FIRSTHDR, + CMSG_NXTHDR, + CMSG_DATA, + SOL_SOCKET, + SCM_RIGHTS, + recvmsg + +type ControlMessage* = object + level*: int + msgType*: int + data*: string + +proc getFd*(cmsg: ControlMessage): AsyncFD = + echo "cmsg.data.len: ", cmsg.data.len + if cmsg.level != SOL_SOCKET or + cmsg.msgType != SCM_RIGHTS or + cmsg.data.len != sizeof(AsyncFD): + raise(newException(ValueError, "unexpected ancillary data")) + result = cast[ptr AsyncFD](cmsg.data.cstring)[] + +proc asyncRecvMsg*(fd: AsyncFD, size: int = BufferSize, + cmsgSize: int = BufferSize): + Future[tuple[data: string, cmsgs: seq[ControlMessage]]] = + var retFuture = newFuture[tuple[data: string, + cmsgs: seq[ControlMessage]]]("asyncRecvMsg") + + proc cb(sock: AsyncFD): bool = + result = true + + var dataBuffer = newString(size) + var iovec = IOVec(iov_base: dataBuffer.cstring, + iov_len: dataBuffer.len.csize_t) + var cmsgBuffer = newString(cmsgSize) + zeroMem(cmsgBuffer.cstring, cmsgBuffer.len) + var msg = Tmsghdr(msg_iov: addr iovec, + msg_iovlen: 1, + msg_control: addr cmsgBuffer[0], + msg_controllen: cmsgSize.csize_t) + + let res = recvmsg(sock.SocketHandle, addr msg, 0) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EINTR and + lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: + retFuture.fail(newOSError(lastError)) + else: + result = false + return + + var cmsgs = newSeq[ControlMessage]() + var cmsgHeader = CMSG_FIRSTHDR(addr msg) + while cmsgHeader != nil: + let dataLen = cmsgHeader.cmsg_len - sizeof(Tcmsghdr).csize_t + var cmsg = ControlMessage(level: cmsgHeader.cmsg_level, + msgType: cmsgHeader.cmsg_type, + data: newString(dataLen)) + copyMem(cmsg.data.cstring, CMSG_DATA(cmsgHeader), cmsgHeader.cmsg_len) + cmsgs.add(cmsg) + cmsgHeader = CMSG_NXTHDR(addr msg, cmsgHeader) + + dataBuffer.setLen(res) + retFuture.complete((dataBuffer, cmsgs)) + + addRead(fd, cb) + return retFuture diff --git a/examples/rendezvous_server/rendezvous_server.nim b/examples/rendezvous_server/rendezvous_server.nim new file mode 100644 index 0000000..9bb71a3 --- /dev/null +++ b/examples/rendezvous_server/rendezvous_server.nim @@ -0,0 +1,100 @@ +import asyncdispatch, asyncnet, os, strformat, strutils, tables +from net import IpAddress, Port, `$` +import ../../message + +# TODO: we need to find out our own external IP and return it to clients who +# connect from our LAN + +type + Register = object + peerId: string + ip: IpAddress + ports: array[3, Port] + + GetInfo = object + peerId: string + + NotifyPeer = object + sender: string + recipient: string + data: string + + Client = object + sock: AsyncSocket + ip: IpAddress + ports: array[3, Port] + +proc removeClient(clients: TableRef[string, Client], peerId: string) = + if peerId.len > 0: clients.del(peerId) + +proc processClient(client: AsyncSocket, clients: TableRef[string, Client]) {.async.} = + let (address, port) = client.getPeerAddr + await client.send(&"notify-endpoint|{address}|{port.int}\n") + var id = "" + var peerId = "" + while true: + let line = await client.recvLine(maxLength = 400) + if line.len == 0: + removeClient(clients, peerId) + break + try: + let args = line.parseArgs(3) + id = args[1] + case args[0]: + of "register": + let req = parseMessage[Register](args[2]) + peerId = req.peerId + clients[peerId] = Client(sock: client, ip: req.ip, ports: req.ports) + asyncCheck client.send(&"ok|{id}\n") + of "get-info": + let req = parseMessage[GetInfo](args[2]) + let peer = clients[req.peerId] + let peerPorts = peer.ports.join(",") + asyncCheck client.send(&"ok|{id}|{peer.ip}|{peerPorts}\n") + of "notify-peer": + let req = parseMessage[NotifyPeer](args[2]) + let recipient = clients[req.recipient] + asyncCheck recipient.sock.send(&"notify-peer|{req.sender}|{req.recipient}|{req.data}\n") + asyncCheck client.send(&"ok|{id}\n") + else: + client.close() + removeClient(clients, peerId) + break + + except KeyError: + asyncCheck client.send(&"error|{id}|peer not registered\n") + + except ValueError: + client.close + removeClient(clients, peerId) + break + +proc serve(port: Port) {.async.} = + var clients = newTable[string, Client]() + var server = newAsyncSocket() + server.setSockOpt(OptReuseAddr, true) + server.bindAddr(port) + server.listen() + + while true: + let client = await server.accept() + asyncCheck processClient(client, clients) + +proc main() = + if paramCount() != 1: + echo(fmt"usage: {paramStr(0)} PORT") + quit(1) + + try: + let portNumber = paramStr(1).parseUInt + if portNumber > uint16.high: + raise newException(ValueError, "port out of range") + let port = Port(portNumber) + asyncCheck serve(port) + runForever() + + except ValueError as e: + echo e.msg + +when isMainModule: + main()