#! """MOP support for DECnet/Python """ import time import socket import os import threading import queue from .common import * from . import events from . import packet from . import datalink from . import timers from . import statemachine from . import logging from . import html from . import nicepackets if not WIN: from fcntl import * SvnFileRev = "$LastChangedRevision: 487 $" class ReceiptGen (object): """Generates MOP message receipt numbers, which are integers in the range 1..0xffff. Note that 0 is not produced, it is used to indicate periodic messages as opposed to request/response exchanges. """ def __init__ (self): self.receipt = random.randint (1, 0xffff) self.lock = threading.Lock () def next (self): with self.lock: ret = self.receipt if ret == 0xffff: self.receipt = 1 else: self.receipt = ret + 1 return ret # Some well known Ethernet addresses CONSMC = Macaddr ("AB-00-00-02-00-00") LOOPMC = Macaddr ("CF-00-00-00-00-00") class C (packet.FieldGroup): """A MOP software identification field. This is either a counted string, or a single byte integer in the range -2..0. """ # We have to do this as a FieldGroup to get access to the owning # Packet object, which is where the "tolerant" flag is found. @classmethod def encode (cls, pkt, fname, maxlen): val = getattr (pkt, fname, None) if val is None: return b"" if isinstance (val, int): if not -2 <= val <= 0: logging.debug ("MOP C-n field integer not in -2..0") raise DecodeError ("Value {} not valid for C field".format (val)) return val.to_bytes (1, "little", signed = True) if isinstance (val, str): val = bytes (val, encoding = "latin1") else: val = makebytes (val) return byte (len (val)) + val @classmethod def decode (cls, buf, pkt, fname, maxlen): """Decode the next field in the buffer according to the rules for the "software" protocol field. Basically this is like an A-n field, but special values 0, -1, and -2 are accepted in the first byte, and string values are taken to be text strings. If the packet attribute "tolerant" is True, the decoder accepts certain non-conforming forms that are found in the wild. """ if not buf: if pkt.tolerant: return buf logging.debug ("No data left for C field") raise MissingData flen = int.from_bytes (buf[:1], "little", signed = True) if flen < -2: logging.debug ("C field with negative length {}", flen) raise DecodeError if flen <= 0: setattr (pkt, fname, flen) return buf[1:] if (flen > maxlen or flen > len (buf)) and pkt.tolerant: setattr (pkt, fname, str (buf, encoding = "latin1")) return b"" val, buf = packet.A.decode (buf, maxlen) setattr (pkt, fname, val) return buf @classmethod def checktype (cls, name, val): if isinstance (val, __class__): return val if isinstance (val, int): return cls (val) return packet.A.checktype (name, val) @classmethod def makecoderow (cls, name, maxlen): return cls, None, (name, maxlen), { name }, False class TIME (Field): __slots__ = ("tm",) def encode (self): """Encode a time.struct_time value into a 10 byte MOP encoding of time. """ try: tzoff = tm.tm_gmtoff // 60 except AttributeError: tzoff = 0 if tzoff < 0: hoff, moff = divmod (-tzoff, 60) hoff = (256 - hoff) & 0xff moff = (256 - moff) & 0xff else: hoff, moff = divmod (tzoff, 60) cent, yr = divmod (self.tm.tm_year, 100) eval = ( cent, yr, self.tm.tm_mon, self.tm.tm_mday, self.tm.tm_hour, self.tm.tm_min, self.tm.tm_sec, 0, hoff, moff ) return bytes (eval) @classmethod def decode (cls, buf): """Decode a MOP time value, which is a 10 byte value vaguely like what's found in a "struct tm" in Unix, or Python time.struct_time. """ require (buf, 10) t = buf[:10] if t[0]: yr = t[0] * 100 + t[1] else: # Not sure if is needed yr = 1900 + t[1] hoff = t[8] if hoff >= 128: hoff -= 256 moff = t[9] if moff >= 128: moff -= 256 tzoff = (hoff * 60 + moff) * 60 ret = cls () ret.tm = time.struct_time ((yr, t[2], t[3], t[4], t[5], t[6], 0, 0, -1, "", tzoff)) return ret, buf[10:] class MopHdr (packet.Packet): _layout = ( ( packet.B, "code", 1 ), ) class SysId (MopHdr): tolerant = True _addslots = ( "last_ts", ) _layout = ( ( packet.RES, 1 ), ( packet.B, "receipt", 2 ), ( packet.TLV, 2, 1, True, ( 1, Version, "version" ), ( 2, packet.BM, ( "loop", 0, 1 ), ( "dump", 1, 1 ), ( "ploader", 2, 1 ), ( "sloader", 3, 1 ), ( "boot", 4, 1 ), ( "carrier", 5, 1 ), ( "counters", 6, 1 ), ( "carrier_reserved", 7, 1 ) ), ( 3, Macaddr, "console_user" ), ( 4, packet.B, "reservation_timer", 2 ), ( 5, packet.B, "console_cmd_size", 2 ), ( 6, packet.B, "console_resp_size", 2 ), ( 7, Macaddr, "hwaddr" ), ( 8, TIME, "time" ), ( 100, packet.B, "device", 1 ), # Spec says max is 17 but sometimes longer values are seen ( 200, C, "software", 127 ), ( 300, packet.B, "processor", 1 ), ( 400, packet.B, "datalink", 1 ), ( 401, packet.B, "bufsize", 2 )) ) code = 7 def_version = Version (3, 0, 0) def services (self): srv = list () for s in ( "loop", "dump", "ploader", "sloader", "boot", "carrier", "counters" ): if getattr (self, s): srv.append (s) return srv class RequestId (MopHdr): _layout = ( ( packet.RES, 1 ), ( packet.B, "receipt", 2 ), ) code = 5 class RequestCounters (MopHdr): _layout = ( ( packet.B, "receipt", 2 ), ) code = 9 class Counters (MopHdr): # Note that most of the error counts don't apply to DECnet/Python, # but we define them so that we can parse and report them in # messages from other systems where they do have meaning. _layout = ( ( packet.B, "receipt", 2 ), ( Timestamp, "time_since_zeroed", 2 ), ( packet.CTR, "bytes_recv", 4 ), ( packet.CTR, "bytes_sent", 4 ), ( packet.CTR, "pkts_recv", 4 ), ( packet.CTR, "pkts_sent", 4 ), ( packet.CTR, "mcbytes_recv", 4 ), ( packet.CTR, "mcpkts_recv", 4 ), ( packet.CTR, "pkts_deferred", 4), ( packet.CTR, "pkts_1_collision", 4), ( packet.CTR, "pkts_mult_collision", 4), ( packet.CTR, "send_fail", 2), ( packet.B, "send_reasons", 2), ( packet.CTR, "recv_fail", 2), ( packet.B, "recv_reasons", 2), ( packet.CTR, "unk_dest", 2 ), ( packet.CTR, "data_overrun", 2), ( packet.CTR, "no_sys_buf", 2), ( packet.CTR, "no_user_buf", 2) ) code = 11 # Bit definitions for send_fail field SEND_FAIL_EXC_COLL = 1 SEND_FAIL_CARR_CHECK_FAIL = 2 SEND_FAIL_SHORT = 4 SEND_FAIL_OPEN = 8 SEND_FAIL_LONG = 16 SEND_FAIL_DEFERFAIL = 32 # Bit definitions for recv_fail field RECV_FAIL_BCC = 1 RECV_FAIL_FRAMING = 2 RECV_FAIL_LONG = 4 class ConsoleRequest (MopHdr): _layout = ( ( packet.BV, "verification", 8 ), ) code = 13 class ConsoleRelease (MopHdr): code = 15 class ConsoleCommand (MopHdr): _layout = ( ( packet.BM, ( "seq", 0, 1 ), ( "break", 1, 1 )), packet.Payload) code = 17 class ConsoleResponse (MopHdr): _layout = ( ( packet.BM, ( "seq", 0, 1 ), ( "cmd_lost", 1, 1 ), ( "resp_lost", 2, 1 ) ), packet.Payload ) code = 19 class LoopSkip (packet.Packet): _layout = ( ( packet.B, "skip", 2 ), packet.Payload ) class LoopFwd (packet.Packet): _layout = ( ( packet.B, "function", 2 ), ( Macaddr, "dest" ), packet.Payload ) function = 2 class LoopReply (packet.Packet): _layout = ( ( packet.B, "function", 2 ), ( packet.B, "receipt", 2 ), packet.Payload ) function = 1 # Dictionary of packet codes to packet layout classes packetformats = { c.code : c for c in globals ().values () if type (c) is packet.packet_encoding_meta and hasattr (c, "code") } class Mop (Element): """The MOP layer. It doesn't do much, other than being the parent of the per-datalink MOP objects. """ def __init__ (self, parent, config): super ().__init__ (parent) self.node.mop = self logging.debug ("Initializing MOP layer") self.config = config self.circuits = EntityDict () dlcirc = self.node.datalink.circuits self.console_config = False for name, c in config.circuit.items (): dl = dlcirc[name] if dl.use_mop: try: self.circuits[name] = MopCircuit (self, name, dl, c) logging.debug ("Initialized MOP circuit {}", name) if c.console: self.console_config = True except Exception: logging.exception ("Error initializing MOP circuit {}", name) def start (self): logging.debug ("Starting MOP layer") for name, c in self.circuits.items (): try: c.start () logging.debug ("Started MOP circuit {}", name) except Exception: logging.exception ("Error starting MOP circuit {}", name) def stop (self): logging.debug ("Stopping MOP layer") for name, c in self.circuits.items (): try: c.stop () logging.debug ("Stopped MOP circuit {}", name) except Exception: logging.exception ("Error stopping MOP circuit {}", name) def http_get (self, mobile, parts, qs): infos = ( "summary", "status", "details" ) if not parts or parts == ['']: what = "summary" elif parts[0] in infos: what = parts[0] else: return None, None active = infos.index (what) + 1 sb = html.sbelement (html.sblabel ("Information"), html.sbbutton (mobile, "mop", "Summary", qs), html.sbbutton (mobile, "mop/status", "Status", qs), html.sbbutton (mobile, "mop/details", "Details", qs)) sb.contents[active].__class__ = html.sbbutton_active ret = [ "

MOP {0}

".format (what) ] first = True if self.console_config: hdr = ( "Name", "MAC address", "HW address", "Console user", "Services" ) else: hdr = ( "Name", "MAC address", "HW address", "Services" ) data = [ c.html (what, self.console_config) for c in self.circuits.values () ] ret.append (html.tbsection ("Circuits", hdr, data)) if what in ("status", "details"): for c in self.circuits.values (): if c.sysid: ret.append (c.sysid.html (what)) return sb, html.main (*ret) def get_api (self): return { "circuits" : self.circuits.get_api () } def nice_read (self, req, resp): if not isinstance (req, nicepackets.NiceReadModule) or \ not req.sumstat (): return # We handle either known/active modules, or the specific # module name "configurator" if req.one () and req.entity.value.upper () != "CONFIGURATOR": return # We expect payload carrying a circuit qualifier. The spec # makes it look like this is encoded in NICE data item # encoding but that is not actually accurate. If there is no # payload we treat that as "known circuits". ce = nicepackets.CircuitReqEntity (-1) if req.payload: p = req.payload require (p, 3) c = int.from_bytes (p[:2], "little") if c != 100: return -6 # Unrecognized parameter c = p[2] if c >= 128: c -= 256 ce.code = c if c > 0: # Specific string require (p, 3 + c) ce.value = str (p[3:3 + c], "latin1") # Parsed the circuit(s) to return, check if ce.code > 0: try: c = self.circuits[ce.value] clist = [ c ] except KeyError: return else: clist = self.circuits.values () # Looks ok, start collecting data ret = [ ] for c in clist: first = True r1 = None if req.stat (): for k, v in sorted (c.sysid.heard.items ()): r = resp.makeitem ("CONFIGURATOR") if first: r1 = r first = False r.circuit = c.name r.physical_address = k ts = time.localtime (v.last_ts) r.last_report = (ts.tm_mday, ts.tm_mon, ts.tm_hour,ts.tm_min, ts.tm_sec) fl = [ ] for i, fn in enumerate (("loop", "dump", "ploader", "sloader", "boot", "carrier", "counters")): if getattr (v, fn, False): fl.append (i) if fl: r.functions = fl for fn in ( "version", "console_user", "reservation_timer", "console_cmd_size", "console_resp_size", "hwaddr", "device", "processor", "datalink", "bufsize" ): setattr (r, fn, getattr (v, fn, None)) s = getattr (v, "software", None) if s: if isinstance (s, int): r.software = ( s, ) else: r.software = ( 0, s ) ret.append (r) if not r1: r1 = resp.makeitem ("CONFIGURATOR") r1.circuit = c.name ret.append (r1) r1.surveillance = 0 dt = time.time () - c.sysid.start_ts dh, dt = divmod (dt, 3600) dm, ds = divmod (dt, 60) r1.elapsed_time = ( dh, dm, ds ) resp["CONFIGURATOR"] = ret class MopCircuit (Element): """The parent of the protocol handlers for the various protocols and services enabled on a particular circuit (datalink instance). """ def __init__ (self, parent, name, datalink, config): super ().__init__ (parent) self.config = config self.name = name self.datalink = datalink self.mop = parent self.loop = self.sysid = None self.conn_clients = dict () self.carrier_client_dest = dict () self.carrier_server = None self.console_verification = config.console self.console = ConnApiHelper (self, CarrierClient) def getentity (self, name): if name == "counters": return self.request_counters return super ().getentity (name) def start (self): if self.datalink.use_mop: # Do this only on datalinks where we want MOP (Ethernet, basically) logging.debug ("Starting mop for {} {}", self.datalink.__class__.__name__, self.name) # Dictionary of pending requests, indexed by receipt number self.requests = dict () self.receipt = ReceiptGen () self.loop = LoopHandler (self, self.datalink) # The various MOP console handlers share a port, so we'll # own it and dispatch received traffic. consport = self.datalink.create_port (self, MOPCONSPROTO) self.consport = consport consport.add_multicast (CONSMC) self.sysid = SysIdHandler (self, consport) self.request_counters = CounterHandler (self, consport) # No console carrier server just now self.carrier_server = None services = list () if self.loop: services.append ("loop") if self.sysid: services.append ("counters") if self.console_verification: services.append ("console") self.services = services def stop (self): logging.debug ("Stopping mop for {} {}", self.datalink.__class__.__name__, self.name) if self.carrier_server: self.carrier_server.release () def request (self, element, pkt, dest, port, receipt = None): """Start a request/response exchange. "element" is the Element instance that will receive the response. "pkt" is the request to send. The receipt number will be filled in. "dest" is the packet destination address. "port" is the datalink port to send the packet to. If "receipt" is supplied, that is the receipt number to assume for this exchange (it must be set in the outgoing packet by the caller). This is for retransmitting requests in the Console Carrier protocol where reuse of a receipt number has a specific meaning, and for loopback where the receipt position in the packet depends on the request. The assigned receipt number is returned. """ if receipt is None: rnum = self.receipt.next () pkt.receipt = rnum else: rnum = receipt self.requests[rnum] = element port.send (pkt, dest) return rnum def deliver (self, item): """Deliver a response. """ rnum = item.receipt if rnum: try: self.requests[rnum].dispatch (item) del self.requests[rnum] except KeyError: pass def done (self, rnum): """Indicate that we're done with the request whose receipt number is rnum. """ try: del self.requests[rnum] except KeyError: pass def exchange (self, pkt, dest, port, timeout = 3, receipt = None): """Perform a request/response exchange. "pkt" is the request to send. The receipt number will be filled in. "dest" is the packet destination address. "port" is the datalink port to send to. This method must not be called from the main node thread, only from worker threads such as the HTTPS API threads. The response packet is returned, or None to indicate timeout. """ listener = WorkHandler () try: rnum = self.request (listener, pkt, dest, port, receipt = receipt) ret = listener.wait () finally: self.done (rnum) return ret def dispatch (self, work): if isinstance (work, datalink.Received): buf = work.packet if not buf: logging.debug ("Null MOP packet received on {}", self.name) return logging.trace ("MOP packet received on {}: {}", self.name, bytes (buf)) header = MopHdr (buf[:1]) msgcode = header.code try: parsed = packetformats[msgcode] (buf) except KeyError: logging.debug ("MOP packet with unknown message code {} on {}", msgcode, self.name) return except DNAException: logging.exception ("MOP packet parse error\n {}", bytes (buf)) return parsed.src = work.src else: # Unknown request return if isinstance (parsed, (SysId, Counters, LoopReply)): # A response packet with a receipt number. if isinstance (parsed, SysId): # Always look at SysId for the stations-heard table self.sysid.dispatch (parsed) # Pick up the receipt number, and dispatch the packet to whoever # is waiting for it. self.deliver (parsed) elif isinstance (parsed, ConsoleResponse): logging.trace ("Mop consoleresponse {} from {}", parsed, parsed.src) try: self.carrier_client_dest[parsed.src].dispatch (parsed) except KeyError: logging.trace ("no address match, {}", repr (self.carrier_client_dest)) pass elif isinstance (parsed, ConsoleRequest): if self.console_verification and not self.carrier_server: if self.console_verification == parsed.verification: self.carrier_server = CarrierServer (self, self.consport, parsed) else: logging.debug ("Console request ignored, wrong verification from {}", parsed.src) else: # Not a response. Give it to the console carrier server, if # one is active, then to the Sysid handler which deals with # other requests. if self.carrier_server: self.carrier_server.dispatch (parsed) self.sysid.dispatch (parsed) def html (self, what, console): services = ", ".join (self.services) if console: cu = (self.carrier_server and self.carrier_server.remote) or "" return [ self.name, self.consport.macaddr, self.datalink.hwaddr, cu, services ] else: return [ self.name, self.consport.macaddr, self.datalink.hwaddr, services ] def get_api (self): return { "name" : self.name, "hwaddr" : self.datalink.hwaddr, "macaddr" : self.consport.macaddr, "services" : self.services } class CounterHandler (Element): """This class defines the API interface for requesting counters. """ def __init__ (self, parent, port): super ().__init__ (parent) self.port = port def post_api (self, data): """Get counters. Input: dest (MAC address), optional timeout in seconds (default: 3) Output: status (a string: timeout or ok). If ok, the counters. """ logging.trace ("processing POST API call, counter request") dest = Macaddr (data["dest"]) timeout = int (data.get ("timeout", 3)) if timeout < 1: return { "status" : "invalid timeout" } pkt = RequestCounters () reply = self.parent.exchange (pkt, dest, self.port, timeout) if reply is None: return { "status" : "timeout" } ret = { "status" : "ok" } for t, n, *x in Counters._layout: if t == packet.CTR: ret[n] = getattr (reply, n) ret["time_since_zeroed"] = int (reply.time_since_zeroed) return ret class SysIdHandler (Element, timers.Timer): """This class defines processing for SysId messages, both sending them (periodically and on request) and receiving them (multicast and directed). We track received ones in a dictionary. """ def __init__ (self, parent, port): Element.__init__ (self, parent) timers.Timer.__init__ (self) # Send the initial ID fairly soon after startup self.node.timers.start (self, self.id_self_delay () // 30) self.port = port self.mop = parent.parent self.heard = dict () self.start_ts = time.time () logging.debug ("Initialized sysid handler for {}", parent.name) def id_self_delay (self): return random.randint (8 * 60, 12 * 60) def dispatch (self, pkt): if isinstance (pkt, packet.Packet): src = pkt.src if isinstance (pkt, SysId): if src in self.heard: logging.trace ("Sysid update on {} from {}", self.parent.name, src) else: logging.trace ("Sysid on {} from new node {}", self.parent.name, src) pkt.last_ts = time.time () self.heard[src] = pkt elif isinstance (pkt, RequestId): self.send_id (src, pkt.receipt) elif isinstance (pkt, RequestCounters): self.send_ctrs (src, pkt.receipt) elif isinstance (pkt, timers.Timeout): logging.trace ("Sending periodic sysid on {}", self.parent.name) self.send_id (CONSMC, 0) self.node.timers.start (self, self.id_self_delay ()) def send_id (self, dest, receipt): sysid = SysId (receipt = receipt, version = SysId.def_version, hwaddr = self.port.parent.hwaddr, loop = True, counters = True, # I want to use a defined code, but one that is # obviously not real. PCL-11 is real but is # not Ethernet, so it's a nice choice. device = 9, # PCL-11 datalink = 1, # Ethernet processor = 2, # Comm server software = "DECnet/Python" # Note: 16 chars max ) if self.parent.console_verification: sysid.carrier = True sysid.reservation_timer = CarrierServer.reservation_timer sysid.console_cmd_size = sysid.console_resp_size = CarrierServer.msgsize if self.parent.carrier_server: sysid.carrier_reserved = True sysid.console_user = self.parent.carrier_server.remote self.port.send (sysid, dest) def send_ctrs (self, dest, receipt): reply = Counters (receipt = receipt) self.port.parent.counters.copy (reply) # Make sure this is an integer reply.time_since_zeroed = int (reply.time_since_zeroed) self.port.send (reply, dest) def html (self, what): title = "Sysid data for {}".format (self.parent.name) if not self.heard: return html.textsection (title, [ "Nothing heard yet" ]) else: header = [ "Source addr", "Services", "HW Address", "Device", "Last heard" ] rows = list () for k, v in sorted (self.heard.items ()): srcaddr = getattr (v, "src", "") or k services = ', '.join (v.services ()) hwaddr = getattr (v, "hwaddr", "") device = getattr (v, "device", "") device = nicepackets.MOPdevices.get (device, device) ts = time.localtime (v.last_ts) last = time.strftime ("%d-%b-%Y %H:%M:%S", ts) row = [ srcaddr, services, hwaddr, device, last ] if what == "details": details = list () for fn in [ "console_user", "reservation_timer", "time", "processor", "datalink", "blocksize", "software" ] + v.xfields (True): val = getattr (v, fn, "") if val: if fn == "time": val = time.strftime ("%d-%b-%Y %H:%M:%S", val) elif fn == "processor": val = nicepackets.MOPCPUs.get (val, val) elif fn == "datalink": val = nicepackets.MOPdatalinks.get (val, val) elif fn == "software": if isinstance (val, int): val = ("Not specified", "Standard OS", "Maintenance system")[-val] elif isinstance (val, bytes): # A byte string, see if it looks printable v1 = "-".join ("{:02x}".format (b) for b in val) try: v2 = str (val, "ascii") if v2.isprintable (): v1 = v2 except UnicodeDecodeError: pass val = v1 fn = v.fieldlabel (fn) details.append (("{} =".format (fn), val)) row.append (details) rows.append (row) if what == "details": return html.detail_section (title, header, rows) return html.tbsection (title, header, rows) def get_api (self): logging.trace ("processing GET API call on sysid listener") ret = list () for k, v in self.heard.items (): item = dict () item["srcaddr"] = getattr (v, "src", "") or k item["console_user"] = getattr (v, "console_user", "") item["reservation_timer"] = getattr (v, "reservation_timer", 0) item["hwaddr"] = getattr (v, "hwaddr", "") systime = getattr (v, "time", None) if systime: systime = systime.tm tzoff = systime.tm_gmtoff systime = time.strftime ("%d-%b-%Y %H:%M:%S", systime) if tzoff: systime += " {:+03d}{:02d}".format (*divmod (tzoff // 60, 60)) item["time"] = systime device = getattr (v, "device", "") item["device"] = nicepackets.MOPdevices.get (device, device) processor = getattr (v, "processor", "") item["processor"] = nicepackets.MOPCPUs.get (processor, processor) datalink = getattr (v, "datalink", "") item["datalink"] = nicepackets.MOPdatalinks.get (datalink, datalink) bs = getattr (v, "bufsize", None) if bs: item["bufsize"] = bs item["software"] = getattr (v, "software", "") item["services"] = v.services () # Add in any implementation dependent fields for k in v.xfields (): item[k] = getattr (v, k) ret.append (item) return ret class ConsolePost (Work): pass class CarrierClient (Element, statemachine.StateMachine): """The client side of the console carrier protocol. """ API_TIMEOUT = 120 def __init__ (self, parent, data, listener): Element.__init__ (self, parent) statemachine.StateMachine.__init__ (self) self.listener = listener self.last_post = time.time () self.port = parent.consport self.handle = random.getrandbits (64) self.outputq = queue.Queue () try: dest = Macaddr (data["dest"]) self.verification = scan_ver (data["verification"]) except KeyError: self.listener.dispatch ({ "status" : "missing arguments" }) return except ValueError: self.listener.dispatch ({ "status" : "Invalid argument value" }) return dest = Macaddr (data["dest"]) if dest in self.parent.carrier_client_dest: self.listener.dispatch ({ "status" : "destination busy" }) return self.dest = dest self.parent.conn_clients[self.handle] = self self.parent.carrier_client_dest[dest] = self self.msg = RequestId () self.sendmsg () logging.debug ("Initialized console carrier client for {}, handle {}", parent.name, self.handle) def post_api (self, data): if not data.get ("data", None) and not data.get ("close", False): # Not close and no data, so it's read if self.state == self.active: try: ret = self.outputq.get (timeout = 60) except queue.Empty: ret = { "status" : "ok", "data" : "" } else: try: ret = self.outputq.get_nowait () except Queue.Empty: ret = { "status" : "closed" } return ret listen = WorkHandler () w = ConsolePost (self, data = data, listener = listen) self.node.addwork (w) ret = listen.wait (timeout = 60) return ret def sendmsg (self, tries = 5, receipt = None): self.retries = tries self.node.timers.stop (self) self.node.timers.start (self, 1) if isinstance (self.msg, ConsoleCommand): self.port.send (self.msg, self.dest) else: self.parent.request (self, self.msg, self.dest, self.port, receipt = receipt) def close (self): """End this console carrier session. Stop any timer and remove its entries in the lookup dictionaries. """ self.node.timers.stop (self) self.msg = self.msg2 = self.listener = None try: del self.parent.conn_clients[self.handle] except KeyError: pass try: del self.parent.carrier_client_dest[self.dest] except KeyError: pass def s0 (self, item): """Initial state: await SysId response, make sure console carrier is available. """ if isinstance (item, SysId): self.node.timers.stop (self) if item.carrier and not item.carrier_reserved: # Looks good, proceed self.cmdsize = item.console_cmd_size self.respsize = item.console_resp_size self.restimer = item.reservation_timer # Now we send a reservation request followed by another # RequestId to see if it worked. self.msg2 = ConsoleRequest (verification = self.verification) self.port.send (self.msg2, self.dest) self.sendmsg () return self.reserve if not item.carrier: self.listener.dispatch ({ "status" : "no console carrier support" }) else: self.listener.dispatch ({ "status" : "console carrier reserved", "client" : str (item.console_user) }) self.close () elif isinstance (item, timers.Timeout): # Timeout, try again if not at the limit self.retries -= 1 if self.retries: self.sendmsg (self.retries) else: self.listener.dispatch ({ "status" : "no reply" }) self.close () def reserve (self, item): """Verify that reservation was successful. """ if isinstance (item, SysId): # If the reservation succeeded, switch to active state to # run the two-way console data stream. if item.carrier_reserved: if item.console_user != self.port.macaddr: self.listener.dispatch ({ "status" : "console carrier reserved", "client" : str (item.console_user) }) self.node.timers.stop (self) self.listener = None return self.s0 self.seq = 0 self.msg = None # No poll message yet self.pendinginput = b"" self.sendpoll () self.listener.dispatch ({ "status" : "ok", "handle" : self.handle }) self.listener = None return self.active elif isinstance (item, timers.Timeout): # Timeout, try again if not at the limit self.retries -= 1 if self.retries: # Resend a reservation request followed by another # RequestId to see if it worked. self.port.send (self.msg2, self.dest) self.sendmsg (self.retries) else: self.listener.dispatch ({ "status" : "no reply" }) self.close () def sendpoll (self): """Send a new poll, or retransmit the previous one. """ tries = self.retries if not self.msg: tries = 5 self.seq ^= 1 indata = self.pendinginput[:self.cmdsize] self.pendinginput = self.pendinginput[self.cmdsize:] self.msg = ConsoleCommand (seq = self.seq, payload = indata) self.node.timers.stop (self) self.node.timers.start (self, 1) self.sendmsg (tries) def sendrelease (self): self.msg2 = ConsoleRelease () self.port.send (self.msg2, self.dest) self.msg = RequestId () self.sendmsg () def active (self, item): """Active (connected) state of the console carrier. """ if isinstance (item, ConsoleResponse) and item.seq == self.seq: # Response packet from our peer, and next in sequence self.retries = 5 data = item.payload if data: data = str (data, encoding = "latin1") self.outputq.put ({ "status" : "ok", "data" : data }) self.msg = None # If there is more data to send, do so now if self.pendinginput: self.sendpoll () return elif isinstance (item, (ConsoleResponse, timers.Timeout)): # Console response but not in sequence, or timeout: for # both, retransmit if it isn't time to give up. If there is # no currently pending message, send the next one. self.retries -= 1 if time.time () - self.last_post > self.API_TIMEOUT: logging.debug ("Closing console client {} due to API timeout", self.dest) self.outputq.put ({ "status" : "api timeout" }) self.sendrelease () return self.release if self.retries: self.sendpoll () else: self.outputq.put ({ "status" : "no response" }) self.close () elif isinstance (item, ConsolePost): data = item.data listener = item.listener if data.get ("close", False): # Close request -- release the console self.sendrelease () listener.dispatch ({ "status" : "ok" }) self.outputq.put ({ "status" : "closed", "data" : "" }) return self.release # Input request, post it and say ok newinput = bytes (data["data"], encoding = "latin1") if self.pendinginput or self.msg: self.pendinginput += newinput else: self.pendinginput = newinput self.sendpoll () listener.dispatch ({ "status" : "ok" }) def release (self, item): """Verify that release was successful. """ if isinstance (item, SysId) and item.receipt == self.msg.receipt: # If the reservation succeeded, switch to active state to # run the two-way console data stream. if not (item.carrier_reserved and item.console_user == self.dest): logging.debug ("Console client closed for {}", self.dest) self.close () elif isinstance (item, timers.Timeout): # Timeout, try again if not at the limit self.retries -= 1 if self.retries: # Resend a release request followed by another # RequestId to see if it worked. self.port.send (self.msg2, self.dest) self.sendmsg (self.retries) else: logging.debug ("Release request timed out for node {}", self.dest) self.close () elif isinstance (item, ConsolePost): # data read or redundant close request when already closing, # say so. item.listener.dispatch ({ "status" : "closed" }) class CarrierServer (Element, timers.Timer): """The server side of the console carrier protocol. """ reservation_timer = 15 msgsize = 512 def __init__ (self, parent, port, reserve): Element.__init__ (self, parent) timers.Timer.__init__ (self) self.port = port self.mop = parent.mop self.remote = reserve.src self.seq = self.pty = 0 self.pendinginput = b"" self.response = None self.pendingoutput = None self.node.timers.start (self, self.reservation_timer) try: pid, fd = os.forkpty () #pty.fork () if pid: # Parent process. Save the pty fd and set it # to non-blocking mode logging.debug ("Started console server for {} {} process {}", parent.name, self.remote, pid) self.pendingoutput = b"" self.pty = fd oldflags = fcntl (fd, F_GETFL, 0) fcntl (fd, F_SETFL, oldflags | os.O_NONBLOCK) else: # Child process, send it off to login. os.execlp ("login", "login") sys._exit (1) except Exception: logging.exception ("Exception starting console client session") self.release () def release (self): self.node.timers.stop (self) logging.debug ("Closed console server for {} {}", self.parent.name, self.remote) if self.pty: try: os.close (self.pty) except Exception: pass self.parent.carrier_server = None def dispatch (self, item): if isinstance (item, timers.Timer): # Reservation timeout, clear any reservation if self.pty: self.release () elif isinstance (item, packet.Packet): # Some received packet. res = self.remote # Ignore any packets from others if item.src != res: return if isinstance (item, ConsoleRelease): # Session ended self.release () elif isinstance (item, ConsoleCommand): # Command/poll message. self.node.timers.stop (self) self.node.timers.start (self, self.reservation_timer) if item.seq == self.seq: # Retransmit, so resend the previous message if self.response: self.port.send (self.response, res) else: # New packet. Save any input, check for output, # build a response packet with pending output. self.pendinginput += item.payload try: accepted = os.write (self.pty, self.pendinginput) except Exception: accepted = len (self.pendinginput) self.pendinginput = self.pendinginput[accepted:] self.seq ^= 1 lp = len (self.pendingoutput) if lp < self.msgsize: try: self.pendingoutput += os.read (self.pty, self.msgsize - lp) except Exception: pass self.response = ConsoleResponse (seq = self.seq, payload = self.pendingoutput) self.pendingoutput = b"" self.port.send (self.response, res) class LoopHandler (Element, timers.Timer): """Handler for loopback protocol """ def __init__ (self, parent, datalink): Element.__init__ (self, parent) timers.Timer.__init__ (self) self.port = port = datalink.create_port (self, LOOPPROTO, pad = False) port.add_multicast (LOOPMC) self.pendingreq = None logging.debug ("Initialized loop handler for {}", parent.name) def dispatch (self, item): """Work item handler """ if isinstance (item, datalink.Received): buf = item.packet top = LoopSkip (buf) skip = top.skip if (skip & 1) or skip > len (buf) - 4: # Invalid skip count, ignore return # Get the function code fun = int.from_bytes (buf[skip + 2:skip + 4], packet.LE) if fun == LoopFwd.function: f = LoopFwd (buf[skip + 2:]) if f.dest[0] & 1: # Forward to multicast, invalid, ignore return top.skip += 8 self.port.send (top, f.dest) elif fun == LoopReply.function: f = LoopReply (buf[skip + 2:]) f.src = item.src self.parent.deliver (f) def post_api (self, data, nml = False): """Perform a loop operation. Input: dest (MAC addresses), optional "timeout" in seconds (default: 3), optional "packets" -- count of packets (default: 1). By default there is a 1 second delay after a successful loop; optional "fast":True suppresses that delay. "nml" is True if the call is from NML rather than from the REST API. In that case, the operation stops on failure. Also, for NML the additional argument "payload" is expected (the data to be sent), and "fast" is implicitly True. Output: a dictionary containing two keys: "status" whose value is a message "ok" for success or an error message string, and "delays", a list of results for each packet: the round trip time in seconds, or -1 to indicate that packet timed out. If "nml" is True, output is the MAC address of the station that replied (a Macaddr) or the count of messages not looped if there was a timeout (an int). If there is something wrong with the inputs, the return value is a dict with element "status" containing an error message. """ if not data: # In case POST data was omitted entirely, substitute an # empty dictionary, which will do the right thing (all # defaults apply). data = { } dest = data.get ("dest", LOOPMC) if not isinstance (dest, list): dest = [ dest ] if not dest: dest = [ LOOPMC ] dest = [ Macaddr (d) for d in dest ] multidest = dest == [ LOOPMC ] if not multidest: for d in dest: if d.ismulti (): return { "status" : "invalid address" } if len (dest) > 3: return { "status" : "too many addresses" } # Add self as the last hop dest.append (self.port.macaddr) timeout = int (data.get ("timeout", 3)) packets = int (data.get ("packets", 1)) if nml: fast = True payload = data["payload"] else: payload = b"Python! " * 12 fast = data.get ("fast", False) if timeout < 1 or packets < 1: return { "status" : "invalid arguments" } ret = { "status" : "ok" } delays = list () for i in range (packets): loopmsg, rnum = self.buildloop (dest[1:], payload) if len (loopmsg) > 1500: return { "status" : "payload too long" } sent = time.time () reply = self.parent.exchange (loopmsg, dest[0], self.port, timeout, receipt = rnum) if reply is None: if nml: return packets - i delays.append (-1) else: delays.append (time.time () - sent) if multidest: dest[0] = reply.src ret["dest"] = str (dest[0]) if not fast: if i < packets - 1: time.sleep (1) else: if nml: return dest[0] ret["delays"] = delays return ret def buildloop (self, destlist, payload): rnum = self.parent.receipt.next () ret = LoopReply (receipt = rnum, payload = payload) for dest in reversed (destlist): ret = LoopFwd (dest = dest, payload = ret) ret = LoopSkip (payload = ret) return ret, rnum