introduce new Puncher interface; implement puncher interface for tcp-syni and tcp-nutss

This commit is contained in:
Christian Ulrich 2020-10-24 18:44:37 +02:00
parent ff8fa732dc
commit 0750af1a13
No known key found for this signature in database
GPG Key ID: 8241BE099775A097
8 changed files with 442 additions and 426 deletions

View File

@ -1,12 +1,13 @@
import asyncdispatch, asyncnet, os, strformat, strutils
from nativesockets import Domain, SockType, Protocol from nativesockets import Domain, SockType, Protocol
from net import IpAddress, Port, `$`, `==`, parseIpAddress
from sequtils import filter
import asyncdispatch, asyncnet, os, strformat, strutils
import asyncutils import asyncutils
import message import message
import options
import tables import tables
import tcp_syni_initiator import tcp_syni
import tcp_syni_responder import tcp_nutss
import tcp_nutss_initiator
import tcp_nutss_responder
from strutils import format, join from strutils import format, join
from nativesockets import setSockOptInt from nativesockets import setSockOptInt
@ -14,8 +15,8 @@ from nativesockets import setSockOptInt
type type
Punchd = ref object Punchd = ref object
unixSocket: AsyncSocket unixSocket: AsyncSocket
initiators: Table[string, Initiator] punchers: Table[string, Puncher]
responders: Table[string, Responder] attempts: seq[Attempt]
Sigint = object of CatchableError Sigint = object of CatchableError
@ -30,23 +31,87 @@ proc sendToClient(unixSock: AsyncSocket, msg: string,
let unixFd = unixSock.getFd.AsyncFD let unixFd = unixSock.getFd.AsyncFD
await unixFd.asyncSendMsg(msg, cmsgs) await unixFd.asyncSendMsg(msg, cmsgs)
proc findAttemptsByLocalAddr(punchd: Punchd, srcIp: IpAddress,
srcPort: Port): seq[Attempt] =
proc matchesLocalAddr(a: Attempt): bool =
a.srcIp == srcIp and a.srcPort == srcPort
punchd.attempts.filter(matchesLocalAddr)
proc acceptConnections(punchd: Punchd, ip: IpAddress, port: Port,
protocol: Protocol) {.async.} =
var sockType: SockType
case protocol:
of IPPROTO_TCP:
sockType = SOCK_STREAM
of IPPROTO_UDP:
sockType = SOCK_DGRAM
else:
assert(false, "can only accept TCP or UDP connections")
let sock = newAsyncSocket(sockType = sockType, protocol = protocol)
sock.bindAddr(port, $ip)
sock.listen()
while true:
let acceptFuture = sock.accept()
await acceptFuture or sleepAsync(Timeout)
if acceptFuture.finished():
let peer = acceptFuture.read()
let (peerAddr, peerPort) = peer.getPeerAddr()
let peerIp = parseIpAddress(peerAddr)
let query = Attempt(srcIp: ip, srcPort: port, dstIp: peerIp,
dstPorts: @[peerPort])
let i = punchd.attempts.find(query)
if i == -1:
echo "Accepted connection, but no attempt found. Discarding."
peer.close()
continue
else:
let acceptFuture = punchd.attempts[i].acceptFuture.get()
acceptFuture.complete(peer)
let localAddrMatches = punchd.findAttemptsByLocalAddr(ip, port)
if localAddrMatches.len() <= 1:
break
sock.close()
proc addAttempt(punchd: Punchd, attempt: Attempt, puncher: Puncher) =
let localAddrMatches = punchd.findAttemptsByLocalAddr(attempt.srcIp,
attempt.srcPort)
punchd.attempts.add(attempt)
if localAddrMatches.len() == 0:
if attempt.acceptFuture.isSome():
asyncCheck punchd.acceptConnections(attempt.srcIp, attempt.srcPort,
puncher.getProtocol())
elif localAddrMatches.contains(attempt):
raise newException(PunchHoleError,
"hole punching for given parameters already active")
proc removeAttempt(punchd: Punchd, attempt: Attempt) =
punchd.attempts.del(punchd.attempts.find(attempt))
proc handleRequest(punchd: Punchd, line: string, proc handleRequest(punchd: Punchd, line: string,
unixSock: AsyncSocket) {.async.} = unixSock: AsyncSocket) {.async.} =
var id: string var id: string
var sock: AsyncSocket var sock: AsyncSocket
var attempt: Attempt
try: try:
let args = line.parseArgs(4) let args = line.parseArgs(4)
id = args[1] id = args[1]
let puncher = punchd.punchers[args[2]]
case args[0]: case args[0]:
of "initiate": of "initiate":
attempt = puncher.parseInitiateRequest(args[3])
punchd.addAttempt(attempt, puncher)
proc progress(extraArgs: string) {.async.} = proc progress(extraArgs: string) {.async.} =
let msg = &"progress|{id}|{args[2]}|{args[3]}|{extraArgs}\n" let msg = &"progress|{id}|{args[2]}|{args[3]}|{extraArgs}\n"
await sendToClient(unixSock, msg) await sendToClient(unixSock, msg)
sock = await punchd.initiators[args[2]].initiate(args[3], progress) sock = await puncher.initiate(attempt, progress)
punchd.removeAttempt(attempt)
of "respond": of "respond":
sock = await punchd.responders[args[2]].respond(args[3]) attempt = puncher.parseRespondRequest(args[3])
punchd.addAttempt(attempt, puncher)
sock = await puncher.respond(attempt)
punchd.removeAttempt(attempt)
else: else:
raise newException(ValueError, "invalid request") raise newException(ValueError, "invalid request")
@ -55,6 +120,7 @@ proc handleRequest(punchd: Punchd, line: string,
sock.close() sock.close()
except PunchHoleError as e: except PunchHoleError as e:
punchd.removeAttempt(attempt)
await sendToClient(unixSock, &"error|{id}|{e.msg}\n") await sendToClient(unixSock, &"error|{id}|{e.msg}\n")
except KeyError, ValueError: except KeyError, ValueError:
unixSock.close unixSock.close
@ -84,19 +150,15 @@ proc main() =
{fpUserRead, fpUserWrite, fpGroupRead, fpGroupWrite, {fpUserRead, fpUserWrite, fpGroupRead, fpGroupWrite,
fpOthersRead, fpOthersWrite}) fpOthersRead, fpOthersWrite})
let punchd = Punchd(unixSocket: unixSocket) let punchd = Punchd(unixSocket: unixSocket)
punchd.initiators["tcp-syni"] = initTcpSyniInitiator() punchd.punchers["tcp-syni"] = initTcpSyniPuncher()
punchd.initiators["tcp-nutss"] = initTcpNutssInitiator() punchd.punchers["tcp-nutss"] = initTcpNutssPuncher()
punchd.responders["tcp-syni"] = initTcpSyniResponder()
punchd.responders["tcp-nutss"] = initTcpNutssResponder()
asyncCheck handleUsers(punchd) asyncCheck handleUsers(punchd)
try: try:
runForever() runForever()
except Sigint: except Sigint:
for i in punchd.initiators.values: while punchd.attempts.len() != 0:
waitFor i.cleanup() waitFor punchd.attempts.pop().cleanup()
for r in punchd.responders.values:
waitFor r.cleanup()
punchd.unixSocket.close() punchd.unixSocket.close()
removeFile(PunchdSocket) removeFile(PunchdSocket)

View File

@ -1,54 +1,97 @@
import asyncdispatch, asyncnet, strformat import asyncdispatch, asyncnet, strformat, options
from nativesockets import Protocol
from net import IpAddress, Port, `$`, `==` from net import IpAddress, Port, `$`, `==`
from sequtils import any from sequtils import any
import asyncutils import asyncutils
type type
Attempt* = ref object of RootObj Attempt* = ref object of RootObj
## A hole punching attempt.
##
## It is created by parsing the arguments of a request in either
## ``parseInitiateRequest`` or ``parseRespondRequest`` and must not be
## modified afterwards. By inclduing ``some(acceptFuture)``, the puncher
## tells the caller to accept connections at the local IP address and
## port ``srcPort`` before calling ``initiate``. The desired transport
## protocol can be obtained by calling ``getProcotol``. The puncher expects
## the caller to complete the future when a connections from
## ``dstIp``:``dstPort`` has been accepted.
srcIp*: IpAddress srcIp*: IpAddress
srcPort*: Port srcPort*: Port
dstIp*: IpAddress dstIp*: IpAddress
dstPorts*: seq[Port] dstPorts*: seq[Port]
acceptFuture*: Option[Future[AsyncSocket]]
Puncher* = ref object of RootObj Puncher* = ref object of RootObj
attempts*: seq[Attempt] ## A hole puncher.
Initiator* = ref object of Puncher
Responder* = ref object of Puncher
PunchHoleError* = object of ValueError PunchHoleError* = object of ValueError
attempt: Attempt
## An exception indicating that the contained ``attempt`` has failed.
PunchProgressCb* = proc(extraArgs: string) {.async.} PunchProgressCb* = proc(extraArgs: string) {.async.}
## A callback to allow a puncher report progress.
##
## When called a status message of type ``progress``, including the given
## ``extraArgs`` has to be sent to the application.
const Timeout* = 3000 const Timeout* = 3000
proc findAttempt*(puncher: Puncher, srcIp: IpAddress, srcPort: Port, proc `==`*(a, b: Attempt): bool =
dstIp: IpAddress, dstPorts: seq[Port]): int = ## ``==`` for hole punching attempts.
for (index, attempt) in puncher.attempts.pairs(): ##
if attempt.srcIp == srcIp and attempt.srcPort == srcPort and ## Two hole punching attempts are considered equal if their ``srcIp``,
attempt.dstIp == dstIp and ## ``srcPort`` and ``dstIp`` are equal and their ``dstPorts`` overlap.
attempt.dstPorts.any(proc (p: Port): bool = p in dstPorts): a.srcIp == b.srcIp and a.srcPort == b.srcPort and a.dstIp == b.dstIp and
return index a.dstPorts.any(proc (p: Port): bool = p in b.dstPorts)
return -1
proc findAttemptsByLocalAddr*(puncher: Puncher, address: IpAddress, method cleanup*(attempt: Attempt): Future[void] {.base, async.} =
port: Port): seq[Attempt] = ## Cleans up when an attempt finished (either successful or not).
for attempt in puncher.attempts: ##
if attempt.srcIp == address and attempt.srcPort == port: ## Does nothing. Override for custom attempt types.
result.add(attempt) discard
method cleanup*(puncher: Puncher): Future[void] {.base, async.} = method getProtocol*(puncher: Puncher): Protocol {.base.} =
## Returns the transport protocol the puncher employs.
raise newException(CatchableError, "Method without implementation override")
method parseInitiateRequest*(puncher: Puncher, args: string): Attempt {.base.} =
## Creates a new hole punching attempt by parsing arguments of an ``initiate``
## request.
##
## ``args`` has to be the arguments after the ``TECHNIQUE`` field, i.e.
## ``IP_FROM|PORTS_FROM|IP_TO|PORTS_TO``. My throw a ``ValueError`` if
## ``args`` is invalid.
raise newException(CatchableError, "Method without implementation override")
method parseRespondRequest*(puncher: Puncher, args: string): Attempt {.base.} =
## Creates a new hole punching attempt by parsing arguments of a ``respond``
## request.
##
## ``args`` has to be the arguments after the ``TECHNIQUE`` field, i.e.
## ``IP_FROM|PORTS_FROM|IP_TO|PORTS_TO|EXTRA_ARGS``. May throw a
## ``ValueError`` if ``args`` is invalid.
raise newException(CatchableError, "Method without implementation override")
method initiate*(puncher: Puncher, attempt: Attempt, progress: PunchProgressCb):
Future[AsyncSocket] {.base, async.} =
## Initiate a hole punching attempt.
##
## ``attempt`` has to be obtained by calling ``parseInitiateRequest``.
## ``progress`` will be called when the attempt has been initiated and a
## status message of type ``progress`` has to be sent to the application. The
## returned future contains a connected socket and will fail with a
## ``PunchHoleError`` if the hole punching fails.
block: # workaround for https://github.com/nim-lang/Nim/issues/12530 block: # workaround for https://github.com/nim-lang/Nim/issues/12530
raise newException(CatchableError, "Method without implementation override") raise newException(CatchableError, "Method without implementation override")
method initiate*(puncher: Initiator, args: string, progress: PunchProgressCb): method respond*(puncher: Puncher, attempt: Attempt):
Future[AsyncSocket] {.base, async.} =
block: # workaround for https://github.com/nim-lang/Nim/issues/12530
raise newException(CatchableError, "Method without implementation override")
method respond*(puncher: Responder, args: string):
Future[AsyncSocket] {.base, async.} = Future[AsyncSocket] {.base, async.} =
## Respond to a hole punching attempt initiated by another peer.
##
## ``attempt has to be obtained by calling ``parseRespondRequest``. The
## returned future contains a connected socket and will fail with as
## ``PunchHoleError`` if the hole punching fails.
block: # workaround for https://github.com/nim-lang/Nim/issues/12530 block: # workaround for https://github.com/nim-lang/Nim/issues/12530
raise newException(CatchableError, "Method without implementation override") raise newException(CatchableError, "Method without implementation override")

106
tcp_nutss.nim Normal file
View File

@ -0,0 +1,106 @@
from nativesockets import Protocol
from net import IpAddress, Port, `$`, `==`
from random import randomize, rand
import asyncdispatch, asyncnet, strformat
import ip_packet
import message
import options
import port_prediction
import puncher
import raw_socket
import utils
export puncher
type
TcpNutssPuncher* = ref object of Puncher
InitiateRequest = object
srcIp: IpAddress
srcPorts: seq[Port]
dstIp: IpAddress
dstPorts: seq[Port]
RespondRequest = object
dstIp: IpAddress
dstPorts: seq[Port]
srcIp: IpAddress
srcPorts: seq[Port]
extraArgs: string
proc injectSynPackets(attempt: Attempt) {.async.} =
let injectFd = setupTcpInjectingSocket()
for dstPort in attempt.dstPorts:
let synOut = IpPacket(protocol: tcp, ipAddrSrc: attempt.srcIp,
ipAddrDst: attempt.dstIp, ipTTL: 2,
tcpPortSrc: attempt.srcPort, tcpPortDst: dstPort,
tcpSeqNumber: rand(uint32), tcpAckNumber: 0,
tcpFlags: {SYN}, tcpWindowSize: 1452 * 10)
echo &"[{synOut.ipAddrSrc}:{synOut.tcpPortSrc} -> {synOut.ipAddrDst}:{synOut.tcpPortDst}, SEQ {synOut.tcpSeqNumber}] injecting outgoing SYN"
await injectFd.injectTcpPacket(synOut)
proc connect(srcIp: IpAddress, srcPort: Port, dstIp: IpAddress, dstPort: Port,
future: Future[AsyncSocket]) {.async.} =
let sock = newAsyncSocket()
sock.setSockOpt(OptReuseAddr, true)
echo &"connect {srcIp}:{srcPort} -> {dstIp}:{dstPort}"
sock.bindAddr(srcPort, $srcIp)
try:
await sock.connect($dstIp, dstPort)
future.complete(sock)
except OSError as e:
echo &"connection {srcIP}:{srcPort.int} -> {dstIp}:{dstPort.int} failed: ", e.msg
sock.close()
proc initTcpNutssPuncher*(): TcpNutssPuncher =
randomize()
TcpNutssPuncher()
method getProtocol*(puncher: TcpNutssPuncher): Protocol =
IPPROTO_TCP
method parseInitiateRequest*(puncher: TcpNutssPuncher, args: string): Attempt =
let parsed = parseMessage[InitiateRequest](args)
let localIp = getPrimaryIPAddr(parsed.dstIp)
let predictedDstPorts = predictPortRange(parsed.dstPorts)
let acceptFuture = newFuture[AsyncSocket]("parseInitiateRequest")
Attempt(srcIp: localIp, srcPort: parsed.srcPorts[0], dstIp: parsed.dstIp,
dstPorts: predictedDstPorts, acceptFuture: some(acceptFuture))
method parseRespondRequest*(puncher: TcpNutssPuncher, args: string): Attempt =
let parsed = parseMessage[RespondRequest](args)
let localIp = getPrimaryIPAddr(parsed.dstIp)
let predictedDstPorts = predictPortRange(parsed.dstPorts)
Attempt(srcIp: localIp, srcPort: parsed.srcPorts[0], dstIp: parsed.dstIp,
dstPorts: predictedDstPorts, acceptFuture: none(Future[AsyncSocket]))
method initiate*(puncher: TcpNutssPuncher, attempt: Attempt,
progress: PunchProgressCb): Future[AsyncSocket] {.async.} =
assert(attempt.acceptFuture.isSome(), "expected attempt with acceptFuture")
try:
let acceptFuture = attempt.acceptFuture.get()
await injectSynPackets(attempt)
await progress("")
await acceptFuture or sleepAsync(Timeout)
if acceptFuture.finished():
result = acceptFuture.read()
else:
raise newException(PunchHoleError, "timeout")
except OSError as e:
raise newException(PunchHoleError, e.msg)
method respond*(puncher: TcpNutssPuncher, attempt: Attempt):
Future[AsyncSocket] {.async.} =
assert(attempt.acceptFuture.isNone(), "expected attempt without acceptFuture")
try:
let connectFuture = newFuture[AsyncSocket]("respond")
for dstPort in attempt.dstPorts:
asyncCheck connect(attempt.srcIp, attempt.srcPort, attempt.dstIp, dstPort,
connectFuture)
await connectFuture or sleepAsync(Timeout)
if connectFuture.finished():
result = connectFuture.read()
else:
raise newException(PunchHoleError, "timeout")
except OSError as e:
raise newException(PunchHoleError, e.msg)

View File

@ -1,97 +0,0 @@
import asyncdispatch, asyncnet, strformat
from net import IpAddress, Port, `$`, `==`, parseIpAddress
from random import randomize, rand
from sequtils import any
import ip_packet
import message
import port_prediction
import puncher
import raw_socket
import utils
export Puncher, Initiator, PunchProgressCb, PunchHoleError, cleanup, initiate
type
TNIAttempt = ref object of Attempt
future: Future[AsyncSocket]
TcpNutssInitiator* = ref object of Initiator
Request = object
srcIp: IpAddress
srcPorts: seq[Port]
dstIp: IpAddress
dstPorts: seq[Port]
method cleanup*(puncher: TcpNutssInitiator) {.async.} =
discard
proc initTcpNutssInitiator*(): TcpNutssInitiator =
randomize()
TcpNutssInitiator()
proc injectSynPackets(attempt: TNIAttempt) {.async.} =
let injectFd = setupTcpInjectingSocket()
for dstPort in attempt.dstPorts:
let synOut = IpPacket(protocol: tcp, ipAddrSrc: attempt.srcIp,
ipAddrDst: attempt.dstIp, ipTTL: 2,
tcpPortSrc: attempt.srcPort, tcpPortDst: dstPort,
tcpSeqNumber: rand(uint32), tcpAckNumber: 0,
tcpFlags: {SYN}, tcpWindowSize: 1452 * 10)
echo &"[{synOut.ipAddrSrc}:{synOut.tcpPortSrc} -> {synOut.ipAddrDst}:{synOut.tcpPortDst}, SEQ {synOut.tcpSeqNumber}] injecting outgoing SYN"
await injectFd.injectTcpPacket(synOut)
proc accept(puncher: TcpNutssInitiator, ip: IpAddress, port: Port) {.async.} =
let sock = newAsyncSocket()
#sock.setSockOpt(OptReuseAddr, true)
sock.bindAddr(port, $ip)
sock.listen()
while true:
let acceptFuture = sock.accept()
await acceptFuture or sleepAsync(Timeout)
if acceptFuture.finished():
let peer = acceptFuture.read()
let (peerAddr, peerPort) = peer.getPeerAddr()
let peerIp = parseIpAddress(peerAddr)
let i = puncher.findAttempt(ip, port, peerIp, @[peerPort])
if i == -1:
echo "Accepted connection, but no attempt found. Discarding."
peer.close()
continue
else:
let attempt = TNIAttempt(puncher.attempts[i])
attempt.future.complete(peer)
let attempts = puncher.findAttemptsByLocalAddr(ip, port)
if attempts.len() <= 1:
break
sock.close()
method initiate*(puncher: TcpNutssInitiator, args: string,
progressCb: PunchProgressCb): Future[AsyncSocket] {.async.} =
let req = parseMessage[Request](args)
let localIp = getPrimaryIPAddr(req.dstIp)
let existingAttempts = puncher.findAttemptsByLocalAddr(localIp, req.srcPorts[0])
if existingAttempts.len() == 0:
echo &"initiating connection to {req.dstIp}:{req.dstPorts[0].int}"
asyncCheck puncher.accept(localIp, req.srcPorts[0])
else:
for a in existingAttempts:
if a.dstIp == req.dstIp and
a.dstPorts.any(proc (p: Port): bool = p in req.dstPorts):
raise newException(PunchHoleError, "hole punching for given parameters already active")
try:
let attempt = TNIAttempt(srcIp: localIp, srcPort: req.srcPorts[0],
dstIp: req.dstIp,
dstPorts: predictPortRange(req.dstPorts),
future: newFuture[AsyncSocket]("initiate"))
puncher.attempts.add(attempt)
await attempt.injectSynPackets()
await progressCb("")
await attempt.future or sleepAsync(Timeout)
puncher.attempts.del(puncher.attempts.find(attempt))
if attempt.future.finished():
result = attempt.future.read()
else:
raise newException(PunchHoleError, "timeout")
except OSError as e:
raise newException(PunchHoleError, e.msg)

View File

@ -1,54 +0,0 @@
import asyncdispatch, asyncnet, strformat
from net import IpAddress, Port, `$`, `==`
import message
import port_prediction
import puncher
import utils
export Puncher, Responder, PunchHoleError, cleanup, respond
type
TcpNutssResponder* = ref object of Responder
Request = object
dstIp: IpAddress
dstPorts: seq[Port]
srcIp: IpAddress
srcPorts: seq[Port]
extraData: string
method cleanup*(puncher: TcpNutssResponder) {.async.} =
discard
proc initTcpNutssResponder*(): TcpNutssResponder =
TcpNutssResponder()
proc connect(srcIp: IpAddress, srcPort: Port, dstIp: IpAddress, dstPort: Port,
future: Future[AsyncSocket]) {.async.} =
let sock = newAsyncSocket()
sock.setSockOpt(OptReuseAddr, true)
echo &"connect {srcIp}:{srcPort} -> {dstIp}:{dstPort}"
sock.bindAddr(srcPort, $srcIp)
try:
await sock.connect($dstIp, dstPort)
future.complete(sock)
except OSError as e:
echo &"connection {srcIP}:{srcPort.int} -> {dstIp}:{dstPort.int} failed: ", e.msg
sock.close()
method respond*(puncher: TcpNutssResponder, args: string): Future[AsyncSocket] {.async.} =
let req = parseMessage[Request](args)
let localIp = getPrimaryIPAddr(req.dstIp)
try:
let connectFuture = newFuture[AsyncSocket]("respond")
let portRange = predictPortRange(req.dstPorts)
for dstPort in portRange:
asyncCheck connect(localIp, req.srcPorts[0], req.dstIp, req.dstPorts[0],
connectFuture)
await connectFuture or sleepAsync(Timeout)
if connectFuture.finished():
result = connectFuture.read()
else:
raise newException(PunchHoleError, "timeout")
except OSError as e:
raise newException(PunchHoleError, e.msg)

190
tcp_syni.nim Normal file
View File

@ -0,0 +1,190 @@
from nativesockets import Protocol, setSockOptInt
from net import IpAddress, Port, `$`, `==`
from random import randomize, rand
from strutils import join
import asyncdispatch, asyncnet, strformat
import ip_packet
import message
import network_interface
import options
import port_prediction
import puncher
import raw_socket
import utils
export puncher
type
TcpSyniPuncher* = ref object of Puncher
InitiateRequest = object
srcIp: IpAddress
srcPorts: seq[Port]
dstIp: IpAddress
dstPorts: seq[Port]
RespondRequest = object
dstIp: IpAddress
dstPorts: seq[Port]
srcIp: IpAddress
srcPorts: seq[Port]
seqNums: seq[uint32]
TcpSyniInitiateAttempt = ref object of Attempt
TcpSyniRespondAttempt = ref object of Attempt
seqNums: seq[uint32]
var IPPROTO_IP {.importc: "IPPROTO_IP", header: "<netinet/in.h>".}: cint
var IP_TTL {.importc: "IP_TTL", header: "<netinet/in.h>".}: cint
proc captureSeqNumbers(attempt: Attempt, cb: PunchProgressCb) {.async.} =
# FIXME: timeout?
let iface = getNetworkInterface(attempt.srcIp)
let captureFd = setupEthernetCapturingSocket(iface)
var seqNums = newSeq[uint32]()
while seqNums.len < attempt.dstPorts.len:
let packet = await captureFd.recv(4000)
if packet == "":
break
let parsed = parseEthernetPacket(packet)
if parsed.protocol == tcp and
parsed.ipAddrSrc == attempt.srcIp and
parsed.tcpPortSrc.int == attempt.srcPort.int and
parsed.ipAddrDst == attempt.dstIp and
parsed.tcpFlags == {SYN}:
for port in attempt.dstPorts:
if parsed.tcpPortDst.int == port.int:
seqNums.add(parsed.tcpSeqNumber)
break
closeSocket(captureFd)
await cb(seqNums.join(","))
proc captureAndResendAck(attempt: Attempt) {.async.} =
let iface = getNetworkInterface(attempt.srcIp)
let captureFd = setupEthernetCapturingSocket(iface)
let injectFd = setupTcpInjectingSocket()
block loops:
while true:
let packet = await captureFd.recv(4000)
if packet == "":
break
var parsed = parseEthernetPacket(packet)
if parsed.protocol == tcp and
parsed.ipAddrSrc == attempt.srcIp and
parsed.tcpPortSrc.int == attempt.srcPort.int and
parsed.ipAddrDst == attempt.dstIp and
parsed.tcpFlags == {ACK}:
for port in attempt.dstPorts:
if parsed.tcpPortDst.int == port.int:
parsed.ipTTL = 64
echo &"[{parsed.ipAddrSrc}:{parsed.tcpPortSrc.int} -> {parsed.ipAddrDst}:{parsed.tcpPortDst}, SEQ {parsed.tcpSeqNumber}] resending ACK with TTL {parsed.ipTTL}"
await injectFd.injectTcpPacket(parsed)
break loops
closeSocket(captureFd)
closeSocket(injectFd)
proc connect(srcIp: IpAddress, srcPort: Port, dstIp: IpAddress, dstPort: Port,
future: Future[AsyncSocket]) {.async.} =
let sock = newAsyncSocket()
sock.setSockOpt(OptReuseAddr, true)
sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, 2)
echo &"connect {srcIp}:{srcPort} -> {dstIp}:{dstPort}"
sock.bindAddr(srcPort, $srcIp)
try:
await sock.connect($dstIp, dstPort)
sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, 64)
future.complete(sock)
except OSError as e:
echo &"connection {srcIP}:{srcPort.int} -> {dstIp}:{dstPort.int} failed: ", e.msg
sock.close()
proc injectSynPackets(attempt: TcpSyniRespondAttempt) {.async.} =
let injectFd = setupTcpInjectingSocket()
for dstPort in attempt.dstPorts:
let synOut = IpPacket(protocol: tcp, ipAddrSrc: attempt.srcIp,
ipAddrDst: attempt.dstIp, ipTTL: 2,
tcpPortSrc: attempt.srcPort, tcpPortDst: dstPort,
tcpSeqNumber: rand(uint32), tcpAckNumber: 0,
tcpFlags: {SYN}, tcpWindowSize: 1452 * 10)
echo &"[{synOut.ipAddrSrc}:{synOut.tcpPortSrc} -> {synOut.ipAddrDst}:{synOut.tcpPortDst}, SEQ {synOut.tcpSeqNumber}] injecting outgoing SYN"
await injectFd.injectTcpPacket(synOut)
for seqNum in attempt.seqNums:
let synIn = IpPacket(protocol: tcp, ipAddrSrc: attempt.dstIp,
ipAddrDst: attempt.srcIp, ipTTL: 64,
tcpPortSrc: dstPort,
tcpPortDst: attempt.srcPort,
tcpSeqNumber: seqNum, tcpAckNumber: 0,
tcpFlags: {SYN}, tcpWindowSize: 1452 * 10)
echo &"[{synIn.ipAddrSrc}:{synIn.tcpPortSrc} -> {synIn.ipAddrDst}:{synIn.tcpPortDst}, SEQ {synIn.tcpSeqNumber}] injecting incoming SYN"
await injectFd.injectTcpPacket(synIn)
closeSocket(injectFd)
proc initTcpSyniPuncher*(): TcpSyniPuncher =
randomize()
TcpSyniPuncher()
method cleanup*(attempt: TcpSyniInitiateAttempt) {.async.} =
await deleteFirewallRules(attempt)
method cleanup*(attempt: TcpSyniRespondAttempt) {.async.} =
await deleteFirewallRules(attempt)
method getProtocol*(puncher: TcpSyniPuncher): Protocol =
IPPROTO_TCP
method parseInitiateRequest*(puncher: TcpSyniPuncher, args: string): Attempt =
let parsed = parseMessage[InitiateRequest](args)
let localIp = getPrimaryIPAddr(parsed.dstIp)
let predictedDstPorts = predictPortRange(parsed.dstPorts)
TcpSyniInitiateAttempt(srcIp: localIp, srcPort: parsed.srcPorts[0],
dstIp: parsed.dstIp, dstPorts: predictedDstPorts,
acceptFuture: none(Future[AsyncSocket]))
method parseRespondRequest*(puncher: TcpSyniPuncher, args: string): Attempt =
let parsed = parseMessage[RespondRequest](args)
let localIp = getPrimaryIPAddr(parsed.dstIp)
let predictedDstPorts = predictPortRange(parsed.dstPorts)
let acceptFuture = newFuture[AsyncSocket]("parseRespondRequest")
TcpSyniRespondAttempt(srcIp: localIp, srcPort: parsed.srcPorts[0],
dstIp: parsed.dstIp, dstPorts: predictedDstPorts,
acceptFuture: some(acceptFuture),
seqNums: parsed.seqNums)
method initiate*(puncher: TcpSyniPuncher, attempt: Attempt,
progress: PunchProgressCb): Future[AsyncSocket] {.async.} =
assert(attempt of TcpSyniInitiateAttempt, "unexpected attempt type")
assert(attempt.acceptFuture.isNone(), "expected attempt without acceptFuture")
await addFirewallRules(attempt)
asyncCheck captureSeqNumbers(attempt, progress)
asyncCheck captureAndResendAck(attempt)
try:
let connectFuture = newFuture[AsyncSocket]("connect")
for dstPort in attempt.dstPorts:
asyncCheck connect(attempt.srcIp, attempt.srcPort, attempt.dstIp,
dstPort, connectFuture)
await connectFuture or sleepAsync(Timeout)
await deleteFirewallRules(attempt)
if connectFuture.finished():
result = connectFuture.read()
else:
raise newException(PunchHoleError, "timeout")
except OSError as e:
raise newException(PunchHoleError, e.msg)
method respond*(puncher: TcpSyniPuncher, attempt: Attempt):
Future[AsyncSocket] {.async.} =
assert(attempt of TcpSyniRespondAttempt, "unexpected attempt type")
assert(attempt.acceptFuture.isSome(), "expected attempt with acceptFuture")
try:
let acceptFuture = attempt.acceptFuture.get()
await addFirewallRules(attempt) # FIXME: needed?
await injectSynPackets(TcpSyniRespondAttempt(attempt))
await acceptFuture or sleepAsync(Timeout)
await deleteFirewallRules(attempt) # FIXME: needed?
if acceptFuture.finished():
result = acceptFuture.read()
else:
raise newException(PunchHoleError, "timeout")
except OSError as e:
raise newException(PunchHoleError, e.msg)

View File

@ -1,120 +0,0 @@
import asyncdispatch, asyncnet, strformat
from net import IpAddress, Port, `$`, `==`
from nativesockets import setSockOptInt
from strutils import join
import ip_packet
import message
import network_interface
import port_prediction
import puncher
import raw_socket
import utils
export Puncher, Initiator, PunchProgressCb, PunchHoleError, cleanup, initiate
type
TcpSyniInitiator* = ref object of Initiator
Request = object
srcIp: IpAddress
srcPorts: seq[Port]
dstIp: IpAddress
dstPorts: seq[Port]
var IPPROTO_IP {.importc: "IPPROTO_IP", header: "<netinet/in.h>".}: cint
var IP_TTL {.importc: "IP_TTL", header: "<netinet/in.h>".}: cint
method cleanup*(puncher: TcpSyniInitiator): Future[void] {.async.} =
while puncher.attempts.len() != 0:
await puncher.attempts.pop().deleteFirewallRules()
proc initTcpSyniInitiator*(): TcpSyniInitiator =
TcpSyniInitiator()
proc captureSeqNumbers(attempt: Attempt, cb: PunchProgressCb) {.async.} =
# FIXME: timeout?
let iface = getNetworkInterface(attempt.srcIp)
let captureFd = setupEthernetCapturingSocket(iface)
var seqNums = newSeq[uint32]()
while seqNums.len < attempt.dstPorts.len:
let packet = await captureFd.recv(4000)
if packet == "":
break
let parsed = parseEthernetPacket(packet)
if parsed.protocol == tcp and
parsed.ipAddrSrc == attempt.srcIp and
parsed.tcpPortSrc.int == attempt.srcPort.int and
parsed.ipAddrDst == attempt.dstIp and
parsed.tcpFlags == {SYN}:
for port in attempt.dstPorts:
if parsed.tcpPortDst.int == port.int:
seqNums.add(parsed.tcpSeqNumber)
break
closeSocket(captureFd)
await cb(seqNums.join(","))
proc captureAndResendAck(attempt: Attempt) {.async.} =
let iface = getNetworkInterface(attempt.srcIp)
let captureFd = setupEthernetCapturingSocket(iface)
let injectFd = setupTcpInjectingSocket()
block loops:
while true:
let packet = await captureFd.recv(4000)
if packet == "":
break
var parsed = parseEthernetPacket(packet)
if parsed.protocol == tcp and
parsed.ipAddrSrc == attempt.srcIp and
parsed.tcpPortSrc.int == attempt.srcPort.int and
parsed.ipAddrDst == attempt.dstIp and
parsed.tcpFlags == {ACK}:
for port in attempt.dstPorts:
if parsed.tcpPortDst.int == port.int:
parsed.ipTTL = 64
echo &"[{parsed.ipAddrSrc}:{parsed.tcpPortSrc.int} -> {parsed.ipAddrDst}:{parsed.tcpPortDst}, SEQ {parsed.tcpSeqNumber}] resending ACK with TTL {parsed.ipTTL}"
await injectFd.injectTcpPacket(parsed)
break loops
closeSocket(captureFd)
closeSocket(injectFd)
proc connect(srcIp: IpAddress, srcPort: Port, dstIp: IpAddress, dstPort: Port,
future: Future[AsyncSocket]) {.async.} =
let sock = newAsyncSocket()
sock.setSockOpt(OptReuseAddr, true)
sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, 2)
echo &"connect {srcIp}:{srcPort} -> {dstIp}:{dstPort}"
sock.bindAddr(srcPort, $srcIp)
try:
await sock.connect($dstIp, dstPort)
sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, 64)
future.complete(sock)
except OSError as e:
echo &"connection {srcIP}:{srcPort.int} -> {dstIp}:{dstPort.int} failed: ", e.msg
sock.close()
method initiate*(puncher: TcpSyniInitiator, args: string,
progressCb: PunchProgressCb): Future[AsyncSocket] {.async.} =
let req = parseMessage[Request](args)
let localIp = getPrimaryIPAddr(req.dstIp)
if puncher.findAttempt(localIp, req.srcPorts[0], req.dstIp, req.dstPorts) != -1:
raise newException(PunchHoleError, "hole punching for given parameters already active")
let attempt = Attempt(srcIp: localIp, srcPort: req.srcPorts[0], dstIp: req.dstIp,
dstPorts: predictPortRange(req.dstPorts))
puncher.attempts.add(attempt)
await attempt.addFirewallRules()
asyncCheck attempt.captureSeqNumbers(progressCb)
asyncCheck attempt.captureAndResendAck()
try:
let connectFuture = newFuture[AsyncSocket]("connect")
for dstPort in attempt.dstPorts:
asyncCheck connect(attempt.srcIp, attempt.srcPort, attempt.dstIp,
dstPort, connectFuture)
await connectFuture or sleepAsync(Timeout)
await attempt.deleteFirewallRules()
puncher.attempts.del(puncher.attempts.find(attempt))
if connectFuture.finished():
result = connectFuture.read()
else:
raise newException(PunchHoleError, "timeout")
except OSError as e:
raise newException(PunchHoleError, e.msg)

View File

@ -1,114 +0,0 @@
import asyncdispatch, asyncnet, strformat
from net import IpAddress, Port, `$`, `==`, parseIpAddress
from random import randomize, rand
from sequtils import any
import ip_packet
import message
import port_prediction
import puncher
import raw_socket
import utils
export Puncher, Responder, PunchHoleError, cleanup, respond
type
TSRAttempt = ref object of Attempt
seqNums: seq[uint32]
future: Future[AsyncSocket]
TcpSyniResponder* = ref object of Responder
Request = object
dstIp: IpAddress
dstPorts: seq[Port]
srcIp: IpAddress
srcPorts: seq[Port]
seqNums: seq[uint32]
method cleanup*(puncher: TcpSyniResponder) {.async.} =
while puncher.attempts.len() != 0:
await puncher.attempts.pop().deleteFirewallRules()
proc initTcpSyniResponder*(): TcpSyniResponder =
randomize()
TcpSyniResponder()
proc injectSynPackets(attempt: TSRAttempt) {.async.} =
let injectFd = setupTcpInjectingSocket()
for dstPort in attempt.dstPorts:
let synOut = IpPacket(protocol: tcp, ipAddrSrc: attempt.srcIp,
ipAddrDst: attempt.dstIp, ipTTL: 2,
tcpPortSrc: attempt.srcPort, tcpPortDst: dstPort,
tcpSeqNumber: rand(uint32), tcpAckNumber: 0,
tcpFlags: {SYN}, tcpWindowSize: 1452 * 10)
echo &"[{synOut.ipAddrSrc}:{synOut.tcpPortSrc} -> {synOut.ipAddrDst}:{synOut.tcpPortDst}, SEQ {synOut.tcpSeqNumber}] injecting outgoing SYN"
await injectFd.injectTcpPacket(synOut)
for seqNum in attempt.seqNums:
let synIn = IpPacket(protocol: tcp, ipAddrSrc: attempt.dstIp,
ipAddrDst: attempt.srcIp, ipTTL: 64,
tcpPortSrc: dstPort,
tcpPortDst: attempt.srcPort,
tcpSeqNumber: seqNum, tcpAckNumber: 0,
tcpFlags: {SYN}, tcpWindowSize: 1452 * 10)
echo &"[{synIn.ipAddrSrc}:{synIn.tcpPortSrc} -> {synIn.ipAddrDst}:{synIn.tcpPortDst}, SEQ {synIn.tcpSeqNumber}] injecting incoming SYN"
await injectFd.injectTcpPacket(synIn)
closeSocket(injectFd)
proc accept(puncher: TcpSyniResponder, srcIp: IpAddress,
srcPort: Port) {.async.} =
let sock = newAsyncSocket()
sock.setSockOpt(OptReuseAddr, true)
sock.bindAddr(srcPort, $srcIp)
sock.listen()
while true:
let acceptFuture = sock.accept()
await acceptFuture or sleepAsync(Timeout)
if acceptFuture.finished():
let peer = acceptFuture.read()
let (peerAddr, peerPort) = peer.getPeerAddr()
let peerIp = parseIpAddress(peerAddr)
let i = puncher.findAttempt(srcIp, srcPort, peerIp, @[peerPort])
if i == -1:
echo "Accepted connection, but no attempt found. Discarding."
peer.close()
continue
else:
let attempt = TSRAttempt(puncher.attempts[i])
attempt.future.complete(peer)
let attempts = puncher.findAttemptsByLocalAddr(srcIp, srcPort)
# FIXME: should attempts have timestamps, so we can decide here which ones to delete?
if attempts.len() <= 1:
break
sock.close()
method respond*(puncher: TcpSyniResponder, args: string):
Future[AsyncSocket] {.async.} =
let req = parseMessage[Request](args)
let localIp = getPrimaryIPAddr(req.dstIp)
let existingAttempts = puncher.findAttemptsByLocalAddr(localIp, req.srcPorts[0])
if existingAttempts.len() == 0:
echo &"accepting connections from {req.dstIp}:{req.dstPorts[0].int}"
asyncCheck puncher.accept(localIp, req.srcPorts[0])
else:
for a in existingAttempts:
if a.dstIp == req.dstIp and
a.dstPorts.any(proc (p: Port): bool = p in req.dstPorts):
raise newException(PunchHoleError, "hole punching for given parameters already active")
try:
let attempt = TSRAttempt(srcIp: localIp, srcPort: req.srcPorts[0],
dstIp: req.dstIp,
dstPorts: predictPortRange(req.dstPorts),
seqNums: req.seqNums,
future: newFuture[AsyncSocket]("respond"))
puncher.attempts.add(attempt)
await attempt.addFirewallRules() # FIXME: needed?
await attempt.injectSynPackets()
await attempt.future or sleepAsync(Timeout)
await attempt.deleteFirewallRules() # FIXME: needed?
puncher.attempts.del(puncher.attempts.find(attempt))
if attempt.future.finished():
result = attempt.future.read()
else:
raise newException(PunchHoleError, "timeout")
except OSError as e:
raise newException(PunchHoleError, e.msg)