diff --git a/punchd.nim b/punchd.nim index cd716b7..a3578e7 100644 --- a/punchd.nim +++ b/punchd.nim @@ -1,12 +1,13 @@ -import asyncdispatch, asyncnet, os, strformat, strutils from nativesockets import Domain, SockType, Protocol +from net import IpAddress, Port, `$`, `==`, parseIpAddress +from sequtils import filter +import asyncdispatch, asyncnet, os, strformat, strutils import asyncutils import message +import options import tables -import tcp_syni_initiator -import tcp_syni_responder -import tcp_nutss_initiator -import tcp_nutss_responder +import tcp_syni +import tcp_nutss from strutils import format, join from nativesockets import setSockOptInt @@ -14,8 +15,8 @@ from nativesockets import setSockOptInt type Punchd = ref object unixSocket: AsyncSocket - initiators: Table[string, Initiator] - responders: Table[string, Responder] + punchers: Table[string, Puncher] + attempts: seq[Attempt] Sigint = object of CatchableError @@ -30,23 +31,87 @@ proc sendToClient(unixSock: AsyncSocket, msg: string, let unixFd = unixSock.getFd.AsyncFD await unixFd.asyncSendMsg(msg, cmsgs) +proc findAttemptsByLocalAddr(punchd: Punchd, srcIp: IpAddress, + srcPort: Port): seq[Attempt] = + proc matchesLocalAddr(a: Attempt): bool = + a.srcIp == srcIp and a.srcPort == srcPort + punchd.attempts.filter(matchesLocalAddr) + +proc acceptConnections(punchd: Punchd, ip: IpAddress, port: Port, + protocol: Protocol) {.async.} = + var sockType: SockType + case protocol: + of IPPROTO_TCP: + sockType = SOCK_STREAM + of IPPROTO_UDP: + sockType = SOCK_DGRAM + else: + assert(false, "can only accept TCP or UDP connections") + let sock = newAsyncSocket(sockType = sockType, protocol = protocol) + sock.bindAddr(port, $ip) + sock.listen() + while true: + let acceptFuture = sock.accept() + await acceptFuture or sleepAsync(Timeout) + if acceptFuture.finished(): + let peer = acceptFuture.read() + let (peerAddr, peerPort) = peer.getPeerAddr() + let peerIp = parseIpAddress(peerAddr) + let query = Attempt(srcIp: ip, srcPort: port, dstIp: peerIp, + dstPorts: @[peerPort]) + let i = punchd.attempts.find(query) + if i == -1: + echo "Accepted connection, but no attempt found. Discarding." + peer.close() + continue + else: + let acceptFuture = punchd.attempts[i].acceptFuture.get() + acceptFuture.complete(peer) + let localAddrMatches = punchd.findAttemptsByLocalAddr(ip, port) + if localAddrMatches.len() <= 1: + break + sock.close() + +proc addAttempt(punchd: Punchd, attempt: Attempt, puncher: Puncher) = + let localAddrMatches = punchd.findAttemptsByLocalAddr(attempt.srcIp, + attempt.srcPort) + punchd.attempts.add(attempt) + if localAddrMatches.len() == 0: + if attempt.acceptFuture.isSome(): + asyncCheck punchd.acceptConnections(attempt.srcIp, attempt.srcPort, + puncher.getProtocol()) + elif localAddrMatches.contains(attempt): + raise newException(PunchHoleError, + "hole punching for given parameters already active") + +proc removeAttempt(punchd: Punchd, attempt: Attempt) = + punchd.attempts.del(punchd.attempts.find(attempt)) + proc handleRequest(punchd: Punchd, line: string, unixSock: AsyncSocket) {.async.} = var id: string var sock: AsyncSocket + var attempt: Attempt try: let args = line.parseArgs(4) id = args[1] + let puncher = punchd.punchers[args[2]] case args[0]: of "initiate": + attempt = puncher.parseInitiateRequest(args[3]) + punchd.addAttempt(attempt, puncher) proc progress(extraArgs: string) {.async.} = let msg = &"progress|{id}|{args[2]}|{args[3]}|{extraArgs}\n" await sendToClient(unixSock, msg) - sock = await punchd.initiators[args[2]].initiate(args[3], progress) + sock = await puncher.initiate(attempt, progress) + punchd.removeAttempt(attempt) of "respond": - sock = await punchd.responders[args[2]].respond(args[3]) + attempt = puncher.parseRespondRequest(args[3]) + punchd.addAttempt(attempt, puncher) + sock = await puncher.respond(attempt) + punchd.removeAttempt(attempt) else: raise newException(ValueError, "invalid request") @@ -55,6 +120,7 @@ proc handleRequest(punchd: Punchd, line: string, sock.close() except PunchHoleError as e: + punchd.removeAttempt(attempt) await sendToClient(unixSock, &"error|{id}|{e.msg}\n") except KeyError, ValueError: unixSock.close @@ -84,19 +150,15 @@ proc main() = {fpUserRead, fpUserWrite, fpGroupRead, fpGroupWrite, fpOthersRead, fpOthersWrite}) let punchd = Punchd(unixSocket: unixSocket) - punchd.initiators["tcp-syni"] = initTcpSyniInitiator() - punchd.initiators["tcp-nutss"] = initTcpNutssInitiator() - punchd.responders["tcp-syni"] = initTcpSyniResponder() - punchd.responders["tcp-nutss"] = initTcpNutssResponder() + punchd.punchers["tcp-syni"] = initTcpSyniPuncher() + punchd.punchers["tcp-nutss"] = initTcpNutssPuncher() asyncCheck handleUsers(punchd) try: runForever() except Sigint: - for i in punchd.initiators.values: - waitFor i.cleanup() - for r in punchd.responders.values: - waitFor r.cleanup() + while punchd.attempts.len() != 0: + waitFor punchd.attempts.pop().cleanup() punchd.unixSocket.close() removeFile(PunchdSocket) diff --git a/puncher.nim b/puncher.nim index 4a41201..a16534e 100644 --- a/puncher.nim +++ b/puncher.nim @@ -1,54 +1,97 @@ -import asyncdispatch, asyncnet, strformat +import asyncdispatch, asyncnet, strformat, options +from nativesockets import Protocol from net import IpAddress, Port, `$`, `==` from sequtils import any import asyncutils type Attempt* = ref object of RootObj + ## A hole punching attempt. + ## + ## It is created by parsing the arguments of a request in either + ## ``parseInitiateRequest`` or ``parseRespondRequest`` and must not be + ## modified afterwards. By inclduing ``some(acceptFuture)``, the puncher + ## tells the caller to accept connections at the local IP address and + ## port ``srcPort`` before calling ``initiate``. The desired transport + ## protocol can be obtained by calling ``getProcotol``. The puncher expects + ## the caller to complete the future when a connections from + ## ``dstIp``:``dstPort`` has been accepted. srcIp*: IpAddress srcPort*: Port dstIp*: IpAddress dstPorts*: seq[Port] + acceptFuture*: Option[Future[AsyncSocket]] Puncher* = ref object of RootObj - attempts*: seq[Attempt] - - Initiator* = ref object of Puncher - - Responder* = ref object of Puncher + ## A hole puncher. PunchHoleError* = object of ValueError + attempt: Attempt + ## An exception indicating that the contained ``attempt`` has failed. PunchProgressCb* = proc(extraArgs: string) {.async.} + ## A callback to allow a puncher report progress. + ## + ## When called a status message of type ``progress``, including the given + ## ``extraArgs`` has to be sent to the application. const Timeout* = 3000 -proc findAttempt*(puncher: Puncher, srcIp: IpAddress, srcPort: Port, - dstIp: IpAddress, dstPorts: seq[Port]): int = - for (index, attempt) in puncher.attempts.pairs(): - if attempt.srcIp == srcIp and attempt.srcPort == srcPort and - attempt.dstIp == dstIp and - attempt.dstPorts.any(proc (p: Port): bool = p in dstPorts): - return index - return -1 +proc `==`*(a, b: Attempt): bool = + ## ``==`` for hole punching attempts. + ## + ## Two hole punching attempts are considered equal if their ``srcIp``, + ## ``srcPort`` and ``dstIp`` are equal and their ``dstPorts`` overlap. + a.srcIp == b.srcIp and a.srcPort == b.srcPort and a.dstIp == b.dstIp and + a.dstPorts.any(proc (p: Port): bool = p in b.dstPorts) -proc findAttemptsByLocalAddr*(puncher: Puncher, address: IpAddress, - port: Port): seq[Attempt] = - for attempt in puncher.attempts: - if attempt.srcIp == address and attempt.srcPort == port: - result.add(attempt) +method cleanup*(attempt: Attempt): Future[void] {.base, async.} = + ## Cleans up when an attempt finished (either successful or not). + ## + ## Does nothing. Override for custom attempt types. + discard -method cleanup*(puncher: Puncher): Future[void] {.base, async.} = - block: # workaround for https://github.com/nim-lang/Nim/issues/12530 - raise newException(CatchableError, "Method without implementation override") +method getProtocol*(puncher: Puncher): Protocol {.base.} = + ## Returns the transport protocol the puncher employs. + raise newException(CatchableError, "Method without implementation override") -method initiate*(puncher: Initiator, args: string, progress: PunchProgressCb): +method parseInitiateRequest*(puncher: Puncher, args: string): Attempt {.base.} = + ## Creates a new hole punching attempt by parsing arguments of an ``initiate`` + ## request. + ## + ## ``args`` has to be the arguments after the ``TECHNIQUE`` field, i.e. + ## ``IP_FROM|PORTS_FROM|IP_TO|PORTS_TO``. My throw a ``ValueError`` if + ## ``args`` is invalid. + raise newException(CatchableError, "Method without implementation override") + +method parseRespondRequest*(puncher: Puncher, args: string): Attempt {.base.} = + ## Creates a new hole punching attempt by parsing arguments of a ``respond`` + ## request. + ## + ## ``args`` has to be the arguments after the ``TECHNIQUE`` field, i.e. + ## ``IP_FROM|PORTS_FROM|IP_TO|PORTS_TO|EXTRA_ARGS``. May throw a + ## ``ValueError`` if ``args`` is invalid. + raise newException(CatchableError, "Method without implementation override") + +method initiate*(puncher: Puncher, attempt: Attempt, progress: PunchProgressCb): Future[AsyncSocket] {.base, async.} = + ## Initiate a hole punching attempt. + ## + ## ``attempt`` has to be obtained by calling ``parseInitiateRequest``. + ## ``progress`` will be called when the attempt has been initiated and a + ## status message of type ``progress`` has to be sent to the application. The + ## returned future contains a connected socket and will fail with a + ## ``PunchHoleError`` if the hole punching fails. block: # workaround for https://github.com/nim-lang/Nim/issues/12530 raise newException(CatchableError, "Method without implementation override") -method respond*(puncher: Responder, args: string): +method respond*(puncher: Puncher, attempt: Attempt): Future[AsyncSocket] {.base, async.} = + ## Respond to a hole punching attempt initiated by another peer. + ## + ## ``attempt has to be obtained by calling ``parseRespondRequest``. The + ## returned future contains a connected socket and will fail with as + ## ``PunchHoleError`` if the hole punching fails. block: # workaround for https://github.com/nim-lang/Nim/issues/12530 raise newException(CatchableError, "Method without implementation override") diff --git a/tcp_nutss.nim b/tcp_nutss.nim new file mode 100644 index 0000000..01c1d2a --- /dev/null +++ b/tcp_nutss.nim @@ -0,0 +1,106 @@ +from nativesockets import Protocol +from net import IpAddress, Port, `$`, `==` +from random import randomize, rand +import asyncdispatch, asyncnet, strformat +import ip_packet +import message +import options +import port_prediction +import puncher +import raw_socket +import utils + +export puncher + +type + TcpNutssPuncher* = ref object of Puncher + + InitiateRequest = object + srcIp: IpAddress + srcPorts: seq[Port] + dstIp: IpAddress + dstPorts: seq[Port] + + RespondRequest = object + dstIp: IpAddress + dstPorts: seq[Port] + srcIp: IpAddress + srcPorts: seq[Port] + extraArgs: string + +proc injectSynPackets(attempt: Attempt) {.async.} = + let injectFd = setupTcpInjectingSocket() + for dstPort in attempt.dstPorts: + let synOut = IpPacket(protocol: tcp, ipAddrSrc: attempt.srcIp, + ipAddrDst: attempt.dstIp, ipTTL: 2, + tcpPortSrc: attempt.srcPort, tcpPortDst: dstPort, + tcpSeqNumber: rand(uint32), tcpAckNumber: 0, + tcpFlags: {SYN}, tcpWindowSize: 1452 * 10) + echo &"[{synOut.ipAddrSrc}:{synOut.tcpPortSrc} -> {synOut.ipAddrDst}:{synOut.tcpPortDst}, SEQ {synOut.tcpSeqNumber}] injecting outgoing SYN" + await injectFd.injectTcpPacket(synOut) + +proc connect(srcIp: IpAddress, srcPort: Port, dstIp: IpAddress, dstPort: Port, + future: Future[AsyncSocket]) {.async.} = + let sock = newAsyncSocket() + sock.setSockOpt(OptReuseAddr, true) + echo &"connect {srcIp}:{srcPort} -> {dstIp}:{dstPort}" + sock.bindAddr(srcPort, $srcIp) + try: + await sock.connect($dstIp, dstPort) + future.complete(sock) + except OSError as e: + echo &"connection {srcIP}:{srcPort.int} -> {dstIp}:{dstPort.int} failed: ", e.msg + sock.close() + +proc initTcpNutssPuncher*(): TcpNutssPuncher = + randomize() + TcpNutssPuncher() + +method getProtocol*(puncher: TcpNutssPuncher): Protocol = + IPPROTO_TCP + +method parseInitiateRequest*(puncher: TcpNutssPuncher, args: string): Attempt = + let parsed = parseMessage[InitiateRequest](args) + let localIp = getPrimaryIPAddr(parsed.dstIp) + let predictedDstPorts = predictPortRange(parsed.dstPorts) + let acceptFuture = newFuture[AsyncSocket]("parseInitiateRequest") + Attempt(srcIp: localIp, srcPort: parsed.srcPorts[0], dstIp: parsed.dstIp, + dstPorts: predictedDstPorts, acceptFuture: some(acceptFuture)) + +method parseRespondRequest*(puncher: TcpNutssPuncher, args: string): Attempt = + let parsed = parseMessage[RespondRequest](args) + let localIp = getPrimaryIPAddr(parsed.dstIp) + let predictedDstPorts = predictPortRange(parsed.dstPorts) + Attempt(srcIp: localIp, srcPort: parsed.srcPorts[0], dstIp: parsed.dstIp, + dstPorts: predictedDstPorts, acceptFuture: none(Future[AsyncSocket])) + +method initiate*(puncher: TcpNutssPuncher, attempt: Attempt, + progress: PunchProgressCb): Future[AsyncSocket] {.async.} = + assert(attempt.acceptFuture.isSome(), "expected attempt with acceptFuture") + try: + let acceptFuture = attempt.acceptFuture.get() + await injectSynPackets(attempt) + await progress("") + await acceptFuture or sleepAsync(Timeout) + if acceptFuture.finished(): + result = acceptFuture.read() + else: + raise newException(PunchHoleError, "timeout") + except OSError as e: + raise newException(PunchHoleError, e.msg) + +method respond*(puncher: TcpNutssPuncher, attempt: Attempt): + Future[AsyncSocket] {.async.} = + assert(attempt.acceptFuture.isNone(), "expected attempt without acceptFuture") + try: + let connectFuture = newFuture[AsyncSocket]("respond") + for dstPort in attempt.dstPorts: + asyncCheck connect(attempt.srcIp, attempt.srcPort, attempt.dstIp, dstPort, + connectFuture) + await connectFuture or sleepAsync(Timeout) + if connectFuture.finished(): + result = connectFuture.read() + else: + raise newException(PunchHoleError, "timeout") + except OSError as e: + raise newException(PunchHoleError, e.msg) diff --git a/tcp_nutss_initiator.nim b/tcp_nutss_initiator.nim deleted file mode 100644 index 50dd5e2..0000000 --- a/tcp_nutss_initiator.nim +++ /dev/null @@ -1,97 +0,0 @@ -import asyncdispatch, asyncnet, strformat -from net import IpAddress, Port, `$`, `==`, parseIpAddress -from random import randomize, rand -from sequtils import any -import ip_packet -import message -import port_prediction -import puncher -import raw_socket -import utils - -export Puncher, Initiator, PunchProgressCb, PunchHoleError, cleanup, initiate - -type - TNIAttempt = ref object of Attempt - future: Future[AsyncSocket] - - TcpNutssInitiator* = ref object of Initiator - - Request = object - srcIp: IpAddress - srcPorts: seq[Port] - dstIp: IpAddress - dstPorts: seq[Port] - -method cleanup*(puncher: TcpNutssInitiator) {.async.} = - discard - -proc initTcpNutssInitiator*(): TcpNutssInitiator = - randomize() - TcpNutssInitiator() - -proc injectSynPackets(attempt: TNIAttempt) {.async.} = - let injectFd = setupTcpInjectingSocket() - for dstPort in attempt.dstPorts: - let synOut = IpPacket(protocol: tcp, ipAddrSrc: attempt.srcIp, - ipAddrDst: attempt.dstIp, ipTTL: 2, - tcpPortSrc: attempt.srcPort, tcpPortDst: dstPort, - tcpSeqNumber: rand(uint32), tcpAckNumber: 0, - tcpFlags: {SYN}, tcpWindowSize: 1452 * 10) - echo &"[{synOut.ipAddrSrc}:{synOut.tcpPortSrc} -> {synOut.ipAddrDst}:{synOut.tcpPortDst}, SEQ {synOut.tcpSeqNumber}] injecting outgoing SYN" - await injectFd.injectTcpPacket(synOut) - -proc accept(puncher: TcpNutssInitiator, ip: IpAddress, port: Port) {.async.} = - let sock = newAsyncSocket() - #sock.setSockOpt(OptReuseAddr, true) - sock.bindAddr(port, $ip) - sock.listen() - while true: - let acceptFuture = sock.accept() - await acceptFuture or sleepAsync(Timeout) - if acceptFuture.finished(): - let peer = acceptFuture.read() - let (peerAddr, peerPort) = peer.getPeerAddr() - let peerIp = parseIpAddress(peerAddr) - let i = puncher.findAttempt(ip, port, peerIp, @[peerPort]) - if i == -1: - echo "Accepted connection, but no attempt found. Discarding." - peer.close() - continue - else: - let attempt = TNIAttempt(puncher.attempts[i]) - attempt.future.complete(peer) - let attempts = puncher.findAttemptsByLocalAddr(ip, port) - if attempts.len() <= 1: - break - sock.close() - -method initiate*(puncher: TcpNutssInitiator, args: string, - progressCb: PunchProgressCb): Future[AsyncSocket] {.async.} = - let req = parseMessage[Request](args) - let localIp = getPrimaryIPAddr(req.dstIp) - let existingAttempts = puncher.findAttemptsByLocalAddr(localIp, req.srcPorts[0]) - if existingAttempts.len() == 0: - echo &"initiating connection to {req.dstIp}:{req.dstPorts[0].int}" - asyncCheck puncher.accept(localIp, req.srcPorts[0]) - else: - for a in existingAttempts: - if a.dstIp == req.dstIp and - a.dstPorts.any(proc (p: Port): bool = p in req.dstPorts): - raise newException(PunchHoleError, "hole punching for given parameters already active") - try: - let attempt = TNIAttempt(srcIp: localIp, srcPort: req.srcPorts[0], - dstIp: req.dstIp, - dstPorts: predictPortRange(req.dstPorts), - future: newFuture[AsyncSocket]("initiate")) - puncher.attempts.add(attempt) - await attempt.injectSynPackets() - await progressCb("") - await attempt.future or sleepAsync(Timeout) - puncher.attempts.del(puncher.attempts.find(attempt)) - if attempt.future.finished(): - result = attempt.future.read() - else: - raise newException(PunchHoleError, "timeout") - except OSError as e: - raise newException(PunchHoleError, e.msg) diff --git a/tcp_nutss_responder.nim b/tcp_nutss_responder.nim deleted file mode 100644 index aa73023..0000000 --- a/tcp_nutss_responder.nim +++ /dev/null @@ -1,54 +0,0 @@ -import asyncdispatch, asyncnet, strformat -from net import IpAddress, Port, `$`, `==` -import message -import port_prediction -import puncher -import utils - -export Puncher, Responder, PunchHoleError, cleanup, respond - -type - TcpNutssResponder* = ref object of Responder - - Request = object - dstIp: IpAddress - dstPorts: seq[Port] - srcIp: IpAddress - srcPorts: seq[Port] - extraData: string - -method cleanup*(puncher: TcpNutssResponder) {.async.} = - discard - -proc initTcpNutssResponder*(): TcpNutssResponder = - TcpNutssResponder() - -proc connect(srcIp: IpAddress, srcPort: Port, dstIp: IpAddress, dstPort: Port, - future: Future[AsyncSocket]) {.async.} = - let sock = newAsyncSocket() - sock.setSockOpt(OptReuseAddr, true) - echo &"connect {srcIp}:{srcPort} -> {dstIp}:{dstPort}" - sock.bindAddr(srcPort, $srcIp) - try: - await sock.connect($dstIp, dstPort) - future.complete(sock) - except OSError as e: - echo &"connection {srcIP}:{srcPort.int} -> {dstIp}:{dstPort.int} failed: ", e.msg - sock.close() - -method respond*(puncher: TcpNutssResponder, args: string): Future[AsyncSocket] {.async.} = - let req = parseMessage[Request](args) - let localIp = getPrimaryIPAddr(req.dstIp) - try: - let connectFuture = newFuture[AsyncSocket]("respond") - let portRange = predictPortRange(req.dstPorts) - for dstPort in portRange: - asyncCheck connect(localIp, req.srcPorts[0], req.dstIp, req.dstPorts[0], - connectFuture) - await connectFuture or sleepAsync(Timeout) - if connectFuture.finished(): - result = connectFuture.read() - else: - raise newException(PunchHoleError, "timeout") - except OSError as e: - raise newException(PunchHoleError, e.msg) diff --git a/tcp_syni.nim b/tcp_syni.nim new file mode 100644 index 0000000..c278f81 --- /dev/null +++ b/tcp_syni.nim @@ -0,0 +1,190 @@ +from nativesockets import Protocol, setSockOptInt +from net import IpAddress, Port, `$`, `==` +from random import randomize, rand +from strutils import join +import asyncdispatch, asyncnet, strformat +import ip_packet +import message +import network_interface +import options +import port_prediction +import puncher +import raw_socket +import utils + +export puncher + +type + TcpSyniPuncher* = ref object of Puncher + + InitiateRequest = object + srcIp: IpAddress + srcPorts: seq[Port] + dstIp: IpAddress + dstPorts: seq[Port] + + RespondRequest = object + dstIp: IpAddress + dstPorts: seq[Port] + srcIp: IpAddress + srcPorts: seq[Port] + seqNums: seq[uint32] + + TcpSyniInitiateAttempt = ref object of Attempt + + TcpSyniRespondAttempt = ref object of Attempt + seqNums: seq[uint32] + +var IPPROTO_IP {.importc: "IPPROTO_IP", header: "".}: cint +var IP_TTL {.importc: "IP_TTL", header: "".}: cint + +proc captureSeqNumbers(attempt: Attempt, cb: PunchProgressCb) {.async.} = + # FIXME: timeout? + let iface = getNetworkInterface(attempt.srcIp) + let captureFd = setupEthernetCapturingSocket(iface) + var seqNums = newSeq[uint32]() + while seqNums.len < attempt.dstPorts.len: + let packet = await captureFd.recv(4000) + if packet == "": + break + let parsed = parseEthernetPacket(packet) + if parsed.protocol == tcp and + parsed.ipAddrSrc == attempt.srcIp and + parsed.tcpPortSrc.int == attempt.srcPort.int and + parsed.ipAddrDst == attempt.dstIp and + parsed.tcpFlags == {SYN}: + for port in attempt.dstPorts: + if parsed.tcpPortDst.int == port.int: + seqNums.add(parsed.tcpSeqNumber) + break + closeSocket(captureFd) + await cb(seqNums.join(",")) + +proc captureAndResendAck(attempt: Attempt) {.async.} = + let iface = getNetworkInterface(attempt.srcIp) + let captureFd = setupEthernetCapturingSocket(iface) + let injectFd = setupTcpInjectingSocket() + block loops: + while true: + let packet = await captureFd.recv(4000) + if packet == "": + break + var parsed = parseEthernetPacket(packet) + if parsed.protocol == tcp and + parsed.ipAddrSrc == attempt.srcIp and + parsed.tcpPortSrc.int == attempt.srcPort.int and + parsed.ipAddrDst == attempt.dstIp and + parsed.tcpFlags == {ACK}: + for port in attempt.dstPorts: + if parsed.tcpPortDst.int == port.int: + parsed.ipTTL = 64 + echo &"[{parsed.ipAddrSrc}:{parsed.tcpPortSrc.int} -> {parsed.ipAddrDst}:{parsed.tcpPortDst}, SEQ {parsed.tcpSeqNumber}] resending ACK with TTL {parsed.ipTTL}" + await injectFd.injectTcpPacket(parsed) + break loops + closeSocket(captureFd) + closeSocket(injectFd) + +proc connect(srcIp: IpAddress, srcPort: Port, dstIp: IpAddress, dstPort: Port, + future: Future[AsyncSocket]) {.async.} = + let sock = newAsyncSocket() + sock.setSockOpt(OptReuseAddr, true) + sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, 2) + echo &"connect {srcIp}:{srcPort} -> {dstIp}:{dstPort}" + sock.bindAddr(srcPort, $srcIp) + try: + await sock.connect($dstIp, dstPort) + sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, 64) + future.complete(sock) + except OSError as e: + echo &"connection {srcIP}:{srcPort.int} -> {dstIp}:{dstPort.int} failed: ", e.msg + sock.close() + +proc injectSynPackets(attempt: TcpSyniRespondAttempt) {.async.} = + let injectFd = setupTcpInjectingSocket() + for dstPort in attempt.dstPorts: + let synOut = IpPacket(protocol: tcp, ipAddrSrc: attempt.srcIp, + ipAddrDst: attempt.dstIp, ipTTL: 2, + tcpPortSrc: attempt.srcPort, tcpPortDst: dstPort, + tcpSeqNumber: rand(uint32), tcpAckNumber: 0, + tcpFlags: {SYN}, tcpWindowSize: 1452 * 10) + echo &"[{synOut.ipAddrSrc}:{synOut.tcpPortSrc} -> {synOut.ipAddrDst}:{synOut.tcpPortDst}, SEQ {synOut.tcpSeqNumber}] injecting outgoing SYN" + await injectFd.injectTcpPacket(synOut) + for seqNum in attempt.seqNums: + let synIn = IpPacket(protocol: tcp, ipAddrSrc: attempt.dstIp, + ipAddrDst: attempt.srcIp, ipTTL: 64, + tcpPortSrc: dstPort, + tcpPortDst: attempt.srcPort, + tcpSeqNumber: seqNum, tcpAckNumber: 0, + tcpFlags: {SYN}, tcpWindowSize: 1452 * 10) + echo &"[{synIn.ipAddrSrc}:{synIn.tcpPortSrc} -> {synIn.ipAddrDst}:{synIn.tcpPortDst}, SEQ {synIn.tcpSeqNumber}] injecting incoming SYN" + await injectFd.injectTcpPacket(synIn) + closeSocket(injectFd) + +proc initTcpSyniPuncher*(): TcpSyniPuncher = + randomize() + TcpSyniPuncher() + +method cleanup*(attempt: TcpSyniInitiateAttempt) {.async.} = + await deleteFirewallRules(attempt) + +method cleanup*(attempt: TcpSyniRespondAttempt) {.async.} = + await deleteFirewallRules(attempt) + +method getProtocol*(puncher: TcpSyniPuncher): Protocol = + IPPROTO_TCP + +method parseInitiateRequest*(puncher: TcpSyniPuncher, args: string): Attempt = + let parsed = parseMessage[InitiateRequest](args) + let localIp = getPrimaryIPAddr(parsed.dstIp) + let predictedDstPorts = predictPortRange(parsed.dstPorts) + TcpSyniInitiateAttempt(srcIp: localIp, srcPort: parsed.srcPorts[0], + dstIp: parsed.dstIp, dstPorts: predictedDstPorts, + acceptFuture: none(Future[AsyncSocket])) + +method parseRespondRequest*(puncher: TcpSyniPuncher, args: string): Attempt = + let parsed = parseMessage[RespondRequest](args) + let localIp = getPrimaryIPAddr(parsed.dstIp) + let predictedDstPorts = predictPortRange(parsed.dstPorts) + let acceptFuture = newFuture[AsyncSocket]("parseRespondRequest") + TcpSyniRespondAttempt(srcIp: localIp, srcPort: parsed.srcPorts[0], + dstIp: parsed.dstIp, dstPorts: predictedDstPorts, + acceptFuture: some(acceptFuture), + seqNums: parsed.seqNums) + +method initiate*(puncher: TcpSyniPuncher, attempt: Attempt, + progress: PunchProgressCb): Future[AsyncSocket] {.async.} = + assert(attempt of TcpSyniInitiateAttempt, "unexpected attempt type") + assert(attempt.acceptFuture.isNone(), "expected attempt without acceptFuture") + await addFirewallRules(attempt) + asyncCheck captureSeqNumbers(attempt, progress) + asyncCheck captureAndResendAck(attempt) + try: + let connectFuture = newFuture[AsyncSocket]("connect") + for dstPort in attempt.dstPorts: + asyncCheck connect(attempt.srcIp, attempt.srcPort, attempt.dstIp, + dstPort, connectFuture) + await connectFuture or sleepAsync(Timeout) + await deleteFirewallRules(attempt) + if connectFuture.finished(): + result = connectFuture.read() + else: + raise newException(PunchHoleError, "timeout") + except OSError as e: + raise newException(PunchHoleError, e.msg) + +method respond*(puncher: TcpSyniPuncher, attempt: Attempt): + Future[AsyncSocket] {.async.} = + assert(attempt of TcpSyniRespondAttempt, "unexpected attempt type") + assert(attempt.acceptFuture.isSome(), "expected attempt with acceptFuture") + try: + let acceptFuture = attempt.acceptFuture.get() + await addFirewallRules(attempt) # FIXME: needed? + await injectSynPackets(TcpSyniRespondAttempt(attempt)) + await acceptFuture or sleepAsync(Timeout) + await deleteFirewallRules(attempt) # FIXME: needed? + if acceptFuture.finished(): + result = acceptFuture.read() + else: + raise newException(PunchHoleError, "timeout") + except OSError as e: + raise newException(PunchHoleError, e.msg) diff --git a/tcp_syni_initiator.nim b/tcp_syni_initiator.nim deleted file mode 100644 index d44e334..0000000 --- a/tcp_syni_initiator.nim +++ /dev/null @@ -1,120 +0,0 @@ -import asyncdispatch, asyncnet, strformat -from net import IpAddress, Port, `$`, `==` -from nativesockets import setSockOptInt -from strutils import join -import ip_packet -import message -import network_interface -import port_prediction -import puncher -import raw_socket -import utils - -export Puncher, Initiator, PunchProgressCb, PunchHoleError, cleanup, initiate - -type - TcpSyniInitiator* = ref object of Initiator - - Request = object - srcIp: IpAddress - srcPorts: seq[Port] - dstIp: IpAddress - dstPorts: seq[Port] - -var IPPROTO_IP {.importc: "IPPROTO_IP", header: "".}: cint -var IP_TTL {.importc: "IP_TTL", header: "".}: cint - -method cleanup*(puncher: TcpSyniInitiator): Future[void] {.async.} = - while puncher.attempts.len() != 0: - await puncher.attempts.pop().deleteFirewallRules() - -proc initTcpSyniInitiator*(): TcpSyniInitiator = - TcpSyniInitiator() - -proc captureSeqNumbers(attempt: Attempt, cb: PunchProgressCb) {.async.} = - # FIXME: timeout? - let iface = getNetworkInterface(attempt.srcIp) - let captureFd = setupEthernetCapturingSocket(iface) - var seqNums = newSeq[uint32]() - while seqNums.len < attempt.dstPorts.len: - let packet = await captureFd.recv(4000) - if packet == "": - break - let parsed = parseEthernetPacket(packet) - if parsed.protocol == tcp and - parsed.ipAddrSrc == attempt.srcIp and - parsed.tcpPortSrc.int == attempt.srcPort.int and - parsed.ipAddrDst == attempt.dstIp and - parsed.tcpFlags == {SYN}: - for port in attempt.dstPorts: - if parsed.tcpPortDst.int == port.int: - seqNums.add(parsed.tcpSeqNumber) - break - closeSocket(captureFd) - await cb(seqNums.join(",")) - -proc captureAndResendAck(attempt: Attempt) {.async.} = - let iface = getNetworkInterface(attempt.srcIp) - let captureFd = setupEthernetCapturingSocket(iface) - let injectFd = setupTcpInjectingSocket() - block loops: - while true: - let packet = await captureFd.recv(4000) - if packet == "": - break - var parsed = parseEthernetPacket(packet) - if parsed.protocol == tcp and - parsed.ipAddrSrc == attempt.srcIp and - parsed.tcpPortSrc.int == attempt.srcPort.int and - parsed.ipAddrDst == attempt.dstIp and - parsed.tcpFlags == {ACK}: - for port in attempt.dstPorts: - if parsed.tcpPortDst.int == port.int: - parsed.ipTTL = 64 - echo &"[{parsed.ipAddrSrc}:{parsed.tcpPortSrc.int} -> {parsed.ipAddrDst}:{parsed.tcpPortDst}, SEQ {parsed.tcpSeqNumber}] resending ACK with TTL {parsed.ipTTL}" - await injectFd.injectTcpPacket(parsed) - break loops - closeSocket(captureFd) - closeSocket(injectFd) - -proc connect(srcIp: IpAddress, srcPort: Port, dstIp: IpAddress, dstPort: Port, - future: Future[AsyncSocket]) {.async.} = - let sock = newAsyncSocket() - sock.setSockOpt(OptReuseAddr, true) - sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, 2) - echo &"connect {srcIp}:{srcPort} -> {dstIp}:{dstPort}" - sock.bindAddr(srcPort, $srcIp) - try: - await sock.connect($dstIp, dstPort) - sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, 64) - future.complete(sock) - except OSError as e: - echo &"connection {srcIP}:{srcPort.int} -> {dstIp}:{dstPort.int} failed: ", e.msg - sock.close() - -method initiate*(puncher: TcpSyniInitiator, args: string, - progressCb: PunchProgressCb): Future[AsyncSocket] {.async.} = - let req = parseMessage[Request](args) - let localIp = getPrimaryIPAddr(req.dstIp) - if puncher.findAttempt(localIp, req.srcPorts[0], req.dstIp, req.dstPorts) != -1: - raise newException(PunchHoleError, "hole punching for given parameters already active") - let attempt = Attempt(srcIp: localIp, srcPort: req.srcPorts[0], dstIp: req.dstIp, - dstPorts: predictPortRange(req.dstPorts)) - puncher.attempts.add(attempt) - await attempt.addFirewallRules() - asyncCheck attempt.captureSeqNumbers(progressCb) - asyncCheck attempt.captureAndResendAck() - try: - let connectFuture = newFuture[AsyncSocket]("connect") - for dstPort in attempt.dstPorts: - asyncCheck connect(attempt.srcIp, attempt.srcPort, attempt.dstIp, - dstPort, connectFuture) - await connectFuture or sleepAsync(Timeout) - await attempt.deleteFirewallRules() - puncher.attempts.del(puncher.attempts.find(attempt)) - if connectFuture.finished(): - result = connectFuture.read() - else: - raise newException(PunchHoleError, "timeout") - except OSError as e: - raise newException(PunchHoleError, e.msg) diff --git a/tcp_syni_responder.nim b/tcp_syni_responder.nim deleted file mode 100644 index 37facfa..0000000 --- a/tcp_syni_responder.nim +++ /dev/null @@ -1,114 +0,0 @@ -import asyncdispatch, asyncnet, strformat -from net import IpAddress, Port, `$`, `==`, parseIpAddress -from random import randomize, rand -from sequtils import any -import ip_packet -import message -import port_prediction -import puncher -import raw_socket -import utils - -export Puncher, Responder, PunchHoleError, cleanup, respond - -type - TSRAttempt = ref object of Attempt - seqNums: seq[uint32] - future: Future[AsyncSocket] - - TcpSyniResponder* = ref object of Responder - - Request = object - dstIp: IpAddress - dstPorts: seq[Port] - srcIp: IpAddress - srcPorts: seq[Port] - seqNums: seq[uint32] - -method cleanup*(puncher: TcpSyniResponder) {.async.} = - while puncher.attempts.len() != 0: - await puncher.attempts.pop().deleteFirewallRules() - -proc initTcpSyniResponder*(): TcpSyniResponder = - randomize() - TcpSyniResponder() - -proc injectSynPackets(attempt: TSRAttempt) {.async.} = - let injectFd = setupTcpInjectingSocket() - for dstPort in attempt.dstPorts: - let synOut = IpPacket(protocol: tcp, ipAddrSrc: attempt.srcIp, - ipAddrDst: attempt.dstIp, ipTTL: 2, - tcpPortSrc: attempt.srcPort, tcpPortDst: dstPort, - tcpSeqNumber: rand(uint32), tcpAckNumber: 0, - tcpFlags: {SYN}, tcpWindowSize: 1452 * 10) - echo &"[{synOut.ipAddrSrc}:{synOut.tcpPortSrc} -> {synOut.ipAddrDst}:{synOut.tcpPortDst}, SEQ {synOut.tcpSeqNumber}] injecting outgoing SYN" - await injectFd.injectTcpPacket(synOut) - for seqNum in attempt.seqNums: - let synIn = IpPacket(protocol: tcp, ipAddrSrc: attempt.dstIp, - ipAddrDst: attempt.srcIp, ipTTL: 64, - tcpPortSrc: dstPort, - tcpPortDst: attempt.srcPort, - tcpSeqNumber: seqNum, tcpAckNumber: 0, - tcpFlags: {SYN}, tcpWindowSize: 1452 * 10) - echo &"[{synIn.ipAddrSrc}:{synIn.tcpPortSrc} -> {synIn.ipAddrDst}:{synIn.tcpPortDst}, SEQ {synIn.tcpSeqNumber}] injecting incoming SYN" - await injectFd.injectTcpPacket(synIn) - closeSocket(injectFd) - -proc accept(puncher: TcpSyniResponder, srcIp: IpAddress, - srcPort: Port) {.async.} = - let sock = newAsyncSocket() - sock.setSockOpt(OptReuseAddr, true) - sock.bindAddr(srcPort, $srcIp) - sock.listen() - while true: - let acceptFuture = sock.accept() - await acceptFuture or sleepAsync(Timeout) - if acceptFuture.finished(): - let peer = acceptFuture.read() - let (peerAddr, peerPort) = peer.getPeerAddr() - let peerIp = parseIpAddress(peerAddr) - let i = puncher.findAttempt(srcIp, srcPort, peerIp, @[peerPort]) - if i == -1: - echo "Accepted connection, but no attempt found. Discarding." - peer.close() - continue - else: - let attempt = TSRAttempt(puncher.attempts[i]) - attempt.future.complete(peer) - let attempts = puncher.findAttemptsByLocalAddr(srcIp, srcPort) - # FIXME: should attempts have timestamps, so we can decide here which ones to delete? - if attempts.len() <= 1: - break - sock.close() - -method respond*(puncher: TcpSyniResponder, args: string): - Future[AsyncSocket] {.async.} = - let req = parseMessage[Request](args) - let localIp = getPrimaryIPAddr(req.dstIp) - let existingAttempts = puncher.findAttemptsByLocalAddr(localIp, req.srcPorts[0]) - if existingAttempts.len() == 0: - echo &"accepting connections from {req.dstIp}:{req.dstPorts[0].int}" - asyncCheck puncher.accept(localIp, req.srcPorts[0]) - else: - for a in existingAttempts: - if a.dstIp == req.dstIp and - a.dstPorts.any(proc (p: Port): bool = p in req.dstPorts): - raise newException(PunchHoleError, "hole punching for given parameters already active") - try: - let attempt = TSRAttempt(srcIp: localIp, srcPort: req.srcPorts[0], - dstIp: req.dstIp, - dstPorts: predictPortRange(req.dstPorts), - seqNums: req.seqNums, - future: newFuture[AsyncSocket]("respond")) - puncher.attempts.add(attempt) - await attempt.addFirewallRules() # FIXME: needed? - await attempt.injectSynPackets() - await attempt.future or sleepAsync(Timeout) - await attempt.deleteFirewallRules() # FIXME: needed? - puncher.attempts.del(puncher.attempts.find(attempt)) - if attempt.future.finished(): - result = attempt.future.read() - else: - raise newException(PunchHoleError, "timeout") - except OSError as e: - raise newException(PunchHoleError, e.msg)