欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 产业 > 为MySQL社区版实现审计功能:从插件配置到日志监控全解析

为MySQL社区版实现审计功能:从插件配置到日志监控全解析

2025/6/12 21:47:47 来源:https://blog.csdn.net/heian_99/article/details/148554751  浏览:    关键词:为MySQL社区版实现审计功能:从插件配置到日志监控全解析

背景

公司要把重要的数据库的操作记录留存,方便查询操作可视化和告警监控等待。更能和其余相同的数据库保持一致。

本文将详细介绍如何通过MariaDB的server_audit插件为MySQL社区版实现完整的审计功能,包括版本兼容性处理、插件配置、日志解析和存储方案。

为什么MySQL社区版需要审计功能?

在公司数据库管理中,审计日志是安全合规和故障排查的重要工具。然而,许多使用MySQL社区版的企业面临一个尴尬的现实:原生的审计功能仅在企业版中提供。根据MySQL官方文档,社区版从5.7.34版本后不再支持第三方审计插件,这给需要合规审计的企业带来了挑战。

技术选型与版本兼容性分析

方案优点缺点适用场景
MySQL企业版审计插件官方支持,功能完善需要付费许可预算充足的企业
MariaDB server_audit插件免费开源,功能完整版本兼容性复杂MySQL 5.7.34及以下版本
Percona审计插件完全兼容MySQL需要迁移到Percona Server新建项目或可迁移环境
触发器+通用日志无需额外组件性能影响大,信息不完整简单审计需求

在企业数据管理中,数据库操作审计是安全合规的核心需求。但MySQL社区版存在明显短板:

  1. 缺乏原生审计功能(仅企业版支持audit_log插件)
  2. 版本兼容陷阱
    • MySQL 5.7.34:最后一个可原生使用MariaDB插件的版本
    • MySQL ≥5.7.34 无法使用MariaDB的server_audit.so插件
    • MySQL 8.0 完全移除插件兼容性
  3. 替代方案选择
    • 方案1:MySQL 5.7低版本 + MariaDB插件(需版本匹配)
    • 方案2:迁移至Percona Server(免费开源,兼容MySQL 5.6/5.7/8.0且支持审计)
    • 方案3:全面迁移到MariaDB

实践建议:在测试环境中验证插件兼容性,生产环境部署前进行完整的回归测试。

环境

环境如下:

MySQL

[root@test_db01 ~]# mysql -V
mysql  Ver 14.14 Distrib 5.7.27, for Linux (x86_64) using  EditLine wrapper

Cnetos7

[root@test_db01 ~]# cat /etc/redhat-release 
CentOS Linux release 7.3.1611 (Core)Linux test_db01 3.10.0-514.el7.x86_64 #1 SMP Tue Nov 22 16:42:41 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux

安装插件

查看插件安装目录


mysql>  show global variables like 'plugin_dir';
+---------------+--------------------------+
| Variable_name | Value                    |
+---------------+--------------------------+
| plugin_dir    | /usr/lib64/mysql/plugin/ |
+---------------+--------------------------+
1 row in set (0.00 sec)

提取mariadb审计插件并放置插件目录

此处需要注意版本,直接适配核对测试

wget  https://downloads.mariadb.com/MariaDB/mariadb-10.5.16/bintar-linux-x86_64/mariadb-10.5.16-linux-x86_64.tar.gz
tar -zxvf   mariadb-10.5.3-linux-x86_64.tar.gz
cp ./mariadb-10.5.3-linux-x86_64/lib/plugin/server_audit.so /usr/lib64/mysql/plugin/
chmod +x /usr/lib64/mysql/plugin/

mysql安装server_audit.so插件

mysql>  install plugin server_audit soname 'server_audit.so';
Query OK, 0 rows affected (0.02 sec)           
##也可以在my.cnf 加载插件方式安装
在my.cnf 设置 plugin_load = server_audit=server_audit.so  

查看当前MySQL插件情况

mysql>  show plugins;
| SERVER_AUDIT                             | ACTIVE   | AUDIT              | server_audit.so       | GPL     |

增加审计目录授权

mkdir /opt/mysqldata/auditlogs
chown -R mysql:mysql /opt/mysqldata/auditlogs

开启审计,写入配置文件

#防止server_audit 插件被卸载 进行配置文件配置
server_audit=FORCE_PLUS_PERMANENT
#指定哪些操作被记录到日志文件中
server_audit_events='CONNECT,QUERY,TABLE,QUERY_DDL'
#开启审计功能
server_audit_logging=on
#默认存放路径,可以不写,默认到data文件下
server_audit_file_path=/opt/mysqldata/auditlogs
#设置文件大小 默认1000000,1073741824=1GB
server_audit_file_rotate_size=1073741824
#指定日志文件的数量,如果为0日志将从不轮转
server_audit_file_rotations=0下面是我的配置文件
版本太低,不支持JSON,高版本支持JSON格式,方便Python处理,后续只能re来处理
```bash
#######server audit#####
server_audit_logging=ON
server_audit_file_path=/www/server/data/server_audit.log
server_audit=FORCE_PLUS_PERMANENT
server_audit_file_rotate_size=500M
server_audit_file_rotations=15
max_allowed_packet=32M
#audit_log_format=JSON
#audit_log_file=server_audit.json
#server_audit_logging=ON
#server_audit_file_path=/data/mysql/server_audit.json
#server_audit=FORCE_PLUS_PERMANENT
#server_audit_file_rotate_size=1G
#server_audit_file_rotations=10
#max_allowed_packet=32M
  • FORCE_PLUS_PERMANENT:防止插件被意外卸载
  • 日志轮转设置:避免日志文件无限增长
  • max_allowed_packet:确保大SQL语句能被完整记录

重启mysql生效!

/etc/init.d/mysql restart
Shutting down MySQL............... SUCCESS! 
Starting MySQL.. SUCCESS! 

检查日志状态

-- 检查插件状态 
SHOW PLUGINS WHERE NAME = 'server_audit';-- 查看审计设置 
SHOW GLOBAL VARIABLES LIKE 'server_audit%';

日志格式

20250610 10:42:49,test_db01,root,192.168.102.207,7972452,438352983,QUERY,xxxx,'SELECT  id,ext_id,ext_table_name,target_type,value1,value2,spread_config  FROM t_target \n \n WHERE (ext_id = \'12606411819336451111124\' AND ext_table_name = \'t_discount_coupon\')',0
20250610 10:42:50,test_db01,root,192.168.83.18,7972073,438352984,QUERY,gazelle_model_assemble,'SELECT  id,code,name,handler,status,retry_times,create_time,update_time  FROM batch      WHERE  (status = \'running\')',0
20250610 10:42:50,test_db01,root,192.168.102.207,7969972,438352986,QUERY,xxxx,'select * from t_event_send where sent = 0 order by created_time asc',0
20250610 10:42:50,test_db01,root,192.168.83.36,7853786,438352985,QUERY,hxxxdb,'SELECT\n            MAX(IF(status = 1, create_time, null)) nearBankTime\n        FROM user_bank_cards\n        WHERE create_time >= NOW() - INTERVAL 12 HOUR',0
20250610 10:42:50,test_db01,root,192.168.83.27,7971636,438352987,QUERY,hixxxx_core,'SELECT id, user_id, trade_no, out_trade_no, order_sn, lease_id, period_no, amount, trade_status, alipay_close_status, trade_type, remark, trade_way, business_type, entry_param, coupon_id, coupon_amount, deleted, gmt_create, gmt_modified FROM hire_trade_flows WHERE deleted = 0 AND (trade_status = 3 AND trade_type = 1 AND trade_way IN (1, 3, 4) AND business_type IN (3, 5, 2, 1, 14, 4, 6, 10, 13))',0
20250610 10:42:50,test_db01,root,192.168.83.25,7972144,438352988,QUERY,historical_customer_data,'SELECT 1',0
20250610 10:42:50,test_db01,root,192.168.83.25,7972144,438352989,QUERY,historical_customer_data,'SELECT 1',0

解释这条日志格式

20250610 10:42:50,test_db01,root,192.168.83.36,7853786,438352985,QUERY,hxxxdb,'SELECT\n            MAX(IF(status = 1, create_time, null)) nearBankTime\n        FROM user_bank_cards\n        WHERE create_time >= NOW() - INTERVAL 12 HOUR',0
字段编号字段值含义
120250610 10:42:50时间戳,表示审计事件发生的时间
2test_db01服务器主机名(或 MariaDB 实例名称)
3root数据库用户名,执行该语句的用户
4192.168.83.36客户端 IP 地址
57853786线程 ID(MariaDB 内部线程号)
6438352985查询 ID,用于区分不同的 SQL 查询
7QUERY事件类型QUERY 表示 SQL 查询)
8hxxxdb当前数据库名USE 的数据库)
9'SELECT\n MAX(IF(...SQL 语句内容,使用单引号包裹,\n 为换行符
100返回值 / 错误码0 表示成功(无错误)

审计日志处理系统实现

系统架构设计

[MySQL Server](生成审计日志)
[server_audit.log](Python监控进程)
[日志解析模块][MySQL审计数据库]

Python处理程序核心逻辑

20250610 10:42:50,test_db01,root,192.168.83.36,7853786,438352985,QUERY,hxxxdb,'SELECT\n            MAX(IF(status = 1, create_time, null)) nearBankTime\n        FROM user_bank_cards\n        WHERE create_time >= NOW() - INTERVAL 12 HOUR',0

我们有上面的日志格式,可以进行如下操作处理

日志监控关键特性:

  1. 断点续传:通过state文件记录读取位置
  2. 文件轮转检测:通过inode和dev识别日志切换
  3. 异常处理:完善的错误捕获和日志记录

在这里插入图片描述

def process_log_entry(new_content):"""处理单个审计日志条目"""# 定义正则表达式模式,用于解析审计日志格式# 模式匹配字段:时间,数据库名(1),用户名,源IP,连接ID,查询ID,事件类型,数据库名(2),SQL语句,执行结果pattern = r"^(\d{8} \d{2}:\d{2}:\d{2}),([\w]+),([\w]+),(\d+\.\d+\.\d+\.\d+),(\d+),(\d+),([\w]+),([\w]+),'(.*)',(\d+)"# 尝试匹配日志行match = re.match(pattern, new_content)if match:# 提取匹配的各个字段sql_time = match.group(1)        # 时间 (格式: 年月日 时分秒)database_name = match.group(8)   # 数据库名称 (从第8组获取)sql_content = match.group(9)      # SQL语句内容 (包含原始转义字符)s_sql_content = match.group(9)    # 保留原始SQL语句用于日志记录sql_result = match.group(10)      # SQL执行结果 (0表示成功)# 清洗SQL语句内容:# 1. 去除SQL注释(--、#和/* */类型的注释)COMMENT_PATTERN = re.compile(r"(--.*?\\r\\n|#.*?\\r\\n|/\*.*?\*/)", re.DOTALL | re.MULTILINE)sql_content = COMMENT_PATTERN.sub("", sql_content)# 2. 替换特殊转义字符#    - 将\n换行符替换为空格#    - 将\'转义单引号替换为普通单引号#    - 将\t制表符替换为空格#    - 将\r回车符替换为空格sql_content = sql_content.replace('\\n', ' ').replace('\\\'', '\'').replace('\\t', ' ').replace('\\r', ' ')# 3. 去除SQL语句前后的多余空格sql_content = sql_content.strip()# 检查是否为需要记录的DDL语句:# 1. SQL执行成功 (sql_result == "0")# 2. 是DDL操作 (CREATE/ALTER/DROP/RENAME开头)DDL_PATTERN = re.compile(r"^(CREATE|ALTER|DROP|RENAME)", re.IGNORECASE)if sql_result == "0" and DDL_PATTERN.match(sql_content):# 记录处理日志logging.info("-" * 50)logging.info(f"原始SQL内容: {s_sql_content}")logging.info(f"执行时间: {sql_time}")logging.info(f"数据库名称: {database_name}")logging.info(f"清洗后SQL: {sql_content}")logging.info("-" * 50)# 将审计记录保存到MySQL数据库save_to_mysql(sql_time, database_name, sql_content)
原始: 'SELECT\n            MAX(...) nearBankTime\n        FROM ...'
清洗后: 'SELECT MAX(...) nearBankTime FROM ...'

完整代码

# -*- coding: utf-8 -*-
import os, sys
import re
import time
import json, logging
from datetime import datetime
import pymysql# 日志配置
logging.basicConfig(filename='mysql_monitor.log',level=logging.INFO,format='[%(asctime)s] %(levelname)s: %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)# 读取缓存文件,定位日志读取位置
def load_state(state_file):"""Load saved state."""try:with open(state_file, 'r', encoding='utf-8') as f:state = json.load(f)return state.get('position', 0), state.get('inode', None), state.get('dev', None)except (IOError, ValueError):return 0, None, Noneexcept Exception as e:logging.error(f"Failed to load state: {str(e)}")return 0, None, None# 保存位置数据到文件中
def save_state(state_file, position, inode, dev):"""Save current state."""try:with open(state_file, 'w', encoding='utf-8') as f:json.dump({'position': position,'inode': inode,'dev': dev}, f)except Exception as e:logging.error(f"Failed to save state: {str(e)}")# 保存sql数据到数据中,方便可视化 或者  告警
def save_to_mysql(sql_time, database_name, sql_content):"""将日志信息写入到 MySQL 数据库的 audit_log 表中"""connection = pymysql.connect(host='192.168.102.201',user='root',password='xxxx.88',database='audit_db',charset='utf8mb4')try:with connection.cursor() as cursor:sql = """INSERT INTO audit_log (sql_time, database_name, sql_content)VALUES (%s, %s, %s)"""sql_time = datetime.strptime(sql_time, "%Y%m%d %H:%M:%S").strftime("%Y-%m-%d %H:%M:%S")cursor.execute(sql, (sql_time, database_name, sql_content))connection.commit()except Exception as e:logging.error(f"Failed to insert data into MySQL: {str(e)}")finally:connection.close()# 读取文件 ,数据 匹配 和 清洗
def process_log_entry(new_content):"""Process a single log entry."""pattern = r"^(\d{8} \d{2}:\d{2}:\d{2}),([\w]+),([\w]+),(\d+\.\d+\.\d+\.\d+),(\d+),(\d+),([\w]+),([\w]+),'(.*)',(\d+)"match = re.match(pattern, new_content)if match:sql_time = match.group(1)database_name = match.group(8)sql_content = match.group(9)s_sql_content = match.group(9)sql_result = match.group(10)# 先匹配存在DDL语句的sql内容# 去除-- /* # 等注释符#logging.info(f"1 {sql_content}")COMMENT_PATTERN = re.compile(r"(--.*?\\r\\n|#.*?\\r\\n|/\*.*?\*/)", re.DOTALL | re.MULTILINE)sql_content = COMMENT_PATTERN.sub("", sql_content)# 去除多余的换行符/'等#logging.info(f"2 {sql_content}")sql_content = sql_content.replace('\\n', ' ').replace('\\\'', '\'').replace('\\t', ' ').replace('\\r', ' ')#logging.info(f"3 {sql_content}")# 去除前后多余的空格sql_content = sql_content.strip()#logging.info(f"4 {sql_content}")# 检查是否为 DDL 语句DDL_PATTERN = re.compile(r"^(CREATE|ALTER|DROP|RENAME)", re.IGNORECASE)if sql_result == "0" and DDL_PATTERN.match(sql_content):logging.info("-" * 50)logging.info(f"SQL Content: {s_sql_content}")logging.info(f"Time: {sql_time}")logging.info(f"Database Name: {database_name}")logging.info(f"SQL Content: {sql_content}")logging.info("-" * 50)save_to_mysql(sql_time, database_name, sql_content)# 启动程序,传入 缓存 文件  和 审计日志记录 文件 位置
def monitor_log(state_file, log_file):"""Monitor MySQL audit logs."""last_position, last_inode, last_dev = load_state(state_file)while True:try:current_st = os.stat(log_file)except OSError:logging.warning("Log file not found, retrying...")time.sleep(1)continueif (current_st.st_ino != last_inode) or (current_st.st_dev != last_dev):logging.info("New log file detected")last_position = 0last_inode = current_st.st_inolast_dev = current_st.st_devsave_state(state_file, last_position, last_inode, last_dev)if current_st.st_size < last_position:logging.info("File truncated")last_position = 0save_state(state_file, last_position, last_inode, last_dev)if current_st.st_size > last_position:try:with open(log_file, 'r', encoding='utf-8', errors='replace') as f:f.seek(last_position)for line in f:process_log_entry(line.strip())last_position = f.tell()save_state(state_file, last_position, last_inode, last_dev)except Exception as e:logging.error(f"Error reading log file: {str(e)}")time.sleep(0.5)if __name__ == "__main__":log_file = '/data/mysql/server_audit.log'state_file = "/opt/mysql_audit_monitor/mysqlMonitor.state"logging.info(f"Starting MySQL audit log monitoring: {log_file}")try:monitor_log(state_file, log_file)except KeyboardInterrupt:logging.info("Monitoring stopped")sys.exit(0)

在这里插入图片描述

在这里插入图片描述

后记

生产环境建议
场景推荐方案
MySQL 5.7.34以下版本MariaDB插件方案
MySQL 8.0+Percona Server或迁移至MariaDB
审计日志存储独立MySQL实例,与业务库物理隔离
高频操作环境写入Elasticsearch替代MySQL

通过server_audit插件+Python监控的组合,可在MySQL社区版低成本实现企业级审计需求。但需注意:

📌 版本兼容性是成功前提,MySQL 8.0用户务必选择Percona/MariaDB
📌 日志清洗环节直接影响分析准确性(如转义符处理)
📌 审计日志存储分离是安全最佳实践

最终效果

  • 所有数据库操作可视化展示
  • 实时捕获高危DDL语句
  • 满足等保2.0/ISO 27001审计要求

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词