change puncher API so we can notify the peer after sending out the SYN packets

This commit is contained in:
Christian Ulrich 2020-11-18 16:34:14 +01:00
parent 2b1cd9eeca
commit 7d31594054
No known key found for this signature in database
GPG Key ID: 8241BE099775A097
2 changed files with 29 additions and 25 deletions

View File

@ -9,12 +9,12 @@ from nativesockets import
from sequtils import any from sequtils import any
type type
Attempt = object Attempt* = object
## A hole punching attempt. ## A hole punching attempt.
srcPort: Port srcPort*: Port
dstIp: IpAddress dstIp*: IpAddress
dstPorts: seq[Port] dstPorts*: seq[Port]
future: Future[Port] future*: Future[Port]
Puncher* = ref object Puncher* = ref object
sock: AsyncSocket sock: AsyncSocket
@ -39,18 +39,18 @@ proc initPuncher*(sock: AsyncSocket): Puncher =
Puncher(sock: sock) Puncher(sock: sock)
proc punch(puncher: Puncher, peerIp: IpAddress, peerPort: Port, proc punch(puncher: Puncher, peerIp: IpAddress, peerPort: Port,
peerProbedPorts: seq[Port], lowTTL: bool, msg: string): Future[Port] peerProbedPorts: seq[Port], lowTTL: bool, msg: string):
{.async.} = Future[Attempt] {.async.} =
let punchFuture = newFuture[Port]("punch") let punchFuture = newFuture[Port]("punch")
let predictedDstPorts = predictPortRange(peerPort, peerProbedPorts) let predictedDstPorts = predictPortRange(peerPort, peerProbedPorts)
let (_, myPort) = puncher.sock.getLocalAddr() let (_, myPort) = puncher.sock.getLocalAddr()
let attempt = Attempt(srcPort: myPort, dstIp: peerIp, result = Attempt(srcPort: myPort, dstIp: peerIp, dstPorts: predictedDstPorts,
dstPorts: predictedDstPorts, future: punchFuture) future: punchFuture)
if puncher.attempts.contains(attempt): if puncher.attempts.contains(result):
raise newException(PunchHoleError, raise newException(PunchHoleError,
"hole punching for given parameters already active") "hole punching for given parameters already active")
puncher.attempts.add(attempt) puncher.attempts.add(result)
echo &"sending msg {msg} to {peerIp}, predicted ports: {attempt.dstPorts}" echo &"sending msg {msg} to {peerIp}, predicted ports: {result.dstPorts}"
var peerAddr: Sockaddr_storage var peerAddr: Sockaddr_storage
var peerSockLen: SockLen var peerSockLen: SockLen
try: try:
@ -58,29 +58,31 @@ proc punch(puncher: Puncher, peerIp: IpAddress, peerPort: Port,
if lowTTL: if lowTTL:
defaultTTL = puncher.sock.getFd.getSockOptInt(IPPROTO_IP, IP_TTL) defaultTTL = puncher.sock.getFd.getSockOptInt(IPPROTO_IP, IP_TTL)
puncher.sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, 2) puncher.sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, 2)
for dstPort in attempt.dstPorts: for dstPort in result.dstPorts:
toSockAddr(attempt.dstIp, dstPort, peerAddr, peerSockLen) toSockAddr(result.dstIp, dstPort, peerAddr, peerSockLen)
# TODO: replace asyncdispatch.sendTo with asyncnet.sendTo (Nim 1.4 required) # TODO: replace asyncdispatch.sendTo with asyncnet.sendTo (Nim 1.4 required)
await sendTo(puncher.sock.getFd().AsyncFD, msg.cstring, msg.len, await sendTo(puncher.sock.getFd().AsyncFD, msg.cstring, msg.len,
cast[ptr SockAddr](addr peerAddr), peerSockLen) cast[ptr SockAddr](addr peerAddr), peerSockLen)
if lowTTL: if lowTTL:
puncher.sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, defaultTTL) puncher.sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, defaultTTL)
await punchFuture or sleepAsync(Timeout)
if punchFuture.finished():
result = punchFuture.read()
else:
raise newException(PunchHoleError, "timeout")
except OSError as e: except OSError as e:
raise newException(PunchHoleError, e.msg) raise newException(PunchHoleError, e.msg)
proc initiate*(puncher: Puncher, peerIp: IpAddress, peerPort: Port, proc initiate*(puncher: Puncher, peerIp: IpAddress, peerPort: Port,
peerProbedPorts: seq[Port]): Future[Port] = peerProbedPorts: seq[Port]): Future[Attempt] =
punch(puncher, peerIp, peerPort, peerProbedPorts, true, "SYN") punch(puncher, peerIp, peerPort, peerProbedPorts, true, "SYN")
proc respond*(puncher: Puncher, peerIp: IpAddress, peerPort: Port, proc respond*(puncher: Puncher, peerIp: IpAddress, peerPort: Port,
peerProbedPorts: seq[Port]): Future[Port] = peerProbedPorts: seq[Port]): Future[Attempt] =
punch(puncher, peerIp, peerPort, peerProbedPorts, false, "ACK") punch(puncher, peerIp, peerPort, peerProbedPorts, false, "ACK")
proc finalize*(attempt: Attempt): Future[Port] {.async.} =
await attempt.future or sleepAsync(Timeout)
if attempt.future.finished:
result = attempt.future.read()
else:
raise newException(PunchHoleError, "timeout")
proc handleMsg*(puncher: Puncher, msg: string, peerIp: IpAddress, proc handleMsg*(puncher: Puncher, msg: string, peerIp: IpAddress,
peerPort: Port) = peerPort: Port) =
## Handles an incoming UDP message which may complete the Futures returned by ## Handles an incoming UDP message which may complete the Futures returned by

View File

@ -336,8 +336,9 @@ proc receive(ctx: QuicP2PContext, peerId: string) {.async.} =
proc handleNotification(ctx: QuicP2PContext, notification: NotifyPeer) proc handleNotification(ctx: QuicP2PContext, notification: NotifyPeer)
{.async.} = {.async.} =
let _ = await ctx.puncher.respond(notification.srcIp, notification.srcPort, let attempt = await ctx.puncher.respond(notification.srcIp, notification.srcPort,
notification.probedsrcPorts) notification.probedsrcPorts)
discard await attempt.finalize()
proc runApp(ctx: QuicP2PContext, srcPort: Port, peerId: string) {.async.} = proc runApp(ctx: QuicP2PContext, srcPort: Port, peerId: string) {.async.} =
let serverConn = await initServerConnection(rendezvousServers[0].hostname, let serverConn = await initServerConnection(rendezvousServers[0].hostname,
@ -370,9 +371,10 @@ proc runApp(ctx: QuicP2PContext, srcPort: Port, peerId: string) {.async.} =
let myProbedPorts = serverConn.probedSrcPorts.join(",") let myProbedPorts = serverConn.probedSrcPorts.join(",")
let peerProbedPorts = peerInfo.probedPorts.join(",") let peerProbedPorts = peerInfo.probedPorts.join(",")
let req = &"{ctx.getPeerId()}|{peerId}|{serverConn.probedIp}|{srcPort}|{myProbedPorts}|{peerInfo.ip}|{peerInfo.localPort}|{peerProbedPorts}" let req = &"{ctx.getPeerId()}|{peerId}|{serverConn.probedIp}|{srcPort}|{myProbedPorts}|{peerInfo.ip}|{peerInfo.localPort}|{peerProbedPorts}"
discard await serverConn.sendRequest("notify-peer", req) let attempt = await ctx.puncher.initiate(peerInfo.ip, peerInfo.localPort,
let peerPort = await ctx.puncher.initiate(peerInfo.ip, peerInfo.localPort,
peerInfo.probedPorts) peerInfo.probedPorts)
discard await serverConn.sendRequest("notify-peer", req)
let peerPort = await attempt.finalize()
initiateQuicConnection(ctx, peerId, peerInfo.ip, peerPort) initiateQuicConnection(ctx, peerId, peerInfo.ip, peerPort)
proc main() = proc main() =