基于多平台耦合信息传播动力学模型的信息传播分析方法
基于python进行模拟三个平台的信息传播,用户参数14亿,考虑平台对其他平台的信息管制,平台的活动引力,用户审美疲劳;
和之前的SIR病毒感染有些相似
代码一些细节需要处理优化....
import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import solve_ivp
import random
from matplotlib.animation import FuncAnimation# 参数设定
N = 1400000000 # 总人数(14亿)
beta = 0.5 # 基础感染率
gamma_E = 1 / 7 # 暴露期恢复率(假设暴露期大约一周)
gamma_I = 1 / 14 # 感染期恢复率(假设感染期大约两周)
alpha_SE = 0.5 # 初始易感到暴露的转化率
alpha_EI = 0.3 # 初始暴露到感染的转化率
p_cross = 0.05 # 平台间交叉概率# 不同平台的参数(简化示例)
platforms = 3
beta_platform = [beta, beta * 0.8, beta * 0.7] # 假设不同平台的基础感染率不同
gamma_E_platform = [gamma_E, gamma_E, gamma_E]
gamma_I_platform = [gamma_I, gamma_I, gamma_I]
alpha_SE_platform = [alpha_SE, alpha_SE * 0.9, alpha_SE * 0.8]
alpha_EI_platform = [alpha_EI, alpha_EI * 0.9, alpha_EI * 0.8]# 初始条件
# 每个平台的初始分布人数
S0 = [N * 0.,N * 0.3,N * 0.7] # 初始易感人数
E0 = [N * 0.05,N * 0.1,N * 0.15] # 初始暴露人数
I0 = [1,0,0] # 初始感染人数
y0 = [*S0, *E0, *I0] # 初始状态向量# 时间向量
t_span = (0, 365)
t_eval = np.arange(0, 365, 1) # 模拟365天,每天更新一次# 平台活动参数
activity_duration = 7 # 活动持续时间(天)
activity_beta_increase = 1.5 # 活动期间感染率增加倍数
activity_times = [10, 0.25 * 365, 0.75 * 365] # 固定活动开始时间,避免随机性# 审美疲劳参数
fatigue_rate_SE = 0.001 # 易感到暴露转化率下降速率
fatigue_rate_EI = 0.001 # 暴露到感染转化率下降速率# 用户平台切换参数
switch_probability = 0.01 # 用户在每个时间步切换平台的概率# 用户在平台上的停留时间
stay_time = np.zeros(platforms)# 定义微分方程
def multi_platform_seir_model(t, y, N, platforms, beta_platform, gamma_E_platform, gamma_I_platform, alpha_SE_platform,alpha_EI_platform, p_cross, stay_time, switch_probability):S = y[:platforms]E = y[platforms:2 * platforms]I = y[2 * platforms:]dSdt = []dEdt = []dIdt = []for i in range(platforms):# 检查是否在活动期间in_activity = any(start <= t < start + activity_duration for start in activity_times)# 活动期间的感染率if in_activity:current_beta = beta_platform[i] * activity_beta_increaseelse:current_beta = beta_platform[i]# 审美疲劳效应current_alpha_SE = max(0, alpha_SE_platform[i] - fatigue_rate_SE * stay_time[i])current_alpha_EI = max(0, alpha_EI_platform[i] - fatigue_rate_EI * stay_time[i])# 用户平台切换if random.random() < switch_probability:# 随机选择一个平台进行切换new_platform = random.choice([j for j in range(platforms) if j != i])# 更新停留时间stay_time[new_platform] += 1stay_time[i] = 0# 逐渐转移用户transfer_amount = min(S[i] * 0.01, S[new_platform] * 0.01) # 转移量为当前平台和目标平台的较小者S[i] -= transfer_amountS[new_platform] += transfer_amountE[i] -= transfer_amountE[new_platform] += transfer_amountI[i] -= transfer_amountI[new_platform] += transfer_amountelse:# 如果没有切换,增加停留时间stay_time[i] += 1dSdt.append(-current_beta * S[i] * I[i] / N -p_cross * (S[i] - sum(S) / platforms))dEdt.append(current_beta * S[i] * I[i] / N -(gamma_E_platform[i] + current_alpha_SE) * E[i] -p_cross * (E[i] - sum(E) / platforms))dIdt.append(current_alpha_SE * E[i] -(gamma_I_platform[i] + current_alpha_EI) * I[i] -p_cross * (I[i] - sum(I) / platforms))return [*dSdt, *dEdt, *dIdt]# 一次性求解
sol = solve_ivp(lambda t, y: multi_platform_seir_model(t, y, N, platforms, beta_platform, gamma_E_platform, gamma_I_platform,alpha_SE_platform, alpha_EI_platform, p_cross, stay_time,switch_probability),t_span, y0, t_eval=t_eval, method='RK45', rtol=1e-6, atol=1e-6)
solution = sol.y# 解析解
S1, S2, S3, E1, E2, E3, I1, I2, I3 = solution# 可视化设置
fig, ax = plt.subplots()
ax.set_xlim(0, 365)
ax.set_ylim(0, N)
line_S1, = ax.plot([], [], label='Susceptible Platform 1')
line_E1, = ax.plot([], [], label='Exposed Platform 1')
line_I1, = ax.plot([], [], label='Infected Platform 1')
line_S2, = ax.plot([], [], label='Susceptible Platform 2')
line_E2, = ax.plot([], [], label='Exposed Platform 2')
line_I2, = ax.plot([], [], label='Infected Platform 2')
line_S3, = ax.plot([], [], label='Susceptible Platform 3')
line_E3, = ax.plot([], [], label='Exposed Platform 3')
line_I3, = ax.plot([], [], label='Infected Platform 3')
ax.legend()# 初始化动画
def init():for line in [line_S1, line_E1, line_I1, line_S2, line_E2, line_I2, line_S3, line_E3, line_I3]:line.set_data([], [])return [line_S1, line_E1, line_I1, line_S2, line_E2, line_I2, line_S3, line_E3, line_I3]# 更新函数
def update(frame):line_S1.set_data(t_eval[:frame + 1], S1[:frame + 1])line_E1.set_data(t_eval[:frame + 1], E1[:frame + 1])line_I1.set_data(t_eval[:frame + 1], I1[:frame + 1])line_S2.set_data(t_eval[:frame + 1], S2[:frame + 1])line_E2.set_data(t_eval[:frame + 1], E2[:frame + 1])line_I2.set_data(t_eval[:frame + 1], I2[:frame + 1])line_S3.set_data(t_eval[:frame + 1], S3[:frame + 1])line_E3.set_data(t_eval[:frame + 1], E3[:frame + 1])line_I3.set_data(t_eval[:frame + 1], I3[:frame + 1])return [line_S1, line_E1, line_I1, line_S2, line_E2, line_I2, line_S3, line_E3, line_I3]# 创建动画
ani = FuncAnimation(fig, update, frames=len(t_eval), init_func=init, blit=True, interval=50)plt.show()