punchd/examples/app/app.nim

268 lines
9.5 KiB
Nim
Raw Normal View History

2020-07-06 15:10:10 +02:00
import asyncdispatch, asyncnet, os, strformat, strutils, tables
from nativeSockets import Domain, SockType, Protocol
from net import IpAddress, Port, isIpAddress, `$`
2020-08-23 12:45:05 +02:00
from posix import CMSG_SPACE
2020-07-06 15:10:10 +02:00
import asyncutils
import ../../message
import random
const rendezvousServers: seq[tuple[hostname: string, port: Port]] = @[
("strangeplace.net", Port(5320))
2020-11-15 14:10:48 +01:00
("ulrich.earth", Port(5320))
]
2020-07-06 15:10:10 +02:00
type
PunchdResponse = Future[tuple[msgContent: string, sock: AsyncSocket]]
PunchdProgressCb = proc (future: PunchdResponse, msgContent: string) {.async.}
OutgoingPunchdMessage = object
future: PunchdResponse
progressCb: PunchdProgressCb
PunchdConnection = object
sock: AsyncSocket
outMessages: TableRef[string, OutgoingPunchdMessage]
inConnections: FutureStream[AsyncSocket]
ServerConnection = object
sock: AsyncSocket
outMessages: TableRef[string, Future[string]]
peerNotifications: FutureStream[string]
probedIp: IpAddress
srcPort: Port
probedSrcPorts: seq[Port]
2020-07-06 15:10:10 +02:00
# Punchd messages
2020-10-15 10:06:39 +02:00
Progress* = object
2020-07-06 15:10:10 +02:00
command: string
args: string
# Server messages
2020-07-26 17:48:47 +02:00
OkGetPeerinfo* = object
2020-07-06 15:10:10 +02:00
ip: string
localPort: Port
probedPorts: seq[Port]
OkGetEndpoint* = object
2020-07-06 15:10:10 +02:00
ip: IpAddress
port: Port
NotifyPeer* = object
sender: string
recipient: string
2020-10-22 00:22:11 +02:00
technique: string
2020-07-06 15:10:10 +02:00
srcIp: IpAddress
srcPort: Port
probedSrcPorts: seq[Port]
2020-07-06 15:10:10 +02:00
dstIp: IpAddress
dstPort: Port
probedDstPorts: seq[Port]
2020-10-22 00:22:11 +02:00
extraArgs: string
2020-07-06 15:10:10 +02:00
# Exceptions
PunchdError = object of ValueError # FIXME: not used yet
ServerError = object of ValueError
proc usage() =
echo &"usage: {paramStr(0)} PEER_ID [OTHER_PEER_ID]"
2020-07-06 15:10:10 +02:00
proc handleServerMessages(conn: ServerConnection) {.async.} =
2020-07-06 15:10:10 +02:00
while true:
let line = await conn.sock.recvLine(maxLength = 400)
2020-07-06 15:10:10 +02:00
let args = line.parseArgs(3, 1)
case args[0]:
of "ok":
let future = conn.outMessages[args[1]]
conn.outMessages.del(args[1])
2020-07-06 15:10:10 +02:00
future.complete(args[2])
of "error":
let future = conn.outMessages[args[1]]
conn.outMessages.del(args[1])
2020-07-06 15:10:10 +02:00
future.fail(newException(ServerError, args[2]))
of "notify-peer":
asyncCheck conn.peerNotifications.write(line.substr(args[0].len + 1))
2020-07-06 15:10:10 +02:00
else:
raise newException(ValueError, "invalid server message")
proc handlePunchdMessages(conn: PunchdConnection) {.async.} =
2020-07-06 15:10:10 +02:00
while true:
let fd = conn.sock.getFd.AsyncFD
2020-08-23 12:45:05 +02:00
let cmsgSize = CMSG_SPACE(sizeof(AsyncFD).csize_t)
let resp = await fd.asyncRecvMsg(400, cmsgSize)
2020-07-20 10:08:28 +02:00
let line = resp.data.strip(leading = false, trailing = true, chars = {'\n'})
echo "received punchd message: ", line
2020-07-20 10:08:28 +02:00
let args = line.parseArgs(3, 1)
2020-07-06 15:10:10 +02:00
case args[0]:
of "ok":
let outMsg = conn.outMessages[args[1]]
conn.outMessages.del(args[1])
if resp.cmsgs.len < 1:
echo "no cmsg"
2020-07-06 15:10:10 +02:00
raise newException(ValueError, "invalid punchd message")
let sock = newAsyncSocket(resp.cmsgs[0].getFd)
register(sock.getFd.AsyncFD)
outMsg.future.complete((args[2], sock))
of "error":
let outMsg = conn.outMessages[args[1]]
conn.outMessages.del(args[1])
outMsg.future.fail(newException(PunchdError, args[2]))
2020-07-06 15:10:10 +02:00
of "progress":
let outMsg = conn.outMessages[args[1]]
2020-07-06 15:10:10 +02:00
asyncCheck outMsg.progressCb(outMsg.future, args[2])
else:
raise newException(ValueError, "invalid punchd message")
proc sendRequest(connection: PunchdConnection, command: string, content: string,
progressCb: PunchdProgressCb = nil): PunchdResponse =
result = newFuture[PunchdResponse.T]("sendRequest")
let id = $rand(uint32)
asyncCheck connection.sock.send(&"{command}|{id}|{content}\n")
let outMsg = OutgoingPunchdMessage(future: result, progressCb: progressCb)
echo "id = ", id
connection.outMessages[id] = outMsg
proc sendRequest(connection: ServerConnection, command: string,
content: string): Future[string] =
result = newFuture[string]("sendRequest")
let id = $rand(uint32)
var request: string
if content.len != 0:
request = &"{command}|{id}|{content}\n"
else:
request = &"{command}|{id}\n"
asyncCheck connection.sock.send(request)
2020-07-06 15:10:10 +02:00
connection.outMessages[id] = result
proc acceptConnection(punchdConn: PunchdConnection, command: string,
msgContent: string) {.async.} =
echo "accepting connection ", msgContent
let resp = await punchdConn.sendRequest(command, msgContent)
asyncCheck punchdConn.inConnections.write(resp.sock)
proc handlePeerNotifications(serverConn: ServerConnection,
punchdConn: PunchdConnection,
peerId: string) {.async.} =
while true:
let (hasData, data) = await serverConn.peerNotifications.read
if not hasData:
break
try:
let msg = parseMessage[NotifyPeer](data)
# FIXME: check if we want to receive messages from the sender
echo "received message from ", msg.sender
let probedSrcPorts = msg.probedSrcPorts.join(",")
let probedDstPorts = msg.probedDstPorts.join(",")
let req = &"{msg.technique}|{msg.srcIp}|{msg.srcPort}|{probedSrcPorts}|{msg.dstIp}|{msg.dstPort}|{probedDstPorts}|{msg.extraArgs}"
2020-10-22 00:27:36 +02:00
asyncCheck acceptConnection(punchdConn, "respond", req)
2020-07-06 15:10:10 +02:00
except ValueError as e:
echo e.msg
discard
proc punchHole(punchdConn: PunchdConnection, serverConn: ServerConnection,
2020-10-22 00:22:11 +02:00
peerId: string, otherPeerId: string, technique: string):
Future[AsyncSocket] {.async.} =
2020-07-26 17:48:47 +02:00
let sResp = await serverConn.sendRequest("get-peerinfo", otherPeerId)
let peerInfo = parseMessage[OkGetPeerinfo](sResp)
2020-07-06 15:10:10 +02:00
proc progressCb(future: PunchdResponse, msgContent: string) {.async.} =
try:
2020-10-15 10:06:39 +02:00
let parsedResp = parseMessage[Progress](msgContent)
2020-07-06 15:10:10 +02:00
let req = &"{peerId}|{otherPeerId}|{parsedResp.command}|{parsedResp.args}"
discard await serverConn.sendRequest("notify-peer", req)
except ServerError as e:
future.fail(e)
except ValueError as e:
future.fail(e)
let myProbedPorts = serverConn.probedSrcPorts.join(",")
let probedPeerPorts = peerInfo.probedPorts.join(",")
let req = &"{technique}|{serverConn.probedIp}|{serverConn.srcPort}|{myProbedPorts}|{peerInfo.ip}|{peerInfo.localPort}|{probedPeerPorts}"
2020-10-22 00:22:11 +02:00
let pResp = await punchdConn.sendRequest("initiate", req, progressCb)
2020-07-06 15:10:10 +02:00
result = pResp.sock
proc getEndpoint(srcPort: Port, serverHostname: string, serverPort: Port):
Future[OkGetEndpoint] {.async.} =
let sock = newAsyncSocket()
var failCount = 0
while true:
try:
sock.bindAddr(srcPort)
2020-11-14 14:33:00 +01:00
break
except OSError as e:
if failCount == 3:
raise e
failCount.inc
2020-08-16 12:42:04 +02:00
await sleepAsync(100)
await sock.connect(serverHostname, serverPort)
let id = rand(uint32)
await sock.send(&"get-endpoint|{id}\n")
let line = await sock.recvLine(maxLength = 400)
let args = line.parseArgs(3)
assert(args[0] == "ok")
assert(args[1] == $id)
result = parseMessage[OkGetEndpoint](args[2])
let emptyLine = await sock.recvLine(maxLength = 400)
assert(emptyLine.len == 0)
sock.close()
proc initServerConnection(srcPort: Port): Future[ServerConnection] {.async.} =
result.srcPort = srcPort
for r in rendezvousServers:
let endpoint = await getEndpoint(srcPort, r.hostname, r.port)
# FIXME: what if we get get different IPs from different servers
result.probedIp = endpoint.ip
result.probedSrcPorts.add(endpoint.port)
result.sock = await asyncnet.dial(rendezvousServers[0].hostname,
rendezvousServers[0].port)
result.outMessages = newTable[string, Future[string]]()
result.peerNotifications = newFutureStream[string]("initServerConnection")
proc runApp(peerId: string, otherPeerId: string = "") {.async.} =
2020-07-06 15:10:10 +02:00
randomize() # initialize random number generator
var punchdConn = PunchdConnection()
punchdConn.sock = newAsyncSocket(AF_UNIX, SOCK_STREAM, IPPROTO_IP)
punchdConn.outMessages = newTable[string, OutgoingPunchdMessage]()
punchdConn.inConnections = newFutureStream[AsyncSocket]("runApp")
await punchdConn.sock.connectUnix("/tmp/punchd.socket")
asyncCheck handlePunchdMessages(punchdConn)
let srcPort = rand(Port(1024) .. Port.high)
2020-07-06 15:10:10 +02:00
if otherPeerId.len == 0:
# register and wait for connections
echo &"init server connection, source port: {srcPort}"
let serverConn = await initServerConnection(srcPort)
asyncCheck handleServerMessages(serverConn)
asyncCheck handlePeerNotifications(serverConn, punchdConn, peerId)
let probedPorts = serverConn.probedSrcPorts.join(",")
let req = &"{peerId}|{serverConn.probedIp}|{serverConn.srcPort}|{probedPorts}"
echo "registering: ", req
2020-07-06 15:10:10 +02:00
discard await serverConn.sendRequest("register", req)
while true:
let (hasSock, sock) = await punchdConn.inConnections.read
if not hasSock:
break
2020-08-23 14:41:38 +02:00
echo "accepted!"
2020-08-23 16:11:53 +02:00
let msg = await sock.recv(4)
2020-07-06 15:10:10 +02:00
echo "received message: ", msg
2020-08-23 16:28:38 +02:00
await sock.send("pong")
2020-10-07 09:42:40 +02:00
sock.close()
2020-07-06 15:10:10 +02:00
else:
# initiate a new connection
var serverConn = await initServerConnection(srcPort)
asyncCheck handleServerMessages(serverConn)
2020-10-22 00:22:11 +02:00
let sock = await punchHole(punchdConn, serverConn, peerId, otherPeerId,
"udp")
2020-08-23 14:41:38 +02:00
echo "connected!"
2020-08-23 16:28:38 +02:00
await sock.send("ping")
2020-08-23 16:11:53 +02:00
let msg = await sock.recv(4)
2020-07-06 15:10:10 +02:00
echo "received message: ", msg
2020-10-07 09:42:40 +02:00
sock.close()
2020-07-06 15:10:10 +02:00
proc main() =
if paramCount() < 1 or paramCount() > 2:
2020-07-06 15:10:10 +02:00
usage()
quit(1)
if paramCount() == 2:
waitFor runApp(paramStr(1), paramStr(2))
2020-07-06 15:10:10 +02:00
else:
waitFor runApp(paramStr(1))
2020-07-06 15:10:10 +02:00
when isMainModule:
main()