1.包含比较检查和修改步骤
import paramiko import os import time import random import datetime import pandas as pd import re import numpy as np from sqlalchemy import text, create_engine import psycopg2 from psycopg2 import sql from sqlalchemy.orm import sessionmaker from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy import create_engine, MetaData, Table, inspect import schedule import subprocess import telnetlib import telnetlib as tn import threading import time import ipaddress import iodef xieyi_modify(command1,command2):command = bytes(command1, encoding='utf-8')tn.write(command + b'\n')command_result = tn.read_very_eager().decode('ascii')time.sleep(1)# 使用正则表达式匹配并提取配置块# 使用正则表达式匹配并提取配置信息config_patterns = [r'description\s+(.+)',r'ip address\s+(\d+\.\d+\.\d+\.\d+)\s+(\d+\.\d+\.\d+\.\d+)',r'duplex\s+(auto|half|full)',r'speed\s+(auto|\d+)',r'port link-type\s+(access|trunk)',r'port default vlan\s+(\d+)',]# 初始化一个字典来存储配置信息config_dict = {}# 遍历所有的配置模式for pattern in config_patterns:match = re.search(pattern, command_result)if match:if len(match.groups()) == 1:key = pattern.split()[0] # 获取模式的第一个单词作为键config_dict[key] = match.group(1)else:key = pattern.split()[0] # 获取模式的第一个单词作为键config_dict[key] = match.groups()return config_dicttime.sleep(1)def get_ospf(tn):command = "show ip ospf neighbor detail"command = bytes(command, encoding='utf-8')tn.write(command + b'\n')time.sleep(1)result_list = []while (True):command_result = tn.read_very_eager().decode('ascii')# print(command_result) result_list.append(command_result)if re.findall(r"--More--", command_result.strip()):tn.write(b" ")elif re.findall(r"#", command_result.strip()):breakelse:time.sleep(0.05)continueresult_str = "\n".join(result_list)dict_ouput = {}strtext = []dict_ouput["host_ip"] = host_ipstartpattern = re.compile(r'State\s+(\w+)')# print(startpattern)print(re.search(startpattern, result_str))dict_ouput["State"]= 0if re.search(startpattern, result_str) :strtext = re.search(startpattern, result_str).group(1)dict_ouput["State"] = strtext# print(dict_ouput)# 直接使用 result_strif dict_ouput == 0 or dict_ouput["State"] != 'FULL':cmd = "show running-config ospfv2"tn.write(cmd.encode('ascii') + b'\n')time.sleep(1)command_result = tn.read_very_eager().decode('ascii')#print(command_result)# 初始化空字典config_dict = {}# 定义一个正则表达式模式来匹配配置项及其参数pattern = r'(\S+)\s+(.*)'# 遍历每一行配置for line in command_result.split('\n'):# 使用正则表达式匹配配置项match = re.match(pattern, line.strip())if match:key, value = match.groups()# 如果键已经存在,则将其值添加到列表中;否则创建一个新的列表if key in config_dict:config_dict[key].append(value)else:config_dict[key] = [value]for key, values in config_dict.items():print(f"{key}: {values}")return config_dictprint('这个是opsf协议值\n')print(config_dict.items())def get_ip_interface(tn,interface):command = "show ip interface brief"command = bytes(command, encoding='utf-8')tn.write(command + b'\n')time.sleep(1)result_list = []while (True):command_result = tn.read_very_eager().decode('ascii')# print(command_result) result_list.append(command_result)if re.findall(r"--More--", command_result.strip()):tn.write(b" ")elif re.findall(r"#", command_result.strip()):breakelse:time.sleep(0.05)continueresult_str = "\n".join(result_list)# 使用 io.StringIO 将字符串转换为文件对象data_file = io.StringIO(result_str)# 使用 read_csv 加载数据,指定分隔符为一个或多个空格,并跳过第一行df = pd.read_csv(data_file, sep='\s+', skiprows=1, names=None, header=None)# 提取第二行作为列名column_names = df.iloc[0].tolist()# 删除前两行(第一行为表头,第二行为实际数据的第一行)df = df.drop(df.index[[0, 1]])# 重新设置列名df.columns = column_names# 显示 DataFrameinterface_name = interface[0]# 根据接口名称筛选数据filtered_df = df[df["Interface"] == interface_name]# 获取 IP 地址和掩码 mask_value = filtered_df[["IP-Address", "Mask"]]return mask_valuedef ping_test(ip_address,tn):command = f"ping {ip_address}"command = bytes(command, encoding='utf-8')tn.write(command+ b'\n')time.sleep(10)ping_result = tn.read_very_eager().decode('ascii')# 检查ping测试是否成功print(ping_result)return "Success rate is 100 percent" in ping_result#command_result = tn.read_very_eager().decode('ascii')# print(command_result)def serch_modify(pd_result,tn):down_interface = pd_result[pd_result['Admin'] == 'down']# 遍历每一个down的接口for index, row in down_interface.iterrows():interface_name = row['Interface']commands = ["enable",f"configure terminal",f"interface {interface_name}","no shutdown","exit","exit"]for cmd in commands:command = bytes(cmd, encoding='utf-8')tn.write(command + b'\n')time.sleep(1) # 等待命令执行# cmd = f'enable'# command = bytes(cmd, encoding='utf-8')# tn.write(command + b'\n')# cmd = f'configure terminal'# command = bytes(cmd, encoding='utf-8')# tn.write(command + b'\n')# # cmd = f'interface {interface_name}'# command = bytes(cmd, encoding='utf-8')# tn.write(command + b'\n')# command_result = tn.read_very_eager().decode('ascii')# print(command_result)# # cmd = 'no shutdown'# command = bytes(cmd, encoding='utf-8')# tn.write(command + b'\n')# command_result = tn.read_very_eager().decode('ascii')# print(command_result)# # command = "show ip interface brief"# command = bytes(command, encoding='utf-8')# tn.write(command + b'\n')# time.sleep(1)# command_result = tn.read_very_eager().decode('ascii')# print(command_result)# # # # # cmd = 'no shutdown'# command = bytes(cmd, encoding='utf-8')# tn.write(command + b'\n') #主函数 def get_info_telnet(host_ip, username, password,mudi_ip,interface):tn = telnetlib.Telnet()# 定义需要检查的接口列表# interfaces = ['xgei0/3/0/1', 'xgei0/3/0/2']try:tn.open(host_ip, port=23, timeout=5)print('%s connected ssuccess !' % host_ip)tn.read_until(b'Username:', timeout=5)tn.write(username.encode('ascii') + b'\n')tn.read_until(b'Password:', timeout=5)tn.write(password.encode('ascii') + b'\n')time.sleep(1)command_result = tn.read_until(b'#', timeout=5)if b'#' not in command_result:print('%s登录失败' % host_ip)else:print('%s登录成功' % host_ip)except:print('%s网络连接失败' % host_ip)#command = "show clock"#command = bytes(command, encoding='utf-8')#tn.write(command + b'\r\n')#run_time = tn.read_until(b'#')#run_time = re.findall(r"\d+:\d+:\d+\s+\w+\s+\w+\s+\w+\s+\d+\s+2024", run_time.decode('GB18030'))[0] ping_test(mudi_ip,tn)command = "show interface brief"command = bytes(command, encoding='utf-8')tn.write(command + b'\n')time.sleep(1)result_list = []while (True):command_result = tn.read_very_eager().decode('ascii')# print(command_result) result_list.append(command_result)if re.findall(r"--More--", command_result.strip()):tn.write(b" ")elif re.findall(r"#", command_result.strip()):breakelse:time.sleep(0.05)continueresult_str = "\n".join(result_list)list_str = result_str.split('\n')pd_result = pd.DataFrame()list_temperature_vec = []for j in list_str:regex = re.compile(r'\w+gei.+\s+.+\s+.+\s+.+\s+.+\s+.+\s+.+', re.S)# print(regex.findall(j))# print(len(regex.findall(j)))if len(re.findall(r"Interface", j)) > 0:new_columns = list_find_str = re.split(r'\s+', j)new_columns = new_columns[0:8]if len(regex.findall(j)) > 0:list_find_str = regex.findall(j)[0]list_find_str = re.split(r'\s+', list_find_str)list_temperature_vec.append(list_find_str)pd_result = pd.DataFrame(list_temperature_vec)pd_result.columns = new_columnsserch_modify(pd_result,tn)print("接口状态起来后的ping测结果:\n")#print(ping_test(mudi_ip, tn))#获取对应接口ip地址和掩码mask_value = get_ip_interface(tn,interface)#获取ospf的配置ospf_value = get_ospf(tn)#print("269",ospf_value)#print(ospf_value) tn.close()return ospf_value, mask_value def modify_ospf(host_ip, username, password):tn = telnetlib.Telnet()try:tn.open(host_ip, port=23, timeout=5)print('%s connected ssuccess !' % host_ip)tn.read_until(b'Username:', timeout=5)tn.write(username.encode('ascii') + b'\n')tn.read_until(b'Password:', timeout=5)tn.write(password.encode('ascii') + b'\n')time.sleep(1)command_result = tn.read_until(b'#', timeout=5)if b'#' not in command_result:print('%s登录失败' % host_ip)else:print('%s登录成功' % host_ip)except:print('%s网络连接失败' % host_ip)commands = [f"configure terminal",f"router ospf 1",f"area 0",f"interface xgei-0/1/1/49","network point-to-multipoint","exit""exit""exit""exit"]for cmd in commands:tn.write(cmd.encode('ascii') + b'\n')time.sleep(3)command_result = tn.read_very_eager().decode('ascii')print(command_result)print("modyfiOspf:ok")tn.close()def compare_host_ip(pd_output1, pd_output2,interface):if pd_output1["network"][0] != pd_output2["network"][0]:modify_ospf(host_ip[0], username, password)# if pd_output1["area"][0] != pd_output2["area"][0]:# modify_ospf(host_ip[1], username, password)# if pd_output1["interface"][0] and pd_output1["interface"][0]!=interface[0]:# modify_ospf(host_ip[2], username, password)# elif pd_output2["interface"][0] and pd_output2["interface"][0] != interface[1]:# modify_ospf(host_ip[3], username, password)# elif "interface" not in pd_output1.keys():# modify_ospf(host_ip[3], username, password)# elif "interface" in pd_output2.keys():# modify_ospf(host_ip[4], username, password)print("ospf配置ok")def are_ips_in_same_subnet(ip1, mask1, ip2, mask2):# 检查两个IP地址是否在同一个子网内try:net1 = ipaddress.ip_network(f"{ip1}/{mask1}", strict=False)net2 = ipaddress.ip_network(f"{ip2}/{mask2}", strict=False)if net1.overlaps(net2):print("地址配置正确")return Trueelse:print("地址错误")return Falseexcept ValueError as e:print(f"无效的网络字符串: {ip1}/{mask1} 或 {ip2}/{mask2}. 错误: {e}")return Falseif __name__ == '__main__':host_ip = ['192.168.2.1','192.168.1.1']interface = ['xgei-0/1/1/49','gei-0/1/1/33']mudi_ip = ['10.0.0.1','10.0.0.1']username = 'zte'password = 'zte'pd_output1,mask_value1 = get_info_telnet(host_ip[0], username, password,mudi_ip[1],interface)print(mask_value1["IP-Address"].values[0])pd_output2,mask_value2 = get_info_telnet(host_ip[1], username, password,mudi_ip[0],interface)are_ips_in_same_subnet(mask_value1["IP-Address"].values[0], mask_value1["Mask"].values[0], mask_value2["IP-Address"].values[0], mask_value2["Mask"].values[0])#compare_host_ip(pd_output1, pd_output2,interface[0])