#!/usr/bin/python3 """NetCom/NetCAN Network Configuration utility.""" import argparse import os import queue import sys import urllib.request from functools import partial import xml.etree.ElementTree as ET from pysnmp.carrier.asyncore.dispatch import AsyncoreDispatcher from pysnmp.carrier.asyncore.dgram import udp from pyasn1.codec.ber import encoder, decoder from pysnmp.proto import api from time import time import netifaces import pyroute2 from pyroute2 import IPRoute from scapy.all import AsyncSniffer, IP, Raw from pysnmp.hlapi import (CommunityData, ContextData, Integer, IpAddress, ObjectIdentity, ObjectType, SnmpEngine, UdpTransportTarget, setCmd) SNMP_IP_ADDRESS = '.1.3.6.1.4.1.12695.1.6.0' SNMP_NETMASK = '.1.3.6.1.4.1.12695.1.7.0' SNMP_BROADCAST_ADDRESS = '.1.3.6.1.4.1.12695.1.8.0' SNMP_DHCP_MODE = '.1.3.6.1.4.1.12695.1.9.0' SNMP_CTRL = ".1.3.6.1.4.1.12695.1.10.0" SNMP_CTRL_REBOOT = 1 SNMP_CTRL_SAVE = 2 DOMAIN_NAME = SNMP_UDP_DOMAIN = (1, 3, 6, 1, 6, 1, 1) # Broadcast manager settings maxWaitForResponses = 5 maxNumberResponses = 10 class VsSnmpError(Exception): """Internal SNMP exception class.""" def __init__(self, message): self.message = message super().__init__(self.message) def __str__(self): message = 'VsSnmpError has been raised' if self.message: message = f'VsSnmpError: {self.message}' return message class StopWaiting(Exception): pass # Protocol version to use pMod = api.protoModules[api.protoVersion1] reqPDU = pMod.GetRequestPDU() class SnmpBroadcastAgent(object): def __init__(self, if_ip_addr): # Build PDU pMod.apiPDU.setDefaults(reqPDU) pMod.apiPDU.setVarBinds( reqPDU, (('1.3.6.1.4.1.12695.1.6.0', pMod.Null('')), ('1.3.6.1.4.1.12695.1.7.0', pMod.Null(''))) ) # Build message self.reqMsg = pMod.Message() pMod.apiMessage.setDefaults(self.reqMsg) pMod.apiMessage.setCommunity(self.reqMsg, 'public') pMod.apiMessage.setPDU(self.reqMsg, reqPDU) self.startedAt = time() self.if_ip_addr = if_ip_addr def cbTimerFun(self, timeNow): if timeNow - self.startedAt > maxWaitForResponses: raise StopWaiting() # noinspection PyUnusedLocal,PyUnusedLocal def cbRecvFun(self, transportDispatcher, transportDomain, transportAddress, wholeMsg, reqPDU=reqPDU): while wholeMsg: rspMsg, wholeMsg = decoder.decode(wholeMsg, asn1Spec=pMod.Message()) rspPDU = pMod.apiMessage.getPDU(rspMsg) # Match response to request if pMod.apiPDU.getRequestID(reqPDU) == pMod.apiPDU.getRequestID(rspPDU): # Check for SNMP errors reported errorStatus = pMod.apiPDU.getErrorStatus(rspPDU) if errorStatus: print(errorStatus.prettyPrint()) else: for oid, val in pMod.apiPDU.getVarBinds(rspPDU): print('%s = %s' % (oid.prettyPrint(), val.prettyPrint())) transportDispatcher.jobFinished(1) return wholeMsg def start(self): transportDispatcher = AsyncoreDispatcher() transportDispatcher.registerRecvCbFun(self.cbRecvFun) transportDispatcher.registerTimerCbFun(self.cbTimerFun) # UDP/IPv4 udpSocketTransport = udp.UdpSocketTransport().openServerMode((self.if_ip_addr, 161)).enableBroadcast() transportDispatcher.registerTransport(DOMAIN_NAME, udpSocketTransport) # Pass message to dispatcher transportDispatcher.sendMessage( encoder.encode(self.reqMsg), DOMAIN_NAME, ('255.255.255.255', 161) ) # wait for a maximum of 10 responses or time out transportDispatcher.jobStarted(1, maxNumberResponses) # Dispatcher will finish as all jobs counter reaches zero try: transportDispatcher.runDispatcher() except Exception: pass finally: transportDispatcher.closeDispatcher() class SnmpManager(object): """SNMP Manager Class.""" def __init__(self, ip_addr, verbose=False, broadcast=False): self.ip_addr = ip_addr self.verbose = verbose self.broadcast = broadcast def write(self, oid, value): """Write value.""" error_str = '' if isinstance(value, int): real_val = Integer(value) else: real_val = IpAddress(value) target = UdpTransportTarget((self.ip_addr, 161)) if self.broadcast: transport = target.openClientMode().enableBroadcast() target.transport = transport error_indication, error_status, error_index, var_binds = next( setCmd(SnmpEngine(), CommunityData('root', mpModel=0), target, ContextData(), ObjectType(ObjectIdentity(oid), real_val)) ) if error_indication: error_str = error_indication elif error_status: error_str = ('%s at %s' % ( error_status.prettyPrint(), error_index and var_binds[int(error_index) - 1][0] or '?')) else: for item in var_binds: if self.verbose: print(item) if error_str: raise VsSnmpError(error_str) def receive_packet(rcv_queue, if_ip_addr, pkt): """Receive UDP packet.""" try: if b'devinfo.xml' in pkt[Raw].load: rcv_queue.put((pkt[IP].src, if_ip_addr, pkt[Raw].load)) except IndexError: pass def get_xml_tag(child, tag): """Get XML tag from device description.""" for item in child: if item.tag == tag: return item.text def parse_xml(text): """Extract freindlyName and check if it is NET-CAN.""" dev = dict() root = ET.fromstring(text) for child in root: if child.tag == "{urn:schemas-upnp-org:device-1-0}device": for item in child: if item.tag == "{urn:schemas-upnp-org:device-1-0}friendlyName": if "NET-CAN" in item.text: dev['model'] = get_xml_tag(child, "{urn:schemas-upnp-org:device-1-0}modelName") dev['fw'] = get_xml_tag(child, "{urn:schemas-upnp-org:device-1-0}firmWareVersionNumber") dev['hw'] = get_xml_tag(child, "{urn:schemas-upnp-org:device-1-0}hardWareVersionNumber") dev['sernum'] = get_xml_tag(child, "{urn:schemas-upnp-org:device-1-0}serialNumber") return dev return None def get_xml_file(buf): """Get devinfo.xml file.""" location_idx = buf.find(b"LOCATION:") location_end_idx = buf.find(b"xml", location_idx) url = buf[location_idx + 9:location_end_idx + 3] response = urllib.request.urlopen(url.decode("utf-8")) data = response.read() return data.decode('utf-8') def main(): """Main routine.""" parser = argparse.ArgumentParser(description='Net Manager') parser.add_argument("-i", "--ipaddr", help="IP address to search for", default='192.168.254.254') args = parser.parse_args() rcv_queue = queue.Queue() ifs = netifaces.interfaces() for item in ifs: addrs = netifaces.ifaddresses(item) try: if_ip_addr = addrs[netifaces.AF_INET][0]["addr"] a_snif = AsyncSniffer(iface=item, filter='udp and port 1900', prn=partial(receive_packet, rcv_queue, if_ip_addr)) a_snif.start() except KeyError: pass while True: ssdp_pkt = rcv_queue.get(timeout=10) if ssdp_pkt[0] == args.ipaddr: print(ssdp_pkt) break reachable = os.system(f"ping -c 1 {ssdp_pkt[0]}") if reachable != 0: ip = IPRoute() try: ip.route("add", dst=f"{ssdp_pkt[0]}/32", oif=ip.link_lookup(ifname='enp3s0')) except pyroute2.netlink.exceptions.NetlinkError as err: print(err) pass mgr = SnmpBroadcastAgent(if_ip_addr) mgr.start() snmp_mgr = SnmpManager(ssdp_pkt[0]) try: snmp_mgr.write(SNMP_DHCP_MODE, 0) snmp_mgr.write(SNMP_IP_ADDRESS, "192.168.0.31") snmp_mgr.write(SNMP_NETMASK, "255.255.255.0") snmp_mgr.write(SNMP_BROADCAST_ADDRESS, "192.168.0.255") snmp_mgr.write(SNMP_CTRL, SNMP_CTRL_SAVE) snmp_mgr.write(SNMP_CTRL, SNMP_CTRL_REBOOT) except Exception as err: print(err) a_snif.stop() if __name__ == '__main__': main()