欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 时评 > 基于netmiko模块实现支持SSH or Telnet的多线程多厂商网络设备自动化巡检脚本

基于netmiko模块实现支持SSH or Telnet的多线程多厂商网络设备自动化巡检脚本

2025/5/25 13:03:46 来源:https://blog.csdn.net/a728800/article/details/148193101  浏览:    关键词:基于netmiko模块实现支持SSH or Telnet的多线程多厂商网络设备自动化巡检脚本

自动化巡检的需求

        巡检工作通常包含大量的重复性操作,而这些重复性特征意味着其背后存在明确的规则和逻辑。这种规律性为实现自动化提供了理想的前提条件。

自动化工具

        我们这里采用python作为自动化的执行工具。

过程

        安装 netmiko

pip install netmiko

        模块的使用

import os
from concurrent.futures import ThreadPoolExecutor
from os import makedirs
from netmiko import ConnectHandler
import csv
import chardet

        厂商识别与协议识别

    if i['厂商'] == '华为' and i['协议'] == 'ssh':device_type = 'huawei'cmd_txt = '华为巡检命令.txt'elif i['厂商'] == '华为' and i['协议'] == 'telnet':device_type = 'huawei_telnet'cmd_txt = '华为巡检命令.txt'elif i['厂商'] == '华三' and i['协议'] == 'ssh':device_type = 'hp_comware'cmd_txt = 'H3C巡检命令.txt'elif i['厂商'] == '华三' and i['协议'] == 'telnet':device_type = 'hp_comware_telnet'cmd_txt = 'H3C巡检命令.txt'elif i['厂商'] == '锐捷' and i['协议'] == 'ssh':device_type = 'ruijie_os'cmd_txt = '锐捷巡检命令.txt'elif i['厂商'] == '锐捷' and i['协议'] == 'telnet':device_type = 'ruijie_os_telnet'cmd_txt = '锐捷巡检命令.txt'

        创建设备字典

device = {'device_type': device_type,'host': i['ip'],'username': i['username'],'password': i['password'],}

        调用构造函数并传参,**表示将字典解包并逐一传参

conn = ConnectHandler(**device)

        巡检记录之path变量为True

dir = os.path.join('./巡检命令文件/', cmd_txt)path = './巡检记录/'if os.path.exists(path): #表示如果path变量中存储的路径存在则运行with open(dir, mode='rb') as ffile: #这段open是为了识别字符集编码data_bs = ffile.read()result_dict = chardet.detect(data_bs)encoding_cmd_txt = result_dict['encoding']with open(dir, mode='r', encoding=encoding_cmd_txt) as cmd_read:for cmd in cmd_read:stdout = conn.send_command(cmd.strip())with open(f'{path}{i['ip']}的巡检记录.txt', mode='a', encoding='utf-8') as addwrite:addwrite.write(stdout)

        巡检记录之path变量为false

    else:makedirs(path)  #因为path变量中存储的路径不存在,因此创建一个with open(dir, mode='rb') as ffile:data_bs = ffile.read()result_dict = chardet.detect(data_bs)encoding_cmd_txt = result_dict['encoding']with open(dir, mode='r', encoding=encoding_cmd_txt) as cmd_read:for cmd in cmd_read:stdout = conn.send_command(cmd.strip())with open(f'{path}{i['ip']}的巡检记录.txt', mode='a', encoding='utf-8') as addwrite:addwrite.write(stdout)

        并且以上所有代码细节都封装进一个函数里以方便代码的抽象化引用和减少代码的重复性。

def xijie(i):if i['厂商'] == '华为' and i['协议'] == 'ssh':device_type = 'huawei'cmd_txt = '华为巡检命令.txt'elif i['厂商'] == '华为' and i['协议'] == 'telnet':device_type = 'huawei_telnet'cmd_txt = '华为巡检命令.txt'elif i['厂商'] == '华三' and i['协议'] == 'ssh':device_type = 'hp_comware'cmd_txt = 'H3C巡检命令.txt'elif i['厂商'] == '华三' and i['协议'] == 'telnet':device_type = 'hp_comware_telnet'cmd_txt = 'H3C巡检命令.txt'elif i['厂商'] == '锐捷' and i['协议'] == 'ssh':device_type = 'ruijie_os'cmd_txt = '锐捷巡检命令.txt'elif i['厂商'] == '锐捷' and i['协议'] == 'telnet':device_type = 'ruijie_os_telnet'cmd_txt = '锐捷巡检命令.txt'device = {'device_type': device_type,'host': i['ip'],'username': i['username'],'password': i['password'],}conn = ConnectHandler(**device)dir = os.path.join('./巡检命令文件/', cmd_txt)path = './巡检记录/'if os.path.exists(path):with open(dir, mode='rb') as ffile:data_bs = ffile.read()result_dict = chardet.detect(data_bs)encoding_cmd_txt = result_dict['encoding']with open(dir, mode='r', encoding=encoding_cmd_txt) as cmd_read:for cmd in cmd_read:stdout = conn.send_command(cmd.strip())with open(f'{path}{i['ip']}的巡检记录.txt', mode='a', encoding='utf-8') as addwrite:addwrite.write(stdout)else:makedirs(path)with open(dir, mode='rb') as ffile:data_bs = ffile.read()result_dict = chardet.detect(data_bs)encoding_cmd_txt = result_dict['encoding']with open(dir, mode='r', encoding=encoding_cmd_txt) as cmd_read:for cmd in cmd_read:stdout = conn.send_command(cmd.strip())with open(f'{path}{i['ip']}的巡检记录.txt', mode='a', encoding='utf-8') as addwrite:addwrite.write(stdout)

        主程序入口点

if __name__ == '__main__':with open('host.csv',mode='rb') as file:raw_data = file.read()result = chardet.detect(raw_data)encoding = result['encoding']with open('host.csv',mode='r',encoding=encoding) as file:reader = csv.DictReader(file)max_thread = 10 #定义线程池中使用多少线程for i in reader:with ThreadPoolExecutor(max_workers=max_thread) as t:t.submit(xijie,i) #提交任务

可以看出,由于将代码细节给抽象化成函数,因此整个代码显得更简洁了,特别是在主程序入口的体现,无需了解代码细节,直接引用即可。

具体代码实现

import os
from concurrent.futures import ThreadPoolExecutor
from os import makedirs
from netmiko import ConnectHandler
import csv
import chardetdef xijie(i):if i['厂商'] == '华为' and i['协议'] == 'ssh':device_type = 'huawei'cmd_txt = '华为巡检命令.txt'elif i['厂商'] == '华为' and i['协议'] == 'telnet':device_type = 'huawei_telnet'cmd_txt = '华为巡检命令.txt'elif i['厂商'] == '华三' and i['协议'] == 'ssh':device_type = 'hp_comware'cmd_txt = 'H3C巡检命令.txt'elif i['厂商'] == '华三' and i['协议'] == 'telnet':device_type = 'hp_comware_telnet'cmd_txt = 'H3C巡检命令.txt'elif i['厂商'] == '锐捷' and i['协议'] == 'ssh':device_type = 'ruijie_os'cmd_txt = '锐捷巡检命令.txt'elif i['厂商'] == '锐捷' and i['协议'] == 'telnet':device_type = 'ruijie_os_telnet'cmd_txt = '锐捷巡检命令.txt'device = {'device_type': device_type,'host': i['ip'],'username': i['username'],'password': i['password'],}conn = ConnectHandler(**device)dir = os.path.join('./巡检命令文件/', cmd_txt)path = './巡检记录/'if os.path.exists(path):with open(dir, mode='rb') as ffile:data_bs = ffile.read()result_dict = chardet.detect(data_bs)encoding_cmd_txt = result_dict['encoding']with open(dir, mode='r', encoding=encoding_cmd_txt) as cmd_read:for cmd in cmd_read:stdout = conn.send_command(cmd.strip())with open(f'{path}{i['ip']}的巡检记录.txt', mode='a', encoding='utf-8') as addwrite:addwrite.write(stdout)else:makedirs(path)with open(dir, mode='rb') as ffile:data_bs = ffile.read()result_dict = chardet.detect(data_bs)encoding_cmd_txt = result_dict['encoding']with open(dir, mode='r', encoding=encoding_cmd_txt) as cmd_read:for cmd in cmd_read:stdout = conn.send_command(cmd.strip())with open(f'{path}{i['ip']}的巡检记录.txt', mode='a', encoding='utf-8') as addwrite:addwrite.write(stdout)if __name__ == '__main__':with open('host.csv',mode='rb') as file:raw_data = file.read()result = chardet.detect(raw_data)encoding = result['encoding']with open('host.csv',mode='r',encoding=encoding) as file:reader = csv.DictReader(file)max_thread = 10for i in reader:with ThreadPoolExecutor(max_workers=max_thread) as t:t.submit(xijie,i)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词