import asyncdispatch, asyncnet, os, strformat, strutils, tables from nativeSockets import Domain, SockType, Protocol from net import IpAddress, Port, isIpAddress, `$` from posix import CMSG_SPACE import asyncutils import ../../message import random const rendezvousServers: seq[tuple[hostname: string, port: Port]] = @[ ("strangeplace.net", Port(5320)), ("ulrich.earth", Port(5320)) ] type PunchdResponse = Future[tuple[msgContent: string, sock: AsyncSocket]] PunchdProgressCb = proc (future: PunchdResponse, msgContent: string) {.async.} OutgoingPunchdMessage = object future: PunchdResponse progressCb: PunchdProgressCb PunchdConnection = object sock: AsyncSocket outMessages: TableRef[string, OutgoingPunchdMessage] inConnections: FutureStream[AsyncSocket] ServerConnection = object sock: AsyncSocket outMessages: TableRef[string, Future[string]] peerNotifications: FutureStream[string] probedIp: IpAddress srcPort: Port probedSrcPorts: seq[Port] # Punchd messages Progress* = object command: string args: string # Server messages OkGetPeerinfo* = object ip: string localPort: Port probedPorts: seq[Port] OkGetEndpoint* = object ip: IpAddress port: Port NotifyPeer* = object sender: string recipient: string technique: string srcIp: IpAddress srcPort: Port probedSrcPorts: seq[Port] dstIp: IpAddress dstPort: Port probedDstPorts: seq[Port] extraArgs: string # Exceptions PunchdError = object of ValueError # FIXME: not used yet ServerError = object of ValueError proc usage() = echo &"usage: {paramStr(0)} PEER_ID [OTHER_PEER_ID]" proc handleServerMessages(conn: ServerConnection) {.async.} = while true: let line = await conn.sock.recvLine(maxLength = 400) let args = line.parseArgs(3, 1) case args[0]: of "ok": let future = conn.outMessages[args[1]] conn.outMessages.del(args[1]) future.complete(args[2]) of "error": let future = conn.outMessages[args[1]] conn.outMessages.del(args[1]) future.fail(newException(ServerError, args[2])) of "notify-peer": asyncCheck conn.peerNotifications.write(line.substr(args[0].len + 1)) else: raise newException(ValueError, "invalid server message") proc handlePunchdMessages(conn: PunchdConnection) {.async.} = while true: let fd = conn.sock.getFd.AsyncFD let cmsgSize = CMSG_SPACE(sizeof(AsyncFD).csize_t) let resp = await fd.asyncRecvMsg(400, cmsgSize) let line = resp.data.strip(leading = false, trailing = true, chars = {'\n'}) echo "received punchd message: ", line let args = line.parseArgs(3, 1) case args[0]: of "ok": let outMsg = conn.outMessages[args[1]] conn.outMessages.del(args[1]) if resp.cmsgs.len < 1: echo "no cmsg" raise newException(ValueError, "invalid punchd message") let sock = newAsyncSocket(resp.cmsgs[0].getFd) register(sock.getFd.AsyncFD) outMsg.future.complete((args[2], sock)) of "error": let outMsg = conn.outMessages[args[1]] conn.outMessages.del(args[1]) outMsg.future.fail(newException(PunchdError, args[2])) of "progress": let outMsg = conn.outMessages[args[1]] asyncCheck outMsg.progressCb(outMsg.future, args[2]) else: raise newException(ValueError, "invalid punchd message") proc sendRequest(connection: PunchdConnection, command: string, content: string, progressCb: PunchdProgressCb = nil): PunchdResponse = result = newFuture[PunchdResponse.T]("sendRequest") let id = $rand(uint32) asyncCheck connection.sock.send(&"{command}|{id}|{content}\n") let outMsg = OutgoingPunchdMessage(future: result, progressCb: progressCb) echo "id = ", id connection.outMessages[id] = outMsg proc sendRequest(connection: ServerConnection, command: string, content: string): Future[string] = result = newFuture[string]("sendRequest") let id = $rand(uint32) var request: string if content.len != 0: request = &"{command}|{id}|{content}\n" else: request = &"{command}|{id}\n" asyncCheck connection.sock.send(request) connection.outMessages[id] = result proc acceptConnection(punchdConn: PunchdConnection, command: string, msgContent: string) {.async.} = echo "accepting connection ", msgContent let resp = await punchdConn.sendRequest(command, msgContent) asyncCheck punchdConn.inConnections.write(resp.sock) proc handlePeerNotifications(serverConn: ServerConnection, punchdConn: PunchdConnection, peerId: string) {.async.} = while true: let (hasData, data) = await serverConn.peerNotifications.read if not hasData: break try: let msg = parseMessage[NotifyPeer](data) # FIXME: check if we want to receive messages from the sender echo "received message from ", msg.sender let probedSrcPorts = msg.probedSrcPorts.join(",") let probedDstPorts = msg.probedDstPorts.join(",") let req = &"{msg.technique}|{msg.srcIp}|{msg.srcPort}|{probedSrcPorts}|{msg.dstIp}|{msg.dstPort}|{probedDstPorts}|{msg.extraArgs}" asyncCheck acceptConnection(punchdConn, "respond", req) except ValueError as e: echo e.msg discard proc punchHole(punchdConn: PunchdConnection, serverConn: ServerConnection, peerId: string, otherPeerId: string, technique: string): Future[AsyncSocket] {.async.} = let sResp = await serverConn.sendRequest("get-peerinfo", otherPeerId) let peerInfo = parseMessage[OkGetPeerinfo](sResp) proc progressCb(future: PunchdResponse, msgContent: string) {.async.} = try: let parsedResp = parseMessage[Progress](msgContent) let req = &"{peerId}|{otherPeerId}|{parsedResp.command}|{parsedResp.args}" discard await serverConn.sendRequest("notify-peer", req) except ServerError as e: future.fail(e) except ValueError as e: future.fail(e) let myProbedPorts = serverConn.probedSrcPorts.join(",") let probedPeerPorts = peerInfo.probedPorts.join(",") let req = &"{technique}|{serverConn.probedIp}|{serverConn.srcPort}|{myProbedPorts}|{peerInfo.ip}|{peerInfo.localPort}|{probedPeerPorts}" let pResp = await punchdConn.sendRequest("initiate", req, progressCb) result = pResp.sock proc getEndpoint(srcPort: Port, serverHostname: string, serverPort: Port): Future[OkGetEndpoint] {.async.} = let sock = newAsyncSocket() var failCount = 0 while true: try: sock.bindAddr(srcPort) break except OSError as e: if failCount == 3: raise e failCount.inc await sleepAsync(100) await sock.connect(serverHostname, serverPort) let id = rand(uint32) await sock.send(&"get-endpoint|{id}\n") let line = await sock.recvLine(maxLength = 400) let args = line.parseArgs(3) assert(args[0] == "ok") assert(args[1] == $id) result = parseMessage[OkGetEndpoint](args[2]) let emptyLine = await sock.recvLine(maxLength = 400) assert(emptyLine.len == 0) sock.close() proc initServerConnection(srcPort: Port): Future[ServerConnection] {.async.} = result.srcPort = srcPort for r in rendezvousServers: let endpoint = await getEndpoint(srcPort, r.hostname, r.port) # FIXME: what if we get get different IPs from different servers result.probedIp = endpoint.ip result.probedSrcPorts.add(endpoint.port) result.sock = await asyncnet.dial(rendezvousServers[0].hostname, rendezvousServers[0].port) result.outMessages = newTable[string, Future[string]]() result.peerNotifications = newFutureStream[string]("initServerConnection") proc runApp(peerId: string, otherPeerId: string = "") {.async.} = randomize() # initialize random number generator var punchdConn = PunchdConnection() punchdConn.sock = newAsyncSocket(AF_UNIX, SOCK_STREAM, IPPROTO_IP) punchdConn.outMessages = newTable[string, OutgoingPunchdMessage]() punchdConn.inConnections = newFutureStream[AsyncSocket]("runApp") await punchdConn.sock.connectUnix("/tmp/punchd.socket") asyncCheck handlePunchdMessages(punchdConn) let srcPort = rand(Port(1024) .. Port.high) if otherPeerId.len == 0: # register and wait for connections echo &"init server connection, source port: {srcPort}" let serverConn = await initServerConnection(srcPort) asyncCheck handleServerMessages(serverConn) asyncCheck handlePeerNotifications(serverConn, punchdConn, peerId) let probedPorts = serverConn.probedSrcPorts.join(",") let req = &"{peerId}|{serverConn.probedIp}|{serverConn.srcPort}|{probedPorts}" echo "registering: ", req discard await serverConn.sendRequest("register", req) while true: let (hasSock, sock) = await punchdConn.inConnections.read if not hasSock: break echo "accepted!" let msg = await sock.recv(4) echo "received message: ", msg await sock.send("pong") sock.close() else: # initiate a new connection var serverConn = await initServerConnection(srcPort) asyncCheck handleServerMessages(serverConn) let sock = await punchHole(punchdConn, serverConn, peerId, otherPeerId, "udp") echo "connected!" await sock.send("ping") let msg = await sock.recv(4) echo "received message: ", msg sock.close() proc main() = if paramCount() < 1 or paramCount() > 2: usage() quit(1) if paramCount() == 2: waitFor runApp(paramStr(1), paramStr(2)) else: waitFor runApp(paramStr(1)) when isMainModule: main()