103 lines
3.2 KiB
Nim
103 lines
3.2 KiB
Nim
import asyncdispatch, asyncnet, message, net, tables, random, strformat
|
|
|
|
type
|
|
Endpoint* = tuple[hostname: string, port: Port]
|
|
|
|
ServerConnection* = ref object
|
|
sock: AsyncSocket
|
|
outMessages: TableRef[string, Future[string]]
|
|
peerNotifications*: FutureStream[string]
|
|
probedIp*: IpAddress
|
|
srcPort*: Port
|
|
probedSrcPorts*: seq[Port]
|
|
|
|
ServerError* = object of ValueError
|
|
|
|
OkGetPeerinfo* = object
|
|
ip*: IpAddress
|
|
localPort*: Port
|
|
probedPorts*: seq[Port]
|
|
|
|
OkGetEndpoint* = object
|
|
ip*: IpAddress
|
|
port*: Port
|
|
|
|
NotifyPeer* = object
|
|
sender*: string
|
|
recipient*: string
|
|
srcIp*: IpAddress
|
|
srcPort*: Port
|
|
probedSrcPorts*: seq[Port]
|
|
dstIp*: IpAddress
|
|
dstPort*: Port
|
|
probedDstPorts*: seq[Port]
|
|
|
|
proc getEndpoint(srcPort: Port, serverHostname: string, serverPort: Port):
|
|
Future[OkGetEndpoint] {.async.} =
|
|
let sock = newAsyncSocket()
|
|
var failCount = 0
|
|
while true:
|
|
try:
|
|
sock.bindAddr(srcPort)
|
|
break
|
|
except OSError as e:
|
|
if failCount == 3:
|
|
raise e
|
|
failCount.inc
|
|
await sleepAsync(100)
|
|
await sock.connect(serverHostname, serverPort)
|
|
let id = rand(uint32)
|
|
await sock.send(&"get-endpoint|{id}\n")
|
|
let line = await sock.recvLine(maxLength = 400)
|
|
let args = line.parseArgs(3)
|
|
assert(args[0] == "ok")
|
|
assert(args[1] == $id)
|
|
result = parseMessage[OkGetEndpoint](args[2])
|
|
let emptyLine = await sock.recvLine(maxLength = 400)
|
|
assert(emptyLine.len == 0)
|
|
sock.close()
|
|
|
|
proc initServerConnection*(serverHostname: string, serverPort: Port,
|
|
srcPort: Port, probingServers: seq[Endpoint]):
|
|
Future[ServerConnection] {.async.} =
|
|
result = ServerConnection(outMessages: newTable[string, Future[string]](),
|
|
peerNotifications: newFutureStream[string]("initServerConnection"),
|
|
srcPort: srcPort)
|
|
for s in probingServers:
|
|
let endpoint = await getEndpoint(srcPort, s.hostname, s.port)
|
|
# FIXME: what if we get get different IPs from different servers
|
|
result.probedIp = endpoint.ip
|
|
result.probedSrcPorts.add(endpoint.port)
|
|
result.sock = await asyncnet.dial(serverHostname,
|
|
serverPort)
|
|
|
|
proc handleServerMessages*(conn: ServerConnection) {.async.} =
|
|
while true:
|
|
let line = await conn.sock.recvLine(maxLength = 400)
|
|
let args = line.parseArgs(3, 1)
|
|
case args[0]:
|
|
of "ok":
|
|
let future = conn.outMessages[args[1]]
|
|
conn.outMessages.del(args[1])
|
|
future.complete(args[2])
|
|
of "error":
|
|
let future = conn.outMessages[args[1]]
|
|
conn.outMessages.del(args[1])
|
|
future.fail(newException(ServerError, args[2]))
|
|
of "notify-peer":
|
|
asyncCheck conn.peerNotifications.write(line.substr(args[0].len + 1))
|
|
else:
|
|
raise newException(ValueError, "invalid server message")
|
|
|
|
proc sendRequest*(connection: ServerConnection, command: string,
|
|
content: string): Future[string] =
|
|
result = newFuture[string]("sendRequest")
|
|
let id = $rand(uint32)
|
|
var request: string
|
|
if content.len != 0:
|
|
request = &"{command}|{id}|{content}\n"
|
|
else:
|
|
request = &"{command}|{id}\n"
|
|
asyncCheck connection.sock.send(request)
|
|
connection.outMessages[id] = result
|