{.passL: "-l crypto -l quicly -l picotls-core -l picotls-openssl".} import asyncdispatch import asyncnet import base32 import certificate import message import net import os import openssl_additional import picotls/picotls import picotls/openssl as ptls_openssl import puncher import quicly/quicly import quicly/cid import quicly/constants import quicly/defaults import quicly/recvstate import quicly/sendstate import quicly/streambuf import server_connection import strformat import strutils from nativesockets import SockAddr, Sockaddr_storage, SockLen, getHostByName from posix import IOVec from sequtils import filter from strutils import parseUInt from openssl import DLLSSLName, EVP_PKEY, SslPtr, PX509, PX509_STORE, X509_STORE_new, X509_STORE_free, X509_STORE_add_cert, PSTACK, d2i_X509 type Connection = ref object conn: ptr quicly_conn_t sock: AsyncSocket certs: seq[Certificate] expectedPeerId: string QuicP2PContext = ref object streamOpen: quicly_stream_open_t nextCid: quicly_cid_plaintext_t signCertCb: ptls_openssl_sign_certificate_t verifyCertsCb: ptls_verify_certificate_t tlsCtx: ptls_context_t quiclyCtx: quicly_context_t connections: seq[Connection] const serverCertChainPath = "./certs/server-certchain.pem" const serverKeyPath = "./certs/server-cert.key" const clientCertChainPath = "./certs/client-certchain.pem" const clientKeyPath = "./certs/client-cert.key" const rendezvousServers: seq[tuple[hostname: string, port: Port]] = @[ ("strangeplace.net", Port(5320)), ("ulrich.earth", Port(5320)) ] proc relativeTimeout(ctx: QuicP2PContext): int32 = ## Obtain the absolute int64 timeout from quicly and convert it to the ## relative int32 timeout expected by poll. result = 0 var nextTimeout = int64.high var now = ctx.quiclyCtx.now.cb(ctx.quiclyCtx.now) for c in ctx.connections: let connTimeout = quicly_get_first_timeout(c.conn) if connTimeout < nextTimeout: nextTimeout = connTimeout if now < nextTimeout: let delta = nextTimeout - now result = min(delta, int32.high).int32 proc peerId(ctx: QuicP2PContext): string = assert(ctx.tlsCtx.certificates.count == 2) let firstCertAddr = cast[ByteAddress](ctx.tlsCtx.certificates.list) let secondCertIovec = cast[ptr ptls_iovec_t](firstCertAddr + sizeof(ptls_iovec_t)) var caCert = newString(secondCertIovec.len) copyMem(caCert.cstring, secondCertIovec.base, secondCertIovec.len) result = caCert.getPublicKey().encode(pad = false) proc peerId(conn: Connection): string = assert(conn.certs.len() == 2) result = conn.certs[1].getPublicKey().encode(pad = false) proc onStopSending(stream: ptr quicly_stream_t, err: cint) {.cdecl.} = echo "onStopSending" discard quicly_close(stream.conn, 0x30000, "") proc onReceiveReset(stream: ptr quicly_stream_t, err: cint) {.cdecl.} = echo "onReceiveReset" discard quicly_close(stream.conn, 0x30000, "") proc onServerReceive(stream: ptr quicly_stream_t, offset: csize_t, src: pointer, len: csize_t) {.cdecl.} = if quicly_streambuf_ingress_receive(stream, offset, src, len) != 0: return let input = quicly_streambuf_ingress_get(stream) var msg = newString(input.len) copyMem(addr msg[0], input.base, input.len) let conn = cast[Connection](quicly_get_data(stream.conn)[]) echo &"client {conn.peerId} sends \"{msg}\"" if quicly_sendstate_is_open(addr stream.sendstate) != 0 and input.len > 0: discard quicly_streambuf_egress_write(stream, input.base, input.len) if quicly_recvstate_transfer_complete(addr stream.recvstate) != 0: discard quicly_streambuf_egress_shutdown(stream) quicly_streambuf_ingress_shift(stream, input.len) proc onClientReceive(stream: ptr quicly_stream_t, offset: csize_t, src: pointer, len: csize_t) {.cdecl.} = if quicly_streambuf_ingress_receive(stream, offset, src, len) != 0: return let input = quicly_streambuf_ingress_get(stream) let msg = newString(input.len) copyMem(msg.cstring, input.base, input.len) let conn = cast[Connection](quicly_get_data(stream.conn)[]) echo &"server {conn.peerId} replies \"{msg}\"" if quicly_recvstate_transfer_complete(addr stream.recvstate) != 0: discard quicly_close(stream.conn, 0, "") quicly_streambuf_ingress_shift(stream, input.len) var streamCallbacksServer = quicly_stream_callbacks_t( on_destroy: quicly_streambuf_destroy, on_send_shift: quicly_streambuf_egress_shift, on_send_emit: quicly_streambuf_egress_emit, on_send_stop: onStopSending, on_receive: onServerReceive, on_receive_reset: onReceiveReset) var streamCallbacksClient = quicly_stream_callbacks_t( on_destroy: quicly_streambuf_destroy, on_send_shift: quicly_streambuf_egress_shift, on_send_emit: quicly_streambuf_egress_emit, on_send_stop: onStopSending, on_receive: onClientReceive, on_receive_reset: onReceiveReset) proc usage() = echo &"usage in server mode: {paramStr(0)} LISTEN_PORT" echo &"usage in client mode: {paramStr(0)} SERVER_HOSTNAME SERVER_PORT" proc onServerStreamOpen(self: ptr quicly_stream_open_t, stream: ptr quicly_stream_t): cint {.cdecl.} = result = quicly_streambuf_create(stream, sizeof(quicly_streambuf_t).csize_t) stream.callbacks = addr streamCallbacksServer proc onClientStreamOpen(self: ptr quicly_stream_open_t, stream: ptr quicly_stream_t): cint {.cdecl.} = result = quicly_streambuf_create(stream, sizeof(quicly_streambuf_t).csize_t) stream.callbacks = addr streamCallbacksClient let msg = "hello server" discard quicly_streambuf_egress_write(stream, msg.cstring, msg.len().csize_t) proc verifyCACertSignature(cert: PX509): bool = let ctx = X509_STORE_CTX_new() let store = X509_STORE_new() discard X509_STORE_add_cert(store, cert) discard X509_STORE_CTX_init(ctx, store, cert, nil) let verifyParams = X509_STORE_CTX_get0_param(ctx) let flags = X509_VERIFY_PARAM_get_flags(verifyParams) discard X509_VERIFY_PARAM_set_flags(verifyParams, flags or X509_V_FLAG_CHECK_SS_SIGNATURE) let verifyResult = X509_verify_cert(ctx) result = verifyResult == 1 X509_STORE_CTX_free(ctx) X509_STORE_free(store) proc verifyCerts(self: ptr ptls_verify_certificate_t, tls: ptr ptls_t, verify_sign: ptr VerifySignCb, verify_data: ptr pointer, certs: ptr ptls_iovec_t, num_certs: csize_t): cint {.cdecl.} = if num_certs != 2: return PTLS_ALERT_UNKNOWN_CA # parse the highest certificate and use it as CA certificate var iovec = cast[ptr ptls_iovec_t](cast[ByteAddress](certs) + sizeof(ptls_iovec_t)) var iovecBase = iovec.base let caCert = d2i_X509(nil, cast[ptr ptr cuchar](addr iovecBase), iovec.len.cint) if caCert.isNil: return PTLS_ALERT_BAD_CERTIFICATE if not verifyCACertSignature(caCert): return PTLS_ALERT_BAD_CERTIFICATE let store = X509_STORE_new() discard X509_STORE_add_cert(store, caCert) # empty server name makes picotls skip server name verification #FIXME: should we use the peer ID as server name? discard ptls_set_server_name(tls, nil, 0) var opensslVerifier: ptls_openssl_verify_certificate_t discard ptls_openssl_init_verify_certificate(addr opensslVerifier, store) result = opensslVerifier.super.cb(addr opensslVerifier.super, tls, verify_sign, verify_data, certs, num_certs) if result == 0: let quiclyConn = cast[ptr quicly_conn_t](ptls_get_data_ptr(tls)[]) let conn = cast[Connection](quicly_get_data(quiclyConn)[]) for i in 0 .. num_certs - 1: let iovec = cast[ptr ptls_iovec_t](cast[ByteAddress](certs) + i.int * sizeof(ptls_iovec_t)) var cert = newString(iovec.len) copyMem(cert.cstring, iovec.base, iovec.len) conn.certs.add(cert) ptls_openssl_dispose_verify_certificate(addr opensslVerifier) X509_STORE_free(store) X509_free(caCert) proc initContext(certChainPath: string, keyPath: string, streamOpenCb: typeof(quicly_stream_open_t.cb)): QuicP2PContext = var tlsCtx = ptls_context_t(randomBytes: ptls_openssl_random_bytes, getTime: addr ptls_get_time, keyExchanges: ptls_openssl_key_exchanges, cipherSuites: ptls_openssl_cipher_suites) quicly_amend_ptls_context(addr tlsCtx) result = QuicP2PContext(streamOpen: quicly_stream_open_t(cb: streamOpenCb), verifyCertsCb: ptls_verify_certificate_t(cb: verifyCerts), tlsCtx: tlsCtx, quiclyCtx: quicly_spec_context) result.quiclyCtx.tls = addr result.tlsCtx result.quiclyCtx.stream_open = addr result.streamOpen result.tlsCtx.verify_certificate = addr result.verifyCertsCb if ptls_load_certificates(addr result.tlsCtx, certChainPath.cstring) != 0: raise newException(ValueError, &"cannot load certificate chain {certChainPath}") let pKeyFile = open(keyPath) let privateKey = PEM_read_PrivateKey(pkeyFile, nil, nil, nil) pkeyFile.close() if privateKey == nil: raise newException(ValueError, &"cannot load private key {keyPath}") discard ptls_openssl_init_sign_certificate(addr result.signCertCb, privateKey) EVP_PKEY_free(privateKey) result.tlsCtx.sign_certificate = addr result.signCertCb.super proc addConnection(ctx: QuicP2PContext, connPtr: ptr quicly_conn_t, sock: AsyncSocket, expectedPeerId: string) = assert(not connPtr.isNil) let data = quicly_get_data(connPtr) var conn = Connection(conn: connPtr, sock: sock, expectedPeerId: expectedPeerId) data[] = addr conn[] ctx.connections.add(conn) proc delConnection(ctx: QuicP2PContext, index: int) = assert(index >= 0 and index < ctx.connections.len) let c = ctx.connections[index] ctx.connections.del(index) quicly_free(c.conn) proc sendPackets(ctx: QuicP2PContext) = if ctx.connections.len == 0: return let conns = ctx.connections.deepCopy() for i in 0 .. conns.len - 1: var srcAddr, dstAddr: quicly_address_t var dgrams: array[10, IOVec] var dgramCount = dgrams.len().csize_t var dgramsBuf = newString(dgramCount * ctx.quiclyCtx.transport_params.max_udp_payload_size) let sendResult = quicly_send(conns[i].conn, addr dstAddr, addr srcAddr, addr dgrams[0], addr dgramCount, addr dgramsBuf[0], dgramsBuf.len().csize_t) case sendResult: of 0: if dgramCount > 0: for j in 0 .. dgramCount - 1: var sockLen = quicly_get_socklen(addr dstAddr.sa) # TODO: replace asyncdispatch.sendTo with asyncnet.sendTo (Nim 1.4 required) asyncCheck sendTo(conns[i].sock.getFd().AsyncFD, dgrams[j].iov_base, dgrams[j].iov_len.int, addr dstAddr.sa, sockLen) of QUICLY_ERROR_FREE_CONNECTION: echo "deleting connection" ctx.delConnection(i) else: raise newException(ValueError, &"quicly_send returned {sendResult}") proc initiateQuicConnection(ctx: QuicP2PContext, peerId: string, sock: AsyncSocket, peerIp: IpAddress, peerPort: Port) = var conn: ptr quicly_conn_t var peerAddr: SockAddr_storage var peerSockLen: SockLen toSockAddr(peerIp, peerPort, peerAddr, peerSockLen) let addressToken = ptls_iovec_init(nil, 0) let connectResult = quicly_connect(addr conn, addr ctx.quiclyCtx, peerId.cstring, addr peerAddr, nil, addr ctx.nextCid, addressToken, nil, nil) if connectResult != 0: echo "quicly_connect failed: ", connectResult return var stream: ptr quicly_stream_t discard quicly_open_stream(conn, addr stream, 0) ctx.addConnection(conn, sock, peerId) proc handleMsg(ctx: QuicP2PContext, msg: string, peerId: string, puncher: Puncher, sock: AsyncSocket, peerAddr: ptr Sockaddr_storage, peerSockLen: SockLen) = var offset: csize_t = 0 while offset < msg.len().csize_t: var decoded: quicly_decoded_packet_t let decodeResult = quicly_decode_packet(addr ctx.quiclyCtx, addr decoded, cast[ptr uint8](msg.cstring), msg.len().csize_t, addr offset) if decode_result == csize_t.high: echo "unable to decode packet" return var conn: ptr quicly_conn_t = nil for c in ctx.connections.filter(proc(c: Connection): bool = c.sock == sock): if quicly_is_destination(c.conn, nil, peerAddr, addr decoded) != 0: conn = c.conn break if conn != nil: discard quicly_receive(conn, nil, peerAddr, addr decoded) else: # The puncher needs to be informed about this message because it may # be the peer's response to our respond call. puncher.handleMsg(msg, sock, peerAddr[], peerSockLen) if peerId.len == 0: let acceptResult = quicly_accept(addr conn, addr ctx.quiclyCtx, nil, peerAddr, addr decoded, nil, addr ctx.nextCid, nil) if acceptResult == 0: ctx.addConnection(conn, sock, peerId) proc receive(ctx: QuicP2PContext, puncher: Puncher, sock: AsyncSocket, peerId: string) {.async.} = while true: # TODO: replace asyncdispatch.recvFromInto with asyncnet.recvFrom (Nim 1.4 required) var msg = newString(BufferSize) var peerAddr: Sockaddr_storage var peerAddrLen = SockLen(sizeof(peerAddr)) let msgLen = await recvFromInto(sock.getFd().AsyncFD, msg.cstring, msg.len, cast[ptr SockAddr](addr peerAddr), addr peerAddrLen) msg.setLen(msgLen) if msg.len > 0: handleMsg(ctx, msg, peerId, puncher, sock, addr peerAddr, peerAddrLen) proc handleNotification(puncher: Puncher, notification: NotifyPeer) {.async.} = let attempt = await puncher.respond(notification.srcIp, notification.srcPort, notification.probedsrcPorts) discard await attempt.finalize() proc runApp(ctx: QuicP2PContext, peerId: string) {.async.} = let primarySock = newAsyncSocket(sockType = SOCK_DGRAM, protocol = IPPROTO_UDP, buffered = false) primarySock.bindAddr(Port(0)) let serverConn = await initServerConnection(primarySock, rendezvousServers[0].hostname, rendezvousServers[0].port, rendezvousServers) let puncher = initPuncher(primarySock, serverConn.probedSrcPorts) asyncCheck handleServerMessages(serverConn) for sock in puncher.socks: asyncCheck receive(ctx, puncher, sock, peerId) if peerId.len == 0: # We are the responder let probedPorts = serverConn.probedSrcPorts.join(",") let req = &"{ctx.peerId}|{serverConn.probedIp}|{puncher.primarySrcPort}|{probedPorts}" discard await serverConn.sendRequest("register", req) while true: let (hasData, data) = await serverConn.peerNotifications.read() if not hasData: break try: let msg = parseMessage[NotifyPeer](data) # FIXME: check if we want to receive messages from the sender asyncCheck handleNotification(puncher, msg) except ValueError as e: echo e.msg discard else: # We are the initiator let serverResponse = await serverConn.sendRequest("get-peerinfo", peerId) let peerInfo = parseMessage[OkGetPeerInfo](serverResponse) let myProbedPorts = serverConn.probedSrcPorts.join(",") let peerProbedPorts = peerInfo.probedPorts.join(",") let req = &"{ctx.peerId}|{peerId}|{serverConn.probedIp}|{puncher.primarySrcPort}|{myProbedPorts}|{peerInfo.ip}|{peerInfo.localPort}|{peerProbedPorts}" let attempt = await puncher.initiate(peerInfo.ip, peerInfo.localPort, peerInfo.probedPorts) discard await serverConn.sendRequest("notify-peer", req) let (sock, peerPort) = await attempt.finalize() initiateQuicConnection(ctx, peerId, sock, peerInfo.ip, peerPort) proc main() = var ctx: QuicP2PContext case paramCount(): of 0: ctx = initContext(serverCertChainPath, serverKeyPath, onServerStreamOpen) ctx.tlsCtx.require_client_authentication = 1 asyncCheck runApp(ctx, "") of 1: let peerId = paramStr(1) ctx = initContext(clientCertChainPath, clientKeyPath, onClientStreamOpen) asyncCheck runApp(ctx, peerId) else: usage() quit(1) echo "My peer ID is ", ctx.peerId while true: let nextTimeout = ctx.relativeTimeout poll(nextTimeout) ctx.sendPackets() when isMainModule: main()