86 lines
3.2 KiB
Nim
86 lines
3.2 KiB
Nim
import asyncdispatch, asyncnet, net, port_prediction, strformat
|
|
|
|
from nativesockets import SockAddr, SockAddr_storage, SockLen
|
|
from sequtils import any
|
|
|
|
type
|
|
Attempt = object
|
|
## A hole punching attempt.
|
|
srcPort: Port
|
|
dstIp: IpAddress
|
|
dstPorts: seq[Port]
|
|
future: Future[Port]
|
|
|
|
Puncher* = ref object
|
|
sock: AsyncSocket
|
|
attempts: seq[Attempt]
|
|
|
|
PunchHoleError* = object of ValueError
|
|
|
|
const Timeout = 3000
|
|
|
|
proc `==`(a, b: Attempt): bool =
|
|
## ``==`` for hole punching attempts.
|
|
##
|
|
## Two hole punching attempts are considered equal if their ``srcPort`` and
|
|
## ``dstIp`` are equal and their ``dstPorts`` overlap.
|
|
a.srcPort == b.srcPort and a.dstIp == b.dstIp and
|
|
a.dstPorts.any(proc (p: Port): bool = p in b.dstPorts)
|
|
|
|
proc initPuncher*(sock: AsyncSocket): Puncher =
|
|
Puncher(sock: sock)
|
|
|
|
proc punch(puncher: Puncher, peerIp: IpAddress, peerPort: Port,
|
|
peerProbedPorts: seq[Port], msg: string): Future[Port] {.async.} =
|
|
let punchFuture = newFuture[Port]("punch")
|
|
let predictedDstPorts = predictPortRange(peerPort, peerProbedPorts)
|
|
let (_, myPort) = puncher.sock.getLocalAddr()
|
|
let attempt = Attempt(srcPort: myPort, dstIp: peerIp,
|
|
dstPorts: predictedDstPorts, future: punchFuture)
|
|
if puncher.attempts.contains(attempt):
|
|
raise newException(PunchHoleError,
|
|
"hole punching for given parameters already active")
|
|
puncher.attempts.add(attempt)
|
|
echo &"sending msg {msg} to {peerIp}, predicted ports: {attempt.dstPorts}"
|
|
var peerAddr: Sockaddr_storage
|
|
var peerSockLen: SockLen
|
|
try:
|
|
for dstPort in attempt.dstPorts:
|
|
toSockAddr(attempt.dstIp, dstPort, peerAddr, peerSockLen)
|
|
# TODO: replace asyncdispatch.sendTo with asyncnet.sendTo (Nim 1.4 required)
|
|
await sendTo(puncher.sock.getFd().AsyncFD, msg.cstring, msg.len,
|
|
cast[ptr SockAddr](addr peerAddr), peerSockLen)
|
|
await punchFuture or sleepAsync(Timeout)
|
|
if punchFuture.finished():
|
|
result = punchFuture.read()
|
|
else:
|
|
raise newException(PunchHoleError, "timeout")
|
|
except OSError as e:
|
|
raise newException(PunchHoleError, e.msg)
|
|
|
|
proc initiate*(puncher: Puncher, peerIp: IpAddress, peerPort: Port,
|
|
peerProbedPorts: seq[Port]): Future[Port] =
|
|
punch(puncher, peerIp, peerPort, peerProbedPorts, "SYN")
|
|
|
|
proc respond*(puncher: Puncher, peerIp: IpAddress, peerPort: Port,
|
|
peerProbedPorts: seq[Port]): Future[Port] =
|
|
punch(puncher, peerIp, peerPort, peerProbedPorts, "ACK")
|
|
|
|
proc handleMsg*(puncher: Puncher, msg: string, peerIp: IpAddress,
|
|
peerPort: Port) =
|
|
## Handles an incoming UDP message which may complete the Futures returned by
|
|
## ``initiate`` and ``respond``.
|
|
let (_, myPort) = puncher.sock.getLocalAddr()
|
|
let query = Attempt(srcPort: myPort, dstIp: peerIp, dstPorts: @[peerPort])
|
|
let i = puncher.attempts.find(query)
|
|
if i != -1:
|
|
puncher.attempts[i].future.complete(peerPort)
|
|
puncher.attempts.del(i)
|
|
|
|
proc handleMsg*(puncher: Puncher, msg: string,
|
|
peerAddr: SockAddr | Sockaddr_storage, peerSockLen: SockLen) =
|
|
var peerIp: IpAddress
|
|
var peerPort: Port
|
|
fromSockAddr(peerAddr, peerSockLen, peerIp, peerPort)
|
|
handleMsg(puncher, msg, peerIp, peerPort)
|