introduce get-endpoint: server sends endpoint info and closes connection (allows subsequent connections to the same server)

This commit is contained in:
Christian Ulrich 2020-07-26 17:46:17 +02:00
parent f23093fd69
commit 3d40ddec85
No known key found for this signature in database
GPG Key ID: 8241BE099775A097
2 changed files with 65 additions and 43 deletions

View File

@ -34,8 +34,7 @@ type
OkGetInfo* = object OkGetInfo* = object
ip: string ip: string
ports: array[3, uint16] ports: array[3, uint16]
NotifyEndpoint* = object OkGetEndpoint* = object
command: string
ip: IpAddress ip: IpAddress
port: Port port: Port
NotifyPeer* = object NotifyPeer* = object
@ -55,45 +54,45 @@ type
proc usage() = proc usage() =
echo &"usage: {paramStr(0)} SERVER_HOSTNAME SERVER_PORT PEER_ID [OTHER_PEER_ID]" echo &"usage: {paramStr(0)} SERVER_HOSTNAME SERVER_PORT PEER_ID [OTHER_PEER_ID]"
proc handleServerMessages(connection: ServerConnection) {.async.} = proc handleServerMessages(conn: ServerConnection) {.async.} =
while true: while true:
let line = await connection.sock.recvLine(maxLength = 400) let line = await conn.sock.recvLine(maxLength = 400)
let args = line.parseArgs(3, 1) let args = line.parseArgs(3, 1)
case args[0]: case args[0]:
of "ok": of "ok":
let future = connection.outMessages[args[1]] let future = conn.outMessages[args[1]]
connection.outMessages.del(args[1]) conn.outMessages.del(args[1])
future.complete(args[2]) future.complete(args[2])
of "error": of "error":
let future = connection.outMessages[args[1]] let future = conn.outMessages[args[1]]
connection.outMessages.del(args[1]) conn.outMessages.del(args[1])
future.fail(newException(ServerError, args[2])) future.fail(newException(ServerError, args[2]))
of "notify-peer": of "notify-peer":
asyncCheck connection.peerNotifications.write(line.substr(args[0].len + 1)) asyncCheck conn.peerNotifications.write(line.substr(args[0].len + 1))
else: else:
raise newException(ValueError, "invalid server message") raise newException(ValueError, "invalid server message")
proc handlePunchdMessages(connection: PunchdConnection) {.async.} = proc handlePunchdMessages(conn: PunchdConnection) {.async.} =
while true: while true:
let fd = connection.sock.getFd.AsyncFD let fd = conn.sock.getFd.AsyncFD
let resp = await fd.asyncRecvMsg(size = 400, cmsgSize = sizeof(AsyncFD)) let resp = await fd.asyncRecvMsg(size = 400, cmsgSize = sizeof(AsyncFD))
let line = resp.data.strip(leading = false, trailing = true, chars = {'\n'}) let line = resp.data.strip(leading = false, trailing = true, chars = {'\n'})
let args = line.parseArgs(3, 1) let args = line.parseArgs(3, 1)
case args[0]: case args[0]:
of "ok": of "ok":
let outMsg = connection.outMessages[args[1]] let outMsg = conn.outMessages[args[1]]
connection.outMessages.del(args[1]) conn.outMessages.del(args[1])
if resp.cmsgs.len != 1: if resp.cmsgs.len != 1:
raise newException(ValueError, "invalid punchd message") raise newException(ValueError, "invalid punchd message")
let sock = newAsyncSocket(resp.cmsgs[0].getFd) let sock = newAsyncSocket(resp.cmsgs[0].getFd)
register(sock.getFd.AsyncFD) register(sock.getFd.AsyncFD)
outMsg.future.complete((args[2], sock)) outMsg.future.complete((args[2], sock))
of "error": of "error":
let outMsg = connection.outMessages[args[1]] let outMsg = conn.outMessages[args[1]]
connection.outMessages.del(args[1]) conn.outMessages.del(args[1])
outMsg.future.fail(newException(ServerError, args[2])) outMsg.future.fail(newException(ServerError, args[2]))
of "progress": of "progress":
let outMsg = connection.outMessages[args[1]] let outMsg = conn.outMessages[args[1]]
asyncCheck outMsg.progressCb(outMsg.future, args[2]) asyncCheck outMsg.progressCb(outMsg.future, args[2])
else: else:
raise newException(ValueError, "invalid punchd message") raise newException(ValueError, "invalid punchd message")
@ -111,7 +110,12 @@ proc sendRequest(connection: ServerConnection, command: string,
content: string): Future[string] = content: string): Future[string] =
result = newFuture[string]("sendRequest") result = newFuture[string]("sendRequest")
let id = $rand(uint32) let id = $rand(uint32)
asyncCheck connection.sock.send(&"{command}|{id}|{content}\n") var request: string
if content.len != 0:
request = &"{command}|{id}|{content}\n"
else:
request = &"{command}|{id}\n"
asyncCheck connection.sock.send(request)
connection.outMessages[id] = result connection.outMessages[id] = result
proc acceptConnection(punchdConn: PunchdConnection, command: string, proc acceptConnection(punchdConn: PunchdConnection, command: string,
@ -141,8 +145,8 @@ proc handlePeerNotifications(serverConn: ServerConnection,
discard discard
proc punchHole(punchdConn: PunchdConnection, serverConn: ServerConnection, proc punchHole(punchdConn: PunchdConnection, serverConn: ServerConnection,
ipAddress: IpAddress, peerId: string, peerId: string, otherPeerId: string):
otherPeerId: string): Future[AsyncSocket] {.async.} = Future[AsyncSocket] {.async.} =
let sResp = await serverConn.sendRequest("get-info", otherPeerId) let sResp = await serverConn.sendRequest("get-info", otherPeerId)
let peerInfo = parseMessage[OkGetInfo](sResp) let peerInfo = parseMessage[OkGetInfo](sResp)
proc progressCb(future: PunchdResponse, msgContent: string) {.async.} = proc progressCb(future: PunchdResponse, msgContent: string) {.async.} =
@ -156,26 +160,31 @@ proc punchHole(punchdConn: PunchdConnection, serverConn: ServerConnection,
future.fail(e) future.fail(e)
let myPorts = (@[Port(1234)] & serverConn.probedPorts).join(",") let myPorts = (@[Port(1234)] & serverConn.probedPorts).join(",")
let peerPorts = peerInfo.ports.join(",") let peerPorts = peerInfo.ports.join(",")
let req = &"{ipAddress}|{myPorts}|{peerInfo.ip}|{peerPorts}" let req = &"{serverConn.probedIp}|{myPorts}|{peerInfo.ip}|{peerPorts}"
let pResp = await punchdConn.sendRequest("tcp-syni-connect", req, progressCb) let pResp = await punchdConn.sendRequest("tcp-syni-connect", req, progressCb)
result = pResp.sock result = pResp.sock
proc initServerConnection(serverHostname: string, serverPort: Port, proc initServerConnection(serverHostname: string, serverPort: Port,
myPort: Port, probePorts: bool): probingPort: Port): Future[ServerConnection] {.async.} =
Future[ServerConnection] {.async.} = for i in 0 .. 1:
if probePorts: # FIXME: error handling
for i in 0 .. 1: let sock = newAsyncSocket()
let sock = newAsyncSocket() sock.bindAddr(probingPort)
sock.setSockOpt(OptReuseAddr, true) await sock.connect(serverHostname, serverPort)
sock.bindAddr(myPort) let id = rand(uint32)
await sock.connect(serverHostname, serverPort) await sock.send(&"get-endpoint|{id}\n")
let line = await sock.recvLine(maxLength = 400) let line = await sock.recvLine(maxLength = 400)
let endpoint = parseMessage[NotifyEndpoint](line) let args = line.parseArgs(3)
result.probedPorts.add(endpoint.port) assert(args[0] == "ok")
sock.close() assert(args[1] == $id)
let endpoint = parseMessage[OkGetEndpoint](args[2])
result.probedIp = endpoint.ip
result.probedPorts.add(endpoint.port)
let emptyLine = await sock.recvLine(maxLength = 400)
assert(emptyLine.len == 0)
sock.close()
result.sock = await asyncnet.dial(serverHostname, serverPort) result.sock = await asyncnet.dial(serverHostname, serverPort)
let line = await result.sock.recvLine(maxLength = 400)
result.probedIp = parseMessage[NotifyEndpoint](line).ip
result.outMessages = newTable[string, Future[string]]() result.outMessages = newTable[string, Future[string]]()
result.peerNotifications = newFutureStream[string]("initServerConnection") result.peerNotifications = newFutureStream[string]("initServerConnection")
@ -191,7 +200,7 @@ proc runApp(serverHostname: string, serverPort: Port, peerId: string,
if otherPeerId.len == 0: if otherPeerId.len == 0:
# register and wait for connections # register and wait for connections
let serverConn = await initServerConnection(serverHostname, serverPort, let serverConn = await initServerConnection(serverHostname, serverPort,
Port(4321), true) Port(4321))
asyncCheck handleServerMessages(serverConn) asyncCheck handleServerMessages(serverConn)
asyncCheck handlePeerNotifications(serverConn, punchdConn, peerId) asyncCheck handlePeerNotifications(serverConn, punchdConn, peerId)
let myPorts = (@[Port(4321)] & serverConn.probedPorts).join(",") let myPorts = (@[Port(4321)] & serverConn.probedPorts).join(",")
@ -209,11 +218,10 @@ proc runApp(serverHostname: string, serverPort: Port, peerId: string,
else: else:
# initiate a new connection # initiate a new connection
let serverConn = await initServerConnection(serverHostname, serverPort, var serverConn = await initServerConnection(serverHostname, serverPort,
Port(1234), true) Port(1234))
asyncCheck handleServerMessages(serverConn) asyncCheck handleServerMessages(serverConn)
let sock = await punchHole(punchdConn, serverConn, serverConn.probedIp, let sock = await punchHole(punchdConn, serverConn, peerId, otherPeerId)
peerId, otherPeerId)
await sock.send("ping") await sock.send("ping")
let msg = await sock.recv(1000) let msg = await sock.recv(1000)
echo "received message: ", msg echo "received message: ", msg

View File

@ -60,14 +60,17 @@ proc probePublicIp(): Future[IpAddress] {.async.} =
proc removeClient(clients: TableRef[string, Client], peerId: string) = proc removeClient(clients: TableRef[string, Client], peerId: string) =
if peerId.len > 0: clients.del(peerId) if peerId.len > 0: clients.del(peerId)
proc processClient(client: AsyncSocket, proc sendEndpoint(client: AsyncSocket, requestId: string) {.async.} =
clients: TableRef[string, Client]) {.async.} =
let (address, port) = client.getPeerAddr() let (address, port) = client.getPeerAddr()
var ipAddr = parseIpAddress(address) var ipAddr = parseIpAddress(address)
if ipAddr.isPrivateIp() and if ipAddr.isPrivateIp() and
ipAddr.isInNetwork(fromIpAddress(getPrimaryIPAddr(ipAddr))): ipAddr.isInNetwork(fromIpAddress(getPrimaryIPAddr(ipAddr))):
ipAddr = await probePublicIp() ipAddr = await probePublicIp()
await client.send(&"notify-endpoint|{ipAddr}|{port.int}\n") await client.send(&"ok|{requestId}|{ipAddr}|{port.int}\n")
client.close
proc processClient(client: AsyncSocket,
clients: TableRef[string, Client]) {.async.} =
var id = "" var id = ""
var peerId = "" var peerId = ""
while true: while true:
@ -77,25 +80,35 @@ proc processClient(client: AsyncSocket,
removeClient(clients, peerId) removeClient(clients, peerId)
break break
try: try:
let args = line.parseArgs(3) let args = line.parseArgs(3, 1)
id = args[1] id = args[1]
case args[0]: case args[0]:
of "register": of "register":
let req = parseMessage[Register](args[2]) let req = parseMessage[Register](args[2])
echo "register: ", req
peerId = req.peerId peerId = req.peerId
clients[peerId] = Client(sock: client, ip: req.ip, ports: req.ports) clients[peerId] = Client(sock: client, ip: req.ip, ports: req.ports)
asyncCheck client.send(&"ok|{id}\n") asyncCheck client.send(&"ok|{id}\n")
of "get-endpoint":
echo "get-endpoint"
asyncCheck client.sendEndpoint(id)
removeClient(clients, peerId)
break
# FIXME: get-peerinfo
of "get-info": of "get-info":
let req = parseMessage[GetInfo](args[2]) let req = parseMessage[GetInfo](args[2])
echo "get-info: ", req
let peer = clients[req.peerId] let peer = clients[req.peerId]
let peerPorts = peer.ports.join(",") let peerPorts = peer.ports.join(",")
asyncCheck client.send(&"ok|{id}|{peer.ip}|{peerPorts}\n") asyncCheck client.send(&"ok|{id}|{peer.ip}|{peerPorts}\n")
of "notify-peer": of "notify-peer":
let req = parseMessage[NotifyPeer](args[2]) let req = parseMessage[NotifyPeer](args[2])
echo "notify-peer: ", req
let recipient = clients[req.recipient] let recipient = clients[req.recipient]
asyncCheck recipient.sock.send(&"notify-peer|{req.sender}|{req.recipient}|{req.data}\n") asyncCheck recipient.sock.send(&"notify-peer|{req.sender}|{req.recipient}|{req.data}\n")
asyncCheck client.send(&"ok|{id}\n") asyncCheck client.send(&"ok|{id}\n")
else: else:
echo "invalid request"
client.close() client.close()
removeClient(clients, peerId) removeClient(clients, peerId)
break break
@ -104,6 +117,7 @@ proc processClient(client: AsyncSocket,
asyncCheck client.send(&"error|{id}|peer not registered\n") asyncCheck client.send(&"error|{id}|peer not registered\n")
except ValueError: except ValueError:
echo "invalid message"
client.close client.close
removeClient(clients, peerId) removeClient(clients, peerId)
break break