use multiple sockets for punching multiple holes if behind a SymmetricRandom NAT
This commit is contained in:
parent
45cc3e0e34
commit
18ee5e4919
64
puncher.nim
64
puncher.nim
|
@ -11,13 +11,15 @@ from sequtils import any
|
||||||
type
|
type
|
||||||
Attempt* = object
|
Attempt* = object
|
||||||
## A hole punching attempt.
|
## A hole punching attempt.
|
||||||
srcPort*: Port
|
srcPorts*: seq[Port]
|
||||||
dstIp*: IpAddress
|
dstIp*: IpAddress
|
||||||
dstPorts*: seq[Port]
|
dstPorts*: seq[Port]
|
||||||
future*: Future[Port]
|
future*: Future[(AsyncSocket, Port)]
|
||||||
|
|
||||||
Puncher* = ref object
|
Puncher* = ref object
|
||||||
sock: AsyncSocket
|
socks: seq[AsyncSocket]
|
||||||
|
srcPorts: seq[Port]
|
||||||
|
natProps: NatProperties
|
||||||
attempts: seq[Attempt]
|
attempts: seq[Attempt]
|
||||||
|
|
||||||
PunchHoleError* = object of ValueError
|
PunchHoleError* = object of ValueError
|
||||||
|
@ -30,23 +32,34 @@ const Timeout = 3000
|
||||||
proc `==`(a, b: Attempt): bool =
|
proc `==`(a, b: Attempt): bool =
|
||||||
## ``==`` for hole punching attempts.
|
## ``==`` for hole punching attempts.
|
||||||
##
|
##
|
||||||
## Two hole punching attempts are considered equal if their ``srcPort`` and
|
## Two hole punching attempts are considered equal if their ``dstIp`` is
|
||||||
## ``dstIp`` are equal and their ``dstPorts`` overlap.
|
## equal, their ``srcPorts`` overlap and their ``dstPorts`` overlap.
|
||||||
a.srcPort == b.srcPort and a.dstIp == b.dstIp and
|
a.dstIp == b.dstIp and
|
||||||
|
a.srcPorts.any(proc (p: Port): bool = p in b.srcPorts) and
|
||||||
a.dstPorts.any(proc (p: Port): bool = p in b.dstPorts)
|
a.dstPorts.any(proc (p: Port): bool = p in b.dstPorts)
|
||||||
|
|
||||||
proc initPuncher*(sock: AsyncSocket): Puncher =
|
proc initPuncher*(socks: seq[AsyncSocket], probedSrcPorts: seq[Port]): Puncher =
|
||||||
Puncher(sock: sock)
|
assert(socks.len > 0)
|
||||||
|
var srcPorts = newSeq[Port](socks.len)
|
||||||
|
for i in 0 .. socks.len - 1:
|
||||||
|
let (_, srcPort) = socks[i].getLocalAddr()
|
||||||
|
srcPorts[i] = srcPort
|
||||||
|
let natProps = getNatProperties(srcPorts[0], probedSrcPorts)
|
||||||
|
Puncher(socks: socks, srcPorts: srcPorts, natProps: natProps)
|
||||||
|
|
||||||
proc punch(puncher: Puncher, peerIp: IpAddress, peerPort: Port,
|
proc punch(puncher: Puncher, peerIp: IpAddress, peerPort: Port,
|
||||||
peerProbedPorts: seq[Port], lowTTL: bool, msg: string):
|
peerProbedPorts: seq[Port], lowTTL: bool, msg: string):
|
||||||
Future[Attempt] {.async.} =
|
Future[Attempt] {.async.} =
|
||||||
let punchFuture = newFuture[Port]("punch")
|
let punchFuture = newFuture[(AsyncSocket, Port)]("punch")
|
||||||
let natProps = getNatProperties(peerPort, peerProbedPorts)
|
let natProps = getNatProperties(peerPort, peerProbedPorts)
|
||||||
let predictedDstPorts = predictPortRange(natProps)
|
let predictedDstPorts = predictPortRange(natProps)
|
||||||
let (_, myPort) = puncher.sock.getLocalAddr()
|
result = Attempt(srcPorts: @[puncher.srcPorts[0]], dstIp: peerIp,
|
||||||
result = Attempt(srcPort: myPort, dstIp: peerIp, dstPorts: predictedDstPorts,
|
dstPorts: predictedDstPorts, future: punchFuture)
|
||||||
future: punchFuture)
|
if puncher.natProps.natType == SymmetricRandom and puncher.srcPorts.len > 1:
|
||||||
|
# our NAT is of the evil symmetric type with random port allocation. We are
|
||||||
|
# trying to help the other peer by punching more holes using all our
|
||||||
|
# sockets.
|
||||||
|
result.srcPorts.add(puncher.srcPorts[1 .. ^1])
|
||||||
if puncher.attempts.contains(result):
|
if puncher.attempts.contains(result):
|
||||||
raise newException(PunchHoleError,
|
raise newException(PunchHoleError,
|
||||||
"hole punching for given parameters already active")
|
"hole punching for given parameters already active")
|
||||||
|
@ -56,16 +69,19 @@ proc punch(puncher: Puncher, peerIp: IpAddress, peerPort: Port,
|
||||||
var peerSockLen: SockLen
|
var peerSockLen: SockLen
|
||||||
try:
|
try:
|
||||||
var defaultTTL: int
|
var defaultTTL: int
|
||||||
|
for i in 0 .. result.srcPorts.len - 1:
|
||||||
|
let sock = puncher.socks[i]
|
||||||
if lowTTL:
|
if lowTTL:
|
||||||
defaultTTL = puncher.sock.getFd.getSockOptInt(IPPROTO_IP, IP_TTL)
|
defaultTTL = sock.getFd.getSockOptInt(IPPROTO_IP, IP_TTL)
|
||||||
puncher.sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, 2)
|
sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, 2)
|
||||||
|
|
||||||
for dstPort in result.dstPorts:
|
for dstPort in result.dstPorts:
|
||||||
toSockAddr(result.dstIp, dstPort, peerAddr, peerSockLen)
|
toSockAddr(result.dstIp, dstPort, peerAddr, peerSockLen)
|
||||||
# TODO: replace asyncdispatch.sendTo with asyncnet.sendTo (Nim 1.4 required)
|
# TODO: replace asyncdispatch.sendTo with asyncnet.sendTo (Nim 1.4 required)
|
||||||
await sendTo(puncher.sock.getFd().AsyncFD, msg.cstring, msg.len,
|
await sendTo(sock.getFd.AsyncFD, msg.cstring, msg.len,
|
||||||
cast[ptr SockAddr](addr peerAddr), peerSockLen)
|
cast[ptr SockAddr](addr peerAddr), peerSockLen)
|
||||||
if lowTTL:
|
if lowTTL:
|
||||||
puncher.sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, defaultTTL)
|
sock.getFd.setSockOptInt(IPPROTO_IP, IP_TTL, defaultTTL)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
raise newException(PunchHoleError, e.msg)
|
raise newException(PunchHoleError, e.msg)
|
||||||
|
|
||||||
|
@ -77,37 +93,37 @@ proc respond*(puncher: Puncher, peerIp: IpAddress, peerPort: Port,
|
||||||
peerProbedPorts: seq[Port]): Future[Attempt] =
|
peerProbedPorts: seq[Port]): Future[Attempt] =
|
||||||
punch(puncher, peerIp, peerPort, peerProbedPorts, false, "ACK")
|
punch(puncher, peerIp, peerPort, peerProbedPorts, false, "ACK")
|
||||||
|
|
||||||
proc finalize*(attempt: Attempt): Future[Port] {.async.} =
|
proc finalize*(attempt: Attempt): Future[(AsyncSocket, Port)] {.async.} =
|
||||||
await attempt.future or sleepAsync(Timeout)
|
await attempt.future or sleepAsync(Timeout)
|
||||||
if attempt.future.finished:
|
if attempt.future.finished:
|
||||||
result = attempt.future.read()
|
result = attempt.future.read()
|
||||||
else:
|
else:
|
||||||
raise newException(PunchHoleError, "timeout")
|
raise newException(PunchHoleError, "timeout")
|
||||||
|
|
||||||
proc handleMsg*(puncher: Puncher, msg: string, peerIp: IpAddress,
|
proc handleMsg*(puncher: Puncher, msg: string, sock: AsyncSocket,
|
||||||
peerPort: Port) =
|
peerIp: IpAddress, peerPort: Port) =
|
||||||
## Handles an incoming UDP message which may complete the Futures returned by
|
## Handles an incoming UDP message which may complete the Futures returned by
|
||||||
## ``initiate`` and ``respond``.
|
## ``initiate`` and ``respond``.
|
||||||
if msg == "SYN":
|
if msg == "SYN":
|
||||||
# We received a SYN packet. We ignore it because we expected it to be
|
# We received a SYN packet. We ignore it because we expected it to be
|
||||||
# filtered by our NAT.
|
# filtered by our NAT.
|
||||||
return
|
return
|
||||||
let (_, myPort) = puncher.sock.getLocalAddr()
|
let query = Attempt(srcPorts: puncher.srcPorts, dstIp: peerIp,
|
||||||
let query = Attempt(srcPort: myPort, dstIp: peerIp, dstPorts: @[peerPort])
|
dstPorts: @[peerPort])
|
||||||
let i = puncher.attempts.find(query)
|
let i = puncher.attempts.find(query)
|
||||||
if i != -1:
|
if i != -1:
|
||||||
if msg == "ACK":
|
if msg == "ACK":
|
||||||
echo &"handling ACK message from {peerIp}:{peerPort}"
|
echo &"handling ACK message from {peerIp}:{peerPort}"
|
||||||
else:
|
else:
|
||||||
echo &"handling QUIC message from {peerIp}:{peerPort}"
|
echo &"handling QUIC message from {peerIp}:{peerPort}"
|
||||||
puncher.attempts[i].future.complete(peerPort)
|
puncher.attempts[i].future.complete((sock, peerPort))
|
||||||
puncher.attempts.del(i)
|
puncher.attempts.del(i)
|
||||||
else:
|
else:
|
||||||
echo &"received unexpected packet from {peerIp}:{peerPort}"
|
echo &"received unexpected packet from {peerIp}:{peerPort}"
|
||||||
|
|
||||||
proc handleMsg*(puncher: Puncher, msg: string,
|
proc handleMsg*(puncher: Puncher, msg: string, sock: AsyncSocket,
|
||||||
peerAddr: SockAddr | Sockaddr_storage, peerSockLen: SockLen) =
|
peerAddr: SockAddr | Sockaddr_storage, peerSockLen: SockLen) =
|
||||||
var peerIp: IpAddress
|
var peerIp: IpAddress
|
||||||
var peerPort: Port
|
var peerPort: Port
|
||||||
fromSockAddr(peerAddr, peerSockLen, peerIp, peerPort)
|
fromSockAddr(peerAddr, peerSockLen, peerIp, peerPort)
|
||||||
handleMsg(puncher, msg, peerIp, peerPort)
|
handleMsg(puncher, msg, sock, peerIp, peerPort)
|
||||||
|
|
101
quicp2p.nim
101
quicp2p.nim
|
@ -25,6 +25,7 @@ import strutils
|
||||||
|
|
||||||
from nativesockets import SockAddr, Sockaddr_storage, SockLen, getHostByName
|
from nativesockets import SockAddr, Sockaddr_storage, SockLen, getHostByName
|
||||||
from posix import IOVec
|
from posix import IOVec
|
||||||
|
from sequtils import filter
|
||||||
from strutils import parseUInt
|
from strutils import parseUInt
|
||||||
|
|
||||||
from openssl import
|
from openssl import
|
||||||
|
@ -42,11 +43,12 @@ from openssl import
|
||||||
type
|
type
|
||||||
Connection = ref object
|
Connection = ref object
|
||||||
conn: ptr quicly_conn_t
|
conn: ptr quicly_conn_t
|
||||||
|
sock: AsyncSocket
|
||||||
certs: seq[Certificate]
|
certs: seq[Certificate]
|
||||||
peerId: string
|
expectedPeerId: string
|
||||||
|
|
||||||
QuicP2PContext = ref object
|
QuicP2PContext = ref object
|
||||||
sock: AsyncSocket
|
socks: seq[AsyncSocket]
|
||||||
puncher: Puncher
|
puncher: Puncher
|
||||||
streamOpen: quicly_stream_open_t
|
streamOpen: quicly_stream_open_t
|
||||||
nextCid: quicly_cid_plaintext_t
|
nextCid: quicly_cid_plaintext_t
|
||||||
|
@ -66,7 +68,7 @@ const rendezvousServers: seq[tuple[hostname: string, port: Port]] = @[
|
||||||
("ulrich.earth", Port(5320))
|
("ulrich.earth", Port(5320))
|
||||||
]
|
]
|
||||||
|
|
||||||
proc getRelativeTimeout(ctx: QuicP2PContext): int32 =
|
proc relativeTimeout(ctx: QuicP2PContext): int32 =
|
||||||
## Obtain the absolute int64 timeout from quicly and convert it to the
|
## Obtain the absolute int64 timeout from quicly and convert it to the
|
||||||
## relative int32 timeout expected by poll.
|
## relative int32 timeout expected by poll.
|
||||||
result = 0
|
result = 0
|
||||||
|
@ -80,7 +82,11 @@ proc getRelativeTimeout(ctx: QuicP2PContext): int32 =
|
||||||
let delta = nextTimeout - now
|
let delta = nextTimeout - now
|
||||||
result = min(delta, int32.high).int32
|
result = min(delta, int32.high).int32
|
||||||
|
|
||||||
proc getPeerId(ctx: QuicP2PContext): string =
|
proc srcPort(ctx: QuicP2PContext): Port =
|
||||||
|
let (_, myPort) = ctx.socks[0].getLocalAddr()
|
||||||
|
myPort
|
||||||
|
|
||||||
|
proc peerId(ctx: QuicP2PContext): string =
|
||||||
assert(ctx.tlsCtx.certificates.count == 2)
|
assert(ctx.tlsCtx.certificates.count == 2)
|
||||||
let firstCertAddr = cast[ByteAddress](ctx.tlsCtx.certificates.list)
|
let firstCertAddr = cast[ByteAddress](ctx.tlsCtx.certificates.list)
|
||||||
let secondCertIovec = cast[ptr ptls_iovec_t](firstCertAddr + sizeof(ptls_iovec_t))
|
let secondCertIovec = cast[ptr ptls_iovec_t](firstCertAddr + sizeof(ptls_iovec_t))
|
||||||
|
@ -88,7 +94,7 @@ proc getPeerId(ctx: QuicP2PContext): string =
|
||||||
copyMem(caCert.cstring, secondCertIovec.base, secondCertIovec.len)
|
copyMem(caCert.cstring, secondCertIovec.base, secondCertIovec.len)
|
||||||
result = caCert.getPublicKey().encode(pad = false)
|
result = caCert.getPublicKey().encode(pad = false)
|
||||||
|
|
||||||
proc getPeerId(conn: Connection): string =
|
proc peerId(conn: Connection): string =
|
||||||
assert(conn.certs.len() == 2)
|
assert(conn.certs.len() == 2)
|
||||||
result = conn.certs[1].getPublicKey().encode(pad = false)
|
result = conn.certs[1].getPublicKey().encode(pad = false)
|
||||||
|
|
||||||
|
@ -108,7 +114,7 @@ proc onServerReceive(stream: ptr quicly_stream_t, offset: csize_t, src: pointer,
|
||||||
var msg = newString(input.len)
|
var msg = newString(input.len)
|
||||||
copyMem(addr msg[0], input.base, input.len)
|
copyMem(addr msg[0], input.base, input.len)
|
||||||
let conn = cast[Connection](quicly_get_data(stream.conn)[])
|
let conn = cast[Connection](quicly_get_data(stream.conn)[])
|
||||||
echo &"client {conn.getPeerId()} sends \"{msg}\""
|
echo &"client {conn.peerId} sends \"{msg}\""
|
||||||
if quicly_sendstate_is_open(addr stream.sendstate) != 0 and input.len > 0:
|
if quicly_sendstate_is_open(addr stream.sendstate) != 0 and input.len > 0:
|
||||||
discard quicly_streambuf_egress_write(stream, input.base, input.len)
|
discard quicly_streambuf_egress_write(stream, input.base, input.len)
|
||||||
if quicly_recvstate_transfer_complete(addr stream.recvstate) != 0:
|
if quicly_recvstate_transfer_complete(addr stream.recvstate) != 0:
|
||||||
|
@ -123,7 +129,7 @@ proc onClientReceive(stream: ptr quicly_stream_t, offset: csize_t,
|
||||||
let msg = newString(input.len)
|
let msg = newString(input.len)
|
||||||
copyMem(msg.cstring, input.base, input.len)
|
copyMem(msg.cstring, input.base, input.len)
|
||||||
let conn = cast[Connection](quicly_get_data(stream.conn)[])
|
let conn = cast[Connection](quicly_get_data(stream.conn)[])
|
||||||
echo &"server {conn.getPeerId()} replies \"{msg}\""
|
echo &"server {conn.peerId} replies \"{msg}\""
|
||||||
if quicly_recvstate_transfer_complete(addr stream.recvstate) != 0:
|
if quicly_recvstate_transfer_complete(addr stream.recvstate) != 0:
|
||||||
discard quicly_close(stream.conn, 0, "")
|
discard quicly_close(stream.conn, 0, "")
|
||||||
quicly_streambuf_ingress_shift(stream, input.len)
|
quicly_streambuf_ingress_shift(stream, input.len)
|
||||||
|
@ -208,7 +214,7 @@ proc verifyCerts(self: ptr ptls_verify_certificate_t, tls: ptr ptls_t,
|
||||||
X509_STORE_free(store)
|
X509_STORE_free(store)
|
||||||
X509_free(caCert)
|
X509_free(caCert)
|
||||||
|
|
||||||
proc initContext(sock: AsyncSocket, puncher: Puncher, certChainPath: string,
|
proc initContext(socks: seq[AsyncSocket], certChainPath: string,
|
||||||
keyPath: string,
|
keyPath: string,
|
||||||
streamOpenCb: typeof(quicly_stream_open_t.cb)):
|
streamOpenCb: typeof(quicly_stream_open_t.cb)):
|
||||||
QuicP2PContext =
|
QuicP2PContext =
|
||||||
|
@ -217,7 +223,7 @@ proc initContext(sock: AsyncSocket, puncher: Puncher, certChainPath: string,
|
||||||
keyExchanges: ptls_openssl_key_exchanges,
|
keyExchanges: ptls_openssl_key_exchanges,
|
||||||
cipherSuites: ptls_openssl_cipher_suites)
|
cipherSuites: ptls_openssl_cipher_suites)
|
||||||
quicly_amend_ptls_context(addr tlsCtx)
|
quicly_amend_ptls_context(addr tlsCtx)
|
||||||
result = QuicP2PContext(sock: sock, puncher: puncher,
|
result = QuicP2PContext(socks: socks,
|
||||||
streamOpen: quicly_stream_open_t(cb: streamOpenCb),
|
streamOpen: quicly_stream_open_t(cb: streamOpenCb),
|
||||||
verifyCertsCb: ptls_verify_certificate_t(cb: verifyCerts),
|
verifyCertsCb: ptls_verify_certificate_t(cb: verifyCerts),
|
||||||
tlsCtx: tlsCtx, quiclyCtx: quicly_spec_context)
|
tlsCtx: tlsCtx, quiclyCtx: quicly_spec_context)
|
||||||
|
@ -236,10 +242,11 @@ proc initContext(sock: AsyncSocket, puncher: Puncher, certChainPath: string,
|
||||||
result.tlsCtx.sign_certificate = addr result.signCertCb.super
|
result.tlsCtx.sign_certificate = addr result.signCertCb.super
|
||||||
|
|
||||||
proc addConnection(ctx: QuicP2PContext, connPtr: ptr quicly_conn_t,
|
proc addConnection(ctx: QuicP2PContext, connPtr: ptr quicly_conn_t,
|
||||||
peerId: string) =
|
sock: AsyncSocket, expectedPeerId: string) =
|
||||||
assert(not connPtr.isNil)
|
assert(not connPtr.isNil)
|
||||||
let data = quicly_get_data(connPtr)
|
let data = quicly_get_data(connPtr)
|
||||||
var conn = Connection(conn: connPtr, peerId: peerId)
|
var conn = Connection(conn: connPtr, sock: sock,
|
||||||
|
expectedPeerId: expectedPeerId)
|
||||||
data[] = addr conn[]
|
data[] = addr conn[]
|
||||||
ctx.connections.add(conn)
|
ctx.connections.add(conn)
|
||||||
|
|
||||||
|
@ -267,7 +274,7 @@ proc sendPackets(ctx: QuicP2PContext) =
|
||||||
for j in 0 .. dgramCount - 1:
|
for j in 0 .. dgramCount - 1:
|
||||||
var sockLen = quicly_get_socklen(addr dstAddr.sa)
|
var sockLen = quicly_get_socklen(addr dstAddr.sa)
|
||||||
# TODO: replace asyncdispatch.sendTo with asyncnet.sendTo (Nim 1.4 required)
|
# TODO: replace asyncdispatch.sendTo with asyncnet.sendTo (Nim 1.4 required)
|
||||||
asyncCheck sendTo(ctx.sock.getFd().AsyncFD, dgrams[j].iov_base,
|
asyncCheck sendTo(conns[i].sock.getFd().AsyncFD, dgrams[j].iov_base,
|
||||||
dgrams[j].iov_len.int, addr dstAddr.sa, sockLen)
|
dgrams[j].iov_len.int, addr dstAddr.sa, sockLen)
|
||||||
of QUICLY_ERROR_FREE_CONNECTION:
|
of QUICLY_ERROR_FREE_CONNECTION:
|
||||||
echo "deleting connection"
|
echo "deleting connection"
|
||||||
|
@ -276,7 +283,8 @@ proc sendPackets(ctx: QuicP2PContext) =
|
||||||
raise newException(ValueError, &"quicly_send returned {sendResult}")
|
raise newException(ValueError, &"quicly_send returned {sendResult}")
|
||||||
|
|
||||||
proc initiateQuicConnection(ctx: QuicP2PContext, peerId: string,
|
proc initiateQuicConnection(ctx: QuicP2PContext, peerId: string,
|
||||||
peerIp: IpAddress, peerPort: Port) =
|
sock: AsyncSocket, peerIp: IpAddress,
|
||||||
|
peerPort: Port) =
|
||||||
var conn: ptr quicly_conn_t
|
var conn: ptr quicly_conn_t
|
||||||
var peerAddr: SockAddr_storage
|
var peerAddr: SockAddr_storage
|
||||||
var peerSockLen: SockLen
|
var peerSockLen: SockLen
|
||||||
|
@ -290,9 +298,10 @@ proc initiateQuicConnection(ctx: QuicP2PContext, peerId: string,
|
||||||
return
|
return
|
||||||
var stream: ptr quicly_stream_t
|
var stream: ptr quicly_stream_t
|
||||||
discard quicly_open_stream(conn, addr stream, 0)
|
discard quicly_open_stream(conn, addr stream, 0)
|
||||||
ctx.addConnection(conn, peerId)
|
ctx.addConnection(conn, sock, peerId)
|
||||||
|
|
||||||
proc handleMsg(ctx: QuicP2PContext, msg: string, peerId: string,
|
proc handleMsg(ctx: QuicP2PContext, msg: string, peerId: string,
|
||||||
|
puncher: Puncher, sock: AsyncSocket,
|
||||||
peerAddr: ptr Sockaddr_storage, peerSockLen: SockLen) =
|
peerAddr: ptr Sockaddr_storage, peerSockLen: SockLen) =
|
||||||
var offset: csize_t = 0
|
var offset: csize_t = 0
|
||||||
while offset < msg.len().csize_t:
|
while offset < msg.len().csize_t:
|
||||||
|
@ -304,7 +313,7 @@ proc handleMsg(ctx: QuicP2PContext, msg: string, peerId: string,
|
||||||
echo "unable to decode packet"
|
echo "unable to decode packet"
|
||||||
return
|
return
|
||||||
var conn: ptr quicly_conn_t = nil
|
var conn: ptr quicly_conn_t = nil
|
||||||
for c in ctx.connections:
|
for c in ctx.connections.filter(proc(c: Connection): bool = c.sock == sock):
|
||||||
if quicly_is_destination(c.conn, nil, peerAddr, addr decoded) != 0:
|
if quicly_is_destination(c.conn, nil, peerAddr, addr decoded) != 0:
|
||||||
conn = c.conn
|
conn = c.conn
|
||||||
break
|
break
|
||||||
|
@ -313,44 +322,47 @@ proc handleMsg(ctx: QuicP2PContext, msg: string, peerId: string,
|
||||||
else:
|
else:
|
||||||
# The puncher needs to be informed about this message because it may
|
# The puncher needs to be informed about this message because it may
|
||||||
# be the peer's response to our respond call.
|
# be the peer's response to our respond call.
|
||||||
ctx.puncher.handleMsg(msg, peerAddr[], peerSockLen)
|
puncher.handleMsg(msg, sock, peerAddr[], peerSockLen)
|
||||||
if peerId.len == 0:
|
if peerId.len == 0:
|
||||||
let acceptResult = quicly_accept(addr conn, addr ctx.quiclyCtx, nil,
|
let acceptResult = quicly_accept(addr conn, addr ctx.quiclyCtx, nil,
|
||||||
peerAddr, addr decoded, nil,
|
peerAddr, addr decoded, nil,
|
||||||
addr ctx.nextCid, nil)
|
addr ctx.nextCid, nil)
|
||||||
if acceptResult == 0:
|
if acceptResult == 0:
|
||||||
ctx.addConnection(conn, peerId)
|
ctx.addConnection(conn, sock, peerId)
|
||||||
|
|
||||||
proc receive(ctx: QuicP2PContext, peerId: string) {.async.} =
|
proc receive(ctx: QuicP2PContext, puncher: Puncher, sock: AsyncSocket,
|
||||||
|
peerId: string) {.async.} =
|
||||||
while true:
|
while true:
|
||||||
# TODO: replace asyncdispatch.recvFromInto with asyncnet.recvFrom (Nim 1.4 required)
|
# TODO: replace asyncdispatch.recvFromInto with asyncnet.recvFrom (Nim 1.4 required)
|
||||||
var msg = newString(BufferSize)
|
var msg = newString(BufferSize)
|
||||||
var peerAddr: Sockaddr_storage
|
var peerAddr: Sockaddr_storage
|
||||||
var peerAddrLen = SockLen(sizeof(peerAddr))
|
var peerAddrLen = SockLen(sizeof(peerAddr))
|
||||||
let msgLen = await recvFromInto(ctx.sock.getFd().AsyncFD, msg.cstring,
|
let msgLen = await recvFromInto(sock.getFd().AsyncFD, msg.cstring, msg.len,
|
||||||
msg.len, cast[ptr SockAddr](addr peerAddr),
|
cast[ptr SockAddr](addr peerAddr),
|
||||||
addr peerAddrLen)
|
addr peerAddrLen)
|
||||||
msg.setLen(msgLen)
|
msg.setLen(msgLen)
|
||||||
if msg.len > 0:
|
if msg.len > 0:
|
||||||
handleMsg(ctx, msg, peerId, addr peerAddr, peerAddrLen)
|
handleMsg(ctx, msg, peerId, puncher, sock, addr peerAddr, peerAddrLen)
|
||||||
|
|
||||||
proc handleNotification(ctx: QuicP2PContext, notification: NotifyPeer)
|
proc handleNotification(puncher: Puncher, notification: NotifyPeer) {.async.} =
|
||||||
{.async.} =
|
let attempt = await puncher.respond(notification.srcIp, notification.srcPort,
|
||||||
let attempt = await ctx.puncher.respond(notification.srcIp, notification.srcPort,
|
|
||||||
notification.probedsrcPorts)
|
notification.probedsrcPorts)
|
||||||
discard await attempt.finalize()
|
discard await attempt.finalize()
|
||||||
|
|
||||||
proc runApp(ctx: QuicP2PContext, srcPort: Port, peerId: string) {.async.} =
|
proc runApp(ctx: QuicP2PContext, peerId: string) {.async.} =
|
||||||
let serverConn = await initServerConnection(rendezvousServers[0].hostname,
|
let serverConn = await initServerConnection(rendezvousServers[0].hostname,
|
||||||
rendezvousServers[0].port,
|
rendezvousServers[0].port,
|
||||||
srcPort, rendezvousServers)
|
ctx.srcPort, rendezvousServers)
|
||||||
|
let puncher = initPuncher(ctx.socks, serverConn.probedSrcPorts)
|
||||||
|
|
||||||
asyncCheck handleServerMessages(serverConn)
|
asyncCheck handleServerMessages(serverConn)
|
||||||
asyncCheck receive(ctx, peerId)
|
for sock in ctx.socks:
|
||||||
|
asyncCheck receive(ctx, puncher, sock, peerId)
|
||||||
|
|
||||||
if peerId.len == 0:
|
if peerId.len == 0:
|
||||||
# We are the responder
|
# We are the responder
|
||||||
let probedPorts = serverConn.probedSrcPorts.join(",")
|
let probedPorts = serverConn.probedSrcPorts.join(",")
|
||||||
let req = &"{ctx.getPeerId()}|{serverConn.probedIp}|{srcPort}|{probedPorts}"
|
let req = &"{ctx.peerId}|{serverConn.probedIp}|{ctx.srcPort}|{probedPorts}"
|
||||||
discard await serverConn.sendRequest("register", req)
|
discard await serverConn.sendRequest("register", req)
|
||||||
while true:
|
while true:
|
||||||
let (hasData, data) = await serverConn.peerNotifications.read()
|
let (hasData, data) = await serverConn.peerNotifications.read()
|
||||||
|
@ -359,7 +371,7 @@ proc runApp(ctx: QuicP2PContext, srcPort: Port, peerId: string) {.async.} =
|
||||||
try:
|
try:
|
||||||
let msg = parseMessage[NotifyPeer](data)
|
let msg = parseMessage[NotifyPeer](data)
|
||||||
# FIXME: check if we want to receive messages from the sender
|
# FIXME: check if we want to receive messages from the sender
|
||||||
asyncCheck handleNotification(ctx, msg)
|
asyncCheck handleNotification(puncher, msg)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
echo e.msg
|
echo e.msg
|
||||||
discard
|
discard
|
||||||
|
@ -370,42 +382,49 @@ proc runApp(ctx: QuicP2PContext, srcPort: Port, peerId: string) {.async.} =
|
||||||
let peerInfo = parseMessage[OkGetPeerInfo](serverResponse)
|
let peerInfo = parseMessage[OkGetPeerInfo](serverResponse)
|
||||||
let myProbedPorts = serverConn.probedSrcPorts.join(",")
|
let myProbedPorts = serverConn.probedSrcPorts.join(",")
|
||||||
let peerProbedPorts = peerInfo.probedPorts.join(",")
|
let peerProbedPorts = peerInfo.probedPorts.join(",")
|
||||||
let req = &"{ctx.getPeerId()}|{peerId}|{serverConn.probedIp}|{srcPort}|{myProbedPorts}|{peerInfo.ip}|{peerInfo.localPort}|{peerProbedPorts}"
|
let req = &"{ctx.peerId}|{peerId}|{serverConn.probedIp}|{ctx.srcPort}|{myProbedPorts}|{peerInfo.ip}|{peerInfo.localPort}|{peerProbedPorts}"
|
||||||
let attempt = await ctx.puncher.initiate(peerInfo.ip, peerInfo.localPort,
|
let attempt = await puncher.initiate(peerInfo.ip, peerInfo.localPort,
|
||||||
peerInfo.probedPorts)
|
peerInfo.probedPorts)
|
||||||
discard await serverConn.sendRequest("notify-peer", req)
|
discard await serverConn.sendRequest("notify-peer", req)
|
||||||
let peerPort = await attempt.finalize()
|
let (sock, peerPort) = await attempt.finalize()
|
||||||
initiateQuicConnection(ctx, peerId, peerInfo.ip, peerPort)
|
initiateQuicConnection(ctx, peerId, sock, peerInfo.ip, peerPort)
|
||||||
|
|
||||||
proc main() =
|
proc main() =
|
||||||
var ctx: QuicP2PContext
|
var ctx: QuicP2PContext
|
||||||
|
var socks = newSeq[AsyncSocket]()
|
||||||
|
randomize()
|
||||||
|
for i in 0 .. 4:
|
||||||
|
# FIXME: close socks
|
||||||
let sock = newAsyncSocket(sockType = SOCK_DGRAM, protocol = IPPROTO_UDP,
|
let sock = newAsyncSocket(sockType = SOCK_DGRAM, protocol = IPPROTO_UDP,
|
||||||
buffered = false)
|
buffered = false)
|
||||||
randomize()
|
|
||||||
let srcPort = rand(Port(1024) .. Port.high)
|
let srcPort = rand(Port(1024) .. Port.high)
|
||||||
|
# FIXME: Once we start using UDP for endpoint probing (currently done in
|
||||||
|
# initServerConnction) we either have to
|
||||||
|
# - finish the probing before we bind the srcPort here
|
||||||
|
# - pass the primary socket to initServerConnection
|
||||||
sock.bindAddr(srcPort)
|
sock.bindAddr(srcPort)
|
||||||
let puncher = initPuncher(sock)
|
socks.add(sock)
|
||||||
|
|
||||||
case paramCount():
|
case paramCount():
|
||||||
of 0:
|
of 0:
|
||||||
ctx = initContext(sock, puncher, serverCertChainPath, serverKeyPath,
|
ctx = initContext(socks, serverCertChainPath, serverKeyPath,
|
||||||
onServerStreamOpen)
|
onServerStreamOpen)
|
||||||
ctx.tlsCtx.require_client_authentication = 1
|
ctx.tlsCtx.require_client_authentication = 1
|
||||||
asyncCheck runApp(ctx, srcPort, "")
|
asyncCheck runApp(ctx, "")
|
||||||
|
|
||||||
of 1:
|
of 1:
|
||||||
let peerId = paramStr(1)
|
let peerId = paramStr(1)
|
||||||
ctx = initContext(sock, puncher, clientCertChainPath, clientKeyPath,
|
ctx = initContext(socks, clientCertChainPath, clientKeyPath,
|
||||||
onClientStreamOpen)
|
onClientStreamOpen)
|
||||||
asyncCheck runApp(ctx, srcPort, peerId)
|
asyncCheck runApp(ctx, peerId)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
usage()
|
usage()
|
||||||
quit(1)
|
quit(1)
|
||||||
|
|
||||||
echo "My peer ID is ", ctx.getPeerId()
|
echo "My peer ID is ", ctx.peerId
|
||||||
while true:
|
while true:
|
||||||
let nextTimeout = ctx.getRelativeTimeout()
|
let nextTimeout = ctx.relativeTimeout
|
||||||
poll(nextTimeout)
|
poll(nextTimeout)
|
||||||
ctx.sendPackets()
|
ctx.sendPackets()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue