prepare to use multiple servers for probing endpoints; use hardcoded list of rendezvous servers for now

This commit is contained in:
Christian Ulrich 2020-11-14 14:18:55 +01:00
parent 5725e1ff82
commit 7831d2a6ed
No known key found for this signature in database
GPG Key ID: 8241BE099775A097
5 changed files with 99 additions and 83 deletions

View File

@ -6,6 +6,10 @@ import asyncutils
import ../../message
import random
const rendezvousServers: seq[tuple[hostname: string, port: Port]] = @[
("strangeplace.net", Port(5320))
]
type
PunchdResponse = Future[tuple[msgContent: string, sock: AsyncSocket]]
PunchdProgressCb = proc (future: PunchdResponse, msgContent: string) {.async.}
@ -23,8 +27,9 @@ type
sock: AsyncSocket
outMessages: TableRef[string, Future[string]]
peerNotifications: FutureStream[string]
publicIp: IpAddress
publicPorts: seq[Port]
probedIp: IpAddress
srcPort: Port
probedSrcPorts: seq[Port]
# Punchd messages
Progress* = object
@ -34,7 +39,8 @@ type
# Server messages
OkGetPeerinfo* = object
ip: string
ports: seq[uint16]
localPort: Port
probedPorts: seq[Port]
OkGetEndpoint* = object
ip: IpAddress
port: Port
@ -43,9 +49,11 @@ type
recipient: string
technique: string
srcIp: IpAddress
srcPorts: seq[Port]
srcPort: Port
probedSrcPorts: seq[Port]
dstIp: IpAddress
dstPorts: seq[Port]
dstPort: Port
probedDstPorts: seq[Port]
extraArgs: string
# Exceptions
@ -53,7 +61,7 @@ type
ServerError = object of ValueError
proc usage() =
echo &"usage: {paramStr(0)} SERVER_HOSTNAME SERVER_PORT PEER_ID [OTHER_PEER_ID]"
echo &"usage: {paramStr(0)} PEER_ID [OTHER_PEER_ID]"
proc handleServerMessages(conn: ServerConnection) {.async.} =
while true:
@ -139,9 +147,9 @@ proc handlePeerNotifications(serverConn: ServerConnection,
let msg = parseMessage[NotifyPeer](data)
# FIXME: check if we want to receive messages from the sender
echo "received message from ", msg.sender
let srcPorts = msg.srcPorts.join(",")
let dstPorts = msg.dstPorts.join(",")
let req = &"{msg.technique}|{msg.srcIp}|{srcPorts}|{msg.dstIp}|{dstPorts}|{msg.extraArgs}"
let probedSrcPorts = msg.probedSrcPorts.join(",")
let probedDstPorts = msg.probedDstPorts.join(",")
let req = &"{msg.technique}|{msg.srcIp}|{msg.srcPort}|{probedSrcPorts}|{msg.dstIp}|{msg.dstPort}|{probedDstPorts}|{msg.extraArgs}"
asyncCheck acceptConnection(punchdConn, "respond", req)
except ValueError as e:
echo e.msg
@ -161,24 +169,21 @@ proc punchHole(punchdConn: PunchdConnection, serverConn: ServerConnection,
future.fail(e)
except ValueError as e:
future.fail(e)
let myPorts = serverConn.publicPorts.join(",")
let peerPorts = peerInfo.ports.join(",")
let req = &"{technique}|{serverConn.publicIp}|{myPorts}|{peerInfo.ip}|{peerPorts}"
let myProbedPorts = serverConn.probedSrcPorts.join(",")
let probedPeerPorts = peerInfo.probedPorts.join(",")
let req = &"{technique}|{serverConn.probedIp}|{serverConn.srcPort}|{myProbedPorts}|{peerInfo.ip}|{peerInfo.localPort}|{probedPeerPorts}"
let pResp = await punchdConn.sendRequest("initiate", req, progressCb)
result = pResp.sock
proc initServerConnection(serverHostname: string, serverPort: Port,
probingPort: Port): Future[ServerConnection] {.async.} =
result.publicPorts.add(probingPort)
var failCount = 0
while result.publicPorts.len < 3:
# FIXME: error handling
proc getEndpoint(srcPort: Port, serverHostname: string, serverPort: Port):
Future[OkGetEndpoint] {.async.} =
let sock = newAsyncSocket()
var failCount = 0
while true:
try:
sock.bindAddr(probingPort)
sock.bindAddr(srcPort)
except OSError as e:
if failCount == 3:
echo "raising error"
raise e
failCount.inc
await sleepAsync(100)
@ -190,20 +195,24 @@ proc initServerConnection(serverHostname: string, serverPort: Port,
let args = line.parseArgs(3)
assert(args[0] == "ok")
assert(args[1] == $id)
let endpoint = parseMessage[OkGetEndpoint](args[2])
echo "endpoint: ", endpoint
result.publicIp = endpoint.ip
result.publicPorts.add(endpoint.port)
result = parseMessage[OkGetEndpoint](args[2])
let emptyLine = await sock.recvLine(maxLength = 400)
assert(emptyLine.len == 0)
sock.close()
result.sock = await asyncnet.dial(serverHostname, serverPort)
proc initServerConnection(srcPort: Port): Future[ServerConnection] {.async.} =
result.srcPort = srcPort
for r in rendezvousServers:
let endpoint = await getEndpoint(srcPort, r.hostname, r.port)
# FIXME: what if we get get different IPs from different servers
result.probedIp = endpoint.ip
result.probedSrcPorts.add(endpoint.port)
result.sock = await asyncnet.dial(rendezvousServers[0].hostname,
rendezvousServers[0].port)
result.outMessages = newTable[string, Future[string]]()
result.peerNotifications = newFutureStream[string]("initServerConnection")
proc runApp(serverHostname: string, serverPort: Port, peerId: string,
otherPeerId: string = "") {.async.} =
proc runApp(peerId: string, otherPeerId: string = "") {.async.} =
randomize() # initialize random number generator
var punchdConn = PunchdConnection()
punchdConn.sock = newAsyncSocket(AF_UNIX, SOCK_STREAM, IPPROTO_IP)
@ -214,13 +223,12 @@ proc runApp(serverHostname: string, serverPort: Port, peerId: string,
let srcPort = rand(Port(1024) .. Port.high)
if otherPeerId.len == 0:
# register and wait for connections
echo &"init server connection, probing port: {srcPort}"
let serverConn = await initServerConnection(serverHostname, serverPort,
srcPort)
echo &"init server connection, source port: {srcPort}"
let serverConn = await initServerConnection(srcPort)
asyncCheck handleServerMessages(serverConn)
asyncCheck handlePeerNotifications(serverConn, punchdConn, peerId)
let myPorts = serverConn.publicPorts.join(",")
let req = &"{peerId}|{serverConn.publicIp}|{myPorts}"
let probedPorts = serverConn.probedSrcPorts.join(",")
let req = &"{peerId}|{serverConn.probedIp}|{serverConn.srcPort}|{probedPorts}"
echo "registering: ", req
discard await serverConn.sendRequest("register", req)
while true:
@ -235,8 +243,7 @@ proc runApp(serverHostname: string, serverPort: Port, peerId: string,
else:
# initiate a new connection
var serverConn = await initServerConnection(serverHostname, serverPort,
srcPort)
var serverConn = await initServerConnection(srcPort)
asyncCheck handleServerMessages(serverConn)
let sock = await punchHole(punchdConn, serverConn, peerId, otherPeerId,
"udp")
@ -247,17 +254,13 @@ proc runApp(serverHostname: string, serverPort: Port, peerId: string,
sock.close()
proc main() =
if paramCount() < 3 or paramCount() > 4:
if paramCount() < 1 or paramCount() > 2:
usage()
quit(1)
let portNumber = paramStr(2).parseUInt
if portNumber > uint16.high:
usage()
quit(1)
if paramCount() == 4:
waitFor runApp(paramStr(1), Port(portNumber), paramStr(3), paramStr(4))
if paramCount() == 2:
waitFor runApp(paramStr(1), paramStr(2))
else:
waitFor runApp(paramStr(1), Port(portNumber), paramStr(3))
waitFor runApp(paramStr(1))
when isMainModule:
main()

View File

@ -1,11 +1,6 @@
from net import Port
proc predictPortRange*(dstPorts: seq[Port]): seq[Port] =
proc predictPortRange*(dstPort: Port, probedDstPorts: seq[Port]): seq[Port] =
# TODO: do real port prediction
result = newSeq[Port](1)
let basePort = min(dstPorts[1].uint16,
uint16.high - (result.len - 1).uint16)
for i in 0 .. result.len - 1:
result[i] = Port(basePort + i.uint16)
result = @[dstPort]

View File

@ -17,15 +17,19 @@ type
InitiateRequest = object
srcIp: IpAddress
srcPorts: seq[Port]
srcPort: Port
probedSrcPorts: seq[Port]
dstIp: IpAddress
dstPorts: seq[Port]
dstPort: Port
probedDstPorts: seq[Port]
RespondRequest = object
dstIp: IpAddress
dstPorts: seq[Port]
dstPort: Port
probedDstPorts: seq[Port]
srcIp: IpAddress
srcPorts: seq[Port]
srcPort: Port
probedSrcPorts: seq[Port]
extraArgs: string
proc injectSynPackets(attempt: Attempt) {.async.} =
@ -59,17 +63,19 @@ proc initTcpNutssPuncher*(): TcpNutssPuncher =
method parseInitiateRequest*(puncher: TcpNutssPuncher, args: string): Attempt =
let parsed = parseMessage[InitiateRequest](args)
let localIp = getPrimaryIPAddr(parsed.dstIp)
let predictedDstPorts = predictPortRange(parsed.dstPorts)
let predictedDstPorts = predictPortRange(parsed.dstPort,
parsed.probedDstPorts)
let acceptFuture = newFuture[AsyncSocket]("parseInitiateRequest")
Attempt(protocol: IPPROTO_TCP, srcIp: localIp, srcPort: parsed.srcPorts[0],
Attempt(protocol: IPPROTO_TCP, srcIp: localIp, srcPort: parsed.srcPort,
dstIp: parsed.dstIp, dstPorts: predictedDstPorts,
acceptFuture: some(acceptFuture))
method parseRespondRequest*(puncher: TcpNutssPuncher, args: string): Attempt =
let parsed = parseMessage[RespondRequest](args)
let localIp = getPrimaryIPAddr(parsed.dstIp)
let predictedDstPorts = predictPortRange(parsed.dstPorts)
Attempt(protocol: IPPROTO_TCP, srcIp: localIp, srcPort: parsed.srcPorts[0],
let predictedDstPorts = predictPortRange(parsed.dstPort,
parsed.probedDstPorts)
Attempt(protocol: IPPROTO_TCP, srcIp: localIp, srcPort: parsed.srcPort,
dstIp: parsed.dstIp, dstPorts: predictedDstPorts,
acceptFuture: none(Future[AsyncSocket]))

View File

@ -19,15 +19,19 @@ type
InitiateRequest = object
srcIp: IpAddress
srcPorts: seq[Port]
srcPort: Port
probedSrcPorts: seq[Port]
dstIp: IpAddress
dstPorts: seq[Port]
dstPort: Port
probedDstPorts: seq[Port]
RespondRequest = object
dstIp: IpAddress
dstPorts: seq[Port]
dstPort: Port
probedDstPorts: seq[Port]
srcIp: IpAddress
srcPorts: seq[Port]
srcPort: Port
probedSrcPorts: seq[Port]
seqNums: seq[uint32]
TcpSyniInitiateAttempt = ref object of Attempt
@ -136,19 +140,21 @@ method getProtocol*(puncher: TcpSyniPuncher): Protocol =
method parseInitiateRequest*(puncher: TcpSyniPuncher, args: string): Attempt =
let parsed = parseMessage[InitiateRequest](args)
let localIp = getPrimaryIPAddr(parsed.dstIp)
let predictedDstPorts = predictPortRange(parsed.dstPorts)
let predictedDstPorts = predictPortRange(parsed.dstPort,
parsed.probedDstPorts)
TcpSyniInitiateAttempt(protocol: IPPROTO_TCP, srcIp: localIp,
srcPort: parsed.srcPorts[0], dstIp: parsed.dstIp,
srcPort: parsed.srcPort, dstIp: parsed.dstIp,
dstPorts: predictedDstPorts,
acceptFuture: none(Future[AsyncSocket]))
method parseRespondRequest*(puncher: TcpSyniPuncher, args: string): Attempt =
let parsed = parseMessage[RespondRequest](args)
let localIp = getPrimaryIPAddr(parsed.dstIp)
let predictedDstPorts = predictPortRange(parsed.dstPorts)
let predictedDstPorts = predictPortRange(parsed.dstPort,
parsed.probedDstPorts)
let acceptFuture = newFuture[AsyncSocket]("parseRespondRequest")
TcpSyniRespondAttempt(protocol: IPPROTO_TCP, srcIp: localIp,
srcPort: parsed.srcPorts[0], dstIp: parsed.dstIp,
srcPort: parsed.srcPort, dstIp: parsed.dstIp,
dstPorts: predictedDstPorts,
acceptFuture: some(acceptFuture),
seqNums: parsed.seqNums)

22
udp.nim
View File

@ -16,15 +16,19 @@ type
InitiateRequest = object
srcIp: IpAddress
srcPorts: seq[Port]
srcPort: Port
probedSrcPorts: seq[Port]
dstIp: IpAddress
dstPorts: seq[Port]
dstPort: Port
probedDstPorts: seq[Port]
RespondRequest = object
dstIp: IpAddress
dstPorts: seq[Port]
dstPort: Port
probedDstPorts: seq[Port]
srcIp: IpAddress
srcPorts: seq[Port]
srcPort: Port
probedSrcPorts: seq[Port]
extraArgs: string
proc doInitiate(srcIp: IpAddress, srcPort: Port, dstIp: IpAddress,
@ -65,16 +69,18 @@ proc initUdpPuncher*(): UdpPuncher =
method parseInitiateRequest*(puncher: UdpPuncher, args: string): Attempt =
let parsed = parseMessage[InitiateRequest](args)
let localIp = getPrimaryIPAddr(parsed.dstIp)
let predictedDstPorts = predictPortRange(parsed.dstPorts)
Attempt(protocol: IPPROTO_UDP, srcIp: localIp, srcPort: parsed.srcPorts[0],
let predictedDstPorts = predictPortRange(parsed.dstPort,
parsed.probedDstPorts)
Attempt(protocol: IPPROTO_UDP, srcIp: localIp, srcPort: parsed.srcPort,
dstIp: parsed.dstIp, dstPorts: predictedDstPorts,
acceptFuture: none(Future[AsyncSocket]))
method parseRespondRequest*(puncher: UdpPuncher, args: string): Attempt =
let parsed = parseMessage[RespondRequest](args)
let localIp = getPrimaryIPAddr(parsed.dstIp)
let predictedDstPorts = predictPortRange(parsed.dstPorts)
Attempt(protocol: IPPROTO_UDP, srcIp: localIp, srcPort: parsed.srcPorts[0],
let predictedDstPorts = predictPortRange(parsed.dstPort,
parsed.probedDstPorts)
Attempt(protocol: IPPROTO_UDP, srcIp: localIp, srcPort: parsed.srcPort,
dstIp: parsed.dstIp, dstPorts: predictedDstPorts,
acceptFuture: none(Future[AsyncSocket]))