414 lines
16 KiB
Nim
414 lines
16 KiB
Nim
{.passL: "-l crypto -l quicly -l picotls-core -l picotls-openssl".}
|
|
|
|
import asyncdispatch
|
|
import asyncnet
|
|
import base32
|
|
import certificate
|
|
import message
|
|
import net
|
|
import os
|
|
import openssl_additional
|
|
import picotls/picotls
|
|
import picotls/openssl as ptls_openssl
|
|
import puncher
|
|
import quicly/quicly
|
|
import quicly/cid
|
|
import quicly/constants
|
|
import quicly/defaults
|
|
import quicly/recvstate
|
|
import quicly/sendstate
|
|
import quicly/streambuf
|
|
import random
|
|
import server_connection
|
|
import strformat
|
|
import strutils
|
|
|
|
from nativesockets import SockAddr, Sockaddr_storage, SockLen, getHostByName
|
|
from posix import IOVec
|
|
from strutils import parseUInt
|
|
|
|
from openssl import
|
|
DLLSSLName,
|
|
EVP_PKEY,
|
|
SslPtr,
|
|
PX509,
|
|
PX509_STORE,
|
|
X509_STORE_new,
|
|
X509_STORE_free,
|
|
X509_STORE_add_cert,
|
|
PSTACK,
|
|
d2i_X509
|
|
|
|
type
|
|
Connection = ref object
|
|
conn: ptr quicly_conn_t
|
|
certs: seq[Certificate]
|
|
peerId: string
|
|
|
|
QuicP2PContext = ref object
|
|
sock: AsyncSocket
|
|
puncher: Puncher
|
|
streamOpen: quicly_stream_open_t
|
|
nextCid: quicly_cid_plaintext_t
|
|
signCertCb: ptls_openssl_sign_certificate_t
|
|
verifyCertsCb: ptls_verify_certificate_t
|
|
tlsCtx: ptls_context_t
|
|
quiclyCtx: quicly_context_t
|
|
connections: seq[Connection]
|
|
|
|
const serverCertChainPath = "./certs/server-certchain.pem"
|
|
const serverKeyPath = "./certs/server-cert.key"
|
|
const clientCertChainPath = "./certs/client-certchain.pem"
|
|
const clientKeyPath = "./certs/client-cert.key"
|
|
|
|
const rendezvousServers: seq[tuple[hostname: string, port: Port]] = @[
|
|
("strangeplace.net", Port(5320)),
|
|
("ulrich.earth", Port(5320))
|
|
]
|
|
|
|
proc getRelativeTimeout(ctx: QuicP2PContext): int32 =
|
|
## Obtain the absolute int64 timeout from quicly and convert it to the
|
|
## relative int32 timeout expected by poll.
|
|
result = 0
|
|
var nextTimeout = int64.high
|
|
var now = ctx.quiclyCtx.now.cb(ctx.quiclyCtx.now)
|
|
for c in ctx.connections:
|
|
let connTimeout = quicly_get_first_timeout(c.conn)
|
|
if connTimeout < nextTimeout:
|
|
nextTimeout = connTimeout
|
|
if now < nextTimeout:
|
|
let delta = nextTimeout - now
|
|
result = min(delta, int32.high).int32
|
|
|
|
proc getPeerId(ctx: QuicP2PContext): string =
|
|
assert(ctx.tlsCtx.certificates.count == 2)
|
|
let firstCertAddr = cast[ByteAddress](ctx.tlsCtx.certificates.list)
|
|
let secondCertIovec = cast[ptr ptls_iovec_t](firstCertAddr + sizeof(ptls_iovec_t))
|
|
var caCert = newString(secondCertIovec.len)
|
|
copyMem(caCert.cstring, secondCertIovec.base, secondCertIovec.len)
|
|
result = caCert.getPublicKey().encode(pad = false)
|
|
|
|
proc getPeerId(conn: Connection): string =
|
|
assert(conn.certs.len() == 2)
|
|
result = conn.certs[1].getPublicKey().encode(pad = false)
|
|
|
|
proc onStopSending(stream: ptr quicly_stream_t, err: cint) {.cdecl.} =
|
|
echo "onStopSending"
|
|
discard quicly_close(stream.conn, 0x30000, "")
|
|
|
|
proc onReceiveReset(stream: ptr quicly_stream_t, err: cint) {.cdecl.} =
|
|
echo "onReceiveReset"
|
|
discard quicly_close(stream.conn, 0x30000, "")
|
|
|
|
proc onServerReceive(stream: ptr quicly_stream_t, offset: csize_t, src: pointer,
|
|
len: csize_t) {.cdecl.} =
|
|
if quicly_streambuf_ingress_receive(stream, offset, src, len) != 0:
|
|
return
|
|
let input = quicly_streambuf_ingress_get(stream)
|
|
var msg = newString(input.len)
|
|
copyMem(addr msg[0], input.base, input.len)
|
|
let conn = cast[Connection](quicly_get_data(stream.conn)[])
|
|
echo &"client {conn.getPeerId()} sends \"{msg}\""
|
|
if quicly_sendstate_is_open(addr stream.sendstate) != 0 and input.len > 0:
|
|
discard quicly_streambuf_egress_write(stream, input.base, input.len)
|
|
if quicly_recvstate_transfer_complete(addr stream.recvstate) != 0:
|
|
discard quicly_streambuf_egress_shutdown(stream)
|
|
quicly_streambuf_ingress_shift(stream, input.len)
|
|
|
|
proc onClientReceive(stream: ptr quicly_stream_t, offset: csize_t,
|
|
src: pointer, len: csize_t) {.cdecl.} =
|
|
if quicly_streambuf_ingress_receive(stream, offset, src, len) != 0:
|
|
return
|
|
let input = quicly_streambuf_ingress_get(stream)
|
|
let msg = newString(input.len)
|
|
copyMem(msg.cstring, input.base, input.len)
|
|
let conn = cast[Connection](quicly_get_data(stream.conn)[])
|
|
echo &"server {conn.getPeerId()} replies \"{msg}\""
|
|
if quicly_recvstate_transfer_complete(addr stream.recvstate) != 0:
|
|
discard quicly_close(stream.conn, 0, "")
|
|
quicly_streambuf_ingress_shift(stream, input.len)
|
|
|
|
var streamCallbacksServer = quicly_stream_callbacks_t(
|
|
on_destroy: quicly_streambuf_destroy,
|
|
on_send_shift: quicly_streambuf_egress_shift,
|
|
on_send_emit: quicly_streambuf_egress_emit,
|
|
on_send_stop: onStopSending,
|
|
on_receive: onServerReceive,
|
|
on_receive_reset: onReceiveReset)
|
|
|
|
var streamCallbacksClient = quicly_stream_callbacks_t(
|
|
on_destroy: quicly_streambuf_destroy,
|
|
on_send_shift: quicly_streambuf_egress_shift,
|
|
on_send_emit: quicly_streambuf_egress_emit,
|
|
on_send_stop: onStopSending,
|
|
on_receive: onClientReceive,
|
|
on_receive_reset: onReceiveReset)
|
|
|
|
proc usage() =
|
|
echo &"usage in server mode: {paramStr(0)} LISTEN_PORT"
|
|
echo &"usage in client mode: {paramStr(0)} SERVER_HOSTNAME SERVER_PORT"
|
|
|
|
proc onServerStreamOpen(self: ptr quicly_stream_open_t,
|
|
stream: ptr quicly_stream_t): cint {.cdecl.} =
|
|
result = quicly_streambuf_create(stream, sizeof(quicly_streambuf_t).csize_t)
|
|
stream.callbacks = addr streamCallbacksServer
|
|
|
|
proc onClientStreamOpen(self: ptr quicly_stream_open_t,
|
|
stream: ptr quicly_stream_t): cint {.cdecl.} =
|
|
result = quicly_streambuf_create(stream, sizeof(quicly_streambuf_t).csize_t)
|
|
stream.callbacks = addr streamCallbacksClient
|
|
let msg = "hello server"
|
|
discard quicly_streambuf_egress_write(stream, msg.cstring, msg.len().csize_t)
|
|
|
|
proc verifyCACertSignature(cert: PX509): bool =
|
|
let ctx = X509_STORE_CTX_new()
|
|
let store = X509_STORE_new()
|
|
discard X509_STORE_add_cert(store, cert)
|
|
discard X509_STORE_CTX_init(ctx, store, cert, nil)
|
|
let verifyParams = X509_STORE_CTX_get0_param(ctx)
|
|
let flags = X509_VERIFY_PARAM_get_flags(verifyParams)
|
|
discard X509_VERIFY_PARAM_set_flags(verifyParams, flags or X509_V_FLAG_CHECK_SS_SIGNATURE)
|
|
let verifyResult = X509_verify_cert(ctx)
|
|
result = verifyResult == 1
|
|
X509_STORE_CTX_free(ctx)
|
|
X509_STORE_free(store)
|
|
|
|
proc verifyCerts(self: ptr ptls_verify_certificate_t, tls: ptr ptls_t,
|
|
verify_sign: ptr VerifySignCb, verify_data: ptr pointer,
|
|
certs: ptr ptls_iovec_t, num_certs: csize_t): cint {.cdecl.} =
|
|
if num_certs != 2:
|
|
return PTLS_ALERT_UNKNOWN_CA
|
|
# parse the highest certificate and use it as CA certificate
|
|
var iovec = cast[ptr ptls_iovec_t](cast[ByteAddress](certs) + sizeof(ptls_iovec_t))
|
|
var iovecBase = iovec.base
|
|
let caCert = d2i_X509(nil, cast[ptr ptr cuchar](addr iovecBase),
|
|
iovec.len.cint)
|
|
if caCert.isNil:
|
|
return PTLS_ALERT_BAD_CERTIFICATE
|
|
if not verifyCACertSignature(caCert):
|
|
return PTLS_ALERT_BAD_CERTIFICATE
|
|
let store = X509_STORE_new()
|
|
discard X509_STORE_add_cert(store, caCert)
|
|
# empty server name makes picotls skip server name verification
|
|
#FIXME: should we use the peer ID as server name?
|
|
discard ptls_set_server_name(tls, nil, 0)
|
|
var opensslVerifier: ptls_openssl_verify_certificate_t
|
|
discard ptls_openssl_init_verify_certificate(addr opensslVerifier, store)
|
|
result = opensslVerifier.super.cb(addr opensslVerifier.super, tls,
|
|
verify_sign, verify_data, certs, num_certs)
|
|
if result == 0:
|
|
let quiclyConn = cast[ptr quicly_conn_t](ptls_get_data_ptr(tls)[])
|
|
let conn = cast[Connection](quicly_get_data(quiclyConn)[])
|
|
for i in 0 .. num_certs - 1:
|
|
let iovec = cast[ptr ptls_iovec_t](cast[ByteAddress](certs) + i.int * sizeof(ptls_iovec_t))
|
|
var cert = newString(iovec.len)
|
|
copyMem(cert.cstring, iovec.base, iovec.len)
|
|
conn.certs.add(cert)
|
|
ptls_openssl_dispose_verify_certificate(addr opensslVerifier)
|
|
X509_STORE_free(store)
|
|
X509_free(caCert)
|
|
|
|
proc initContext(sock: AsyncSocket, puncher: Puncher, certChainPath: string,
|
|
keyPath: string,
|
|
streamOpenCb: typeof(quicly_stream_open_t.cb)):
|
|
QuicP2PContext =
|
|
var tlsCtx = ptls_context_t(randomBytes: ptls_openssl_random_bytes,
|
|
getTime: addr ptls_get_time,
|
|
keyExchanges: ptls_openssl_key_exchanges,
|
|
cipherSuites: ptls_openssl_cipher_suites)
|
|
quicly_amend_ptls_context(addr tlsCtx)
|
|
result = QuicP2PContext(sock: sock, puncher: puncher,
|
|
streamOpen: quicly_stream_open_t(cb: streamOpenCb),
|
|
verifyCertsCb: ptls_verify_certificate_t(cb: verifyCerts),
|
|
tlsCtx: tlsCtx, quiclyCtx: quicly_spec_context)
|
|
result.quiclyCtx.tls = addr result.tlsCtx
|
|
result.quiclyCtx.stream_open = addr result.streamOpen
|
|
result.tlsCtx.verify_certificate = addr result.verifyCertsCb
|
|
if ptls_load_certificates(addr result.tlsCtx, certChainPath.cstring) != 0:
|
|
raise newException(ValueError, &"cannot load certificate chain {certChainPath}")
|
|
let pKeyFile = open(keyPath)
|
|
let privateKey = PEM_read_PrivateKey(pkeyFile, nil, nil, nil)
|
|
pkeyFile.close()
|
|
if privateKey == nil:
|
|
raise newException(ValueError, &"cannot load private key {keyPath}")
|
|
discard ptls_openssl_init_sign_certificate(addr result.signCertCb, privateKey)
|
|
EVP_PKEY_free(privateKey)
|
|
result.tlsCtx.sign_certificate = addr result.signCertCb.super
|
|
|
|
proc addConnection(ctx: QuicP2PContext, connPtr: ptr quicly_conn_t,
|
|
peerId: string) =
|
|
assert(not connPtr.isNil)
|
|
let data = quicly_get_data(connPtr)
|
|
var conn = Connection(conn: connPtr, peerId: peerId)
|
|
data[] = addr conn[]
|
|
ctx.connections.add(conn)
|
|
|
|
proc delConnection(ctx: QuicP2PContext, index: int) =
|
|
assert(index >= 0 and index < ctx.connections.len)
|
|
let c = ctx.connections[index]
|
|
ctx.connections.del(index)
|
|
quicly_free(c.conn)
|
|
|
|
proc sendPackets(ctx: QuicP2PContext) =
|
|
if ctx.connections.len == 0:
|
|
return
|
|
let conns = ctx.connections.deepCopy()
|
|
for i in 0 .. conns.len - 1:
|
|
var srcAddr, dstAddr: quicly_address_t
|
|
var dgrams: array[10, IOVec]
|
|
var dgramCount = dgrams.len().csize_t
|
|
var dgramsBuf = newString(dgramCount * ctx.quiclyCtx.transport_params.max_udp_payload_size)
|
|
let sendResult = quicly_send(conns[i].conn, addr dstAddr, addr srcAddr,
|
|
addr dgrams[0], addr dgramCount,
|
|
addr dgramsBuf[0], dgramsBuf.len().csize_t)
|
|
case sendResult:
|
|
of 0:
|
|
if dgramCount > 0:
|
|
for j in 0 .. dgramCount - 1:
|
|
var sockLen = quicly_get_socklen(addr dstAddr.sa)
|
|
# TODO: replace asyncdispatch.sendTo with asyncnet.sendTo (Nim 1.4 required)
|
|
asyncCheck sendTo(ctx.sock.getFd().AsyncFD, dgrams[j].iov_base,
|
|
dgrams[j].iov_len.int, addr dstAddr.sa, sockLen)
|
|
of QUICLY_ERROR_FREE_CONNECTION:
|
|
echo "deleting connection"
|
|
ctx.delConnection(i)
|
|
else:
|
|
raise newException(ValueError, &"quicly_send returned {sendResult}")
|
|
|
|
proc initiateQuicConnection(ctx: QuicP2PContext, peerId: string,
|
|
peerIp: IpAddress, peerPort: Port) =
|
|
var conn: ptr quicly_conn_t
|
|
var peerAddr: SockAddr_storage
|
|
var peerSockLen: SockLen
|
|
toSockAddr(peerIp, peerPort, peerAddr, peerSockLen)
|
|
let addressToken = ptls_iovec_init(nil, 0)
|
|
let connectResult = quicly_connect(addr conn, addr ctx.quiclyCtx,
|
|
peerId.cstring, addr peerAddr, nil,
|
|
addr ctx.nextCid, addressToken, nil, nil)
|
|
if connectResult != 0:
|
|
echo "quicly_connect failed: ", connectResult
|
|
return
|
|
var stream: ptr quicly_stream_t
|
|
discard quicly_open_stream(conn, addr stream, 0)
|
|
ctx.addConnection(conn, peerId)
|
|
|
|
proc handleMsg(ctx: QuicP2PContext, msg: string, peerId: string,
|
|
peerAddr: ptr Sockaddr_storage, peerSockLen: SockLen) =
|
|
var offset: csize_t = 0
|
|
while offset < msg.len().csize_t:
|
|
var decoded: quicly_decoded_packet_t
|
|
let decodeResult = quicly_decode_packet(addr ctx.quiclyCtx, addr decoded,
|
|
cast[ptr uint8](msg.cstring),
|
|
msg.len().csize_t, addr offset)
|
|
if decode_result == csize_t.high:
|
|
echo "unable to decode packet"
|
|
return
|
|
var conn: ptr quicly_conn_t = nil
|
|
for c in ctx.connections:
|
|
if quicly_is_destination(c.conn, nil, peerAddr, addr decoded) != 0:
|
|
conn = c.conn
|
|
break
|
|
if conn != nil:
|
|
discard quicly_receive(conn, nil, peerAddr, addr decoded)
|
|
else:
|
|
# The puncher needs to be informed about this message because it may
|
|
# be the peer's response to our respond call.
|
|
ctx.puncher.handleMsg(msg, peerAddr[], peerSockLen)
|
|
if peerId.len == 0:
|
|
let acceptResult = quicly_accept(addr conn, addr ctx.quiclyCtx, nil,
|
|
peerAddr, addr decoded, nil,
|
|
addr ctx.nextCid, nil)
|
|
if acceptResult == 0:
|
|
ctx.addConnection(conn, peerId)
|
|
|
|
proc receive(ctx: QuicP2PContext, peerId: string) {.async.} =
|
|
while true:
|
|
# TODO: replace asyncdispatch.recvFromInto with asyncnet.recvFrom (Nim 1.4 required)
|
|
var msg = newString(BufferSize)
|
|
var peerAddr: Sockaddr_storage
|
|
var peerAddrLen = SockLen(sizeof(peerAddr))
|
|
let msgLen = await recvFromInto(ctx.sock.getFd().AsyncFD, msg.cstring,
|
|
msg.len, cast[ptr SockAddr](addr peerAddr),
|
|
addr peerAddrLen)
|
|
msg.setLen(msgLen)
|
|
if msg.len > 0:
|
|
handleMsg(ctx, msg, peerId, addr peerAddr, peerAddrLen)
|
|
|
|
proc handleNotification(ctx: QuicP2PContext, notification: NotifyPeer)
|
|
{.async.} =
|
|
let attempt = await ctx.puncher.respond(notification.srcIp, notification.srcPort,
|
|
notification.probedsrcPorts)
|
|
discard await attempt.finalize()
|
|
|
|
proc runApp(ctx: QuicP2PContext, srcPort: Port, peerId: string) {.async.} =
|
|
let serverConn = await initServerConnection(rendezvousServers[0].hostname,
|
|
rendezvousServers[0].port,
|
|
srcPort, rendezvousServers)
|
|
asyncCheck handleServerMessages(serverConn)
|
|
asyncCheck receive(ctx, peerId)
|
|
|
|
if peerId.len == 0:
|
|
# We are the responder
|
|
let probedPorts = serverConn.probedSrcPorts.join(",")
|
|
let req = &"{ctx.getPeerId()}|{serverConn.probedIp}|{srcPort}|{probedPorts}"
|
|
discard await serverConn.sendRequest("register", req)
|
|
while true:
|
|
let (hasData, data) = await serverConn.peerNotifications.read()
|
|
if not hasData:
|
|
break
|
|
try:
|
|
let msg = parseMessage[NotifyPeer](data)
|
|
# FIXME: check if we want to receive messages from the sender
|
|
asyncCheck handleNotification(ctx, msg)
|
|
except ValueError as e:
|
|
echo e.msg
|
|
discard
|
|
|
|
else:
|
|
# We are the initiator
|
|
let serverResponse = await serverConn.sendRequest("get-peerinfo", peerId)
|
|
let peerInfo = parseMessage[OkGetPeerInfo](serverResponse)
|
|
let myProbedPorts = serverConn.probedSrcPorts.join(",")
|
|
let peerProbedPorts = peerInfo.probedPorts.join(",")
|
|
let req = &"{ctx.getPeerId()}|{peerId}|{serverConn.probedIp}|{srcPort}|{myProbedPorts}|{peerInfo.ip}|{peerInfo.localPort}|{peerProbedPorts}"
|
|
let attempt = await ctx.puncher.initiate(peerInfo.ip, peerInfo.localPort,
|
|
peerInfo.probedPorts)
|
|
discard await serverConn.sendRequest("notify-peer", req)
|
|
let peerPort = await attempt.finalize()
|
|
initiateQuicConnection(ctx, peerId, peerInfo.ip, peerPort)
|
|
|
|
proc main() =
|
|
var ctx: QuicP2PContext
|
|
let sock = newAsyncSocket(sockType = SOCK_DGRAM, protocol = IPPROTO_UDP,
|
|
buffered = false)
|
|
randomize()
|
|
let srcPort = rand(Port(1024) .. Port.high)
|
|
sock.bindAddr(srcPort)
|
|
let puncher = initPuncher(sock)
|
|
|
|
case paramCount():
|
|
of 0:
|
|
ctx = initContext(sock, puncher, serverCertChainPath, serverKeyPath,
|
|
onServerStreamOpen)
|
|
ctx.tlsCtx.require_client_authentication = 1
|
|
asyncCheck runApp(ctx, srcPort, "")
|
|
|
|
of 1:
|
|
let peerId = paramStr(1)
|
|
ctx = initContext(sock, puncher, clientCertChainPath, clientKeyPath,
|
|
onClientStreamOpen)
|
|
asyncCheck runApp(ctx, srcPort, peerId)
|
|
|
|
else:
|
|
usage()
|
|
quit(1)
|
|
|
|
echo "My peer ID is ", ctx.getPeerId()
|
|
while true:
|
|
let nextTimeout = ctx.getRelativeTimeout()
|
|
poll(nextTimeout)
|
|
ctx.sendPackets()
|
|
|
|
when isMainModule:
|
|
main()
|