#!
"""NSP (End Communications and Session Control layers) for DECnet/Python
"""
from collections import deque
from .common import *
from .routing_packets import ShortData, LongData
from . import logging
from . import events
from . import packet
from . import timers
from . import statemachine
from . import modulo
from . import html
from . import nicepackets
SvnFileRev = "$LastChangedRevision: 592 $"
# API exceptions
class NSPException (DNAException): pass
class WrongState (NSPException): "Connection is in the wrong state"
class RangeError (NSPException): "Parameter is out of range"
class ConnectionLimit (NSPException): "Connection limit reached"
class CantSend (NSPException): "Can't send interrupt at this time"
class IntLength (NSPException): "Interrupt message too long"
class UnknownNode (NSPException): "Unknown node name"
# Packet parsing exceptions
class NSPDecodeError (packet.DecodeError): pass
class InvalidAck (NSPDecodeError): "ACK fields in error"
class InvalidLS (NSPDecodeError): "Reserved LSFLAGS value"
# NSP packet layouts. These cover the routing layer payload (or the
# datalink layer payload, in the case of Phase II)
# Sequence numbers are modulo 4096
class Seq (Field, modulo.Mod, mod = 4096):
"""Sequence numbers for NSP -- integers modulo 2^12. Note that
creating one of these (e.g., from packet decode) ignores high
order bits rather than complaining about them.
"""
def __new__ (cls, val):
return modulo.Mod.__new__ (cls, val & 0o7777)
@classmethod
def decode (cls, buf):
if len (buf) < 2:
raise MissingData
v = int.from_bytes (buf[:2], packet.LE)
return cls (v), buf[2:]
def encode (self):
return self.to_bytes (2, packet.LE)
def __bytes__ (self):
return self.encode ()
class AckNum (Field):
"""Class for the (usually optional) ACK field in an NSP packet.
"""
# Values for QUAL:
ACK = 0
NAK = 1
XACK = 2
XNAK = 3
_labels = ( "ACK", "NAK", "XACK", "XNAK" )
def __init__ (self, num = 0, qual = ACK):
if not 0 <= qual <= 3:
raise ValueError ("Invalid QUAL value {}".format (qual))
self.qual = qual
self.num = Seq (num)
def __str__ (self):
return "{} {}".format (self._labels[self.qual], self.num)
def __eq__ (self, other):
return self.num == other.num and self.qual == other.qual
def __ne__ (self, other):
return not self == other
@classmethod
def decode (cls, buf):
if len (buf) >= 2:
v = int.from_bytes (buf[:2], packet.LE)
if v & 0x8000:
# ACK field is present. Always advance past it.
buf = buf[2:]
qual = (v >> 12) & 7
if 0 <= qual <= 3:
# Use the field only if QUAL is valid
return cls (v, qual), buf
return None, buf
@classmethod
def checktype (cls, name, val):
# This allows for the field to be optional, which is
# represented by an attribute value of None.
if val is None:
return val
return super (__class__, cls).checktype (name, val)
def encode (self):
return (0x8000 + (self.qual << 12) + self.num).to_bytes (2, packet.LE)
def is_nak (self):
return self.qual == self.NAK or self.qual == self.XNAK
def is_cross (self):
return self.qual == self.XACK or self.qual == self.XNAK
def chan (self, this, other):
if self.is_cross ():
return other
return this
class tolerantI (packet.I):
# A version of the I (image string) field, but coded to be
# tolerant of messed up input since some implementation such as
# Cisco send bad values some of the time.
@classmethod
def decode (cls, buf, maxlen):
if not buf:
logging.trace ("Missing I field, empty field substituted")
return cls (b""), b""
flen = buf[0]
if flen > maxlen or flen > len (buf) + 1:
logging.trace ("Invalid I field, empty field substituted")
return cls (b""), b""
return super (__class__, cls).decode (buf, maxlen)
# Common header -- just the MSGFLG field, expanded into its subfields.
class NspHdr (packet.Packet):
_layout = (( packet.BM,
( "mbz", 0, 2 ),
( "type", 2, 2 ),
( "subtype", 4, 3 ),
( "int_ls", 4, 1 ),
( "bom", 5, 1 ), # if int_ls == 0 (data message)
( "eom", 6, 1 ), # if int_ls == 0 (data message)
( "int", 5, 1 ), # if int_ls == 1 (other-data message)
( "mbz2", 7, 1 )), )
mbz = 0
mbz2 = 0
# type codes
DATA = 0
ACK = 1
CTL = 2
# Ack subtype codes
ACK_DATA = 0
ACK_OTHER = 1
ACK_CONN = 2
# Control subtype codes
NOP = 0 # NOP (normally only in Phase II, but spec doesn't restrict it)
CI = 1
CC = 2
DI = 3
DC = 4
#NI = 5 # Phase 2 node init (handled in routing, doesn't come to NSP)
RCI = 6 # Retransmitted CI
class AckHdr (NspHdr):
"""The standard packet beginning for packets that have link addresses
and acknum fields. Note that the second ACK field is called "acknum2"
rather than "ackoth" or "ackdat" since those names don't make sense if
we use this header interchangeably for all packet layouts. And while
it is typical to use the first field for "this subchannel" and the
second for "the other subchannel", that isn't required.
"""
_layout = (( packet.B, "dstaddr", 2 ),
( packet.B, "srcaddr", 2 ),
( AckNum, "acknum" ),
( AckNum, "acknum2" ))
def check (self):
# Check that the two acknum fields (if both are supplied) point
# to different subchannels
if self.acknum and self.acknum2 and \
self.acknum.is_cross () == self.acknum2.is_cross ():
logging.debug ("Both acknums refer to the same subchannel")
raise InvalidAck ("Both acknums refer to the same subchannel")
# Note on classes for packet layouts:
#
# It is tempting at times to make packet type x a subclass of packet
# type y, when x looks just like y but with some extra stuff, or with
# a change in type code only. IntMsg vs. LinkSvcMsg are an example
# of the former, AckData vs. AckOther or DataSeg vs. IntMsg an example
# of the latter. This is typically not a good idea, because "isinstance"
# will match an object of a subclass of the supplied class. To keep
# the actual message classes distinct, in the hierarchy below they are
# almost always derived from a base class that is not in itself an
# actual message class. However, we can reuse some of the methods of
# other classes (those that we didn't want to use as base class) without
# causing trouble -- consider AckData.check for example.
class AckData (AckHdr):
type = NspHdr.ACK
subtype = NspHdr.ACK_DATA
def check (self):
AckHdr.check (self)
if self.acknum is None:
logging.debug ("acknum field missing")
raise InvalidAck ("acknum field missing")
class AckOther (AckHdr):
type = NspHdr.ACK
subtype = NspHdr.ACK_OTHER
check = AckData.check
class AckConn (NspHdr):
# A Conn Ack doesn't have payload, but VAXELN sends extraneous
# bytes at the end and pretending that's payload will suppress a
# parse error.
_layout = (( packet.B, "dstaddr", 2 ),
packet.Payload)
type = NspHdr.ACK
subtype = NspHdr.ACK_CONN
def __init__ (self, *args, **kwargs):
super ().__init__ (*args, **kwargs)
self.payload = b""
class DataSeg (AckHdr):
_layout = (( packet.BM,
( "segnum", 0, 12, Seq ),
( "dly", 12, 1 )),
packet.Payload)
type = NspHdr.DATA
int_ls = 0
class IntMsg (AckHdr):
_layout = (( Seq, "segnum" ),
packet.Payload)
type = NspHdr.DATA
subtype = 3
int_ls = 1
int = 1
# Link Service message also uses the interrupt subchannel.
class LinkSvcMsg (AckHdr):
_layout = (( Seq, "segnum" ),
( packet.BM,
( "fcmod", 0, 2 ),
( "fcval_int", 2, 2 )),
( packet.SIGNED, "fcval", 1 ))
type = NspHdr.DATA
subtype = 1
int_ls = 1
int = 0
# fcval_int values:
DATA_REQ = 0
INT_REQ = 1
# fcmod values:
NO_CHANGE = 0
XOFF = 1
XON = 2
def check (self):
if self.fcval_int > 1 or self.fcmod == 3:
logging.debug ("Reserved LSFLAGS value")
raise InvalidLS
# Control messages. 5 (Node init) is handled in route_ptp since it is
# a datalink dependent routing layer message. 0 (NOP) is here,
# however.
# Common parts of CI, RCI, and CC
class ConnMsg (NspHdr):
_layout = (( packet.B, "dstaddr", 2 ),
( packet.B, "srcaddr", 2 ),
( packet.BM,
( "mb1", 0, 2 ),
( "fcopt", 2, 2 ),
( "mbz", 4, 4 )),
( packet.EX, "info", 1 ),
( packet.B, "segsize", 2 ))
type = NspHdr.CTL
mb1 = 1
mbz = 0
# Services:
SVC_NONE = 0
SVC_SEG = 1 # Segment flow control
SVC_MSG = 2 # Message flow control
# Info:
VER_PH3 = 0 # Phase 3 (NSP 3.2)
VER_PH2 = 1 # Phase 2 (NSP 3.1)
VER_PH4 = 2 # Phase 4 (NSP 4.0)
VER_41 = 3 # Phase 4+ (NSP 4.1)
nspverstrings = ( "3.2", "3.1", "4.0", "4.1" )
nspver3 = ( (3, 2, 0), (3, 1, 0), (4, 0, 0), (4, 1, 0))
nspphase = { ConnMsg.VER_PH2 : 2, ConnMsg.VER_PH3 : 3,
ConnMsg.VER_PH4 : 4, ConnMsg.VER_41 : 4 }
# This is either Connect Initiate or Retransmitted Connect Initiate
# depending on the subtype value.
class ConnInit (ConnMsg):
_layout = (packet.Payload,)
#subtype = NspHdr.CI
#subtype = NspHdr.RCI
dstaddr = 0
# Connect Confirm is very similar to Connect Init (the differences are
# mainly in the session layer, which is just payload to us).
# However, the srcaddr is now non-zero.
class ConnConf (ConnMsg):
_layout = (( tolerantI, "data_ctl", 16 ),) # CC payload is an I field
subtype = NspHdr.CC
class DiscConf (NspHdr):
_layout = (( packet.B, "dstaddr", 2 ),
( packet.B, "srcaddr", 2 ),
( packet.B, "reason", 2 ))
type = NspHdr.CTL
subtype = NspHdr.DC
# Supply a dummy value in the object to allow common handling with
# DiscInit, which does include a disconnect data field.
data_ctl = b""
# Three reason codes are treated as specific packets in the NSP spec;
# all others are in effect synonyms for disconnect initiate for Phase II
# compatibility. Define subclasses for the three here so we can use
# those later.
class NoRes (DiscConf):
reason = 1
class DiscComp (DiscConf):
reason = 42
class NoLink (DiscConf):
reason = 41
# DI is like DC but it adds session control disconnect data
class DiscInit (NspHdr):
_layout = (( packet.B, "dstaddr", 2 ),
( packet.B, "srcaddr", 2 ),
( packet.B, "reason", 2 ),
( tolerantI, "data_ctl", 16 ))
type = NspHdr.CTL
subtype = NspHdr.DI
OBJ_FAIL = 38 # Object failed (copied from session.py)
UNREACH = 39 # Destination unreachable (copied from session.py)
# Mapping from packet type code (msgflg field) to packet class
msgmap = { (c.type << 2) + (c.subtype << 4) : c
for c in ( AckData, AckOther, AckConn, ConnConf,
DiscConf, DiscInit, IntMsg, LinkSvcMsg ) }
# Put in Connect Init with its two msgflag values
msgmap[(NspHdr.CTL << 2) + (NspHdr.CI << 4)] = ConnInit
msgmap[(NspHdr.CTL << 2) + (NspHdr.RCI << 4)] = ConnInit
# For data segments we put in all 4 combinations of bom/eom flags so
# we can just do the message map without having to check for those cases
# separately.
for st in range (4):
msgmap[NspHdr.DATA + (st << 5)] = DataSeg
# Put in an "ignore me" entry for NOP packets
msgmap[(NspHdr.CTL << 2) + (NspHdr.NOP << 4)] = None
# Mapping from reason to specific Disconnect Confirm subclass
dcmap = { c.reason : c for c in ( NoRes, DiscComp, NoLink ) }
class NspCounters (BaseCounters):
nodecounters = [
( "time_since_zeroed", "Time since counters zeroed" ),
( "byt_rcv", "User bytes received" ),
( "byt_xmt", "User bytes sent" ),
( "msg_rcv", "User messages received" ),
( "msg_xmt", "User messages sent" ),
( "t_byt_rcv", "Total bytes received" ),
( "t_byt_xmt", "Total bytes sent" ),
( "t_msg_rcv", "Total messages received" ),
( "t_msg_xmt", "Total messages sent" ),
( "con_rcv", "Connects received" ),
( "con_xmt", "Connects sent" ),
( "timeout", "Response timeouts" ),
( "no_res_rcv", "Received connect resource errors" )
]
def __init__ (self, owner):
super ().__init__ (owner)
self.byt_rcv = 0
self.byt_xmt = 0
self.msg_rcv = 0
self.msg_xmt = 0
self.t_byt_rcv = 0
self.t_byt_xmt = 0
self.t_msg_rcv = 0
self.t_msg_xmt = 0
self.con_rcv = 0
self.con_xmt = 0
self.timeout = 0
self.no_res_rcv = 0
class NSPNode (object):
"""The remote node state needed by NSP. This is a base class of
the Nodeinfo object, which is what node.nodeinfo () returns.
"""
# Allow a subclass to change what counters this node has.
counterclass = NspCounters
def __init__ (self):
# NSP specific node state -- see NSP 4.0.1 spec, table 6.
self.delay = 0
self.counters = self.counterclass (self)
def used (self):
# Returns True if this node has been used.
return self.counters.t_byt_rcv or self.counters.t_byt_xmt or \
self.counters.con_rcv or self.counters.con_xmt
def get_api (self):
ret = dict ()
# Supply counts, but only if we have some
if self.used ():
for f, lb in self.counters.nodecounters:
ret[f] = getattr (self.counters, f)
ret["delay"] = self.delay
return ret
# Packet types that trigger No Link response if not mapped to a connection
nolinkset = { ConnConf, DiscInit, DataSeg, IntMsg, LinkSvcMsg }
# These reason codes are internal to NSP and not available to SC
reservedreasons = { NoRes.reason, DiscComp.reason, NoLink.reason }
def shortname (n):
if isinstance (n, Nodeid):
return Nodeid (n)
return n.name
def skey (n):
return not isinstance (n[0], Nodeid), n
class NSP (Element):
"""The NSP Entity. This owns all the connections. It implements
the ECL (formerly NSP) layer of the DECnet Network Architecture.
"""
def __init__ (self, parent, config):
super ().__init__ (parent)
logging.debug ("Initializing NSP")
# Dictionary of connections indexed by local connection address
self.connections = EntityDict ()
# Ditto but indexed by node ID and remote connection address.
self.rconnections = EntityDict ()
self.config = config = config.nsp
self.maxconns = config.max_connections
# Fixed for now
self.inact_time = 300
self.conn_timeout = 30
#
self.ectr = parent.routing.nodeinfo.counters
self.init_id ()
# Create the "reserved port"
self.resport = ReservedPort (self)
# Figure out what NSP version code we will send in CI/CC messages
self.nspver = (ConnMsg.VER_PH2,
ConnMsg.VER_PH3,
ConnMsg.VER_PH4)[self.node.phase - 2]
def start (self):
logging.debug ("Starting NSP")
self.routing = self.parent.routing
def stop (self):
logging.debug ("Stopping NSP")
def get_api (self):
return { "version" : nspverstrings[self.nspver],
"max_connections" : self.maxconns,
"connections" : self.connections.get_api () }
def connect (self, dest, payload):
"""Session control request for an outbound connection. Returns
the connection address if the request was accepted.
"""
c = Connection (self, outbound = (dest, payload))
return c
def dispatch (self, item):
if isinstance (item, Received):
# Arriving packet delivered up from Routing. Map the packet
# to a port (Connection object), see NSP 4.0.1 spec
# section 6.2 (receive dispatcher)
buf = item.packet
# If this packet was looped back from one sent by this
# NSP, it will be in the form of a Packet object, not a
# buffer, so trying to parse it will bring pain. If so,
# just turn it into bytes so the common code works.
buf = makebytes (buf)
msgflg = buf[0]
try:
t = msgmap[msgflg]
except KeyError:
# TYPE or SUBTYPE invalid, or MSGFLG is extended (step 1)
logging.trace ("Ill formatted NSP packet received from {}: {}",
item.src, item.packet)
logging.trace ("Unrecognized msgflg value {}, ignored", msgflg)
# FIXME: this needs to log the message in the right format
self.node.logevent (events.inv_msg, message = buf,
source_node = item.src)
return
if not t:
# NOP message to be ignored, do so.
if logging.tracing:
logging.trace ("NSP NOP packet received from {}: {}",
item.src, item.packet)
return
try:
pkt = t (buf)
except packet.DecodeError:
logging.debug ("Invalid packet {}", buf)
# Ignore it
return
if t is DiscConf:
# Do a further lookup on disconnect confirm reason code
# (step 5)
try:
t = dcmap[pkt.reason]
except KeyError:
# Other Disconnect Confirm, that's Phase II stuff.
# Parse it as a generic DiscConf packet
pass
pkt = t (buf)
if logging.tracing:
logging.trace ("NSP packet received from {}: {}", item.src, pkt)
if t is ConnInit:
# Step 4: if this is a returned CI, find the connection
# that sent it.
# Note that the error case of CI with non-zero dest addr
# is caught by the general DecodeError handling above.
if item.rts:
try:
conn = self.connections[pkt.srcaddr]
if conn.state != conn.ci:
# Unexpected RTS, ignore
return
except KeyError:
# Not there, must have been deleted. Ignore.
logging.trace ("Returned CI not matched, ignored")
return
else:
# Step 3: see if this is a retransmit, otherwise
# map it onto a new Connection if available.
cikey = (item.src, pkt.srcaddr)
if cikey in self.rconnections:
conn = self.rconnections[cikey]
if conn.state not in (conn.cr, conn.cc):
# Unexpected in this state, discard
return
else:
# Set the parsed packet into the work item
item.packet = pkt
try:
conn = Connection (self, inbound = item)
# All done (constructor did all the work)
return
except Exception:
# Can't create another connection, give it
# to the reserved port for a No Resources reply.
logging.debug ("Can't allocate connection for CI",
exc_info = True)
conn = self.resport
else:
# Step 6 or 7: look up via the local link address.
conn = self.connections.get (pkt.dstaddr, None)
# Do all the port mapping checks. The NSP spec
# (section 6.2) lists them in a manner that doesn't
# directly match the flow here, because here we start
# with a lookup based only on the dstaddr (our
# address) field.
if conn:
if conn.state in (conn.ci, conn.cd):
# CI or CD state, check the message (rule 6 first
# part, rule 7 note).
if t in (NoRes, ConnConf, DiscInit, DiscConf):
if conn.dstaddr == 0:
# Dest address not set yet
cikey = (item.src, pkt.srcaddr)
self.rconnections[cikey] = conn
conn.dstaddr = pkt.srcaddr
elif t is not AckConn:
conn = None # Not a valid mapping
else:
# We have a remote address, do a full check
# (rule 6 second part, rule 7)
if t is AckConn:
# Conn Ack only maps to a connection in CI state
conn = None
if conn:
# We still think we have a connection mapping,
# do the source address check.
if item.src != conn.dest and \
isinstance (conn.dest, Nodeid):
# Node address mismatch. Note we don't
# check this for loop nodes, where the
# destination "address" is a circuit
# rather than a Nodeid.
conn = None
elif t is not AckConn and pkt.srcaddr != conn.dstaddr:
# Mismatch, map to reserved port or discard
conn = None
# No valid connection mapping found, send message to the
# reserved port or discard, according to message type.
if not conn:
if t in (AckConn, NoRes, DiscConf, DiscComp, NoLink,
AckData, AckOther):
# discard the packet silently
logging.trace ("Packet with bad address discarded: {}", pkt)
return
# Something that needs a reply, map to reserved port
conn = self.resport
# Packet is mapped to a port, so process it there. Change
# the packet attribute in the work item to be the parsed
# packet from the logic above.
# Adjust the total counters
if conn is not self.resport:
nc = conn.destnode.counters
nc.t_byt_rcv += len (pkt.decoded_from)
nc.t_msg_rcv += 1
item.packet = pkt
conn.dispatch (item)
def init_id (self):
# Initialize the free connection ID list. The algorithm used
# meets the requirements of the Phase II NSP spec for the case
# where the node talks to an intercept node.
#
# This requires:
# 1. The low order bits of the ID must be unique
# 2. The low order bits must not be zero
# 3. Previously used IDs must not be reused for as long as possible.
#
# The solution is to use a circular list (deque) where IDs are
# taken from one end and put back in the other. The list is
# initialized with max-connections entries, each with a
# different non-zero low order value (for example, if
# max-connections is 511, "low order" means the bottom 9 bits).
# Each entry has a random value in the high order bits. When an
# ID is freed, the high order part is incremented.
c = self.maxconns + 1
fc = [ i + random.randrange (0, 65536, c) for i in range (1, c) ]
random.shuffle (fc)
self.freeconns = deque (fc)
def get_id (self):
if not self.freeconns:
return None
return self.freeconns.popleft ()
def ret_id (self, i):
i = (i + self.maxconns + 1) & 0xffff
self.freeconns.append (i)
def http_get (self, mobile, parts, qs):
infos = ( "summary", "status", "counters", "characteristics" )
if not parts or parts == ['']:
what = "summary"
elif parts[0] in infos:
what = parts[0]
else:
return None, None
active = infos.index (what) + 1
sb = html.sbelement (html.sblabel ("Information"),
html.sbbutton (mobile, "nsp", "Summary", qs),
html.sbbutton (mobile, "nsp/status", "Status", qs),
html.sbbutton (mobile, "nsp/counters",
"Counters", qs),
html.sbbutton (mobile, "nsp/characteristics",
"Characteristics", qs))
sb.contents[active].__class__ = html.sbbutton_active
ret = self.html (what)
return sb, html.main (*ret)
def html (self, what):
title = "NSP {1} for node {0.nodeid} ({0.name})".format (self.parent.routing, what)
if what == "summary":
body = [ "Version: {}".format (nspverstrings[self.nspver]),
"Current connections: {}".format (len (self.connections)),
"Peak connections: {}".format (self.ectr.peak_conns),
"Max connections: {}".format (self.maxconns) ]
return [ html.firsttextsection (title, body) ]
if what == "characteristics":
echar = [ "Version: {}".format (nspverstrings[self.nspver]),
"Max connections: {}".format (self.maxconns),
"NSP weight: {}".format (self.config.nsp_weight),
"NSP delay: {:.2f}".format (self.config.nsp_delay),
"Queue limit: {}".format (self.config.qmax),
"Max retransmits : {}".format (self.config.retransmits) ]
ret = [ html.firsttextsection (title, echar) ]
objects = self.parent.session.html_objects ()
ret.append (objects)
lpnodes = sorted ([ n.nodename, n.circuit.name ]
for n in self.node.nodeinfo_byid.values ()
if n.loopnode)
hdr = [ "Node name", "Circuit" ]
ret.append (html.tbsection ("Loop nodes", hdr, lpnodes))
nodes = sorted ((Nodeid (k),
html.cell (n.nodename, 'class="double_right"'))
for k, n in self.node.nodeinfo_byid.items ()
if n.nodename and not n.loopnode)
nl = len (nodes)
if nl < 20:
cols = 1
elif nl < 40:
cols = 2
else:
cols = 3
skip = (nl + cols - 1) // cols
nodes3 = list ()
for i in range (skip):
row = list ()
for s in range (cols):
i2 = i + s * skip
if i2 < nl:
row.extend (nodes[i2])
row[-1].markup = ""
nodes3.append (row)
hdr = [ "Node ID",
html.hcell ("Node name", 'class="double_right"') ] * cols
hdr[-1] = html.hcell ("Node name")
ret.append (html.tbsection ("Node database", hdr, nodes3))
return ret
if what == "status":
estat = [ "Version: {}".format (nspverstrings[self.nspver]),
"Current connections: {}".format (len (self.connections)),
"Peak connections: {}".format (self.ectr.peak_conns),
"Max connections: {}".format (self.maxconns) ]
ret = [ html.firsttextsection (title, estat) ]
# Get the list of active nodes (those with traffic)
anodes = [ (shortname (k), n.nodename, "{:.1f}".format (n.delay))
for k, n in self.node.nodeinfo_byid.items ()
if n.used () ]
anodes.sort (key = skey)
hdr = ( "Node ID", "Node name", "Delay" )
if anodes:
ret.append (html.tbsection ("Node status", hdr, anodes))
else:
ret.append (html.textsection ("Node status",
[ "No active nodes" ]))
conns = self.html_conns (what)
ret.append (conns)
return ret
if what == "counters":
estat = [ "Version: {}".format (nspverstrings[self.nspver]),
"Current connections: {}".format (len (self.connections)),
"Peak connections: {}".format (self.ectr.peak_conns),
"Max connections: {}".format (self.maxconns) ]
ret = [ html.firsttextsection (title, estat) ]
# Get the executor counters
ctr = [ ( "{} = ".format (lb), getattr (self.ectr, fn))
for fn, lb in self.ectr.nodecounters
if fn not in self.ectr.exclude ]
ret.append (html.section ("Executor counters",
html.dtable (ctr)))
# Get the list of active nodes (those with traffic)
anodes = list ()
for k, n in self.node.nodeinfo_byid.items ():
if not n.used ():
continue
nc = n.counters
if nc is self.ectr:
continue
ctr = [ ( "{} = ".format (lb), getattr (nc, fn))
for fn, lb in nc.nodecounters ]
anodes.append ([ shortname (k), n.nodename, ctr ])
anodes.sort (key = skey)
hdr = ( "Node ID", "Node name" )
if anodes:
ret.append (html.detail_section ("Node counters", hdr, anodes))
else:
ret.append (html.textsection ("Node counters",
[ "No active nodes" ]))
# Note that currently there are no connection counters;
# traffic on connections is counted in nodes.
return ret
# Should not get here...
return [ "not yet implemented" ]
def html_conns (self, what):
# Return an HTML item for the current connections. For the
# moment this only applies to status (there are no connection
# counters).
title = "Logical links (connections)"
ret = list ()
sc = self.parent.session
if what == "status":
hdr = ("LLA", "State", "Object", "Node", "RLA", "Remote object")
for k, c in sorted (self.connections.items ()):
ret.append ((k, c.state.__name__, sc.html_localuser (c),
c.destnode, c.dstaddr, sc.html_remuser (c)))
if ret:
return html.tbsection (title, hdr, ret)
else:
return html.textsection (title, [ "No connections" ])
def read_node (self, req, nodeinfo, resp, links = None):
# Fill in a NICE read node response record with information
# from nodeinfo, inserting it into resp. Note that asking for
# it causes the entry to be created.
r = resp[nodeinfo]
r.entity = nicepackets.NodeEntity (nodeinfo)
# We have a node for which we have some information. Check
# the information type request to see what is wanted.
if req.sumstat ():
# summary or status
# Count the connections to this node. TODO: should this
# be tracked as state in the NSPNode object?
if links is None:
links = 0
for c in self.connections.values ():
if c.destnode == nodeinfo:
links += 1
if links:
r.active_links = links
if nodeinfo.delay != 0:
r.delay = int (nodeinfo.delay) or 1
elif req.char ():
# characteristics. Nothing except for executor or loop
if nodeinfo.loopnode:
r.circuit = nodeinfo.circuit.name
if nodeinfo == self.node.routing.nodeinfo:
# It's the executor
r.ecl_version = nspver3[self.nspver]
r.maximum_links = self.maxconns
r.delay_factor = int (self.config.nsp_delay * 16)
r.delay_weight = self.config.nsp_weight
r.inactivity_timer = self.inact_time
r.retransmit_factor = self.config.retransmits
r.incoming_timer = self.conn_timeout
r.outgoing_timer = self.conn_timeout
else:
# counters
nodeinfo.counters.copy (r)
def nice_read (self, req, resp):
# We only know about nodes
if not isinstance (req, nicepackets.NiceReadNode):
return
# We know nothing about adjacent nodes
if req.adj ():
return
if req.mult ():
# Multiple nodes: walk the node table
for n in self.node.nodeinfo_byid.values ():
# Add it to the response either if we're doing
# "known", or the executor, or "active" and there is a
# link, or "significant" and we have any information.
if req.known () or \
(n == self.node.routing.nodeinfo and not req.loop ()):
self.read_node (req, n, resp)
elif req.loop ():
# Loop nodes
if n.loopnode:
self.read_node (req, n, resp)
else:
# significant or active. See if we want to
# include this.
if req.sig () and \
((req.counters () and n.used ()) or \
(req.sumstat () and n.delay != 0)):
self.read_node (req, n, resp)
else:
# Active
l = 0
for c in self.connections.values ():
if c.destnode == n:
l += 1
if l:
self.read_node (req, n, resp, l)
else:
# Specific node by name or ID. Name was converted to
# Nodeinfo entry in node.py
try:
n = self.node.nodeinfo_byid[req.entity.value]
except KeyError:
try:
print ("looking for", req.entity.value.nodename)
n = self.node.nodeinfo_byname[req.entity.value.nodename]
except KeyError:
print ("no node", req.entity.value)
return
self.read_node (req, n, resp)
class txqentry (timers.Timer):
"""An entry in the retransmit queue for a subchannel.
Each entry has a timer, which is the retransmit timer for that
particular packet.
"""
__slots__ = ("packet", "txtime", "channel", "tries",
"msgnum", "segnum", "sent")
def __init__ (self, packet, channel, segnum = 0, msgnum = 0):
super ().__init__ ()
self.packet = packet
self.channel = channel
self.tries = self.txtime = 0
self.sent = False
self.segnum = segnum
self.msgnum = msgnum
def send (self):
pkt = self.packet
if isinstance (pkt, ConnInit):
if self.tries:
pkt.subtype = NspHdr.RCI
else:
pkt.subtype = NspHdr.CI
elif isinstance (pkt, AckHdr):
self.channel.set_acks (pkt)
# TODO: Skip this if phase 2 local node?
self.channel.node.timers.start (self, self.channel.parent.acktimeout ())
self.tries += 1
if self.txtime == 0:
self.txtime = time.time ()
self.channel.parent.sendmsg (self.packet)
self.sent = True
def ack (self):
"""Handle acknowledgment of packet. Also used when the packet
is not going to be transmitted again for some other reason
(like connection abort).
"""
self.channel.node.timers.stop (self)
if self.txtime:
self.channel.parent.update_delay (self.txtime)
def dispatch (self, item):
"""Handle timeout for the packet.
"""
# Count a timeout
c = self.channel.parent
c.destnode.counters.timeout += 1
# See if too many tries. It's 1 after the first try,
# incremented in the send operation, so check is >= not >.
if self.tries >= c.parent.config.retransmits:
# Limit exceeded, stop retransmitting. If we're dealing
# with a Connect Initiate, that's all we do. For other
# packets, we disconnect. The connection is simply closed
# because it doesn't seem we're getting across to the
# other end. The reason disconnect isn't done for CI is
# that the remote might be a Phase II node, which doesn't
# send Connect Ack.
logging.trace ("Retransmit limit on {}", self.packet)
if isinstance (self.packet, ConnInit):
# Stop the timer for this packet
self.channel.node.timers.stop (self)
else:
# Not CI, so close due to "destination unreachable"
disc = DiscInit (reason = UNREACH, data_ctl = b"")
c.to_sc (Received (self, packet = disc), False)
c.close ()
# Mark connection as closed
c.set_state (c.closed)
return
self.sent = False
# Don't send just yet if flow control forbids it
if not isinstance (self.packet, DataSeg) or \
self.channel.flow_ok (self):
self.send ()
class Subchannel (Element, timers.Timer):
"""A subchannel (data or other-data) within an NSP connection. This
is where we keep the per-subchannel state: queues, flow control
parameters, sequence numbers, etc.
The timer base class is for the ack holdoff timer. Packet timeout
is handled on a per-packet basis, by the txqentry class.
"""
# Holdoff delay
HOLDOFF = 0.1
def __init__ (self, parent):
Element.__init__ (self, parent)
timers.Timer.__init__ (self)
self.pending_ack = deque () # List of pending txqentry items
self.nextseg = 1 # Next segment number
self.nextmsg = 1 # Next message number
self.maxseg = 0 # Max segment number allowed to be sent
self.maxmsg = 0 # Max message number allowed to be sent
self.maxseqsent = Seq (0) # Highest sequence number actually sent
self.maxackseg = 0 # Highest segment number acked
self.acknum = Seq (0) # Outbound ack number
self.ackpending = False # No deferred ack
# The flow control parameters are remote flow control -- we don't
# do local flow control other than to request another interrupt
# each time we get one. So there are no explicit local flow
# attributes.
self.xon = True # Flow on/off switch
self.flow = ConnMsg.SVC_NONE # Outbound flow control selected
self.ooo = dict () # Pending received out of order packets
def dispatch (self, item):
if isinstance (item, timers.Timeout):
# Send an explicit ack
self.send_ack ()
elif isinstance (item, Received):
# A packet for this subchannel.
pkt = item.packet
# Process any ack number fields -- this is done before we
# look at the sequence number.
self.process_ack (pkt.acknum)
self.process_ack (pkt.acknum2)
if isinstance (pkt, (AckData, AckOther)):
# Explicit ACK message, so no data, we're done
return
# Check the sequence number against the next expected value.
num = pkt.segnum
if num <= self.acknum:
# Duplicate, send an explicit ack, but otherwise ignore it.
self.send_ack ()
return
elif num != self.acknum + 1:
# Not next in sequence, save it
if logging.tracing:
logging.trace ("Saving out of order NSP packet {}", pkt)
self.ooo[num] = item
return
# It's in sequence. Process it, as well as packets waiting
# in the out of order cache that are now in order.
while item:
self.acknum = num
self.ackpending = True
self.process_data (item)
num += 1
# Remove the packet with the next higher sequence number
# from the OOO cache, if it is there, and keep going if
# so.
item = self.ooo.pop (num, None)
# Done with in-sequence packets, start the ACK holdoff timer
# if it isn't already running.
if self.ackpending and not self.islinked ():
# ACK holdoff timer is not yet running, start it
self.node.timers.start (self, self.HOLDOFF)
def send_ack (self):
self.node.timers.stop (self)
ack = self.parent.makepacket (self.Ack)
self.set_acks (ack, True)
self.parent.sendmsg (ack)
def set_acks (self, pkt, explicit = False):
if explicit or self.ackpending:
self.ackpending = False
self.node.timers.stop (self)
pkt.acknum = AckNum (self.acknum)
if self.parent.cphase == 4:
# Phase IV, we can use cross-subchannel ACK.
other = self.cross
if other.ackpending:
other.ackpending = False
self.node.timers.stop (other)
pkt.acknum2 = AckNum (other.acknum, AckNum.XACK)
def process_ack (self, num):
if num is not None:
if num.is_cross ():
if self.parent.cphase < 4:
logging.debug ("Cross-subchannel ACK/NAK but phase is {}",
self.parent.cphase)
self.cross.ack (num.num)
else:
self.ack (num.num)
def ack (self, acknum):
"""Handle a received ack on this subchannel.
"""
try:
firsttxq = self.pending_ack[0]
except IndexError:
return
if isinstance (firsttxq.packet, (IntMsg, DataSeg)):
if acknum < firsttxq.packet.segnum or acknum > self.maxseqsent:
# Duplicate or out of range ack, ignore.
# Note that various control packets end up in the Data
# subchannel as well, and those don't have sequence numbers.
logging.trace ("Ignoring ack, first {} last {}, got {}",
firsttxq.packet.segnum, self.maxseqsent, acknum)
return
count = acknum - firsttxq.packet.segnum + 1
else:
count = 1
acked = None
for i in range (count):
acked = self.pending_ack.popleft ()
acked.ack ()
self.maxackseg = acked.segnum
def close (self):
"""Handle connection close actions for this subchannel. This is
also used for connection abort, to discard all pending packets and
stop timers.
"""
self.node.timers.stop (self)
for pkt in self.pending_ack:
pkt.ack ()
self.pending_ack.clear ()
self.ooo.clear ()
class Data_Subchannel (Subchannel):
# Class for ACKs send from this subchannel
Ack = AckData
name = "data"
def __init__ (self, parent):
super ().__init__ (parent)
self.qmax = parent.parent.config.qmax
def process_data (self, item):
"""Process a data packet that is next in sequence.
"""
self.parent.to_sc (item)
def process_ack (self, num):
super ().process_ack (num)
# Some transmits may have been blocked that are now ok, try
# again.
self.send_blocked ()
if self.parent.shutdown and not self.pending_ack:
self.parent.disc_rej (*self.parent.pending_disc)
self.parent.set_state (self.parent.di)
def flow_ok (self, qe):
"""Return True if this queue entry can be transmitted now, False
if not, according to the current flow control state.
The rule is: this packet can be sent if:
1. In flight packet count is <= maxq parameter (20 by default), and
2. Flow is on (xon/xoff state is "xon"), and
3. One of:
a. No flow control, or
b. segment flow ctl, and this segment <= max allowed segment, or
c. message flow ctl, and this message <= max allowed message
TODO: add congestion control per DEC-TR-353 (Raj Jain)
"""
if self.pending_ack:
maxq = self.pending_ack[0].segnum + self.qmax - 1
if qe.segnum > maxq:
return False
else:
maxq = self.qmax - 1
return self.xon and qe.segnum <= maxq and \
(self.flow == ConnMsg.SVC_NONE or
(self.flow == ConnMsg.SVC_SEG and qe.segnum <= self.maxseg) or
(self.flow == ConnMsg.SVC_MSG and qe.msgnum <= self.maxmsg))
def send (self, pkt):
"""Queue a packet for transmission, and try to send it.
Note that the data subchannel is used not just for data
segments, but also for control packets that are retransmitted:
Connect Init, Connect Confirm, Disconnect Init.
"""
if isinstance (pkt, DataSeg):
pkt.segnum = Seq (self.nextseg % Seq.modulus)
qe = txqentry (pkt, self, self.nextseg, self.nextmsg)
self.nextseg += 1
if pkt.eom:
self.nextmsg += 1
else:
qe = txqentry (pkt, self)
self.pending_ack.append (qe)
if self.send_qe (qe) and isinstance (pkt, DataSeg):
self.maxseqsent = max (self.maxseqsent, pkt.segnum)
def send_qe (self, qe):
"""Attempt to send an item that has previously been put on the
transmit queue. Return True if it was sent, False if it cannot
be sent right now due to flow control or too many unacknowledged
segments.
"""
if isinstance (qe.packet, DataSeg):
# For data segments, check if we can transmit now. If not,
# just leave it queued; it will be transmitted when flow
# control permits.
if not self.flow_ok (qe):
# Not allowed to send.
return False
# Good to go; send it and start the timeout.
qe.send ()
return True
def process_ls (self, pkt):
"""Process a link service packet that updates the data
subchannel, i.e., a "Data request" packet in the NSP spec
terminology.
"""
if self.flow == ConnMsg.SVC_MSG:
delta = pkt.fcval
if delta >= 0 and self.maxmsg + delta < self.maxackseg + 128:
self.maxmsg += delta
else:
logging.debug ("Invalid LS (Data Request, message mode) message {}", pkt)
return
elif self.flow == ConnMsg.SVC_SEG:
delta = pkt.fcval
if self.maxackseg <= self.maxseg + delta < self.maxackseg + 128:
self.maxseg += delta
else:
logging.debug ("Invalid LS (Data Request, segment mode) message {}", pkt)
return
if pkt.fcmod:
self.xon = pkt.fcmod == pkt.XON
self.send_blocked ()
def send_blocked (self):
"""Look for not-sent packets in the transmit queue, and retry
sending them. Quit when one is refused again.
"""
for qe in self.pending_ack:
if not qe.sent:
if not self.send_qe (qe):
break
if isinstance (qe.packet, DataSeg):
self.maxseqsent = max (self.maxseqsent, qe.packet.segnum)
class Other_Subchannel (Subchannel):
# Class for ACKs send from this subchannel
Ack = AckOther
name = "interrupt"
def __init__ (self, parent):
super ().__init__ (parent)
self.seqnum = Seq (1) # Next transmitted sequence number
self.maxmsg = 1 # Allowed to send one interrupt
# Interrupt flow control is different from data flow control, but
# the closest analog is message flow control.
self.flow = ConnMsg.SVC_MSG
def process_data (self, item):
pkt = item.packet
if isinstance (pkt, IntMsg):
# We don't bother checking inbound flow control, i.e.,
# while we never issue any outbound requests for interrupt
# messages, we still permit the other end to send more
# than one.
return self.parent.to_sc (item)
# Not interrupt, so it's link service.
if pkt.fcval_int == pkt.DATA_REQ:
self.cross.process_ls (pkt)
else:
self.process_ls (pkt)
def process_ls (self, pkt):
"""Process a link service packet that updates the interrupt
subchannel, i.e., an "Interrupt request" packet in the NSP spec
terminology. The only meaningful field is FCVAL, which must be
non-negative, and the total request count cannot exceed 127.
"""
delta = pkt.fcval
if delta >= 0:
self.maxmsg += delta
else:
logging.debug ("Invalid LS (Interrupt Request) message {}", pkt)
def send (self, pkt):
"""Queue a packet for transmission, and send it if we're allowed.
The check for "allowed" does not apply to link service messages.
PyDECnet does not send Link Service messages except for the
no-op Keepalive message. (If we ever allow more than one
Interrupt message inbound that would change; there does not
appear to be any good reason for using data subchannel flow
control.)
"""
if not isinstance (pkt, LinkSvcMsg) and self.maxmsg < self.nextmsg:
# Interrupt sends are refused if we're not allowed to send
# right now.
raise CantSend
qe = txqentry (pkt, self, msgnum = self.nextmsg)
self.nextmsg += 1
self.pending_ack.append (qe)
# Good to go; send it and start the timeout.
qe.send ()
self.maxseqsent = max (self.maxseqsent, pkt.segnum)
class Connection (Element, statemachine.StateMachine):
"""An NSP connection object. This contains the connection state
machine, the data and other-data subchannel state, and the session
control API with the exception of the "connect" call. Arriving
packets that are mapped onto the connection come into the per-state
processing function (via "dispatch" in the Statemachine base class).
Note that packet type validation and address checks have already
been done as part of the "mapping" of arriving packets onto connections.
The timer of a Connection is the inactivity timer when in RUN state,
and the timeout when in CI/CD or CR states.
A note on connection states:
The NSP spec uses a model where the Session Control layer polls NSP
for things it needs to know about. Because of that model, there are
a number of connection states to represent "waiting for SC to poll
NSP for something it has to hear". The implementation we have here
uses queueing of messages to SC instead of polling. The result is that
none of those "waiting for SC to poll" states are needed; instead,
whatever the NSP spec delivers to SC for a poll in such a state is
handled instead by a message to SC and an immediate transition to
the state after.
Specifically, this means that the O, DN, RJ, NC, and NR states
do not exist.
In the same way, states that exist only to model the "waiting for
SC to close the port" case do not exist either. Instead, the
connection is closed immediately.
This means that the DRC, CN, and DIC states do not exist either.
Finally, there is no DR state because it does the same thing as DI;
we use DI state instead.
Note, though, that CL exists after a fashion. When a connection is
closed, NSP no longer knows about it (for example, it is no longer
listed in the connection address lookup tables) but it is possible for
some other component still to hold a reference to it. State CN is
represented by Connection.state == None.
"""
def __init__ (self, parent, *, inbound = None, outbound = None):
Element.__init__ (self, parent)
statemachine.StateMachine.__init__ (self)
# srcaddr and dstaddr are the connection identifiers, not
# node addresses -- this matches the spec terminology
self.srcaddr = srcaddr = self.parent.get_id ()
if srcaddr is None:
raise ConnectionLimit
# We will add this connection to the dictionary of connections
# known to NSP. Don't do that yet, but check the max count.
ccount = len (self.parent.connections) + 1
if ccount > self.parent.ectr.peak_conns:
self.parent.ectr.peak_conns = ccount
self.dstaddr = 0
self.shutdown = False
# Parameters
self.inact_time = parent.inact_time
self.conn_timeout = parent.conn_timeout
# Flags
self.rstssegbug = False
# Initialize the data segment reassembly list
self.asmlist = list ()
self.data = Data_Subchannel (self)
# We use the optional "multiple other-data messages allowed at a time"
# model, rather than the one at a time model that the NSP spec uses.
# That makes the two subchannels look basically the same -- same data
# structures, same control machinery.
self.other = Other_Subchannel (self)
# Set the "other subchannel" references
self.other.cross = self.data
self.data.cross = self.other
# Now do the correct action for this new connection, depending
# on whether it was an arriving one (CI packet) or originating
# (session layer "connect" call).
if inbound:
pkt = inbound.packet
# Inbound connection. Save relevant state about the remote
# node, and send the payload up to session control.
self.dest = inbound.src
if isinstance (self.dest, Nodeid):
self.destnode = self.parent.node.nodeinfo (self.dest)
else:
# Circuit, so try to find the loop node.
if self.dest.loop_node is not None:
self.destnode = self.dest.loop_node
else:
self.destnode = self.node.nodeinfo (self.node.nodeid)
self.destnode.counters.con_rcv += 1
self.dstaddr = pkt.srcaddr
self.parent.rconnections[(self.dest, self.dstaddr)] = self
self.setphase (pkt)
self.data.flow = pkt.fcopt
self.segsize = min (pkt.segsize, MSS)
if self.cphase > 2:
# If phase 3 or later, send CA
ca = self.makepacket (AckConn)
self.sendmsg (ca)
# Now add the connection to the dictionary of ones we
# know. This needs to happen before we send it up to
# session control.
self.parent.connections[srcaddr] = self
# Set the new state, and send the packet up to Session Control
self.set_state (self.cr)
self.to_sc (inbound)
elif outbound:
dest, payload = outbound
# Create an outbound connection to the given destination node,
# with the supplied session control layer payload.
if dest == Nodeid (0):
dest = self.parent.node.nodeid
self.dest = dest
try:
self.destnode = self.parent.node.nodeinfo (dest)
except KeyError:
raise UnknownNode from None
dest = self.dest = self.destnode.get_dest ()
self.destnode.counters.con_xmt += 1
ci = self.makepacket (ConnInit, payload = payload,
fcopt = ConnMsg.SVC_NONE,
info = self.parent.nspver,
segsize = MSS)
logging.trace ("Connecting to {}: {}", dest, payload)
# Do this first otherwise that packet is processed in the
# wrong state if it is addressed to ourselves.
self.set_state (self.ci)
# Now add the connection to the dictionary of ones we
# know. That too happens before the message is sent, for
# the same reason.
self.parent.connections[srcaddr] = self
# Send it on the data subchannel
self.data.send (ci)
else:
raise ValueError ("missing inbound or outbound argument")
# Either way we start a timeout to reject the connection if
# the other end (outbound) or the local application (inbound)
# takes too long.
self.node.timers.start (self, self.conn_timeout)
def setphase (self, pkt):
# Remember the connection version (lower of the local and remote
# version numbers). Since the version numbers are not in
# numeric order, map received version to remote DECnet phase,
# and save the lower of that and ours. The field is specified
# as EX format, but only the bottom 2 bits are defined as the
# NSP version code.
self.rphase = nspphase[pkt.info & 3]
self.cphase = min (self.rphase, self.parent.node.phase)
def s0 (self, item):
raise InternalError ("S0 state not used")
def closed (self, item):
raise InternalError ("Closed state should not be reached")
def get_api (self):
ret = { "local_addr" : self.srcaddr,
"remote_addr" : self.dstaddr,
"state" : self.state.name }
if self.destnode:
ret["node"] = self.destnode.nodeid
return ret
def close (self):
"""Get rid of this connection. This doesn't send anything;
if messages are needed, that is up to the caller.
"""
self.node.timers.stop (self)
del self.parent.connections[self.srcaddr]
# dstaddr isn't set yet if we're closing due to timeout after
# CI, or CI returned to sender.
if self.dstaddr:
# The lookup by destination (remote) address in the case
# of a loop node connection may be either the loop node's
# circuit (if it was matched when the incoming packet
# arrived) or the executor's address. So try it both
# ways.
try:
del self.parent.rconnections[(self.dest, self.dstaddr)]
except KeyError:
del self.parent.rconnections[(self.node.nodeid, self.dstaddr)]
self.parent.ret_id (self.srcaddr)
# Clean up the subchannels
self.data.close ()
self.other.close ()
logging.trace ("Deleted connection {} to {}", self.srcaddr, self.dest)
return self.closed
def setsockopt (self, rstssegbug = False, **kwds):
"""Set connection options or flags not directly related to any
standard connection API call.
"""
self.rstssegbug = rstssegbug
def accept (self, payload = b""):
"""Accept an incoming connection, using the supplied payload
as session control accept data.
"""
if self.state != self.cr:
raise WrongState
cc = self.makepacket (ConnConf, data_ctl = payload,
fcopt = ConnMsg.SVC_NONE,
info = self.parent.nspver,
segsize = MSS)
logging.trace ("Accepting to {}: {}", self.srcaddr, payload)
# Send it on the data subchannel as an acknowledged message if
# phase 3 or later, but send it direct (no ack expected) for
# phase 2.
if self.cphase == 2:
self.set_state (self.run)
# Stop the connect timer
self.node.timers.stop (self)
self.sendmsg (cc)
else:
self.set_state (self.cc)
self.data.send (cc)
return True
def reject (self, reason = 0, payload = b""):
"""Reject an incoming connection, using the supplied reason
code and payload as session control reject data.
"""
if self.state != self.cr:
raise WrongState
# Stop the connect timer
self.node.timers.stop (self)
# Do this first so the state will be DI if this is the local
# node where the DiscComp comes back immediately.
self.set_state (self.di)
self.disc_rej (reason, payload)
return True
def disc_rej (self, reason, payload):
# Common code for reject, disconnect, and abort
if reason < 0 or reason > 255 or reason in reservedreasons:
raise RangeError
self.node.timers.stop (self)
# Zap the subchannels:
self.data.close ()
self.other.close ()
di = self.makepacket (DiscInit, reason = reason, data_ctl = payload)
logging.trace ("Disconnecting (or rejecting) to {}: {} {}",
self.dest, reason, payload)
# Send it on the data subchannel
self.data.send (di)
def disconnect (self, reason = 0, payload = b""):
"""Disconnect an active connection, using the supplied reason
code and payload as session control disconnect data. This is
a "clean shutdown", the connection is closed once pending outbound
transmits have been acknowledged.
"""
if self.state != self.run:
raise WrongState
if self.data.pending_ack:
# Data not all acked yet, don't send DI just yet
# TODO: need to add code to send it later.
self.shutdown = True
self.pending_disc = (reason, payload)
else:
# Do this first so the state will be DI if this is the local
# node where the DiscComp comes back immediately.
self.set_state (self.di)
self.disc_rej (reason, payload)
return True
def abort (self, reason = 0, payload = b""):
"""Disconnect an active connection, using the supplied reason
code and payload as session control disconnect data. This is
a "hard shutdown", the connection is closed immediately, any
pending transmits are discarded.
"""
if self.state != self.run:
raise WrongState
self.disc_rej (reason, payload)
self.set_state (self.di)
return True
def send_data (self, data):
"""Send a message. Segmentation will be done here, i.e., we
implement a session control message interface. Messages are
queued without limit, but of course are only transmitted if
flow control rules permit.
"""
if self.state != self.run or self.shutdown:
raise WrongState
self.destnode.counters.byt_xmt += len (data)
self.destnode.counters.msg_xmt += 1
bom = 1
dl = len (data)
for i in range (0, dl, self.segsize):
eom = 0
if dl - i <= self.segsize:
eom = 1
pkt = self.makepacket (DataSeg, bom = bom, eom = eom,
payload = data[i:i + self.segsize])
bom = 0
self.data.send (pkt)
return True
def interrupt (self, data):
"""Send an interrupt. This is accepted only if an interrupt
message is allowed to be sent right now. That is true when
the connection is first opened, and whenever the remote node
allows another interrupt to be sent. Typically DECnet nodes
allow only one interrupt at a time, so when this function is
called, permission to send another is denied until the remote
node gets around to sending another flow control message that
issues another interrupt credit.
"""
if self.state != self.run or self.shutdown:
raise WrongState
if len (data) > 16:
raise IntLength
self.destnode.counters.byt_xmt += len (data)
self.destnode.counters.msg_xmt += 1
sc = self.other
pkt = self.makepacket (IntMsg, payload = data,
segnum = sc.seqnum)
sc.send (pkt)
# It was accepted, so increment the sequence number
logging.trace ("sent interrupt seq {}", sc.seqnum)
sc.seqnum += 1
return True
def to_sc (self, item, reject = False):
"""Send a work item to Session Control.
"""
pkt = item.packet
nc = self.destnode.counters
if isinstance (pkt, DataSeg):
if self.asmlist:
# Not first segment
if pkt.bom:
logging.debug ("BOM flag, but not first segment: {}", pkt)
self.asmlist.append (pkt.payload)
if not pkt.eom:
# Not last segment, nothing to give to SC.
return
# Last segment of several. Construct a message for the
# entire payload
pkt = DataSeg (payload = b''.join (self.asmlist))
self.asmlist = list ()
item.packet = pkt
else:
if not pkt.bom:
if self.rstssegbug:
# Workaround requested for this issue. Some
# RSTS (DECnet/E) senders mishandle the flags;
# in RSTS this is done in the application, not
# in the DECnet core. Specifically, the event
# sender is known to get it wrong.
pkt.bom = pkt.eom = 1
else:
logging.debug ("first segment but no BOM flag: {}", pkt)
if not pkt.eom:
# First of several segments, save it
self.asmlist.append (pkt.payload)
return
# Single segment message, pass it up as is.
nc.byt_rcv += len (pkt.payload)
nc.msg_rcv += 1
elif isinstance (pkt, IntMsg):
nc.byt_rcv += len (pkt.payload)
nc.msg_rcv += 1
item.reject = reject
item.src = self
item.connection = self
self.node.addwork (item, self.node.session)
def update_delay (self, txtime):
if txtime and self.destnode:
delta = time.time () - txtime
# If the time estimate is smaller than our timer
# granularity, round it up to one tick. Otherwise we may
# end up with retransmit timeouts that are too short and
# produce false timeouts.
if delta < JIFFY:
delta = JIFFY
if self.destnode.delay:
# There is an estimate, do weighted average
self.destnode.delay += (delta - self.destnode.delay) \
/ (self.parent.config.nsp_weight + 1)
else:
# No estimate yet, use this one
self.destnode.delay = delta
if self.destnode.delay > 5:
# Cap it at 5 seconds. Sometimes we have congestion and
# the algorithm doesn't deal with that sanely.
self.destnode.delay = 5
def acktimeout (self):
if self.destnode.delay:
return self.destnode.delay * self.parent.config.nsp_delay
return 2 # Spec says default is 5 but 2 is plenty nowadays
def makepacket (self, cls, **kwds):
pkt = cls (dstaddr = self.dstaddr, **kwds)
# Connect Ack doesn't have a source address, so handle that separately
try:
pkt.srcaddr = self.srcaddr
except AttributeError:
pass
return pkt
def sendmsg (self, pkt):
if logging.tracing:
logging.trace ("NSP sending packet {} to {}", pkt, self.dest)
self.destnode.counters.t_byt_xmt += len (pkt)
self.destnode.counters.t_msg_xmt += 1
self.parent.routing.send (pkt, self.dest,
rqr = isinstance (pkt, ConnInit))
def validate (self, item):
if logging.tracing:
logging.trace ("Processing {} in connection {}", item, self)
return True
def cr (self, item):
"""Connect Received state. Mostly we wait here for Session
Control to decide what to do about an inbound connection.
We also ACK any retransmitted CI messages.
"""
if isinstance (item, Received):
pkt = item.packet
if isinstance (pkt, ConnInit) and self.cphase > 2:
# Retransmitted or out of order CI. Resend the CA.
ca = self.makepacket (AckConn)
self.sendmsg (ca)
elif isinstance (item, timers.Timeout):
# Timeout waiting for application confirm (or reject).
# Reject it with reason 38, and also deliver a disconnect up
# to session control.
self.reject (OBJ_FAIL)
disc = DiscInit (reason = OBJ_FAIL, data_ctl = b"")
self.to_sc (Received (self, packet = disc))
def ci (self, item):
"""Connect Init sent state. This just checks for Connect Ack
and returned Connect Init, everything else is common with the
CD state.
"""
if isinstance (item, Received):
pkt = item.packet
if isinstance (pkt, AckConn) and self.node.phase > 2:
# Connect Ack, go to CD state
self.data.ack (0) # Process ACK of the CI
return self.cd
elif isinstance (pkt, ConnInit):
# Returned outbound CI. Report unreachable to Session
# Control, after substituting a disconnect (reject) message.
#
# Note that inbound CI doesn't come here, it comes in via the
# constructor, or if retransmitted to the CR state handler.
item.packet = DiscInit (reason = UNREACH, data_ctl = b"")
self.to_sc (item, True)
return self.close ()
return self.cd (item)
def cd (self, item):
"""Connect Delivered state. This also serves as common code for
the Connect Init state since they are nearly identical.
"""
if isinstance (item, Received):
pkt = item.packet
if isinstance (pkt, ConnConf):
# Connection was accepted. Save relevant state about the remote
# node, and send the payload up to session control.
self.dstaddr = pkt.srcaddr
# Save connection version information
self.setphase (pkt)
self.data.flow = pkt.fcopt
self.segsize = min (pkt.segsize, MSS)
self.data.ack (0) # Treat this as ACK of the CI
if self.cphase > 2:
# If phase 3 or later, send data Ack
ack = self.data.send_ack ()
self.node.timers.start (self, self.inact_time)
# Send the accept up to Session Control
self.to_sc (item)
# Transition to RUN state
return self.run
elif isinstance (pkt, DiscInit):
# Connect Reject
self.dstaddr = pkt.srcaddr
# Send the reject up to Session Control
self.to_sc (item, True)
# Ack the reject message
ack = self.makepacket (DiscComp)
self.sendmsg (ack)
return self.close ()
elif isinstance (pkt, (NoRes, DiscConf)):
# No resources, or Phase 2 reject.
if isinstance (pkt, NoRes):
# Received a "no resources" reject, count that
destnode = self.parent.node.nodeinfo (item.src)
if destnode:
destnode.counters.no_res_rcv += 1
# Send the reject up to Session Control
self.to_sc (item, True)
return self.close ()
elif isinstance (item, timers.Timeout):
# Timeout waiting for confirm (or reject). We can't send
# anything to the other end because the protocol makes no
# provision for disconnect in CR state. So just deliver
# failure locally and make the connection go away.
disc = DiscInit (reason = OBJ_FAIL, data_ctl = b"")
self.to_sc (Received (self, packet = disc), True)
return self.close ()
def cc (self, item):
"""Connect Confirm state. Accept on an incoming connection
gets us to this point (except in Phase II where that goes
straight to RUN state).
"""
if isinstance (item, Received):
pkt = item.packet
if isinstance (pkt, (DataSeg, AckData, IntMsg,
LinkSvcMsg, AckOther)):
self.data.ack (0) # Treat ack or data as ACK of CC message
self.set_state (self.run)
return self.run (item)
def run (self, item):
if isinstance (item, Received):
pkt = item.packet
# On any received packet, restart the inactivity timer,
# if phase 3 or higher
if self.cphase > 2:
self.node.timers.start (self, self.inact_time)
if isinstance (pkt, (DataSeg, AckData)):
self.data.dispatch (item)
elif isinstance (pkt, (IntMsg, LinkSvcMsg, AckOther)):
self.other.dispatch (item)
elif isinstance (pkt, DiscInit):
self.to_sc (item)
ack = self.makepacket (DiscComp)
self.sendmsg (ack)
return self.close ()
elif isinstance (pkt, DiscConf):
self.to_sc (item)
return self.close ()
elif isinstance (pkt, ConnConf):
# Duplicate confirm
if self.cphase > 2:
# If phase 3 or later, send data Ack
ack = self.data.send_ack ()
return
elif isinstance (item, timers.Timeout):
# Inactivity timeout, send a no-change Link Service message
pkt = self.makepacket (LinkSvcMsg,
segnum = self.other.seqnum,
fcmod = LinkSvcMsg.DATA_REQ,
fcval_int = LinkSvcMsg.DATA_REQ,
fcval = 0)
self.other.seqnum += 1
self.other.send (pkt)
def di (self, item):
if isinstance (item, Received):
pkt = item.packet
if isinstance (pkt, DiscInit):
ack = self.makepacket (DiscComp)
self.sendmsg (ack)
return self.close ()
elif isinstance (pkt, DiscConf):
return self.close ()
class ReservedPort (Element):
"""An NSP "reserved port". This is a descriptive trick to talk about
error responses not tied to an active connection, things like no such
connection, or no resources. We implement a "reserved port" here
because that makes things simple.
"""
def dispatch (self, item):
"""Handle a work item for the reserved port. Typically these
generate an error response back to the sender; the specific
response depends on what we're replying to. Only Received items
come here.
"""
pkt = item.packet
logging.trace ("Processing {} in reserved port", pkt)
if isinstance (pkt, ConnInit):
# ConnInit could not be mapped, send No Resources
t = NoRes
else:
# Some other message could not be mapped, send No Link
t = NoLink
reply = t (srcaddr = pkt.dstaddr, dstaddr = pkt.srcaddr)
self.node.routing.send (reply, item.src)