quicp2p/quicp2p.nim

340 lines
13 KiB
Nim

{.passL: "-l crypto -l quicly -l picotls-core -l picotls-openssl".}
import asyncdispatch
import asyncnet
import certificate
import net
import os
import openssl_additional
import picotls/picotls
import picotls/openssl as ptls_openssl
import quicly/quicly
import quicly/cid
import quicly/constants
import quicly/defaults
import quicly/recvstate
import quicly/sendstate
import quicly/streambuf
import strformat
import strutils
from nativesockets import SockAddr, Sockaddr_storage, SockLen, getHostByName
from posix import IOVec
from strutils import parseUInt
from openssl import
DLLSSLName,
EVP_PKEY,
SslPtr,
PX509,
PX509_STORE,
X509_STORE_new,
X509_STORE_free,
X509_STORE_add_cert,
PSTACK,
d2i_X509
const serverCertChainPath = "./certs/server-certchain.pem"
const serverKeyPath = "./certs/server-cert.key"
const clientCertChainPath = "./certs/client-certchain.pem"
const clientKeyPath = "./certs/client-cert.key"
type
Connection = ref object
conn: ptr quicly_conn_t
certs: seq[Certificate]
QuicP2PContext = ref object
sock: AsyncSocket
streamOpen: quicly_stream_open_t
nextCid: quicly_cid_plaintext_t
signCertCb: ptls_openssl_sign_certificate_t
verifyCertsCb: ptls_verify_certificate_t
tlsCtx: ptls_context_t
quiclyCtx: quicly_context_t
connections: seq[Connection]
proc getRelativeTimeout(ctx: QuicP2PContext): int32 =
## Obtain the absolute int64 timeout from quicly and convert it to the
## relative int32 timeout expected by poll.
result = 0
var nextTimeout = int64.high
var now = ctx.quiclyCtx.now.cb(ctx.quiclyCtx.now)
for c in ctx.connections:
let connTimeout = quicly_get_first_timeout(c.conn)
if connTimeout < nextTimeout:
nextTimeout = connTimeout
if now < nextTimeout:
let delta = nextTimeout - now
result = min(delta, int32.high).int32
proc onStopSending(stream: ptr quicly_stream_t, err: cint) {.cdecl.} =
echo "onStopSending"
discard quicly_close(stream.conn, 0x30000, "")
proc onReceiveReset(stream: ptr quicly_stream_t, err: cint) {.cdecl.} =
echo "onReceiveReset"
discard quicly_close(stream.conn, 0x30000, "")
proc onServerReceive(stream: ptr quicly_stream_t, offset: csize_t, src: pointer,
len: csize_t) {.cdecl.} =
if quicly_streambuf_ingress_receive(stream, offset, src, len) != 0:
return
let input = quicly_streambuf_ingress_get(stream)
var msg = newString(input.len)
copyMem(addr msg[0], input.base, input.len)
let conn = cast[Connection](quicly_get_data(stream.conn)[])
echo &"client {conn.certs[1].getPublicKey().toHex()} sends \"{msg}\""
if quicly_sendstate_is_open(addr stream.sendstate) != 0 and input.len > 0:
discard quicly_streambuf_egress_write(stream, input.base, input.len)
if quicly_recvstate_transfer_complete(addr stream.recvstate) != 0:
discard quicly_streambuf_egress_shutdown(stream)
quicly_streambuf_ingress_shift(stream, input.len)
proc onClientReceive(stream: ptr quicly_stream_t, offset: csize_t,
src: pointer, len: csize_t) {.cdecl.} =
if quicly_streambuf_ingress_receive(stream, offset, src, len) != 0:
return
let input = quicly_streambuf_ingress_get(stream)
let msg = newString(input.len)
copyMem(msg.cstring, input.base, input.len)
let conn = cast[Connection](quicly_get_data(stream.conn)[])
echo &"server {conn.certs[1].getPublicKey().toHex()} sends \"{msg}\""
if quicly_recvstate_transfer_complete(addr stream.recvstate) != 0:
discard quicly_close(stream.conn, 0, "")
quicly_streambuf_ingress_shift(stream, input.len)
var streamCallbacksServer = quicly_stream_callbacks_t(
on_destroy: quicly_streambuf_destroy,
on_send_shift: quicly_streambuf_egress_shift,
on_send_emit: quicly_streambuf_egress_emit,
on_send_stop: onStopSending,
on_receive: onServerReceive,
on_receive_reset: onReceiveReset)
var streamCallbacksClient = quicly_stream_callbacks_t(
on_destroy: quicly_streambuf_destroy,
on_send_shift: quicly_streambuf_egress_shift,
on_send_emit: quicly_streambuf_egress_emit,
on_send_stop: onStopSending,
on_receive: onClientReceive,
on_receive_reset: onReceiveReset)
proc usage() =
echo &"usage in server mode: {paramStr(0)} LISTEN_PORT"
echo &"usage in client mode: {paramStr(0)} SERVER_HOSTNAME SERVER_PORT"
proc onServerStreamOpen(self: ptr quicly_stream_open_t,
stream: ptr quicly_stream_t): cint {.cdecl.} =
result = quicly_streambuf_create(stream, sizeof(quicly_streambuf_t).csize_t)
stream.callbacks = addr streamCallbacksServer
proc onClientStreamOpen(self: ptr quicly_stream_open_t,
stream: ptr quicly_stream_t): cint {.cdecl.} =
result = quicly_streambuf_create(stream, sizeof(quicly_streambuf_t).csize_t)
stream.callbacks = addr streamCallbacksClient
let msg = "hello server"
discard quicly_streambuf_egress_write(stream, msg.cstring, msg.len().csize_t)
proc verifyCACertSignature(cert: PX509): bool =
let ctx = X509_STORE_CTX_new()
let store = X509_STORE_new()
discard X509_STORE_add_cert(store, cert)
discard X509_STORE_CTX_init(ctx, store, cert, nil)
let verifyParams = X509_STORE_CTX_get0_param(ctx)
let flags = X509_VERIFY_PARAM_get_flags(verifyParams)
discard X509_VERIFY_PARAM_set_flags(verifyParams, flags or X509_V_FLAG_CHECK_SS_SIGNATURE)
let verifyResult = X509_verify_cert(ctx)
result = verifyResult == 1
X509_STORE_CTX_free(ctx)
X509_STORE_free(store)
proc verifyCerts(self: ptr ptls_verify_certificate_t, tls: ptr ptls_t,
verify_sign: ptr VerifySignCb, verify_data: ptr pointer,
certs: ptr ptls_iovec_t, num_certs: csize_t): cint {.cdecl.} =
if num_certs != 2:
return PTLS_ALERT_UNKNOWN_CA
# parse the highest certificate and use it as CA certificate
var iovec = cast[ptr ptls_iovec_t](cast[ByteAddress](certs) + sizeof(ptls_iovec_t))
var iovecBase = iovec.base
let caCert = d2i_X509(nil, cast[ptr ptr cuchar](addr iovecBase),
iovec.len.cint)
if caCert.isNil:
return PTLS_ALERT_BAD_CERTIFICATE
if not verifyCACertSignature(caCert):
return PTLS_ALERT_BAD_CERTIFICATE
let store = X509_STORE_new()
discard X509_STORE_add_cert(store, caCert)
var opensslVerifier: ptls_openssl_verify_certificate_t
discard ptls_openssl_init_verify_certificate(addr opensslVerifier, store)
result = opensslVerifier.super.cb(addr opensslVerifier.super, tls,
verify_sign, verify_data, certs, num_certs)
if result == 0:
let quiclyConn = cast[ptr quicly_conn_t](ptls_get_data_ptr(tls)[])
let conn = cast[Connection](quicly_get_data(quiclyConn)[])
for i in 0 .. num_certs - 1:
let iovec = cast[ptr ptls_iovec_t](cast[ByteAddress](certs) + i.int * sizeof(ptls_iovec_t))
var cert = newString(iovec.len)
copyMem(cert.cstring, iovec.base, iovec.len)
conn.certs.add(cert)
ptls_openssl_dispose_verify_certificate(addr opensslVerifier)
X509_STORE_free(store)
X509_free(caCert)
proc initContext(sock: AsyncSocket, certChainPath: string, keyPath: string,
streamOpenCb: typeof(quicly_stream_open_t.cb)):
QuicP2PContext =
var tlsCtx = ptls_context_t(randomBytes: ptls_openssl_random_bytes,
getTime: addr ptls_get_time,
keyExchanges: ptls_openssl_key_exchanges,
cipherSuites: ptls_openssl_cipher_suites)
quicly_amend_ptls_context(addr tlsCtx)
result = QuicP2PContext(sock: sock,
streamOpen: quicly_stream_open_t(cb: streamOpenCb),
verifyCertsCb: ptls_verify_certificate_t(cb: verifyCerts),
tlsCtx: tlsCtx, quiclyCtx: quicly_spec_context)
result.quiclyCtx.tls = addr result.tlsCtx
result.quiclyCtx.stream_open = addr result.streamOpen
result.tlsCtx.verify_certificate = addr result.verifyCertsCb
if ptls_load_certificates(addr result.tlsCtx, certChainPath.cstring) != 0:
raise newException(ValueError, &"cannot load certificate chain {certChainPath}")
let pKeyFile = open(keyPath)
let privateKey = PEM_read_PrivateKey(pkeyFile, nil, nil, nil)
pkeyFile.close()
if privateKey == nil:
raise newException(ValueError, &"cannot load private key {keyPath}")
discard ptls_openssl_init_sign_certificate(addr result.signCertCb, privateKey)
EVP_PKEY_free(privateKey)
result.tlsCtx.sign_certificate = addr result.signCertCb.super
proc addConnection(ctx: QuicP2PContext, connPtr: ptr quicly_conn_t) =
assert(not connPtr.isNil)
let data = quicly_get_data(connPtr)
var conn = Connection(conn: connPtr)
data[] = addr conn[]
ctx.connections.add(conn)
proc delConnection(ctx: QuicP2PContext, index: int) =
assert(index >= 0 and index < ctx.connections.len)
let c = ctx.connections[index]
ctx.connections.del(index)
quicly_free(c.conn)
proc sendPackets(ctx: QuicP2PContext) =
if ctx.connections.len == 0:
return
let conns = ctx.connections.deepCopy()
for i in 0 .. conns.len - 1:
var srcAddr, dstAddr: quicly_address_t
var dgrams: array[10, IOVec]
var dgramCount = dgrams.len().csize_t
var dgramsBuf = newString(dgramCount * ctx.quiclyCtx.transport_params.max_udp_payload_size)
let sendResult = quicly_send(conns[i].conn, addr dstAddr, addr srcAddr,
addr dgrams[0], addr dgramCount,
addr dgramsBuf[0], dgramsBuf.len().csize_t)
case sendResult:
of 0:
if dgramCount > 0:
for j in 0 .. dgramCount - 1:
var sockLen = quicly_get_socklen(addr dstAddr.sa)
# TODO: replace asyncdispatch.sendTo with asyncnet.sendTo (Nim 1.4 required)
asyncCheck sendTo(ctx.sock.getFd().AsyncFD, dgrams[j].iov_base,
dgrams[j].iov_len.int, addr dstAddr.sa, sockLen)
of QUICLY_ERROR_FREE_CONNECTION:
echo "deleting connection"
ctx.delConnection(i)
else:
raise newException(ValueError, &"quicly_send returned {sendResult}")
proc handleMsg(ctx: QuicP2PContext, msg: string, peerAddr: ptr SockAddr,
isServer: bool) =
var offset: csize_t = 0
while offset < msg.len().csize_t:
var decoded: quicly_decoded_packet_t
let decodeResult = quicly_decode_packet(addr ctx.quiclyCtx, addr decoded,
cast[ptr uint8](msg.cstring),
msg.len().csize_t, addr offset)
if decode_result == csize_t.high:
return
var conn: ptr quicly_conn_t = nil
for c in ctx.connections:
if quicly_is_destination(c.conn, nil, peerAddr, addr decoded) != 0:
conn = c.conn
break
if conn != nil:
discard quicly_receive(conn, nil, peerAddr, addr decoded)
elif isServer:
discard quicly_accept(addr conn, addr ctx.quiclyCtx, nil, peerAddr,
addr decoded, nil, addr ctx.nextCid, nil)
ctx.addConnection(conn)
proc receive(ctx: QuicP2PContext, isServer: bool) {.async.} =
while true:
# TODO: replace asyncdispatch.recvFromInto with asyncnet.recvFrom (Nim 1.4 required)
var msg = newString(BufferSize)
var peerAddr: Sockaddr_storage
var peerAddrLen = SockLen(sizeof(peerAddr))
let msgLen = await recvFromInto(ctx.sock.getFd().AsyncFD, msg.cstring,
msg.len, cast[ptr SockAddr](addr peerAddr),
addr peerAddrLen)
msg.setLen(msgLen)
if msg.len > 0:
handleMsg(ctx, msg, cast[ptr SockAddr](addr peerAddr), isServer)
proc main() =
var ctx: QuicP2PContext
let sock = newAsyncSocket(sockType = SOCK_DGRAM, protocol = IPPROTO_UDP,
buffered = false)
case paramCount():
of 1:
let portNumber = paramStr(1).parseUInt()
if portNumber > uint16.high:
usage()
quit(1)
sock.bindAddr(Port(portNumber))
ctx = initContext(sock, serverCertChainPath, serverKeyPath,
onServerStreamOpen)
ctx.tlsCtx.require_client_authentication = 1
asyncCheck receive(ctx, true)
of 2:
let hostname = paramStr(1)
let portNumber = paramStr(2).parseUInt()
if portNumber > uint16.high:
usage()
quit(1)
ctx = initContext(sock, clientCertChainPath, clientKeyPath,
onClientStreamOpen)
var conn: ptr quicly_conn_t
let hostent = getHostByName(hostname)
if hostent.addrList.len == 0:
echo "cannot resolve hostname ", hostname
quit(2)
var destAddr: Sockaddr_storage
var sockLen: SockLen
toSockAddr(parseIpAddress(hostent.addrList[0]), Port(portNumber), destAddr,
sockLen)
let addressToken = ptls_iovec_init(nil, 0)
let connectResult = quicly_connect(addr conn, addr ctx.quiclyCtx,
hostname.cstring, addr destAddr, nil,
addr ctx.nextCid, addressToken, nil, nil)
if connectResult != 0:
echo "quicly_connect failed: ", connectResult
quit(3)
var stream: ptr quicly_stream_t
discard quicly_open_stream(conn, addr stream, 0)
ctx.addConnection(conn)
asyncCheck receive(ctx, false)
else:
usage()
quit(1)
while true:
let nextTimeout = ctx.getRelativeTimeout()
poll(nextTimeout)
ctx.sendPackets()
when isMainModule:
main()