punchd/udp.nim

119 lines
4.3 KiB
Nim

from nativesockets import Protocol, SockType
from net import IpAddress, Port, `$`, `==`
import asyncdispatch
import asyncnet
import message
import options
import port_prediction
import puncher
import strformat
import utils
export puncher
type
UdpPuncher* = ref object of Puncher
InitiateRequest = object
srcIp: IpAddress
srcPort: Port
probedSrcPorts: seq[Port]
dstIp: IpAddress
dstPort: Port
probedDstPorts: seq[Port]
RespondRequest = object
dstIp: IpAddress
dstPort: Port
probedDstPorts: seq[Port]
srcIp: IpAddress
srcPort: Port
probedSrcPorts: seq[Port]
extraArgs: string
proc doInitiate(srcIp: IpAddress, srcPort: Port, dstIp: IpAddress,
dstPort: Port, initiateFuture: Future[AsyncSocket]) {.async.} =
let sock = newAsyncSocket(sockType = SOCK_DGRAM, protocol = IPPROTO_UDP)
sock.bindAddr(srcPort, $srcIp)
await sock.connect($dstIp, dstPort)
await sock.send("SYN")
let recvFuture = sock.recv(6) # "SYNACK"
await recvFuture or sleepAsync(Timeout)
if recvFuture.finished():
let msg = recvFuture.read()
echo &"connection {srcIp}:{srcPort.int} -> {dstIp}:{dstPort.int} succeeded: ", msg
await sock.send("ACK")
initiateFuture.complete(sock)
else:
sock.close()
echo &"connection {srcIp}:{srcPort.int} -> {dstIp}:{dstPort.int} timed out"
proc doRespond(srcIp: IpAddress, srcPort: Port, dstIp: IpAddress,
dstPort: Port, respondFuture: Future[AsyncSocket]) {.async.} =
let sock = newAsyncSocket(sockType = SOCK_DGRAM, protocol = IPPROTO_UDP)
sock.bindAddr(srcPort, $srcIp)
await sock.connect($dstIp, dstPort)
await sock.send("SYNACK")
let recvFuture = sock.recv(3) # "ACK"
await recvFuture or sleepAsync(Timeout)
if recvFuture.finished():
let msg = recvFuture.read()
echo &"connection {srcIp}:{srcPort.int} -> {dstIp}:{dstPort.int} succeeded: ", msg
respondFuture.complete(sock)
else:
echo &"connection {srcIp}:{srcPort.int} -> {dstIp}:{dstPort.int} timed out"
proc initUdpPuncher*(): UdpPuncher =
UdpPuncher()
method parseInitiateRequest*(puncher: UdpPuncher, args: string): Attempt =
let parsed = parseMessage[InitiateRequest](args)
let localIp = getPrimaryIPAddr(parsed.dstIp)
let predictedDstPorts = predictPortRange(parsed.dstPort,
parsed.probedDstPorts)
Attempt(protocol: IPPROTO_UDP, srcIp: localIp, srcPort: parsed.srcPort,
dstIp: parsed.dstIp, dstPorts: predictedDstPorts,
acceptFuture: none(Future[AsyncSocket]))
method parseRespondRequest*(puncher: UdpPuncher, args: string): Attempt =
let parsed = parseMessage[RespondRequest](args)
let localIp = getPrimaryIPAddr(parsed.dstIp)
let predictedDstPorts = predictPortRange(parsed.dstPort,
parsed.probedDstPorts)
Attempt(protocol: IPPROTO_UDP, srcIp: localIp, srcPort: parsed.srcPort,
dstIp: parsed.dstIp, dstPorts: predictedDstPorts,
acceptFuture: none(Future[AsyncSocket]))
method initiate*(puncher: UdpPuncher, attempt: Attempt,
progress: PunchProgressCb): Future[AsyncSocket] {.async.} =
assert(attempt.acceptFuture.isNone(), "expected attempt without acceptFuture")
try:
let initiateFuture = newFuture[AsyncSocket]("initiate")
for dstPort in attempt.dstPorts:
asyncCheck doInitiate(attempt.srcIp, attempt.srcPort, attempt.dstIp,
dstPort, initiateFuture)
await progress("")
await initiateFuture or sleepAsync(Timeout)
if initiateFuture.finished():
result = initiateFuture.read()
else:
raise newException(PunchHoleError, "timeout")
except OSError as e:
raise newException(PunchHoleError, e.msg)
method respond*(puncher: UdpPuncher, attempt: Attempt):
Future[AsyncSocket] {.async.} =
assert(attempt.acceptFuture.isNone(), "expected attempt without acceptFuture")
try:
let respondFuture = newFuture[AsyncSocket]("respond")
for dstPort in attempt.dstPorts:
asyncCheck doRespond(attempt.srcIp, attempt.srcPort, attempt.dstIp,
dstPort, respondFuture)
await respondFuture or sleepAsync(Timeout)
if respondFuture.finished():
result = respondFuture.read()
else:
raise newException(PunchHoleError, "timeout")
except OSError as e:
raise newException(PunchHoleError, e.msg)