quicp2p/port_prediction.nim

160 lines
6.0 KiB
Nim

import algorithm
import net
import random
import sequtils
import unittest
const RandomPortCount = 10000
proc min(a, b: uint16): uint16 =
min(a.int32, b.int32).uint16
proc toUInt16(p: Port): uint16 = uint16(p)
proc toPort(u: uint16): Port = Port(u)
proc addOffset(port: uint16, offset: uint16, minValue = 1024'u16,
maxValue = uint16.high): uint16 =
assert(port >= minValue)
assert(port <= maxValue)
let distanceToMaxValue = maxValue - port
if distanceToMaxValue < offset:
return minValue + offset - distanceToMaxValue - 1
return port + offset
proc subtractOffset(port: uint16, offset: uint16, minValue = 1024'u16,
maxValue = uint16.high): uint16 =
assert(port >= minValue)
assert(port <= maxValue)
let distanceToMinValue = port - minValue
if distanceToMinValue < offset:
return maxValue - offset + distanceToMinValue + 1
return port - offset
proc predictPortRange*(localPort: uint16, probedPorts: seq[uint16]): seq[uint16] =
if probedPorts.len == 0:
# No probed ports, so our only guess can be that the NAT is a cone-type NAT
# and the port mapping preserves the local Port.
return @[localPort]
if probedPorts.len == 1:
# Only one server was used for probing, so we cannot know if the NAT is
# symmetric or not. We are trying the probed port (assuming cone-type NAT)
# and the next port in a progressive sequence if applicable (assuming
# symmetric NAT with progressive port mapping).
result.add(probedPorts[0])
if probedPorts[0] > localPort:
let offset = probedPorts[0] - localPort
result.add(probedPorts[0].addOffset(offset))
elif probedPorts[0] < localPort:
let offset = localPort - probedPorts[0]
result.add(probedPorts[0].subtractOffset(offset))
return
let deduplicatedPorts = probedPorts.deduplicate()
if deduplicatedPorts.len() == 1:
# It looks like the NAT is a cone-type NAT.
return deduplicatedPorts
let probedPortsSorted = probedPorts.sorted()
let minPort = probedPortsSorted[probedPortsSorted.minIndex()]
let maxPort = probedPortsSorted[probedPortsSorted.maxIndex()]
var minDistance = uint16.high()
var maxDistance = uint16.low()
for i in 1 .. probedPortsSorted.len() - 1:
# FIXME: use rotated distance
let distance = probedPortsSorted[i] - probedPortsSorted[i - 1]
minDistance = min(minDistance, distance)
maxDistance = max(maxDistance, distance)
if maxDistance < 10:
if probedPorts.isSorted(Ascending):
# assume symmetric NAT with positive-progressive port mapping
if minDistance == maxDistance:
return @[maxPort.addOffset(maxDistance)]
else:
for i in countup(0'u16, maxDistance):
result.add(minPort.addOffset(i))
return
if probedPorts.isSorted(Descending):
# assume symmetric NAT with negative-progressive port mapping
if minDistance == maxDistance:
return @[minPort.subtractOffset(maxDistance)]
else:
for i in countup(0'u16, maxDistance):
result.add(maxPort.subtractOffset(i))
return
# assume symmetric NAT with random port mapping
randomize()
let first = if maxPort <= uint16.high - RandomPortCount - 1000'u16:
maxPort + 1000'u16
else:
uint16.high - RandomPortCount
result = newSeq[uint16](RandomPortCount)
for i in 0'u16 .. RandomPortCount - 1'u16:
result[i] = first + i
proc predictPortRange*(localPort: Port, probedPorts: seq[Port]): seq[Port] =
predictPortRange(localPort.uint16, probedPorts.map(toUInt16)).map(toPort)
suite "port prediction tests":
test "single port":
let predicted = predictPortRange(Port(1234), @[])
check(predicted == @[Port(1234)])
test "single probe equal":
let predicted = predictPortRange(Port(1234), @[Port(1234)])
check(predicted == @[Port(1234)])
test "single probe positive-progressive":
let predicted = predictPortRange(Port(1234), @[Port(1236)])
check(predicted == @[Port(1236), Port(1238)])
test "single probe negative-progressive":
let predicted = predictPortRange(Port(1234), @[Port(1232)])
check(predicted == @[Port(1232), Port(1230)])
test "all equal":
let predicted = predictPortRange(Port(1234), @[Port(1234), Port(1234)])
check(predicted == @[Port(1234)])
test "positive-progressive, offset 1":
let predicted = predictPortRange(Port(1234), @[Port(2034), Port(2035)])
check(predicted == @[Port(2036)])
test "positive-progressive, offset 9":
let predicted = predictPortRange(Port(1234), @[Port(2034), Port(2043)])
check(predicted == @[Port(2052)])
test "negative-progressive, offset 1":
let predicted = predictPortRange(Port(1234), @[Port(1100), Port(1099)])
check(predicted == @[Port(1098)])
test "negative-progressive, offset 9":
let predicted = predictPortRange(Port(1234), @[Port(1100), Port(1091)])
check(predicted == @[Port(1082)])
test "positive-progressive, 3 probed ports, low offset":
let predicted = predictPortRange(Port(1234), @[Port(2000), Port(2000), Port(2002)])
check(predicted == @[Port(2000), Port(2001), Port(2002)])
test "negative-progressive, 3 probed ports, low offset":
let predicted = predictPortRange(Port(1234), @[Port(2002), Port(2000), Port(2000)])
check(predicted == @[Port(2002), Port(2001), Port(2000)])
test "high port, positive-progressive, offset 1":
let predicted = predictPortRange(Port(1234), @[Port(65534), Port(65535)])
check(predicted == @[Port(1024)])
test "high port, positive-progressive, offset 9":
let predicted = predictPortRange(Port(1234), @[Port(65520), Port(65529)])
check(predicted == @[Port(1026)])
test "low port, negative-progressive, offset 1":
let predicted = predictPortRange(Port(1234), @[Port(1025), Port(1024)])
check(predicted == @[Port(65535)])
test "low port, negative-progressive, offset 9":
let predicted = predictPortRange(Port(1234), @[Port(1039), Port(1030)])
check(predicted == @[Port(65533)])
test "random mapping":
let predicted = predictPortRange(Port(1234), @[Port(3546), Port(7624)])
check(predicted.len == RandomPortCount)