背景
公司要把重要的数据库的操作记录留存,方便查询操作可视化和告警监控等待。更能和其余相同的数据库保持一致。
本文将详细介绍如何通过MariaDB的server_audit插件为MySQL社区版实现完整的审计功能,包括版本兼容性处理、插件配置、日志解析和存储方案。
为什么MySQL社区版需要审计功能?
在公司数据库管理中,审计日志是安全合规和故障排查的重要工具。然而,许多使用MySQL社区版的企业面临一个尴尬的现实:原生的审计功能仅在企业版中提供。根据MySQL官方文档,社区版从5.7.34版本后不再支持第三方审计插件,这给需要合规审计的企业带来了挑战。
技术选型与版本兼容性分析
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
MySQL企业版审计插件 | 官方支持,功能完善 | 需要付费许可 | 预算充足的企业 |
MariaDB server_audit插件 | 免费开源,功能完整 | 版本兼容性复杂 | MySQL 5.7.34及以下版本 |
Percona审计插件 | 完全兼容MySQL | 需要迁移到Percona Server | 新建项目或可迁移环境 |
触发器+通用日志 | 无需额外组件 | 性能影响大,信息不完整 | 简单审计需求 |
在企业数据管理中,数据库操作审计是安全合规的核心需求。但MySQL社区版存在明显短板:
- 缺乏原生审计功能(仅企业版支持
audit_log
插件) - 版本兼容陷阱
- MySQL 5.7.34:最后一个可原生使用MariaDB插件的版本
- MySQL ≥5.7.34 无法使用MariaDB的
server_audit.so
插件 - MySQL 8.0 完全移除插件兼容性
- 替代方案选择
- 方案1:MySQL 5.7低版本 + MariaDB插件(需版本匹配)
- 方案2:迁移至Percona Server(免费开源,兼容MySQL 5.6/5.7/8.0且支持审计)
- 方案3:全面迁移到MariaDB
实践建议:在测试环境中验证插件兼容性,生产环境部署前进行完整的回归测试。
环境
环境如下:
MySQL
[root@test_db01 ~]# mysql -V
mysql Ver 14.14 Distrib 5.7.27, for Linux (x86_64) using EditLine wrapper
Cnetos7
[root@test_db01 ~]# cat /etc/redhat-release
CentOS Linux release 7.3.1611 (Core)Linux test_db01 3.10.0-514.el7.x86_64 #1 SMP Tue Nov 22 16:42:41 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
安装插件
查看插件安装目录
mysql> show global variables like 'plugin_dir';
+---------------+--------------------------+
| Variable_name | Value |
+---------------+--------------------------+
| plugin_dir | /usr/lib64/mysql/plugin/ |
+---------------+--------------------------+
1 row in set (0.00 sec)
提取mariadb审计插件并放置插件目录
此处需要注意版本,直接适配核对测试
wget https://downloads.mariadb.com/MariaDB/mariadb-10.5.16/bintar-linux-x86_64/mariadb-10.5.16-linux-x86_64.tar.gz
tar -zxvf mariadb-10.5.3-linux-x86_64.tar.gz
cp ./mariadb-10.5.3-linux-x86_64/lib/plugin/server_audit.so /usr/lib64/mysql/plugin/
chmod +x /usr/lib64/mysql/plugin/
mysql安装server_audit.so插件
mysql> install plugin server_audit soname 'server_audit.so';
Query OK, 0 rows affected (0.02 sec)
##也可以在my.cnf 加载插件方式安装
在my.cnf 设置 plugin_load = server_audit=server_audit.so
查看当前MySQL插件情况
mysql> show plugins;
| SERVER_AUDIT | ACTIVE | AUDIT | server_audit.so | GPL |
增加审计目录授权
mkdir /opt/mysqldata/auditlogs
chown -R mysql:mysql /opt/mysqldata/auditlogs
开启审计,写入配置文件
#防止server_audit 插件被卸载 进行配置文件配置
server_audit=FORCE_PLUS_PERMANENT
#指定哪些操作被记录到日志文件中
server_audit_events='CONNECT,QUERY,TABLE,QUERY_DDL'
#开启审计功能
server_audit_logging=on
#默认存放路径,可以不写,默认到data文件下
server_audit_file_path=/opt/mysqldata/auditlogs
#设置文件大小 默认1000000,1073741824=1GB
server_audit_file_rotate_size=1073741824
#指定日志文件的数量,如果为0日志将从不轮转
server_audit_file_rotations=0下面是我的配置文件
版本太低,不支持JSON,高版本支持JSON格式,方便Python处理,后续只能re来处理
```bash
#######server audit#####
server_audit_logging=ON
server_audit_file_path=/www/server/data/server_audit.log
server_audit=FORCE_PLUS_PERMANENT
server_audit_file_rotate_size=500M
server_audit_file_rotations=15
max_allowed_packet=32M
#audit_log_format=JSON
#audit_log_file=server_audit.json
#server_audit_logging=ON
#server_audit_file_path=/data/mysql/server_audit.json
#server_audit=FORCE_PLUS_PERMANENT
#server_audit_file_rotate_size=1G
#server_audit_file_rotations=10
#max_allowed_packet=32M
FORCE_PLUS_PERMANENT
:防止插件被意外卸载- 日志轮转设置:避免日志文件无限增长
max_allowed_packet
:确保大SQL语句能被完整记录
重启mysql生效!
/etc/init.d/mysql restart
Shutting down MySQL............... SUCCESS!
Starting MySQL.. SUCCESS!
检查日志状态
-- 检查插件状态
SHOW PLUGINS WHERE NAME = 'server_audit';-- 查看审计设置
SHOW GLOBAL VARIABLES LIKE 'server_audit%';
日志格式
20250610 10:42:49,test_db01,root,192.168.102.207,7972452,438352983,QUERY,xxxx,'SELECT id,ext_id,ext_table_name,target_type,value1,value2,spread_config FROM t_target \n \n WHERE (ext_id = \'12606411819336451111124\' AND ext_table_name = \'t_discount_coupon\')',0
20250610 10:42:50,test_db01,root,192.168.83.18,7972073,438352984,QUERY,gazelle_model_assemble,'SELECT id,code,name,handler,status,retry_times,create_time,update_time FROM batch WHERE (status = \'running\')',0
20250610 10:42:50,test_db01,root,192.168.102.207,7969972,438352986,QUERY,xxxx,'select * from t_event_send where sent = 0 order by created_time asc',0
20250610 10:42:50,test_db01,root,192.168.83.36,7853786,438352985,QUERY,hxxxdb,'SELECT\n MAX(IF(status = 1, create_time, null)) nearBankTime\n FROM user_bank_cards\n WHERE create_time >= NOW() - INTERVAL 12 HOUR',0
20250610 10:42:50,test_db01,root,192.168.83.27,7971636,438352987,QUERY,hixxxx_core,'SELECT id, user_id, trade_no, out_trade_no, order_sn, lease_id, period_no, amount, trade_status, alipay_close_status, trade_type, remark, trade_way, business_type, entry_param, coupon_id, coupon_amount, deleted, gmt_create, gmt_modified FROM hire_trade_flows WHERE deleted = 0 AND (trade_status = 3 AND trade_type = 1 AND trade_way IN (1, 3, 4) AND business_type IN (3, 5, 2, 1, 14, 4, 6, 10, 13))',0
20250610 10:42:50,test_db01,root,192.168.83.25,7972144,438352988,QUERY,historical_customer_data,'SELECT 1',0
20250610 10:42:50,test_db01,root,192.168.83.25,7972144,438352989,QUERY,historical_customer_data,'SELECT 1',0
解释这条日志格式
20250610 10:42:50,test_db01,root,192.168.83.36,7853786,438352985,QUERY,hxxxdb,'SELECT\n MAX(IF(status = 1, create_time, null)) nearBankTime\n FROM user_bank_cards\n WHERE create_time >= NOW() - INTERVAL 12 HOUR',0
字段编号 | 字段值 | 含义 |
---|---|---|
1 | 20250610 10:42:50 | 时间戳,表示审计事件发生的时间 |
2 | test_db01 | 服务器主机名(或 MariaDB 实例名称) |
3 | root | 数据库用户名,执行该语句的用户 |
4 | 192.168.83.36 | 客户端 IP 地址 |
5 | 7853786 | 线程 ID(MariaDB 内部线程号) |
6 | 438352985 | 查询 ID,用于区分不同的 SQL 查询 |
7 | QUERY | 事件类型(QUERY 表示 SQL 查询) |
8 | hxxxdb | 当前数据库名(USE 的数据库) |
9 | 'SELECT\n MAX(IF(... | SQL 语句内容,使用单引号包裹,\n 为换行符 |
10 | 0 | 返回值 / 错误码,0 表示成功(无错误) |
审计日志处理系统实现
系统架构设计
[MySQL Server] ↓ (生成审计日志)
[server_audit.log] ↓ (Python监控进程)
[日志解析模块] → [MySQL审计数据库]
Python处理程序核心逻辑
20250610 10:42:50,test_db01,root,192.168.83.36,7853786,438352985,QUERY,hxxxdb,'SELECT\n MAX(IF(status = 1, create_time, null)) nearBankTime\n FROM user_bank_cards\n WHERE create_time >= NOW() - INTERVAL 12 HOUR',0
我们有上面的日志格式,可以进行如下操作处理
日志监控关键特性:
- 断点续传:通过state文件记录读取位置
- 文件轮转检测:通过inode和dev识别日志切换
- 异常处理:完善的错误捕获和日志记录
def process_log_entry(new_content):"""处理单个审计日志条目"""# 定义正则表达式模式,用于解析审计日志格式# 模式匹配字段:时间,数据库名(1),用户名,源IP,连接ID,查询ID,事件类型,数据库名(2),SQL语句,执行结果pattern = r"^(\d{8} \d{2}:\d{2}:\d{2}),([\w]+),([\w]+),(\d+\.\d+\.\d+\.\d+),(\d+),(\d+),([\w]+),([\w]+),'(.*)',(\d+)"# 尝试匹配日志行match = re.match(pattern, new_content)if match:# 提取匹配的各个字段sql_time = match.group(1) # 时间 (格式: 年月日 时分秒)database_name = match.group(8) # 数据库名称 (从第8组获取)sql_content = match.group(9) # SQL语句内容 (包含原始转义字符)s_sql_content = match.group(9) # 保留原始SQL语句用于日志记录sql_result = match.group(10) # SQL执行结果 (0表示成功)# 清洗SQL语句内容:# 1. 去除SQL注释(--、#和/* */类型的注释)COMMENT_PATTERN = re.compile(r"(--.*?\\r\\n|#.*?\\r\\n|/\*.*?\*/)", re.DOTALL | re.MULTILINE)sql_content = COMMENT_PATTERN.sub("", sql_content)# 2. 替换特殊转义字符# - 将\n换行符替换为空格# - 将\'转义单引号替换为普通单引号# - 将\t制表符替换为空格# - 将\r回车符替换为空格sql_content = sql_content.replace('\\n', ' ').replace('\\\'', '\'').replace('\\t', ' ').replace('\\r', ' ')# 3. 去除SQL语句前后的多余空格sql_content = sql_content.strip()# 检查是否为需要记录的DDL语句:# 1. SQL执行成功 (sql_result == "0")# 2. 是DDL操作 (CREATE/ALTER/DROP/RENAME开头)DDL_PATTERN = re.compile(r"^(CREATE|ALTER|DROP|RENAME)", re.IGNORECASE)if sql_result == "0" and DDL_PATTERN.match(sql_content):# 记录处理日志logging.info("-" * 50)logging.info(f"原始SQL内容: {s_sql_content}")logging.info(f"执行时间: {sql_time}")logging.info(f"数据库名称: {database_name}")logging.info(f"清洗后SQL: {sql_content}")logging.info("-" * 50)# 将审计记录保存到MySQL数据库save_to_mysql(sql_time, database_name, sql_content)
原始: 'SELECT\n MAX(...) nearBankTime\n FROM ...'
清洗后: 'SELECT MAX(...) nearBankTime FROM ...'
完整代码
# -*- coding: utf-8 -*-
import os, sys
import re
import time
import json, logging
from datetime import datetime
import pymysql# 日志配置
logging.basicConfig(filename='mysql_monitor.log',level=logging.INFO,format='[%(asctime)s] %(levelname)s: %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)# 读取缓存文件,定位日志读取位置
def load_state(state_file):"""Load saved state."""try:with open(state_file, 'r', encoding='utf-8') as f:state = json.load(f)return state.get('position', 0), state.get('inode', None), state.get('dev', None)except (IOError, ValueError):return 0, None, Noneexcept Exception as e:logging.error(f"Failed to load state: {str(e)}")return 0, None, None# 保存位置数据到文件中
def save_state(state_file, position, inode, dev):"""Save current state."""try:with open(state_file, 'w', encoding='utf-8') as f:json.dump({'position': position,'inode': inode,'dev': dev}, f)except Exception as e:logging.error(f"Failed to save state: {str(e)}")# 保存sql数据到数据中,方便可视化 或者 告警
def save_to_mysql(sql_time, database_name, sql_content):"""将日志信息写入到 MySQL 数据库的 audit_log 表中"""connection = pymysql.connect(host='192.168.102.201',user='root',password='xxxx.88',database='audit_db',charset='utf8mb4')try:with connection.cursor() as cursor:sql = """INSERT INTO audit_log (sql_time, database_name, sql_content)VALUES (%s, %s, %s)"""sql_time = datetime.strptime(sql_time, "%Y%m%d %H:%M:%S").strftime("%Y-%m-%d %H:%M:%S")cursor.execute(sql, (sql_time, database_name, sql_content))connection.commit()except Exception as e:logging.error(f"Failed to insert data into MySQL: {str(e)}")finally:connection.close()# 读取文件 ,数据 匹配 和 清洗
def process_log_entry(new_content):"""Process a single log entry."""pattern = r"^(\d{8} \d{2}:\d{2}:\d{2}),([\w]+),([\w]+),(\d+\.\d+\.\d+\.\d+),(\d+),(\d+),([\w]+),([\w]+),'(.*)',(\d+)"match = re.match(pattern, new_content)if match:sql_time = match.group(1)database_name = match.group(8)sql_content = match.group(9)s_sql_content = match.group(9)sql_result = match.group(10)# 先匹配存在DDL语句的sql内容# 去除-- /* # 等注释符#logging.info(f"1 {sql_content}")COMMENT_PATTERN = re.compile(r"(--.*?\\r\\n|#.*?\\r\\n|/\*.*?\*/)", re.DOTALL | re.MULTILINE)sql_content = COMMENT_PATTERN.sub("", sql_content)# 去除多余的换行符/'等#logging.info(f"2 {sql_content}")sql_content = sql_content.replace('\\n', ' ').replace('\\\'', '\'').replace('\\t', ' ').replace('\\r', ' ')#logging.info(f"3 {sql_content}")# 去除前后多余的空格sql_content = sql_content.strip()#logging.info(f"4 {sql_content}")# 检查是否为 DDL 语句DDL_PATTERN = re.compile(r"^(CREATE|ALTER|DROP|RENAME)", re.IGNORECASE)if sql_result == "0" and DDL_PATTERN.match(sql_content):logging.info("-" * 50)logging.info(f"SQL Content: {s_sql_content}")logging.info(f"Time: {sql_time}")logging.info(f"Database Name: {database_name}")logging.info(f"SQL Content: {sql_content}")logging.info("-" * 50)save_to_mysql(sql_time, database_name, sql_content)# 启动程序,传入 缓存 文件 和 审计日志记录 文件 位置
def monitor_log(state_file, log_file):"""Monitor MySQL audit logs."""last_position, last_inode, last_dev = load_state(state_file)while True:try:current_st = os.stat(log_file)except OSError:logging.warning("Log file not found, retrying...")time.sleep(1)continueif (current_st.st_ino != last_inode) or (current_st.st_dev != last_dev):logging.info("New log file detected")last_position = 0last_inode = current_st.st_inolast_dev = current_st.st_devsave_state(state_file, last_position, last_inode, last_dev)if current_st.st_size < last_position:logging.info("File truncated")last_position = 0save_state(state_file, last_position, last_inode, last_dev)if current_st.st_size > last_position:try:with open(log_file, 'r', encoding='utf-8', errors='replace') as f:f.seek(last_position)for line in f:process_log_entry(line.strip())last_position = f.tell()save_state(state_file, last_position, last_inode, last_dev)except Exception as e:logging.error(f"Error reading log file: {str(e)}")time.sleep(0.5)if __name__ == "__main__":log_file = '/data/mysql/server_audit.log'state_file = "/opt/mysql_audit_monitor/mysqlMonitor.state"logging.info(f"Starting MySQL audit log monitoring: {log_file}")try:monitor_log(state_file, log_file)except KeyboardInterrupt:logging.info("Monitoring stopped")sys.exit(0)
后记
生产环境建议
场景 | 推荐方案 |
---|---|
MySQL 5.7.34以下版本 | MariaDB插件方案 |
MySQL 8.0+ | Percona Server或迁移至MariaDB |
审计日志存储 | 独立MySQL实例,与业务库物理隔离 |
高频操作环境 | 写入Elasticsearch替代MySQL |
通过server_audit
插件+Python监控的组合,可在MySQL社区版低成本实现企业级审计需求。但需注意:
📌 版本兼容性是成功前提,MySQL 8.0用户务必选择Percona/MariaDB
📌 日志清洗环节直接影响分析准确性(如转义符处理)
📌 审计日志存储分离是安全最佳实践
最终效果:
- 所有数据库操作可视化展示
- 实时捕获高危DDL语句
- 满足等保2.0/ISO 27001审计要求