punchd/examples/app/app.nim

216 lines
7.9 KiB
Nim

import asyncdispatch, asyncnet, os, strformat, strutils, tables
from nativeSockets import Domain, SockType, Protocol
from net import IpAddress, Port, isIpAddress, `$`
import asyncutils
import ../../message
import random
type
PunchdResponse = Future[tuple[msgContent: string, sock: AsyncSocket]]
PunchdProgressCb = proc (future: PunchdResponse, msgContent: string) {.async.}
OutgoingPunchdMessage = object
future: PunchdResponse
progressCb: PunchdProgressCb
PunchdConnection = object
sock: AsyncSocket
outMessages: TableRef[string, OutgoingPunchdMessage]
inConnections: FutureStream[AsyncSocket]
ServerConnection = object
sock: AsyncSocket
outMessages: TableRef[string, Future[string]]
peerNotifications: FutureStream[string]
# Punchd messages
ProgressTcpSyniConnect* = object
command: string
args: string
# Server messages
OkGetInfo* = object
ip: string
ports: array[3, uint16]
NotifyEndpoint* = object
command: string
ip: IpAddress
port: Port
NotifyPeer* = object
sender: string
recipient: string
command: string
srcIp: IpAddress
srcPorts: array[3, Port]
dstIp: IpAddress
dstPorts: array[3, Port]
seqNumbers: array[10, uint32]
# Exceptions
PunchdError = object of ValueError # FIXME: not used yet
ServerError = object of ValueError
proc usage() =
echo &"usage: {paramStr(0)} SERVER_HOSTNAME SERVER_PORT PEER_ID [OTHER_PEER_ID]"
proc handleServerMessages(connection: ServerConnection) {.async.} =
while true:
let line = await connection.sock.recvLine(maxLength = 400)
let args = line.parseArgs(3, 1)
case args[0]:
of "ok":
let future = connection.outMessages[args[1]]
connection.outMessages.del(args[1])
future.complete(args[2])
of "error":
let future = connection.outMessages[args[1]]
connection.outMessages.del(args[1])
future.fail(newException(ServerError, args[2]))
of "notify-peer":
asyncCheck connection.peerNotifications.write(line.substr(args[0].len + 1))
else:
raise newException(ValueError, "invalid server message")
proc handlePunchdMessages(connection: PunchdConnection) {.async.} =
while true:
let fd = connection.sock.getFd.AsyncFD
let resp = await fd.asyncRecvMsg(size = 400, cmsgSize = sizeof(AsyncFD))
let args = resp.data.parseArgs(3, 1)
case args[0]:
of "ok":
let outMsg = connection.outMessages[args[1]]
connection.outMessages.del(args[1])
if resp.cmsgs.len != 1:
raise newException(ValueError, "invalid punchd message")
let sock = newAsyncSocket(resp.cmsgs[0].getFd)
register(sock.getFd.AsyncFD)
outMsg.future.complete((args[2], sock))
of "error":
let outMsg = connection.outMessages[args[1]]
connection.outMessages.del(args[1])
outMsg.future.fail(newException(ServerError, args[2]))
of "progress":
let outMsg = connection.outMessages[args[1]]
asyncCheck outMsg.progressCb(outMsg.future, args[2])
else:
raise newException(ValueError, "invalid punchd message")
proc sendRequest(connection: PunchdConnection, command: string, content: string,
progressCb: PunchdProgressCb = nil): PunchdResponse =
result = newFuture[PunchdResponse.T]("sendRequest")
let id = $rand(uint32)
asyncCheck connection.sock.send(&"{command}|{id}|{content}\n")
let outMsg = OutgoingPunchdMessage(future: result, progressCb: progressCb)
echo "id = ", id
connection.outMessages[id] = outMsg
proc sendRequest(connection: ServerConnection, command: string,
content: string): Future[string] =
result = newFuture[string]("sendRequest")
let id = $rand(uint32)
asyncCheck connection.sock.send(&"{command}|{id}|{content}\n")
connection.outMessages[id] = result
proc acceptConnection(punchdConn: PunchdConnection, command: string,
msgContent: string) {.async.} =
echo "accepting connection ", msgContent
let resp = await punchdConn.sendRequest(command, msgContent)
asyncCheck punchdConn.inConnections.write(resp.sock)
proc handlePeerNotifications(serverConn: ServerConnection,
punchdConn: PunchdConnection,
peerId: string) {.async.} =
while true:
let (hasData, data) = await serverConn.peerNotifications.read
if not hasData:
break
try:
let msg = parseMessage[NotifyPeer](data)
# FIXME: check if we want to receive messages from the sender
echo "received message from ", msg.sender
let srcPorts = msg.srcPorts.join(",")
let dstPorts = msg.dstPorts.join(",")
let seqNumbers = msg.seqNumbers.join(",")
let req = &"{msg.srcIp}|{srcPorts}|{msg.dstIp}|{dstPorts}|{seqNumbers}"
asyncCheck acceptConnection(punchdConn, msg.command, req)
except ValueError as e:
echo e.msg
discard
proc punchHole(punchdConn: PunchdConnection, serverConn: ServerConnection,
ipAddress: IpAddress, peerId: string,
otherPeerId: string): Future[AsyncSocket] {.async.} =
let sResp = await serverConn.sendRequest("get-info", otherPeerId)
let peerInfo = parseMessage[OkGetInfo](sResp)
proc progressCb(future: PunchdResponse, msgContent: string) {.async.} =
try:
let parsedResp = parseMessage[ProgressTcpSyniConnect](msgContent)
let req = &"{peerId}|{otherPeerId}|{parsedResp.command}|{parsedResp.args}"
discard await serverConn.sendRequest("notify-peer", req)
except ServerError as e:
future.fail(e)
except ValueError as e:
future.fail(e)
let myPorts = &"{1234},{1234},{1234}"
let peerPorts = peerInfo.ports.join(",")
let req = &"{ipAddress}|{myPorts}|{peerInfo.ip}|{peerPorts}"
let pResp = await punchdConn.sendRequest("tcp-syni-connect", req, progressCb)
result = pResp.sock
proc runApp(serverHostname: string, serverPort: Port, peerId: string,
otherPeerId: string = "") {.async.} =
# TODO: determine endpoint in another proc
randomize() # initialize random number generator
var punchdConn = PunchdConnection()
punchdConn.sock = newAsyncSocket(AF_UNIX, SOCK_STREAM, IPPROTO_IP)
punchdConn.outMessages = newTable[string, OutgoingPunchdMessage]()
punchdConn.inConnections = newFutureStream[AsyncSocket]("runApp")
await punchdConn.sock.connectUnix("/tmp/punchd.socket")
var serverConn = ServerConnection()
serverConn.sock = await asyncnet.dial(serverHostname, serverPort, IPPROTO_TCP)
serverConn.outMessages = newTable[string, Future[string]]()
serverConn.peerNotifications = newFutureStream[string]("runApp")
let resp = await serverConn.sock.recvLine(maxLength = 400)
let endpoint = parseMessage[NotifyEndpoint](resp)
echo &"rendezvous server says I am {endpoint.ip}:{endpoint.port.int}"
asyncCheck handlePunchdMessages(punchdConn)
asyncCheck handleServerMessages(serverConn)
asyncCheck handlePeerNotifications(serverConn, punchdConn, peerId)
if otherPeerId.len == 0:
# register and wait for connections
let req = &"{peerId}|{endpoint.ip}|{endpoint.port.int},{endpoint.port.int},{endpoint.port.int}"
discard await serverConn.sendRequest("register", req)
echo "registered"
while true:
let (hasSock, sock) = await punchdConn.inConnections.read
if not hasSock:
break
echo "got connection"
let msg = await sock.recv(1000)
echo "received message: ", msg
await sock.send("pong")
else:
# initiate a new connection
let sock = await punchHole(punchdConn, serverConn, endpoint.ip, peerId,
otherPeerId)
await sock.send("ping")
let msg = await sock.recv(1000)
echo "received message: ", msg
proc main() =
if paramCount() < 3 or paramCount() > 4:
usage()
quit(1)
let portNumber = paramStr(2).parseUInt
if portNumber > uint16.high:
usage()
quit(1)
if paramCount() == 4:
waitFor runApp(paramStr(1), Port(portNumber), paramStr(3), paramStr(4))
else:
waitFor runApp(paramStr(1), Port(portNumber), paramStr(3))
when isMainModule:
main()