第十章 SSH协议
10.1 简介
- SSH(secure Shell,安全外科协议)是一种用于在不安全网络上进行安全远程登录和实现其他安全网络服务的协议。
- 由三个组件构成:
- SSH传输层协议:版本协商、算法协商、密钥协商
- SSH用户认证协议:用户认证(口令、密钥)
- SSH连接协议:建立会话连接
10.2 密码学补充
10.2.1 对称加密和非对称加密
-
对称加密:对明文进行加密的密钥和对密文进行解密的密钥相同,即加解密密钥相同。
- 优点:加密速度快;
- 缺点:不安全,密钥传输过程中存在安全风险

-
非对称加密:双方都有两把钥匙:公钥和私钥
-
优点:公钥发送给对端,私钥本地保存,加密和解密过程中使用不同的密钥进行,公钥加密则私钥解密,私钥加密则公钥解密
-
优点:安全,私钥保存在本地
-
缺点:加密速度慢。虽然更急着无法通过密文得到明文信息,但是可以修改信息。

-
10.2.2 数字信封

- 核心概念:对于明文还是采用对称密钥加密的方式,而对于用于加密的对称密钥,采用非对称密钥加密进行保护。
- 优点:保证安全性的前提下,又提升了加密速度。
- 缺点:虽然攻击者无法通过密文得到明文信息,但是可以篡改信息。
10.2.3 数字签名

- 假设甲要向乙发送信息:
- 甲采用对称密钥对明文信息进行加密
- 甲使用乙的公钥对对称密钥进行加密,得到就是数字信封。
- 甲将最原始的明文信息通过Hash计算得到一段Hash-1值,并将该Hash值使用甲的私钥进行加密,得到一个数字签名。
- 甲将密文、数字信封以及数字签名发送给乙。
- 乙通过自己的私钥对数字信封进行解密得到对称密钥,使用对称密钥对密文信息进行解密,得到明文信息
- 乙使用该明文信息通过Hash计算得到一段Hash-2值。
- 乙使用甲的公钥对数字签名进行解密,得到Hash-1值。
- 乙将Hash-1和Hash-2值比较,相同,则认为发送端为甲,否则报文为伪造报文。
10.3 SSH传输层协议
- SSH传输层协议是一个安全传输协议,SSH传输层通常建立在TCP/IP链接上,协议算法如下:
| 算法类型 | 算法功能 | 算法名称 |
|---|---|---|
| 密钥交换算法 | 用于产生会话密钥 | Diffie-Hellman-Group14-sha1,Diffie-Hellman-Group1-sha1… |
| 公钥算法 | 用于进行数字签名和用户认证 | SSH-RSA,SSH-DSS… |
| 对称加密算法 | 用于会话的加密 | aes128-ctr,3des-cbc… |
| 消息认证算法 | 用于数据完整性认证 | hmac-sha1,hmac-md5… |
密钥交换算法:双发根据该算法生成一个对称加密的密钥用于后续报文的加密
公钥加密算法:用于用户认证时选择哪种算法进行非对称加密
对称加密算法:用于报文在进行对称加密时采用哪种堆成加密的算法
消息认证算法:用于数据完整性的认证
10.4 用户认证原理
- 口令认证(账号密码认证)

- 公钥认证

- 免密登录虽然不用输入账号密码,但是需要实现将客户端的公钥存储在服务器中
10.5 SSH连接协议

第十一 章 Paramiko组件
11.1 Paramiko常用类
- Channel类:该类用创建在SSH Transport上的安全通道
- Message类:SSH Message是字节流。该类对字符串、整数、bools和无线精度整数(python中称为long)的某些组合进行编码。
- Packetizer类:数据包处理类
- Transport类:该类型用于在现有套接字或类套接字对象上创建一个Transport会话对象。开启一个SSH的会话通道。
- SFTPClient类:该类通过打开一个SSH Transport 会话通道并执行远程文件操作。
- SSHClient类:SSHClient类是与SSH服务器会话的高级表示。该类集成了Transport,channel和SFTPClient类。
11.2 Paramiko密钥相关类
- SSH Agent类:用于SSH代理
- Host Keys类:该类与OpenSSH Known_hosts文件相关,用于创建一个host keys对象
- OpenSSH是SSH协议的免费开源实现。Open SSH提供了服务端后台程序和客户端工具。所有Linux操作系统均集成了OpenSSH。
- OpenSSH把用户访问过每一个计算机的公钥都记录在~/.ssh/known_hosts。当下次访问相同计算机时,OpenSSH会核对公钥。如果公钥不同,则发出警告,避免用户受到中间人攻击。
- Key handing类:该类用于创建对应密钥类型的实例,如RSA实例,DSS(DSA)密钥。
11.3 Paramiko使用流程图

11.4 Paramiko实例
11.4.1 SSHClient类实例
- 实例一:
import paramiko
import timessh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 默认进行用户认证时,需要先获取服务器的公钥,服务器将公钥发送给客户端时,客户端默认不会立即接受该公钥。
# set_missing_host_key_policy(paramiko.AutoAddPolicy())实现自动保存公钥。
ssh_client.connect(hostname='192.168.0.1', username='python', password='Huawei@123')
vtysh = ssh_client.invoke_shell()
res = vtysh.recv(65535).decode('utf-8')
print(res)
vtysh.send("N\n") # 不修改密码
time.sleep(0.5) # 输入一次命令后等待命令行反应,进入到用户视图
vtysh.send("screen-length 0 temporary\n") # 将命令行的显示命令数量调整为显示所有
time.sleep(0.5)
vtysh.send("dis cu\n")
time.sleep(1)
res = vtysh.recv(65535).decode('utf-8')
print(res)
- 实例二:通过SSHClient类实现自动配置SFTP功能
import paramiko
import timessh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 默认进行用户认证时,需要先获取服务器的公钥,服务器将公钥发送给客户端时,客户端默认不会立即接受该公钥。
# set_missing_host_key_policy(paramiko.AutoAddPolicy())实现自动保存公钥。
ssh_client.connect(hostname='192.168.0.1', username='python', password='Huawei@123')
vtysh = ssh_client.invoke_shell()
res = vtysh.recv(65535).decode('utf-8')
print(res)
vtysh.send("N\n") # 不修改密码
time.sleep(0.5) # 输入一次命令后等待命令行反应,进入到用户视图
vtysh.send("system-view im\n")
time.sleep(0.5)
with open("sftp.txt", "r", encoding="utf-8") as f: # 开启sftp文件,该文件中包含了sftp的配置命令,通过for循环读取。for i in f.readlines():vtysh.send(i)time.sleep(0.5) # 每条输入后等待0.5秒,等待命令行反应。
- SFTP脚本命令如下:
sftp server enable
ssh user python service-type stelnet sftp
ssh user python sftp-directory cfcard:
ssh authorization-type default root
11.4.2 Transport
- 使用Transport方法下载配置文件。
- 多次下载使用相同的文件名则会覆盖之前下载的。
- 路径中不能写额外的字符,否则会报文件不存在的错。
import paramikotran = paramiko.Transport(('192.168.0.1', 22))
tran.connect(username='python', password='Huawei@123')
sftp = paramiko.SFTPClient.from_transport(tran) # 从SSH会话的通道中建立SFTP会话的通道
sftp.get("/vrpcfg.cfg", r"D:\PycharmProjects\PythonProject\vrpcfg.cfg") # 前一个路径是相对路径,后一个路径是绝对路径
sftp.put(xxx,xxx) #上传文件
11.5 OS模块
11.5.1 路径操作:常用四个方法
os.对象.exists():判断路径是否存在。os.对象.abspath():获取当前py文件的绝对路径。os.对象.dirname():获取上层目录。os.对象.join():连接两个路径形成新的路径。
import osPath_dirs = r"files"
if not os.path.exists(Path_dirs): # 如果Path_dirs变量表示的目录不存在,则创建目录。os.mkdir(Path_dirs)path = os.path.abspath(__file__) # 获取当前py文件的绝对路径
root_path = os.path.dirname(path) # 获取上层目录
new_path = os.path.join(root_path, Path_dirs) # 连接root_path与Path_dirs,形成新的路径。
print(path)
print(root_path)
print(new_path)
D:\PycharmProjects\PythonProject\test.py
D:\PycharmProjects\PythonProject
D:\PycharmProjects\PythonProject\files
11.5.2 例一:形成新文件名(文件路径)
- 备份配置文件,根据日期形成用户名
import os
from datetime import datetimePath_dirs = r"files"
if not os.path.exists(Path_dirs): # 如果Path_dirs变量表示的目录不存在,则创建目录。os.mkdir(Path_dirs)path = os.path.abspath(__file__) # 获取当前py文件的绝对路径
root_path = os.path.dirname(path) # 获取上层目录time_now = datetime.today().strftime('%Y-%m-%d-%H-%M-%S')
print(time_now, type(time_now))
new_path = os.path.join(root_path, Path_dirs, "{}.cfg".format(time_now)) # 拼接路径
new_path1 = os.path.join(root_path, Path_dirs, time_now + ".cfg") # 拼接路径
print(new_path)
print(new_path1)
11.5.3 例二:下载配置文件名包含当前时间
- 使用SFTP连接设备,并下载设备配置文件,并按时间形成不同的路径
import os
from datetime import datetimeimport paramikoPath_dirs = r"files"
if not os.path.exists(Path_dirs): # 如果Path_dirs变量表示的目录不存在,则创建目录。os.mkdir(Path_dirs)path = os.path.abspath(__file__) # 获取当前py文件的绝对路径
root_path = os.path.dirname(path) # 获取上层目录time_now = datetime.today().strftime('%Y-%m-%d-%H-%M-%S')
print(time_now, type(time_now))
new_path = os.path.join(root_path, Path_dirs, "{}.cfg".format(time_now)) # 拼接路径将日期也放入到文件路径中tran = paramiko.Transport(("192.168.0.1", 22))
tran.connect(username='python', password='Huawei@123')
sftp = paramiko.SFTPClient.from_transport(tran) # 从SSH会话通道中建立SFTP会话通道
sftp.get("/vrpcfg.cfg", new_path) # 下载配置文件
11.6 paramiko模块
11.6.1 paramiko自动配置
import os
import time
from datetime import datetime
import paramikodef ssh_connect(ip, port, username, password):ssh_client = paramiko.SSHClient()ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())ssh_client.connect(hostname=ip, port=port, username=username, password=password)return ssh_clientdef ssh_config(ip, port, username, password, config_file):ssh_client = ssh_connect(ip, port, username, password)vty = ssh_client.invoke_shell() # 打开命令行vty.send("N\n")vty.send("system-view im\n")time.sleep(0.5)with open(config_file, 'r', encoding="utf-8") as f:for i in f.readlines():vty.send(i)print(i)time.sleep(0.5)return vty.recv(65535).decode('utf-8')
res = ssh_config("192.168.0.1","22","python","Huawei@123","snmp.txt")
- 再次进行函数优化,批量进行配置
import os
import time
from datetime import datetime
import paramikodef ssh_connect(ip, port, username, password):ssh_client = paramiko.SSHClient()ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())ssh_client.connect(hostname=ip, port=port, username=username, password=password)return ssh_clientdef ssh_config(ip, port, username, password, config_file):ssh_client = ssh_connect(ip, port, username, password)vty = ssh_client.invoke_shell() # 打开命令行vty.send(b"N\n")vty.send(b"system-view im\n")time.sleep(0.5)with open(config_file, 'r', encoding="utf-8") as f:for i in f.readlines():vty.send(i)time.sleep(0.5)return vty.recv(65535).decode('utf-8')def run(num):data_list = [{f"CE{i}": {"ip": f"192.168.0.{i}", "port": 22, "username": "python", "password": "Huawei@123"}}for i in range(1, 4)]print(data_list)for i in data_list:res = ssh_config(i[f"CE{num}"]["ip"], 22, "python", "Huawei@123", "snmp.txt")print(res)num += 1run(num=1) # 结果将snmp.txt中的配置信息都导入到三个交换机中
- 自动批量配置了snmp.txt中的命令行之后,将保存,并下载配置文件。
import os
import time
from datetime import datetime
import paramikodef ssh_connect(ip, port, username, password):ssh_client = paramiko.SSHClient()ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())ssh_client.connect(hostname=ip, port=port, username=username, password=password)return ssh_clientdef ssh_config(ip, port, username, password, config_file):ssh_client = ssh_connect(ip, port, username, password)vty = ssh_client.invoke_shell() # 打开命令行vty.send(b"N\n")vty.send(b"system-view im\n")time.sleep(0.5)with open(config_file, 'r', encoding="utf-8") as f:for i in f.readlines():vty.send(i)time.sleep(0.5)return vty.recv(65535).decode('utf-8')def download(ip, port, username, password, sysname):tran = paramiko.Transport((ip, port))tran.connect(username=username, password=password) # 一定要使用关键字参数sftp = paramiko.SFTPClient.from_transport(tran)time_now = datetime.now().strftime('%Y_%m_%d_%H_%M_%S')path = os.path.join(os.path.dirname(os.path.abspath(__file__)), time_now + f"{sysname}.cfg")sftp.get("/vrpcfg.cfg", path)data_list = [{"ip": f"192.168.0.{i}", "sysname": f"CE{i}", "username": "python", "password": "Huawei@123"} for i inrange(1, 4)]
for i in data_list:download(ip=i["ip"], port=22, sysname=i["sysname"], username=i["username"], password=i["password"])
11.6.2 paramiko进阶实验
- 在使用paramiko进行SSH登录时,可能会存在网络或者设备故障的问题
- 如何判断设备登录成功
- 如何保障回显完毕
- 如何判断进入了系统视图
- 如何判断命令执行成功
- 命令执行成功后如何返回最初的界面
import time
import re
import paramikoclass LoginError(Exception):def __init__(self, ip):self.ip = ipdef __str__(self):return f"{self.ip}连接失败"class GetMarkError(Exception):def __str__(self):return "GetMarkError"class SSH:def __init__(self, ip: str, port: int, username: str, password: str) -> None:self.ip = ipself.port = portself.username = usernameself.password = passwordself.session = Noneself.vty = Noneself.mark = Noneself.login = Falseself.config = Falseself.old_mark = Nonedef transport(self) -> None: # 定义打开SSH会话的函数ssh_session = paramiko.SSHClient() # 定义一个SSHClient会话对象ssh_session.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 自动加载公钥ssh_session.connect(hostname=self.ip, port=self.port, username=self.username,password=self.password) # 使用connect方法连接服务器self.session = ssh_session # 将ssh_session 赋值给self.session,方便该程序其他位置调用。def open_vty(self) -> None: # 定义一个打开命令行的函数self.vty = self.session.invoke_shell()def get_mark(self, output: str) -> None:pat = re.compile("<.+?>") # 创建正则表达式匹配<>用户视图。res = pat.findall(output) # 将pat正则表达式采用findall方法在output中查找,查看是否匹配正则表达式。返回值是一个列表if res.__len__() != 1: # 查看列表是否不为1长度,如果不为1,则没有匹配到对应的<>用户视图标记,返回获取标记失败异常,否则为1,则匹配到用户视图标记。raise GetMarkErrorelse:self.mark = self.old_mark = res[0]# print(self.mark)def login_device(self) -> None: # 定义一个登录设备的函数,调用两个函数try:self.transport() # 调用函数创建一个会话对象self.open_vty() # 调用函数创建一个命令行通道对象except Exception: # 报错则抛出以下错误信息raise LoginError(self.ip)self.vty.send("n\n") # self.vty是上述存在的命令行通道函数time.sleep(1)output = self.vty.recv(65535).decode("utf-8") # 将回显赋值给output变量,使用output判断self.get_mark(output) # 调用get_mark函数,查询是否在output中看到<>用户视图标记self.login = True # 记录登录状态def recv_result(self, nbytes, interval=1): # 设置函数参数,nbytes表示回显的最大内容属,interval表示时间间隔默认1spat = re.compile(self.mark) # 调用self.mark属性,该属性在get_mark函数中的xx行已被赋值,表示“<设备名称>”ret = "" # 定义变量接收回显内容,将循环中的回显内容添加到ret中,避免被覆盖。while True:res = self.vty.recv(nbytes).decode("utf-8")# print(ret)ret += resif not pat.search(res): # 通过正则表达式pat匹配字符串res,如果没有匹配,执行if语句块内的语句self.vty.send(" ")time.sleep(interval)continueelse:breakreturn ret # 返回全部回显结果def test_command(self, command: str):if not self.login:self.login_device()self.vty.send(command + "\n")res = self.recv_result(65535) # 调用函数recv_result函数,执行函数循环,获取全部回显参数return resdef config_mode(self):if not self.login:self.login_device()self.mark = "\[.+?\]"self.vty.send("system-view" + "\n")self.config = Truedef exit(self):self.mark = "<.+?>"return self.test_command("return")def send_command(self, command: list):if not self.config:self.config_mode()ret = ""for i in command:res = self.test_command(i)ret += resret += self.test_command("commit")ret += self.exit()return retif __name__ == "__main__":ssh_client = SSH(ip="192.168.0.1", port=22, username="python", password="Huawei@123") # 定义SSH_client类,初始化参数res = ssh_client.send_command(["interface G1/0/2", "description tutu11"])print(res)
