quicp2p/server_connection.nim

104 lines
3.3 KiB
Nim

import asyncdispatch, asyncnet, message, net, tables, random, strformat
type
Endpoint* = tuple[hostname: string, port: Port]
ServerConnection* = ref object
sock: AsyncSocket
outMessages: TableRef[string, Future[string]]
peerNotifications*: FutureStream[string]
probedIp*: IpAddress
probedSrcPorts*: seq[Port]
ServerError* = object of ValueError
OkGetPeerinfo* = object
ip*: IpAddress
localPort*: Port
probedPorts*: seq[Port]
OkGetEndpoint* = object
ip*: IpAddress
port*: Port
NotifyPeer* = object
sender*: string
recipient*: string
srcIp*: IpAddress
srcPort*: Port
probedSrcPorts*: seq[Port]
dstIp*: IpAddress
dstPort*: Port
probedDstPorts*: seq[Port]
proc getEndpoint(sock: AsyncSocket, serverHostname: string, serverPort: Port):
Future[OkGetEndpoint] {.async.} =
# TODO: use sock (UDP socket) for probing
let tcpSock = newAsyncSocket()
let (_, srcPort) = sock.getLocalAddr
var failCount = 0
while true:
try:
tcpSock.bindAddr(srcPort)
break
except OSError as e:
if failCount == 3:
raise e
failCount.inc
await sleepAsync(100)
await tcpSock.connect(serverHostname, serverPort)
let id = rand(uint32)
await tcpSock.send(&"get-endpoint|{id}\n")
let line = await tcpSock.recvLine(maxLength = 400)
let args = line.parseArgs(3)
assert(args[0] == "ok")
assert(args[1] == $id)
result = parseMessage[OkGetEndpoint](args[2])
let emptyLine = await tcpSock.recvLine(maxLength = 400)
assert(emptyLine.len == 0)
tcpSock.close()
proc initServerConnection*(sock: AsyncSocket, serverHostname: string,
serverPort: Port, probingServers: seq[Endpoint]):
Future[ServerConnection] {.async.} =
let peerNotifications = newFutureStream[string]("initServerConnection")
result = ServerConnection(outMessages: newTable[string, Future[string]](),
peerNotifications: peerNotifications)
for s in probingServers:
let endpoint = await getEndpoint(sock, s.hostname, s.port)
# FIXME: what if we get get different IPs from different servers
result.probedIp = endpoint.ip
result.probedSrcPorts.add(endpoint.port)
result.sock = await asyncnet.dial(serverHostname,
serverPort)
proc handleServerMessages*(conn: ServerConnection) {.async.} =
while true:
let line = await conn.sock.recvLine(maxLength = 400)
let args = line.parseArgs(3, 1)
case args[0]:
of "ok":
let future = conn.outMessages[args[1]]
conn.outMessages.del(args[1])
future.complete(args[2])
of "error":
let future = conn.outMessages[args[1]]
conn.outMessages.del(args[1])
future.fail(newException(ServerError, args[2]))
of "notify-peer":
asyncCheck conn.peerNotifications.write(line.substr(args[0].len + 1))
else:
raise newException(ValueError, "invalid server message")
proc sendRequest*(connection: ServerConnection, command: string,
content: string): Future[string] =
result = newFuture[string]("sendRequest")
let id = $rand(uint32)
var request: string
if content.len != 0:
request = &"{command}|{id}|{content}\n"
else:
request = &"{command}|{id}\n"
asyncCheck connection.sock.send(request)
connection.outMessages[id] = result