欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 产业 > #渗透测试#SRC漏洞挖掘#红蓝攻防#地址池搭建之自动化编排ZMap输出结果

#渗透测试#SRC漏洞挖掘#红蓝攻防#地址池搭建之自动化编排ZMap输出结果

2025/9/19 21:14:00 来源:https://blog.csdn.net/m0_62828084/article/details/143897967  浏览:    关键词:#渗透测试#SRC漏洞挖掘#红蓝攻防#地址池搭建之自动化编排ZMap输出结果

免责声明 本教程仅为合法的教学目的而准备,严禁用于任何形式的违法犯罪活动及其他商业行为,在使用本教程前,您应确保该行为符合当地的法律法规,继续阅读即表示您需自行承担所有操作的后果,如有异议,请立即停止本文章阅读。                                                             

目录

前情提要:

一、编写需求:

1、导入模块

2、定义全局变量

3、信号处理函数

4、测活函数

5、主函数

7、读取IP地址文件

8、处理文件读取异常

源码如下:

二、 如何优化IP测活脚本性能?

1、使用高效的网络库

2、批量处理请求

3、使用多线程或多进程

4、优化DNS解析

5、减少不必要的日志记录

6、使用高效的IP地址存储结构

7、定期清理和优化

 三、优化脚本:

实现步骤

1. 读取IP地址并添加端口号

2. 使用asyncio进行异步测活

3. 保存存活的IP:端口号组合

4. 主函数

5.  代码实现


前情提要:

接上文ZMap生成的txt文档中只有开放筛选端口的IP,在通过脚本进行验活的过程中,需要采用ip+端口的方式,所以进行python 脚本的编写。

一、编写需求:

编写py脚本实现将单个txt文件中的IP地址,添加规定的端口号,并测活,然后以IP:端口号的格式,
将存活的IP保存在新的txt文件中,并展示每一条测活的过程。并设置强制终止。

1、导入模块
import os 
import socket 
import signal 
import sys 

这部分代码导入了所需的Python标准库模块:

  • os: 用于与操作系统交互。
  • socket: 用于网络通信,特别是TCP/IP连接。
  • signal: 用于处理信号,例如强制终止信号。
  • sys: 提供了与Python解释器交互的功能。
2、定义全局变量
output_file = "alive_ips.txt"  
port = 80  # 规定的端口号 
  • output_file: 定义了输出文件的名称,用于存储存活的IP地址和端口号。
  • port: 定义了要测试的端口号,这里设置为80。
3、信号处理函数
def signal_handler(sig, frame): print("\n脚本被强制终止。") sys.exit(0)  
  • signal_handler: 这是一个信号处理函数,用于在接收到强制终止信号(如Ctrl+C)时执行。它会打印一条消息并退出脚本。
4、测活函数
def check_ip_port(ip, port): try: # 创建socket连接 sock = socket.socket(socket.AF_INET,  socket.SOCK_STREAM) sock.settimeout(2)   # 设置超时时间为2秒 result = sock.connect_ex((ip,  port)) if result == 0: print(f"IP: {ip}:{port} 存活") return True else: print(f"IP: {ip}:{port} 不存活") return False except Exception as e: print(f"IP: {ip}:{port} 测活失败: {e}") return False finally: sock.close()  
 
  • check_ip_port: 这是一个用于测试IP地址和端口是否存活的函数。
    • 创建一个TCP/IP套接字。
    • 设置超时时间为2秒。
    • 尝试连接指定的IP地址和端口。
    • 如果连接成功(result == 0),则打印“存活”并返回True
    • 如果连接失败,则打印“不存活”并返回False
    • 如果发生异常,打印错误信息并返回False
    • 无论结果如何,最后都会关闭套接字。
5、主函数
def main():  # 注册信号处理函数  signal.signal(signal.SIGINT,   signal_handler)  # 读取IP地址文件  with open("文件名字",   "r") as file:  ips = file.readlines()    # 清空输出文件  with open(output_file, "w") as file:  pass  # 遍历每个IP地址并测活  for ip in ips:  ip = ip.strip()    # 去除首尾空白字符  if check_ip_port(ip, port):  # 将存活的IP地址和端口号写入文件  with open(output_file, "a") as file:  file.write(f"{ip}:{port}\n")    if __name__ == "__main__":  main()   
  • main: 这是脚本的主函数。
    • 注册信号处理函数,以便在接收到SIGINT信号(如Ctrl+C)时调用signal_handler
    • 打开并读取包含IP地址的文件raw-443-ip.txt ,将其内容存储在ips列表中。
    • 清空输出文件alive_ips.txt ,以便重新写入存活的IP地址。
    • 遍历每个IP地址,调用check_ip_port函数进行测活。
    • 如果IP地址存活,将其和端口号写入alive_ips.txt 文件。
  • if __name__ == "__main__": main(): 这是Python脚本的入口点,确保只有在直接运行脚本时才会执行main函数。
  • 注意点:    # 读取IP地址文件
        with open("raw-443-ip.txt",  "r") as file:
            ips = file.readlines() 
  • 7、读取IP地址文件

    在读取IP地址文件时,需要注意文件的路径问题。确保文件路径是正确的,否则可能会导致文件读取失败。

    相对路径与绝对路径

  • 相对路径:相对于当前工作目录的路径。例如,如果当前工作目录是 /home/user/,那么 raw-443-ip.txt 文件的相对路径可能是 ./data/raw-443-ip.txt
  • 绝对路径:从根目录开始的完整路径。例如,/home/user/data/raw-443-ip.txt
  • 确认文件路径

    在读取文件之前,建议确认文件路径是否正确。可以通过以下方式确认:

    import os # 确认文件是否存在 
    if os.path.exists("raw-443-ip.txt"):  with open("raw-443-ip.txt",  "r") as file: ips = file.readlines()  
    else: print("文件不存在,请检查路径。") 
    8、处理文件读取异常

    为了增强代码的健壮性,建议使用异常处理来捕获文件读取过程中可能出现的错误。

  • try: with open("raw-443-ip.txt",  "r") as file: ips = file.readlines()  
    except FileNotFoundError: print("文件未找到,请检查路径。") 
    except IOError as e: print(f"文件读取错误: {e}") 

源码如下:
import os 
import socket 
import signal 
import sys # 定义全局变量 output_file = "alive_ips.txt"  
port = 80  # 规定的端口号 # 信号处理函数,用于强制终止脚本 
def signal_handler(sig, frame): print("\n脚本被强制终止。") sys.exit(0)  # 测活函数 
def check_ip_port(ip, port): try: # 创建socket连接 sock = socket.socket(socket.AF_INET,  socket.SOCK_STREAM) sock.settimeout(2)   # 设置超时时间为2秒 result = sock.connect_ex((ip,  port)) if result == 0: print(f"IP: {ip}:{port} 存活") return True else: print(f"IP: {ip}:{port} 不存活") return False except Exception as e: print(f"IP: {ip}:{port} 测活失败: {e}") return False finally: sock.close()  # 主函数 
def main(): # 注册信号处理函数 signal.signal(signal.SIGINT,  signal_handler) # 读取IP地址文件 with open("raw-443-ip.txt",  "r") as file: ips = file.readlines()  # 清空输出文件 with open(output_file, "w") as file: pass # 遍历每个IP地址并测活 for ip in ips: ip = ip.strip()   # 去除首尾空白字符 if check_ip_port(ip, port): # 将存活的IP地址和端口号写入文件 with open(output_file, "a") as file: file.write(f"{ip}:{port}\n")  if __name__ == "__main__": main() 

二、 如何优化IP测活脚本性能?

1、使用高效的网络库

选择高效的网络库可以显著提升IP测活脚本的性能。例如,使用asyncio库可以实现异步I/O操作,从而提高并发处理能力。

2、批量处理请求

将多个IP地址分组进行批量处理,而不是逐个处理,可以减少网络延迟和系统调用开销。

3、使用多线程或多进程

利用多线程或多进程技术可以充分利用多核CPU的性能,提高并发处理能力。

4、优化DNS解析

DNS解析可能会成为性能瓶颈,使用缓存DNS解析结果或使用本地DNS缓存服务可以减少解析时间。

5、减少不必要的日志记录

过多的日志记录会占用大量CPU和I/O资源,适当减少或优化日志记录可以提升脚本性能。

6、使用高效的IP地址存储结构

使用高效的IP地址存储结构(如Trie树)可以加快IP地址的查找和匹配速度。

7、定期清理和优化

定期清理不再使用的IP地址和优化数据库查询可以保持脚本的高效运行。

 三、优化脚本:

编写一个Python脚本,实现以下功能:

  1. 从单个txt文件中读取IP地址。
  2. 为每个IP地址添加规定的端口号(80、8080、443)。
  3. 测试这些IP:端口号组合的连通性。
  4. 将存活的IP:端口号组合保存到新的txt文件中。
  5. 展示每一条测活的过程。
  6. 设置强制终止功能。
  7. 处理文件读取异常。
  8. 使用asyncio库实现异步I/O操作,提高并发处理能力。
  9. 将多个IP地址分组进行批量处理,减少网络延迟和系统调用开销。
  10. 利用多线程或多进程技术充分利用多核CPU的性能。
  11. 使用高效的IP地址存储结构(如Trie树)加快IP地址的查找和匹配速度。

实现步骤

1. 读取IP地址并添加端口号

首先,我们需要从txt文件中读取IP地址,并为每个IP地址添加规定的端口号(80、8080、443)。

def read_ips_from_file(file_path): try: with open(file_path, 'r') as file: ips = [line.strip() for line in file if line.strip()]  return ips except FileNotFoundError: print(f"文件 {file_path} 未找到") return [] except Exception as e: print(f"读取文件时发生错误: {e}") return [] def generate_ip_port_combinations(ips, ports=[80, 8080, 443]): combinations = [(ip, port) for ip in ips for port in ports] return combinations 

2. 使用asyncio进行异步测活

使用asyncio库进行异步I/O操作,提高并发处理能力。

import asyncio 
import aiohttp async def check_ip_port(ip, port): url = f"http://{ip}:{port}" try: async with aiohttp.ClientSession() as session: async with session.get(url,  timeout=5) as response: if response.status  == 200: print(f"IP: {ip}, Port: {port} 存活") return f"{ip}:{port}" else: print(f"IP: {ip}, Port: {port} 不存活") return None except Exception as e: print(f"IP: {ip}, Port: {port} 测活失败: {e}") return None async def check_ips_ports(combinations): tasks = [check_ip_port(ip, port) for ip, port in combinations] results = await asyncio.gather(*tasks)  return [result for result in results if result] 

3. 保存存活的IP:端口号组合

将存活的IP:端口号组合保存到新的txt文件中。

def save_alive_ips(alive_ips, output_file): with open(output_file, 'w') as file: for ip_port in alive_ips: file.write(f"{ip_port}\n")  print(f"存活的IP:端口号组合已保存到 {output_file}") 

4. 主函数

将上述功能整合到主函数中,并设置强制终止功能。

import signal 
import sys def signal_handler(sig, frame): print("强制终止程序") sys.exit(0)  signal.signal(signal.SIGINT,  signal_handler) async def main(): file_path = "ips.txt"  output_file = "alive_ips.txt"  ips = read_ips_from_file(file_path) combinations = generate_ip_port_combinations(ips) alive_ips = await check_ips_ports(combinations) save_alive_ips(alive_ips, output_file) if __name__ == "__main__": asyncio.run(main())  

 5、代码实现

import asyncio 
import socket 
import threading 
import time 
import sys 
from collections import defaultdict # Trie树节点定义 
class TrieNode: def __init__(self): self.children  = defaultdict(TrieNode) self.is_end  = False class Trie: def __init__(self): self.root  = TrieNode() def insert(self, ip): node = self.root  for part in ip.split('.'):  node = node.children[part]  node.is_end  = True def search(self, ip): node = self.root  for part in ip.split('.'):  if part not in node.children:  return False node = node.children[part]  return node.is_end  # 读取IP地址并构建Trie树 
def read_ips_from_file(filename): trie = Trie() try: with open(filename, 'r') as file: for line in file: ip = line.strip()  trie.insert(ip)  except FileNotFoundError: print("文件未找到,请检查文件路径。") sys.exit(1)  return trie # 测试IP:端口号组合的连通性 
async def test_ip_port(ip, port): try: reader, writer = await asyncio.open_connection(ip,  port) writer.close()  await writer.wait_closed()  return (ip, port, True) except (socket.gaierror,  ConnectionRefusedError, TimeoutError): return (ip, port, False) # 批量测试IP:端口号组合 
async def batch_test_ips(trie, ports, batch_size): tasks = [] for ip in trie.root.children:  for port in ports: tasks.append(test_ip_port(ip,  port)) if len(tasks) >= batch_size: results = await asyncio.gather(*tasks)  for result in results: print(f"测试 {result[0]}:{result[1]} - {'存活' if result[2] else '不存活'}") tasks = [] if tasks: results = await asyncio.gather(*tasks)  for result in results: print(f"测试 {result[0]}:{result[1]} - {'存活' if result[2] else '不存活'}") # 保存存活的IP:端口号组合 
def save_alive_ips(results, filename): with open(filename, 'w') as file: for ip, port, alive in results: if alive: file.write(f"{ip}:{port}\n")  # 强制终止功能 
def signal_handler(sig, frame): print("强制终止脚本。") sys.exit(0)  # 主函数 
def main(): filename = 'ips.txt'  output_filename = 'alive_ips.txt'  ports = [80, 8080, 443] batch_size = 10 trie = read_ips_from_file(filename) loop = asyncio.get_event_loop()  results = loop.run_until_complete(batch_test_ips(trie,  ports, batch_size)) save_alive_ips(results, output_filename) if __name__ == "__main__": main() 

未完待续~~~!!!!!

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词