From 7d3159405479c7b31023f51cafc8950f38090f06 Mon Sep 17 00:00:00 2001 From: Christian Ulrich Date: Wed, 18 Nov 2020 16:34:14 +0100 Subject: [PATCH] change puncher API so we can notify the peer after sending out the SYN packets --- puncher.nim | 44 +++++++++++++++++++++++--------------------- quicp2p.nim | 10 ++++++---- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/puncher.nim b/puncher.nim index a62406b..b1c530c 100644 --- a/puncher.nim +++ b/puncher.nim @@ -9,12 +9,12 @@ from nativesockets import from sequtils import any type - Attempt = object + Attempt* = object ## A hole punching attempt. - srcPort: Port - dstIp: IpAddress - dstPorts: seq[Port] - future: Future[Port] + srcPort*: Port + dstIp*: IpAddress + dstPorts*: seq[Port] + future*: Future[Port] Puncher* = ref object sock: AsyncSocket @@ -39,18 +39,18 @@ proc initPuncher*(sock: AsyncSocket): Puncher = Puncher(sock: sock) proc punch(puncher: Puncher, peerIp: IpAddress, peerPort: Port, - peerProbedPorts: seq[Port], lowTTL: bool, msg: string): Future[Port] - {.async.} = + peerProbedPorts: seq[Port], lowTTL: bool, msg: string): + Future[Attempt] {.async.} = let punchFuture = newFuture[Port]("punch") let predictedDstPorts = predictPortRange(peerPort, peerProbedPorts) let (_, myPort) = puncher.sock.getLocalAddr() - let attempt = Attempt(srcPort: myPort, dstIp: peerIp, - dstPorts: predictedDstPorts, future: punchFuture) - if puncher.attempts.contains(attempt): + result = Attempt(srcPort: myPort, dstIp: peerIp, dstPorts: predictedDstPorts, + future: punchFuture) + if puncher.attempts.contains(result): raise newException(PunchHoleError, "hole punching for given parameters already active") - puncher.attempts.add(attempt) - echo &"sending msg {msg} to {peerIp}, predicted ports: {attempt.dstPorts}" + puncher.attempts.add(result) + echo &"sending msg {msg} to {peerIp}, predicted ports: {result.dstPorts}" var peerAddr: Sockaddr_storage var peerSockLen: SockLen try: @@ -58,29 +58,31 @@ proc punch(puncher: Puncher, peerIp: IpAddress, peerPort: Port, if lowTTL: defaultTTL = puncher.sock.getFd.getSockOptInt(IPPROTO_IP, IP_TTL) puncher.sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, 2) - for dstPort in attempt.dstPorts: - toSockAddr(attempt.dstIp, dstPort, peerAddr, peerSockLen) + for dstPort in result.dstPorts: + toSockAddr(result.dstIp, dstPort, peerAddr, peerSockLen) # TODO: replace asyncdispatch.sendTo with asyncnet.sendTo (Nim 1.4 required) await sendTo(puncher.sock.getFd().AsyncFD, msg.cstring, msg.len, cast[ptr SockAddr](addr peerAddr), peerSockLen) if lowTTL: puncher.sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, defaultTTL) - await punchFuture or sleepAsync(Timeout) - if punchFuture.finished(): - result = punchFuture.read() - else: - raise newException(PunchHoleError, "timeout") except OSError as e: raise newException(PunchHoleError, e.msg) proc initiate*(puncher: Puncher, peerIp: IpAddress, peerPort: Port, - peerProbedPorts: seq[Port]): Future[Port] = + peerProbedPorts: seq[Port]): Future[Attempt] = punch(puncher, peerIp, peerPort, peerProbedPorts, true, "SYN") proc respond*(puncher: Puncher, peerIp: IpAddress, peerPort: Port, - peerProbedPorts: seq[Port]): Future[Port] = + peerProbedPorts: seq[Port]): Future[Attempt] = punch(puncher, peerIp, peerPort, peerProbedPorts, false, "ACK") +proc finalize*(attempt: Attempt): Future[Port] {.async.} = + await attempt.future or sleepAsync(Timeout) + if attempt.future.finished: + result = attempt.future.read() + else: + raise newException(PunchHoleError, "timeout") + proc handleMsg*(puncher: Puncher, msg: string, peerIp: IpAddress, peerPort: Port) = ## Handles an incoming UDP message which may complete the Futures returned by diff --git a/quicp2p.nim b/quicp2p.nim index 4e08eae..f7c0a0c 100644 --- a/quicp2p.nim +++ b/quicp2p.nim @@ -336,8 +336,9 @@ proc receive(ctx: QuicP2PContext, peerId: string) {.async.} = proc handleNotification(ctx: QuicP2PContext, notification: NotifyPeer) {.async.} = - let _ = await ctx.puncher.respond(notification.srcIp, notification.srcPort, - notification.probedsrcPorts) + let attempt = await ctx.puncher.respond(notification.srcIp, notification.srcPort, + notification.probedsrcPorts) + discard await attempt.finalize() proc runApp(ctx: QuicP2PContext, srcPort: Port, peerId: string) {.async.} = let serverConn = await initServerConnection(rendezvousServers[0].hostname, @@ -370,9 +371,10 @@ proc runApp(ctx: QuicP2PContext, srcPort: Port, peerId: string) {.async.} = let myProbedPorts = serverConn.probedSrcPorts.join(",") let peerProbedPorts = peerInfo.probedPorts.join(",") let req = &"{ctx.getPeerId()}|{peerId}|{serverConn.probedIp}|{srcPort}|{myProbedPorts}|{peerInfo.ip}|{peerInfo.localPort}|{peerProbedPorts}" + let attempt = await ctx.puncher.initiate(peerInfo.ip, peerInfo.localPort, + peerInfo.probedPorts) discard await serverConn.sendRequest("notify-peer", req) - let peerPort = await ctx.puncher.initiate(peerInfo.ip, peerInfo.localPort, - peerInfo.probedPorts) + let peerPort = await attempt.finalize() initiateQuicConnection(ctx, peerId, peerInfo.ip, peerPort) proc main() =