From 3d40ddec85186f33ffa3864ff38fb297c17003f9 Mon Sep 17 00:00:00 2001 From: Christian Ulrich Date: Sun, 26 Jul 2020 17:46:17 +0200 Subject: [PATCH] introduce get-endpoint: server sends endpoint info and closes connection (allows subsequent connections to the same server) --- examples/app/app.nim | 86 ++++++++++--------- .../rendezvous_server/rendezvous_server.nim | 22 ++++- 2 files changed, 65 insertions(+), 43 deletions(-) diff --git a/examples/app/app.nim b/examples/app/app.nim index 6f0805c..2022066 100644 --- a/examples/app/app.nim +++ b/examples/app/app.nim @@ -34,8 +34,7 @@ type OkGetInfo* = object ip: string ports: array[3, uint16] - NotifyEndpoint* = object - command: string + OkGetEndpoint* = object ip: IpAddress port: Port NotifyPeer* = object @@ -55,45 +54,45 @@ type proc usage() = echo &"usage: {paramStr(0)} SERVER_HOSTNAME SERVER_PORT PEER_ID [OTHER_PEER_ID]" -proc handleServerMessages(connection: ServerConnection) {.async.} = +proc handleServerMessages(conn: ServerConnection) {.async.} = while true: - let line = await connection.sock.recvLine(maxLength = 400) + let line = await conn.sock.recvLine(maxLength = 400) let args = line.parseArgs(3, 1) case args[0]: of "ok": - let future = connection.outMessages[args[1]] - connection.outMessages.del(args[1]) + let future = conn.outMessages[args[1]] + conn.outMessages.del(args[1]) future.complete(args[2]) of "error": - let future = connection.outMessages[args[1]] - connection.outMessages.del(args[1]) + let future = conn.outMessages[args[1]] + conn.outMessages.del(args[1]) future.fail(newException(ServerError, args[2])) of "notify-peer": - asyncCheck connection.peerNotifications.write(line.substr(args[0].len + 1)) + asyncCheck conn.peerNotifications.write(line.substr(args[0].len + 1)) else: raise newException(ValueError, "invalid server message") -proc handlePunchdMessages(connection: PunchdConnection) {.async.} = +proc handlePunchdMessages(conn: PunchdConnection) {.async.} = while true: - let fd = connection.sock.getFd.AsyncFD + let fd = conn.sock.getFd.AsyncFD let resp = await fd.asyncRecvMsg(size = 400, cmsgSize = sizeof(AsyncFD)) let line = resp.data.strip(leading = false, trailing = true, chars = {'\n'}) let args = line.parseArgs(3, 1) case args[0]: of "ok": - let outMsg = connection.outMessages[args[1]] - connection.outMessages.del(args[1]) + let outMsg = conn.outMessages[args[1]] + conn.outMessages.del(args[1]) if resp.cmsgs.len != 1: raise newException(ValueError, "invalid punchd message") let sock = newAsyncSocket(resp.cmsgs[0].getFd) register(sock.getFd.AsyncFD) outMsg.future.complete((args[2], sock)) of "error": - let outMsg = connection.outMessages[args[1]] - connection.outMessages.del(args[1]) + let outMsg = conn.outMessages[args[1]] + conn.outMessages.del(args[1]) outMsg.future.fail(newException(ServerError, args[2])) of "progress": - let outMsg = connection.outMessages[args[1]] + let outMsg = conn.outMessages[args[1]] asyncCheck outMsg.progressCb(outMsg.future, args[2]) else: raise newException(ValueError, "invalid punchd message") @@ -111,7 +110,12 @@ proc sendRequest(connection: ServerConnection, command: string, content: string): Future[string] = result = newFuture[string]("sendRequest") let id = $rand(uint32) - asyncCheck connection.sock.send(&"{command}|{id}|{content}\n") + var request: string + if content.len != 0: + request = &"{command}|{id}|{content}\n" + else: + request = &"{command}|{id}\n" + asyncCheck connection.sock.send(request) connection.outMessages[id] = result proc acceptConnection(punchdConn: PunchdConnection, command: string, @@ -141,8 +145,8 @@ proc handlePeerNotifications(serverConn: ServerConnection, discard proc punchHole(punchdConn: PunchdConnection, serverConn: ServerConnection, - ipAddress: IpAddress, peerId: string, - otherPeerId: string): Future[AsyncSocket] {.async.} = + peerId: string, otherPeerId: string): + Future[AsyncSocket] {.async.} = let sResp = await serverConn.sendRequest("get-info", otherPeerId) let peerInfo = parseMessage[OkGetInfo](sResp) proc progressCb(future: PunchdResponse, msgContent: string) {.async.} = @@ -156,26 +160,31 @@ proc punchHole(punchdConn: PunchdConnection, serverConn: ServerConnection, future.fail(e) let myPorts = (@[Port(1234)] & serverConn.probedPorts).join(",") let peerPorts = peerInfo.ports.join(",") - let req = &"{ipAddress}|{myPorts}|{peerInfo.ip}|{peerPorts}" + let req = &"{serverConn.probedIp}|{myPorts}|{peerInfo.ip}|{peerPorts}" let pResp = await punchdConn.sendRequest("tcp-syni-connect", req, progressCb) result = pResp.sock proc initServerConnection(serverHostname: string, serverPort: Port, - myPort: Port, probePorts: bool): - Future[ServerConnection] {.async.} = - if probePorts: - for i in 0 .. 1: - let sock = newAsyncSocket() - sock.setSockOpt(OptReuseAddr, true) - sock.bindAddr(myPort) - await sock.connect(serverHostname, serverPort) - let line = await sock.recvLine(maxLength = 400) - let endpoint = parseMessage[NotifyEndpoint](line) - result.probedPorts.add(endpoint.port) - sock.close() + probingPort: Port): Future[ServerConnection] {.async.} = + for i in 0 .. 1: + # FIXME: error handling + let sock = newAsyncSocket() + sock.bindAddr(probingPort) + await sock.connect(serverHostname, serverPort) + let id = rand(uint32) + await sock.send(&"get-endpoint|{id}\n") + let line = await sock.recvLine(maxLength = 400) + let args = line.parseArgs(3) + assert(args[0] == "ok") + assert(args[1] == $id) + let endpoint = parseMessage[OkGetEndpoint](args[2]) + result.probedIp = endpoint.ip + result.probedPorts.add(endpoint.port) + let emptyLine = await sock.recvLine(maxLength = 400) + assert(emptyLine.len == 0) + sock.close() + result.sock = await asyncnet.dial(serverHostname, serverPort) - let line = await result.sock.recvLine(maxLength = 400) - result.probedIp = parseMessage[NotifyEndpoint](line).ip result.outMessages = newTable[string, Future[string]]() result.peerNotifications = newFutureStream[string]("initServerConnection") @@ -191,7 +200,7 @@ proc runApp(serverHostname: string, serverPort: Port, peerId: string, if otherPeerId.len == 0: # register and wait for connections let serverConn = await initServerConnection(serverHostname, serverPort, - Port(4321), true) + Port(4321)) asyncCheck handleServerMessages(serverConn) asyncCheck handlePeerNotifications(serverConn, punchdConn, peerId) let myPorts = (@[Port(4321)] & serverConn.probedPorts).join(",") @@ -209,11 +218,10 @@ proc runApp(serverHostname: string, serverPort: Port, peerId: string, else: # initiate a new connection - let serverConn = await initServerConnection(serverHostname, serverPort, - Port(1234), true) + var serverConn = await initServerConnection(serverHostname, serverPort, + Port(1234)) asyncCheck handleServerMessages(serverConn) - let sock = await punchHole(punchdConn, serverConn, serverConn.probedIp, - peerId, otherPeerId) + let sock = await punchHole(punchdConn, serverConn, peerId, otherPeerId) await sock.send("ping") let msg = await sock.recv(1000) echo "received message: ", msg diff --git a/examples/rendezvous_server/rendezvous_server.nim b/examples/rendezvous_server/rendezvous_server.nim index 175f676..29008ea 100644 --- a/examples/rendezvous_server/rendezvous_server.nim +++ b/examples/rendezvous_server/rendezvous_server.nim @@ -60,14 +60,17 @@ proc probePublicIp(): Future[IpAddress] {.async.} = proc removeClient(clients: TableRef[string, Client], peerId: string) = if peerId.len > 0: clients.del(peerId) -proc processClient(client: AsyncSocket, - clients: TableRef[string, Client]) {.async.} = +proc sendEndpoint(client: AsyncSocket, requestId: string) {.async.} = let (address, port) = client.getPeerAddr() var ipAddr = parseIpAddress(address) if ipAddr.isPrivateIp() and ipAddr.isInNetwork(fromIpAddress(getPrimaryIPAddr(ipAddr))): ipAddr = await probePublicIp() - await client.send(&"notify-endpoint|{ipAddr}|{port.int}\n") + await client.send(&"ok|{requestId}|{ipAddr}|{port.int}\n") + client.close + +proc processClient(client: AsyncSocket, + clients: TableRef[string, Client]) {.async.} = var id = "" var peerId = "" while true: @@ -77,25 +80,35 @@ proc processClient(client: AsyncSocket, removeClient(clients, peerId) break try: - let args = line.parseArgs(3) + let args = line.parseArgs(3, 1) id = args[1] case args[0]: of "register": let req = parseMessage[Register](args[2]) + echo "register: ", req peerId = req.peerId clients[peerId] = Client(sock: client, ip: req.ip, ports: req.ports) asyncCheck client.send(&"ok|{id}\n") + of "get-endpoint": + echo "get-endpoint" + asyncCheck client.sendEndpoint(id) + removeClient(clients, peerId) + break + # FIXME: get-peerinfo of "get-info": let req = parseMessage[GetInfo](args[2]) + echo "get-info: ", req let peer = clients[req.peerId] let peerPorts = peer.ports.join(",") asyncCheck client.send(&"ok|{id}|{peer.ip}|{peerPorts}\n") of "notify-peer": let req = parseMessage[NotifyPeer](args[2]) + echo "notify-peer: ", req let recipient = clients[req.recipient] asyncCheck recipient.sock.send(&"notify-peer|{req.sender}|{req.recipient}|{req.data}\n") asyncCheck client.send(&"ok|{id}\n") else: + echo "invalid request" client.close() removeClient(clients, peerId) break @@ -104,6 +117,7 @@ proc processClient(client: AsyncSocket, asyncCheck client.send(&"error|{id}|peer not registered\n") except ValueError: + echo "invalid message" client.close removeClient(clients, peerId) break