echo application in async Nim (untested)
This commit is contained in:
parent
7923bd96ab
commit
44fd55871a
241
quicp2p.nim
241
quicp2p.nim
|
@ -1,50 +1,241 @@
|
||||||
{.passL: "-l crypto -l quicly -l picotls-core -l picotls-openssl".}
|
{.passL: "-l crypto -l quicly -l picotls-core -l picotls-openssl".}
|
||||||
|
|
||||||
|
import asyncdispatch
|
||||||
|
import asyncnet
|
||||||
|
import net
|
||||||
|
import os
|
||||||
import quicly/quicly
|
import quicly/quicly
|
||||||
|
import quicly/cid
|
||||||
|
import quicly/constants
|
||||||
import quicly/defaults
|
import quicly/defaults
|
||||||
|
import quicly/recvstate
|
||||||
|
import quicly/sendstate
|
||||||
|
import quicly/streambuf
|
||||||
import picotls/picotls
|
import picotls/picotls
|
||||||
import picotls/openssl as ptls_openssl
|
import picotls/openssl as ptls_openssl
|
||||||
|
import strformat
|
||||||
|
|
||||||
from openssl import DLLSSLName, EVP_PKEY, EVP_PKEY_free
|
from nativesockets import SockAddr, Sockaddr_storage, SockLen, getHostByName
|
||||||
|
from openssl import DLLSSLName, EVP_PKEY
|
||||||
|
from posix import IOVec
|
||||||
|
from strutils import parseUInt
|
||||||
|
|
||||||
const certChainPath = "./certs/server-certchain.pem"
|
const certChainPath = "./certs/server-certchain.pem"
|
||||||
const keyPath = "./certs/server-cert.key"
|
const keyPath = "./certs/server-cert.key"
|
||||||
|
|
||||||
|
type
|
||||||
|
QuicP2PContext = ref object
|
||||||
|
sock: AsyncSocket
|
||||||
|
streamOpen: quicly_stream_open_t
|
||||||
|
nextCid: quicly_cid_plaintext_t
|
||||||
|
signCertificate: ptls_openssl_sign_certificate_t
|
||||||
|
tlsCtx: ptls_context_t
|
||||||
|
quiclyCtx: quicly_context_t
|
||||||
|
connections: seq[ptr quicly_conn_t]
|
||||||
|
|
||||||
proc PEM_read_PrivateKey(fp: File, x: ptr EVP_PKEY,
|
proc PEM_read_PrivateKey(fp: File, x: ptr EVP_PKEY,
|
||||||
cb: proc(buf: cstring, size: cint, rwflag: cint, u: pointer): cint {.cdecl.},
|
cb: proc(buf: cstring, size: cint, rwflag: cint, u: pointer): cint {.cdecl.},
|
||||||
u: pointer): EVP_PKEY
|
u: pointer): EVP_PKEY
|
||||||
{.cdecl, dynlib: DLLSSLName, importc.}
|
{.importc, dynlib: DLLSSLName, cdecl.}
|
||||||
|
|
||||||
proc onStreamOpen(self: ptr quicly_stream_open_t, stream: ptr quicly_stream_t):
|
proc EVP_PKEY_free(key: EVP_PKEY) {.importc, dynlib: DLLSSLName, cdecl.}
|
||||||
cint {.cdecl.} =
|
|
||||||
echo "onStreamOpen!"
|
|
||||||
|
|
||||||
proc main() =
|
proc getRelativeTimeout(ctx: QuicP2PContext): int =
|
||||||
# callbacks
|
result = int.high
|
||||||
var streamOpen = quicly_stream_open_t(cb: onStreamOpen)
|
var nextTimeout = int64.high
|
||||||
|
var now = ctx.quiclyCtx.now.cb(ctx.quiclyCtx.now)
|
||||||
|
for c in ctx.connections:
|
||||||
|
let connTimeout = quicly_get_first_timeout(c)
|
||||||
|
if connTimeout < nextTimeout:
|
||||||
|
nextTimeout = connTimeout
|
||||||
|
if now < nextTimeout:
|
||||||
|
let delta = nextTimeout - now
|
||||||
|
# convert from microseconds to milliseconds
|
||||||
|
result = int(delta div 1_000)
|
||||||
|
|
||||||
var tlsCtx = ptls_context_t(randomBytes: ptlsOpensslRandomBytes,
|
proc onStopSending(stream: ptr quicly_stream_t, err: cint) {.cdecl.} =
|
||||||
getTime: addr ptlsGetTime,
|
echo "onStopSending"
|
||||||
keyExchanges: ptlsOpensslKeyExchanges,
|
discard quicly_close(stream.conn, 0x30000, "")
|
||||||
cipherSuites: ptlsOpensslCipherSuites)
|
|
||||||
quiclyAmendPtlsContext(addr tlsCtx)
|
proc onReceiveReset(stream: ptr quicly_stream_t, err: cint) {.cdecl.} =
|
||||||
var ctx = quiclySpecContext
|
echo "onReceiveReset"
|
||||||
ctx.tls = addr tlsCtx
|
discard quicly_close(stream.conn, 0x30000, "")
|
||||||
ctx.stream_open = addr streamOpen
|
|
||||||
if ptlsLoadCertificates(addr tlsCtx, certChainPath.cstring) != 0:
|
proc onServerReceive(stream: ptr quicly_stream_t, offset: csize_t, src: pointer,
|
||||||
echo "cannot load certificate chain ", certChainPath
|
len: csize_t) {.cdecl.} =
|
||||||
quit(1)
|
if quicly_streambuf_ingress_receive(stream, offset, src, len) != 0:
|
||||||
|
return
|
||||||
|
let input = quicly_streambuf_ingress_get(stream)
|
||||||
|
if quicly_sendstate_is_open(addr stream.sendstate) != 0 and input.len > 0:
|
||||||
|
discard quicly_streambuf_egress_write(stream, input.base, input.len)
|
||||||
|
if quicly_recvstate_transfer_complete(addr stream.recvstate) != 0:
|
||||||
|
discard quicly_streambuf_egress_shutdown(stream)
|
||||||
|
quicly_streambuf_ingress_shift(stream, input.len)
|
||||||
|
|
||||||
|
proc onClientReceive(stream: ptr quicly_stream_t, offset: csize_t,
|
||||||
|
src: pointer, len: csize_t) {.cdecl.} =
|
||||||
|
if quicly_streambuf_ingress_receive(stream, offset, src, len) != 0:
|
||||||
|
return
|
||||||
|
let input = quicly_streambuf_ingress_get(stream)
|
||||||
|
let msg = newString(input.len)
|
||||||
|
copyMem(msg.cstring, input.base, input.len)
|
||||||
|
echo "received message from server: ", msg
|
||||||
|
if quicly_recvstate_transfer_complete(addr stream.recvstate) != 0:
|
||||||
|
discard quicly_close(stream.conn, 0, "")
|
||||||
|
quicly_streambuf_ingress_shift(stream, input.len)
|
||||||
|
|
||||||
|
var streamCallbacksServer = quicly_stream_callbacks_t(
|
||||||
|
on_destroy: quicly_streambuf_destroy,
|
||||||
|
on_send_shift: quicly_streambuf_egress_shift,
|
||||||
|
on_send_emit: quicly_streambuf_egress_emit,
|
||||||
|
on_send_stop: onStopSending,
|
||||||
|
on_receive: onServerReceive,
|
||||||
|
on_receive_reset: onReceiveReset)
|
||||||
|
|
||||||
|
var streamCallbacksClient = quicly_stream_callbacks_t(
|
||||||
|
on_destroy: quicly_streambuf_destroy,
|
||||||
|
on_send_shift: quicly_streambuf_egress_shift,
|
||||||
|
on_send_emit: quicly_streambuf_egress_emit,
|
||||||
|
on_send_stop: onStopSending,
|
||||||
|
on_receive: onClientReceive,
|
||||||
|
on_receive_reset: onReceiveReset)
|
||||||
|
|
||||||
|
proc usage() =
|
||||||
|
echo &"usage in server mode: {paramStr(0)} LISTEN_PORT"
|
||||||
|
echo &"usage in client mode: {paramStr(0)} SERVER_HOSTNAME SERVER_PORT"
|
||||||
|
|
||||||
|
proc onServerStreamOpen(self: ptr quicly_stream_open_t,
|
||||||
|
stream: ptr quicly_stream_t): cint {.cdecl.} =
|
||||||
|
result = quicly_streambuf_create(stream, sizeof(quicly_streambuf_t).csize_t)
|
||||||
|
stream.callbacks = addr streamCallbacksServer
|
||||||
|
|
||||||
|
proc onClientStreamOpen(self: ptr quicly_stream_open_t,
|
||||||
|
stream: ptr quicly_stream_t): cint {.cdecl.} =
|
||||||
|
result = quicly_streambuf_create(stream, sizeof(quicly_streambuf_t).csize_t)
|
||||||
|
stream.callbacks = addr streamCallbacksClient
|
||||||
|
|
||||||
|
proc handleMsg(ctx: QuicP2PContext, msg: string) {.async.} =
|
||||||
|
var offset: csize_t = 0
|
||||||
|
while offset < msg.len().csize_t:
|
||||||
|
var decoded: quicly_decoded_packet_t
|
||||||
|
let decodeResult = quicly_decode_packet(addr ctx.quiclyCtx, addr decoded,
|
||||||
|
cast[ptr uint8](msg.cstring),
|
||||||
|
msg.len().csize_t, addr offset)
|
||||||
|
if decode_result == csize_t.high:
|
||||||
|
return
|
||||||
|
let (myAddress, myPort) = ctx.sock.getLocalAddr()
|
||||||
|
var sockAddr: Sockaddr_storage
|
||||||
|
var sockLen: SockLen
|
||||||
|
toSockAddr(parseIpAddress(myAddress), myPort, sockAddr, sockLen)
|
||||||
|
var conn: ptr quicly_conn_t = nil
|
||||||
|
for c in ctx.connections:
|
||||||
|
if quicly_is_destination(c, nil, addr sockAddr, addr decoded) != 0:
|
||||||
|
conn = c
|
||||||
|
break
|
||||||
|
if conn != nil:
|
||||||
|
discard quicly_receive(conn, nil, addr sockAddr, addr decoded)
|
||||||
|
else:
|
||||||
|
discard quicly_accept(addr conn, addr ctx.quiclyCtx, nil, addr sockAddr,
|
||||||
|
addr decoded, nil, addr ctx.nextCid, nil)
|
||||||
|
ctx.connections.add(conn)
|
||||||
|
|
||||||
|
proc initContext(sock: AsyncSocket,
|
||||||
|
streamOpenCb: typeof(quicly_stream_open_t.cb)):
|
||||||
|
QuicP2PContext =
|
||||||
|
result.sock = sock
|
||||||
|
result.streamOpen = quicly_stream_open_t(cb: streamOpenCb)
|
||||||
|
result.tlsCtx = ptls_context_t(randomBytes: ptlsOpensslRandomBytes,
|
||||||
|
getTime: addr ptlsGetTime,
|
||||||
|
keyExchanges: ptlsOpensslKeyExchanges,
|
||||||
|
cipherSuites: ptlsOpensslCipherSuites)
|
||||||
|
quicly_amend_ptls_context(addr result.tlsCtx)
|
||||||
|
result.quiclyCtx = quicly_spec_context
|
||||||
|
result.quiclyCtx.tls = addr result.tlsCtx
|
||||||
|
result.quiclyCtx.stream_open = addr result.streamOpen
|
||||||
|
if ptls_load_certificates(addr result.tlsCtx, certChainPath.cstring) != 0:
|
||||||
|
raise newException(ValueError, &"cannot load certificate chain {certChainPath}")
|
||||||
let pKeyFile = open(keyPath)
|
let pKeyFile = open(keyPath)
|
||||||
let privateKey = PEM_read_PrivateKey(pkeyFile, nil, nil, nil)
|
let privateKey = PEM_read_PrivateKey(pkeyFile, nil, nil, nil)
|
||||||
pkeyFile.close()
|
pkeyFile.close()
|
||||||
if privateKey == nil:
|
if privateKey == nil:
|
||||||
echo "cannot load private key ", keyPath
|
raise newException(ValueError, &"cannot load private key {keyPath}")
|
||||||
quit(2)
|
discard ptls_openssl_init_sign_certificate(addr result.signCertificate, privateKey)
|
||||||
var signCertificate: ptls_openssl_sign_certificate_t
|
|
||||||
discard ptls_openssl_init_sign_certificate(addr signCertificate, privateKey)
|
|
||||||
EVP_PKEY_free(privateKey)
|
EVP_PKEY_free(privateKey)
|
||||||
tlsCtx.signCertificate = addr signCertificate.super
|
result.tlsCtx.signCertificate = addr result.signCertificate.super
|
||||||
echo "hello world"
|
|
||||||
|
proc sendPackets(ctx: QuicP2PContext) =
|
||||||
|
for c in ctx.connections:
|
||||||
|
var srcAddr, dstAddr: quicly_address_t
|
||||||
|
var dgrams: array[10, IOVec]
|
||||||
|
var dgramCount = dgrams.len().csize_t
|
||||||
|
var dgramsBuf = newString(dgramCount * ctx.quiclyCtx.transport_params.max_udp_payload_size)
|
||||||
|
let sendResult = quicly_send(c, addr dstAddr, addr srcAddr, addr dgrams[0],
|
||||||
|
addr dgramCount, addr dgramsBuf[0],
|
||||||
|
dgramsBuf.len().csize_t)
|
||||||
|
case sendResult:
|
||||||
|
of 0:
|
||||||
|
for d in dgrams:
|
||||||
|
var sockLen = quicly_get_socklen(addr dstAddr.sa)
|
||||||
|
asyncCheck sendTo(ctx.sock.getFd().AsyncFD, d.iov_base, d.iov_len.int,
|
||||||
|
addr dstAddr.sa, sockLen)
|
||||||
|
of QUICLY_ERROR_FREE_CONNECTION:
|
||||||
|
ctx.connections.del(ctx.connections.find(c))
|
||||||
|
quicly_free(c)
|
||||||
|
else:
|
||||||
|
raise newException(ValueError, &"quicly_send returned {sendResult}")
|
||||||
|
|
||||||
|
proc handleClients(ctx: QuicP2PContext, sock: AsyncSocket) {.async.} =
|
||||||
|
while true:
|
||||||
|
let msg = await sock.recv(BufferSize)
|
||||||
|
asyncCheck handleMsg(ctx, msg)
|
||||||
|
|
||||||
|
proc main() =
|
||||||
|
var ctx: QuicP2PContext
|
||||||
|
let sock = newAsyncSocket(sockType = SOCK_DGRAM, protocol = IPPROTO_UDP)
|
||||||
|
case paramCount():
|
||||||
|
of 1:
|
||||||
|
let portNumber = paramStr(1).parseUInt()
|
||||||
|
if portNumber > uint16.high:
|
||||||
|
usage()
|
||||||
|
quit(1)
|
||||||
|
ctx = initContext(sock, onServerStreamOpen)
|
||||||
|
sock.bindAddr(Port(portNumber))
|
||||||
|
asyncCheck handleClients(ctx, sock)
|
||||||
|
|
||||||
|
of 2:
|
||||||
|
let hostname = paramStr(1)
|
||||||
|
let portNumber = paramStr(2).parseUInt()
|
||||||
|
if portNumber > uint16.high:
|
||||||
|
usage()
|
||||||
|
quit(1)
|
||||||
|
ctx = initContext(sock, onClientStreamOpen)
|
||||||
|
var conn: ptr quicly_conn_t
|
||||||
|
let hostent = getHostByName(hostname)
|
||||||
|
if hostent.addrList.len == 0:
|
||||||
|
echo "cannot resolve hostname ", hostname
|
||||||
|
quit(2)
|
||||||
|
var destAddr: Sockaddr_storage
|
||||||
|
var sockLen: SockLen
|
||||||
|
toSockAddr(parseIpAddress(hostent.addrList[0]), Port(portNumber), destAddr,
|
||||||
|
sockLen)
|
||||||
|
let addressToken = ptls_iovec_init(nil, 0)
|
||||||
|
let connectResult = quicly_connect(addr conn, addr ctx.quiclyCtx,
|
||||||
|
hostname.cstring, addr destAddr, nil,
|
||||||
|
addr ctx.nextCid, addressToken, nil, nil)
|
||||||
|
if connectResult != 0:
|
||||||
|
echo "quicly_connect failed: ", connectResult
|
||||||
|
quit(3)
|
||||||
|
ctx.connections.add(conn)
|
||||||
|
var stream: ptr quicly_stream_t
|
||||||
|
discard quicly_open_stream(conn, addr stream, 0)
|
||||||
|
|
||||||
|
else:
|
||||||
|
usage()
|
||||||
|
quit(1)
|
||||||
|
|
||||||
|
while true:
|
||||||
|
poll(ctx.getRelativeTimeout())
|
||||||
|
ctx.sendPackets()
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
main()
|
main()
|
||||||
|
|
Loading…
Reference in New Issue