如何使用Python自动采集路由器信息
还有一小时就要下班了,老板突然指示马上收集一下所有设备的IP,软件版本,序列号信息,一台一台登录肯定得加班了,要不用Python自动采集吧。
环境
- Python 3.9
- Python模块
Netmiko - ssh登录路由器模块
Getpass - 非明文输入密码
TextFSM - 把无规律的文本内容整理成有序的数据格式
ntc-templates - 用TextFSM开发的用于网络设备的模板集,包括Cisco、Avaya、Huawei等
openpyxl - 把查询结果输出到EXCEL
concurrent.futures - 并发模块
代码
#!/usr/bin/env python3
# -*- coding=utf-8 -*-
# by EFLYTOP
# www.eflytop.com
from netmiko import ConnectHandler,NetmikoTimeoutException,NetmikoAuthenticationException
from openpyxl import Workbook
from openpyxl.styles import PatternFill, Border, Side
from pprint import pprint
from concurrent.futures import ThreadPoolExecutor
import getpass
import sys
import time
print('请把要执行的IP清单保存至device.txt文件')
username = input("请输入设备用户名:")
password = getpass.getpass(prompt='请输入密码:')
#enpass = getpass.getpass(prompt='请输入enable密码:')
succeed_ip_list = []
failed_login_ip_list = []
hostname_list = []
sn_list = []
uptime_list = []
model_list = []
os_version_list = []
image_list = []
wb = Workbook()
ws = wb.active
ws.title = 'Inventory'
ws['A1'] = 'Hostname'
ws['B1'] = 'IP Address'
ws['C1'] = 'Serial Number'
ws['D1'] = 'Uptime'
ws['E1'] = 'Model'
ws['F1'] = 'OS Version'
ws['G1'] = 'Image'
yellowFill = PatternFill(start_color='FFFF00', end_color='FFFF00', fill_type='solid')
thin_border = Border(left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin'))
ws['A1'].fill=yellowFill
ws['B1'].fill=yellowFill
ws['C1'].fill=yellowFill
ws['D1'].fill=yellowFill
ws['E1'].fill=yellowFill
ws['F1'].fill=yellowFill
ws['G1'].fill=yellowFill
file_time = time.strftime("%Y-%m-%d_%H_%M_%S", time.localtime())
#print输出到txt文件
class Logger(object):
def __init__(self, filename="Default.log"):
self.terminal = sys.stdout
self.log = open(filename, "a")
def write(self, message):
self.terminal.write(message)
self.log.write(message)
def flush(self):
pass
sys.stdout = Logger('log_' + file_time + '.log')
start_time = time.time()
def get_ip_address():
with open('device.txt') as f:
addresses = f.read().splitlines()
return addresses
def retrieve_data(ipaddr):
connection_info = {
'device_type': 'cisco_ios',
'ip': ipaddr,
'username': username,
'password': password,
#'port': 22,
#'secret': enpass,
}
try:
with ConnectHandler(**connection_info) as conn:
#conn.enable()
print('INFO: ',time.strftime('%x %X'),'已经登录设备:', ipaddr)
output = conn.send_command('show version', use_textfsm=True)
succeed_ip_list.append(ipaddr)
hostname = conn.find_prompt().replace('#','')
hostname_list.append(hostname)
sn = output[0]['serial'][0]
sn_list.append(sn)
uptime = output[0]['uptime']
uptime_list.append(uptime)
model = output[0]['hardware'][0]
model_list.append(model)
os_version = output[0]['version']
os_version_list.append(os_version)
image = output[0]['running_image']
image_list.append(image)
except NetmikoAuthenticationException : #认证失败报错记录
print('INFO: ',time.strftime('%x %X'),ipaddr,'[Error 1] Authentication failed.')
failed_login_ip_list.append(ipaddr)
except NetmikoTimeoutException : #登录超时报错记录
print('INFO: ',time.strftime('%x %X'),ipaddr,'[Error 2] Connection timed out.')
failed_login_ip_list.append(ipaddr)
except Exception as e:
print('INFO: ',time.strftime('%x %X'),ipaddr,e)
failed_login_ip_list.append(ipaddr)
with ThreadPoolExecutor(max_workers=5) as exe:
ip_addresses = get_ip_address()
results = exe.map(retrieve_data, ip_addresses)
row_top_number = len(succeed_ip_list) + 2
for hostname, ip, sn, uptime, model, os_version, image, row in zip(hostname_list, succeed_ip_list, sn_list, uptime_list, model_list, os_version_list, image_list, range(2, row_top_number)):
ws.cell(row=row, column=1, value=hostname)
ws.cell(row=row, column=2, value=ip)
ws.cell(row=row, column=3, value=sn)
ws.cell(row=row, column=4, value=uptime)
ws.cell(row=row, column=5, value=model)
ws.cell(row=row, column=6, value=os_version)
ws.cell(row=row, column=7, value=image)
dims = {}
for row in ws.rows:
for cell in row:
cell.border=thin_border
if cell.value:
dims[cell.column_letter] = max((dims.get(cell.column, 0), len(str(cell.value))))
for col, value in dims.items():
ws.column_dimensions[col].width = value + 5
wb.save('inventory_' + file_time +'.xlsx')
print('下列主机登录失败,请手动检查:')
for i in failed_login_ip_list:
print(i)
#print(succeed_ip_list)
#print(hostname_list)
#print(model_list)
print('本次执行时长:%.2f'%(time.time()-start_time) + '秒')
print("命令执行完毕,执行日志请查看" + 'log_' + file_time + '.log')
print('执行结果请查看' + 'inventory_' + file_time +'.xlsx')