欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 创投人物 > python2.7+flask1.1.4+SQLAlchemy1.3.0+Flask-SQLAlchemy2.1连接mysql稳定方式

python2.7+flask1.1.4+SQLAlchemy1.3.0+Flask-SQLAlchemy2.1连接mysql稳定方式

2025/6/18 12:12:21 来源:https://blog.csdn.net/weixin_45005012/article/details/148712048  浏览:    关键词:python2.7+flask1.1.4+SQLAlchemy1.3.0+Flask-SQLAlchemy2.1连接mysql稳定方式

经过了多次尝试,该版本是python2.7比较稳定好用的版本。以下即是完整代码

#coding=utf-8from config import config
from app.main import platform_app
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import OperationalError, InvalidRequestError, StatementError
import time
import traceback
import threading
import logginglogger = logging.getLogger(__name__)platform_app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}"
platform_app.config['SQLALCHEMY_POOL_PRE_PING'] = True
platform_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = Falseengine_options = {'pool_size': 200,'max_overflow': 100,'pool_timeout': 30,'pool_recycle': 600,'pool_pre_ping': True,'connect_args': {'connect_timeout': 5,'read_timeout': 30,'write_timeout': 30,'charset': 'utf8',}
}platform_app.config['SQLALCHEMY_ENGINE_OPTIONS'] = engine_optionsdb = SQLAlchemy(platform_app)#安全执行SQL函数
def safe_db_execute(func, *args, **kwargs):"""安全执行数据库操作,自动处理连接失效和事务问题"""max_retries = 3for attempt in range(max_retries):try:return func(*args, **kwargs)except (OperationalError, InvalidRequestError, StatementError) as e:error_str = str(e).lower()# 检查是否是需要回滚的错误if "can't reconnect until invalid transaction is rolled back" in error_str or \"mysql server has gone away" in error_str:logger.warning("数据库连接问题 (尝试 %d/%d): %s", attempt + 1, max_retries, str(e))# 尝试回滚事务try:db.session.rollback()logger.info("事务已回滚")except Exception as rollback_error:logger.error("回滚失败: %s", str(rollback_error))# 尝试关闭当前会话try:db.session.close()logger.info("会话已关闭")except Exception as close_error:logger.error("关闭会话失败: %s", str(close_error))# 刷新连接池try:db.engine.dispose()logger.info("连接池已刷新")except Exception as dispose_error:logger.error("刷新连接池失败: %s", str(dispose_error))# 重新创建会话try:# 在旧版本中需要这样重新创建会话db.session = db.create_scoped_session()logger.info("会话已重新创建")except:# 如果上面失败,尝试简单重新创建try:db.session = db.sessionmaker(bind=db.engine)logger.info("会话已重新创建(备用方法)")except Exception as session_error:logger.error("重新创建会话失败: %s", str(session_error))# 等待一段时间后重试time.sleep(1)else:# 如果是其他错误,直接抛出logger.error("数据库操作失败: %s", str(e))traceback.print_exc()raise# 如果重试多次都失败logger.error("数据库操作失败,重试 %d 次后放弃", max_retries)raise Exception("数据库操作失败,重试 {} 次后放弃".format(max_retries))#封装常用数据库操作
class SafeDBSession:@staticmethoddef execute(query, params=None):return safe_db_execute(db.session.execute, query, params)@staticmethoddef query(*entities):return safe_db_execute(db.session.query, *entities)@staticmethoddef add(instance):return safe_db_execute(db.session.add, instance)@staticmethoddef commit():return safe_db_execute(db.session.commit)@staticmethoddef rollback():try:db.session.rollback()logger.info("事务已回滚")except Exception as e:logger.error("回滚失败: %s", str(e))@staticmethoddef update(entity, filter_expr, update_values):"""安全更新操作:param entity: 模型类 (如 User):param filter_expr: 过滤表达式 (如 User.id == 1):param update_values: 更新值字典 (如 {'name': 'New Name'}):return: 更新的行数"""def _update():# 构建更新查询query = db.session.query(entity).filter(filter_expr)# 执行更新并返回影响的行数return query.update(update_values, synchronize_session=False)return safe_db_execute(_update)@staticmethoddef delete(entity, filter_expr=None, instance=None):"""安全删除操作:param entity: 模型类 (如 User):param filter_expr: 过滤表达式 (如 User.id == 1),用于批量删除:param instance: 要删除的实例对象,用于单个对象删除:return: 删除的行数"""def _delete():if instance:# 删除单个实例db.session.delete(instance)return 1  # 单个删除始终返回1elif filter_expr:# 批量删除query = db.session.query(entity).filter(filter_expr)return query.delete(synchronize_session=False)else:raise ValueError("必须提供filter_expr或instance参数")return safe_db_execute(_delete)@staticmethoddef get(entity, primary_key):"""安全获取单个对象"""def _get():return db.session.query(entity).get(primary_key)return safe_db_execute(_get)@staticmethoddef flush():"""安全刷新会话"""return safe_db_execute(db.session.flush)#确保在请求结束时移除会话
@platform_app.teardown_appcontext
def shutdown_session(exception=None):try:db.session.remove()logger.debug("会话已清理")except Exception as e:logger.error("清理会话时出错: %s", str(e))# 确保所有连接都关闭try:db.engine.dispose()logger.debug("连接池已刷新")except Exception as e:logger.error("刷新连接池失败: %s", str(e))def connection_heartbeat():"""定期执行心跳查询保持连接活跃"""while True:time.sleep(300)  # 每5分钟一次try:SafeDBSession.execute("SELECT 1")logger.debug("数据库心跳正常")except Exception as e:logger.error("数据库心跳失败: %s", str(e))#启动心跳线程
if not platform_app.debug:heartbeat_thread = threading.Thread(target=connection_heartbeat)heartbeat_thread.daemon = Trueheartbeat_thread.start()

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词