#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import json
import time
import subprocess
import threading
import sys
from datetime import datetime, timedelta
import logging
import webbrowser
from flask import Flask, render_template, request, jsonify, session, redirect, url_for, make_response, send_from_directory, flash
import threading
from threading import Lock
import signal
import socket
import requests
import re
from flask_socketio import SocketIO # 添加WebSocket支持
import hashlib
import psutil
import urllib3
# 禁用不安全请求的警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 全局应用状态变量
shutdown_flag = False # 控制应用程序关闭的标志
# 初始化 SocketIO
socketio = SocketIO()
# 系统版本固定在代码中
VERSION = "v20250429213045" # 系统版本号 - 这是固定的本地版本
# 配置日志
def setup_logging():
try:
# 获取当前脚本所在目录
script_dir = os.path.dirname(os.path.abspath(__file__))
log_file = os.path.join(script_dir, 'dashboard.log')
# 确保日志文件存在
if not os.path.exists(log_file):
with open(log_file, 'w', encoding='utf-8') as f:
f.write('')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8', mode='a'),
logging.StreamHandler()
]
)
logging.info("Dashboard日志系统初始化成功")
except Exception as e:
print(f"日志系统初始化失败: {str(e)}")
# 如果文件日志失败,至少使用控制台日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
# 初始化日志系统
setup_logging()
# 获取或创建版本文件,格式为"当前时间(v版本号)",例如"20250408125815(v1.1)"。
def ensure_version_file():
"""确保版本文件存在,写入固定版本号"""
try:
version_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'VERSION.txt')
# 使用固定版本号
version_content = VERSION
# 写入版本文件
with open(version_file, 'w') as f:
f.write(version_content)
logging.info(f"更新版本文件: {version_content}")
return version_content
except Exception as e:
logging.error(f"处理版本文件时出错: {e}")
return VERSION # 返回固定版本号
# 获取线上版本进行比较
def get_online_version():
"""获取线上版本信息"""
try:
log("正在获取线上版本信息...")
# 获取系统代理设置
proxies = {
'http': os.environ.get('HTTP_PROXY', ''),
'https': os.environ.get('HTTPS_PROXY', '')
}
# 如果没有代理设置,尝试不使用代理直接访问
if not any(proxies.values()):
log("未检测到系统代理设置,尝试直接连接...")
response = requests.get(VERSION_CHECK_URL, timeout=5, verify=False)
else:
log(f"使用系统代理设置: {proxies}")
response = requests.get(VERSION_CHECK_URL, timeout=5, proxies=proxies, verify=False)
log(f"获取线上版本响应状态码: {response.status_code}")
if response.status_code == 200:
version = response.text.strip()
log(f"成功获取线上版本: {version}")
return version
elif response.status_code == 407:
log("需要代理认证,尝试不使用代理直接访问...")
# 尝试不使用代理直接访问
response = requests.get(VERSION_CHECK_URL, timeout=5, proxies={'http': None, 'https': None}, verify=False)
if response.status_code == 200:
version = response.text.strip()
log(f"直接访问成功获取线上版本: {version}")
return version
else:
log(f"直接访问失败,HTTP状态码: {response.status_code}")
return None
else:
log(f"获取线上版本失败,HTTP状态码: {response.status_code}")
return None
except requests.exceptions.Timeout:
log("获取线上版本超时")
return None
except requests.exceptions.ConnectionError:
log("连接线上版本服务器失败,尝试不使用代理直接访问...")
try:
# 尝试不使用代理直接访问
response = requests.get(VERSION_CHECK_URL, timeout=5, proxies={'http': None, 'https': None}, verify=False)
if response.status_code == 200:
version = response.text.strip()
log(f"直接访问成功获取线上版本: {version}")
return version
else:
log(f"直接访问失败,HTTP状态码: {response.status_code}")
return None
except Exception as e:
log(f"直接访问失败: {str(e)}")
return None
except Exception as e:
log(f"获取线上版本时发生未知错误: {str(e)}")
return None
VERSION_CHECK_URL = "https://gitea.ui-beam.cn/ui_beam/NetEaseDSMonitor/raw/branch/main/VERSION.txt" # 正式版本地址
version_status = {
'last_check_time': None,
'online_version': None,
'has_update': False
}
version_lock = Lock()
# 版本检测线程标志
version_check_thread_running = False
# 检查并安装依赖
def check_and_install_dependencies():
try:
# 检查plyer库是否安装
try:
import plyer
except ImportError:
log("正在安装plyer库...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "plyer"])
log("plyer库安装成功")
except Exception as e:
log(f"依赖检查/安装过程中出错: {str(e)}")
# 禁用 Flask 的访问日志
logging.getLogger('werkzeug').setLevel(logging.ERROR)
# 创建Flask应用
app = Flask(__name__)
app.secret_key = 'netease-dashboard-secret-key' # 用于session加密
app.permanent_session_lifetime = timedelta(days=30) # 设置会话有效期为30天
socketio.init_app(app) # 初始化 SocketIO
# 告警阈值
ALARM_THRESHOLD = 1900 # 两个系统折算总和的告警阈值
alarm_status = {
'last_alarm_time': None,
'alarm_count': 0,
'is_alarming': False,
'alarm_type': None
}
alarm_lock = Lock()
# 进程对象
processes = {
'breeze': None,
'cms': None,
'inspect': None
}
def log(message):
"""记录日志"""
try:
logging.info(message)
except Exception as e:
print("Log error: " + str(e))
print("Original message: " + message)
def start_monitor_processes():
"""启动监控进程"""
try:
# 获取会话凭据
breeze_cookie = session.get('breeze_cookie', '')
cms_cookie = session.get('cms_cookie', '')
inspect_cookie = session.get('inspect_cookie', '')
username = session.get('username', '')
backend_type = session.get('backend_type', 'breeze_monitor')
# 检查是否至少有一个Cookie可用
if not username or not (breeze_cookie or cms_cookie or inspect_cookie):
logging.error("缺少必要的会话凭据,至少需要一个系统的Cookie和用户名")
return False
# 终止现有进程
terminate_existing_processes()
# 根据提供的Cookie决定启动哪些监控进程
processes_started = 0
# 仅当提供了Breeze cookie时才启动Breeze监控
if breeze_cookie:
# 选择正确的监控脚本
if backend_type == 'breeze_monitor_CHAT':
monitor_script = 'breeze_monitor_CHAT.py'
else:
monitor_script = 'breeze_monitor.py'
if not os.path.exists(monitor_script):
logging.error(f"监控脚本不存在: {monitor_script}")
else:
# 启动Breeze监控进程
breeze_env = os.environ.copy()
breeze_env['BREEZE_COOKIE'] = breeze_cookie
breeze_env['BREEZE_USERNAME'] = username
# 定义最小化窗口的启动参数(仅Windows)
if sys.platform.startswith('win'):
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.wShowWindow = 6 # SW_MINIMIZE
else:
si = None
try:
global processes
processes['breeze'] = subprocess.Popen(
['python', monitor_script],
env=breeze_env,
creationflags=subprocess.CREATE_NEW_CONSOLE,
startupinfo=si
)
logging.info(f"已启动Breeze监控进程: {processes['breeze'].pid}")
processes_started += 1
except Exception as e:
logging.error(f"启动Breeze监控进程失败: {str(e)}")
else:
logging.info("未提供Breeze Cookie,跳过启动Breeze监控进程")
# 仅当提供了CMS cookie时才启动CMS监控
if cms_cookie:
if not os.path.exists('cms_monitor.py'):
logging.error("CMS监控脚本不存在")
else:
# 启动CMS监控进程
cms_env = os.environ.copy()
cms_env['CMS_COOKIE'] = cms_cookie
cms_env['CMS_USERNAME'] = username
# 定义最小化窗口的启动参数(仅Windows)
if sys.platform.startswith('win'):
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.wShowWindow = 6 # SW_MINIMIZE
else:
si = None
try:
processes['cms'] = subprocess.Popen(
['python', 'cms_monitor.py'],
env=cms_env,
creationflags=subprocess.CREATE_NEW_CONSOLE,
startupinfo=si
)
logging.info(f"已启动CMS监控进程: {processes['cms'].pid}")
processes_started += 1
except Exception as e:
logging.error(f"启动CMS监控进程失败: {str(e)}")
else:
logging.info("未提供CMS Cookie,跳过启动CMS监控进程")
# 仅当提供了CC审核平台cookie时才启动其监控进程
if inspect_cookie:
if not os.path.exists('inspect_monitor.py'):
logging.error("CC审核平台监控脚本不存在")
else:
# 启动CC审核平台监控进程
inspect_env = os.environ.copy()
inspect_env['INSPECT_COOKIE'] = inspect_cookie
inspect_env['INSPECT_USERNAME'] = username
# 定义最小化窗口的启动参数(仅Windows)
if sys.platform.startswith('win'):
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.wShowWindow = 6 # SW_MINIMIZE
else:
si = None
try:
processes['inspect'] = subprocess.Popen(
['python', 'inspect_monitor.py'],
env=inspect_env,
creationflags=subprocess.CREATE_NEW_CONSOLE,
startupinfo=si
)
logging.info(f"已启动CC审核平台监控进程: {processes['inspect'].pid}")
processes_started += 1
except Exception as e:
logging.error(f"启动CC审核平台监控进程失败: {str(e)}")
else:
logging.info("未提供CC审核平台Cookie,跳过启动其监控进程")
# 至少启动一个进程才算成功
if processes_started > 0:
logging.info(f"成功启动了 {processes_started} 个监控进程")
return True
else:
logging.error("没有启动任何监控进程")
return False
except Exception as e:
logging.error(f"启动监控进程失败: {str(e)}")
return False
def add_no_cache_headers(response):
"""添加禁止缓存的响应头"""
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
@app.after_request
def after_request(response):
"""每个响应添加禁止缓存的头部"""
return add_no_cache_headers(response)
@app.route('/favicon.ico')
def favicon():
"""提供网站图标"""
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
static_path = os.path.join(script_dir, 'static')
return send_from_directory(static_path, 'ds-favicon.ico', mimetype='image/vnd.microsoft.icon')
except Exception as e:
log("Failed to serve favicon: " + str(e))
return '', 204
@app.route('/')
def index():
"""主页路由"""
if not session.get('logged_in'):
return redirect(url_for('login'))
return redirect(url_for('dashboard'))
@app.route('/dashboard')
def dashboard():
"""仪表盘路由"""
if not session.get('logged_in'):
return redirect(url_for('login'))
# 获取当前登录用户信息
staff_name = session.get('staff_name', '')
username = session.get('username', '')
# 获取当前时间
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 获取统计数据
breeze_hourly = read_data_file('breeze_hourly.json')
breeze_daily = read_data_file('breeze_daily.json')
cms_hourly = read_data_file('cms_hourly.json')
cms_daily = read_data_file('cms_daily.json')
# 获取告警状态
alarm_status = get_alarm_status()
# 获取系数配置
breeze_coefficients = get_breeze_coefficients()
cms_coefficients = get_coefficients()
return render_template('dashboard.html',
staff_name=staff_name,
username=username,
current_time=current_time,
breeze_hourly=breeze_hourly,
breeze_daily=breeze_daily,
cms_hourly=cms_hourly,
cms_daily=cms_daily,
alarm_status=alarm_status,
breeze_coefficients=breeze_coefficients,
cms_coefficients=cms_coefficients,
version=VERSION)
def get_online_staff_data():
"""从在线链接获取staff.json数据"""
url = "http://scripts.ui-beam.com:5000/NetEaseDSMonitor/config/staff.json"
# 添加重试机制
max_retries = 3
retry_delay = 5 # 重试间隔(秒)
for attempt in range(max_retries):
try:
# 尝试不使用代理直接获取
try:
response = requests.get(url, timeout=10, proxies={'http': None, 'https': None}, verify=False)
except:
# 如果直接获取失败,尝试使用系统代理
proxies = {
'http': os.environ.get('HTTP_PROXY', ''),
'https': os.environ.get('HTTPS_PROXY', '')
}
response = requests.get(url, timeout=10, proxies=proxies, verify=False)
if response.status_code == 200:
staff_data = response.json()
log(f"从在线链接加载员工数据成功")
return staff_data
else:
error_msg = f"获取在线员工数据失败,HTTP状态码: {response.status_code}"
log(error_msg)
if attempt < max_retries - 1: # 如果不是最后一次重试
log(f"第{attempt + 1}次重试失败,{retry_delay}秒后重试...")
time.sleep(retry_delay)
continue
else:
log("所有重试均失败,返回空数据")
return {}
except Exception as e:
error_msg = f"获取在线员工数据时发生错误: {str(e)}"
log(error_msg)
if attempt < max_retries - 1: # 如果不是最后一次重试
log(f"第{attempt + 1}次重试失败,{retry_delay}秒后重试...")
time.sleep(retry_delay)
continue
else:
log("所有重试均失败,返回空数据")
return {}
return {}
def get_staff_name(staff_id):
"""根据工号获取员工姓名"""
try:
staff_data = get_online_staff_data()
if staff_data:
return staff_data.get(staff_id)
return None
except Exception as e:
log(f"获取员工姓名时发生错误: {str(e)}")
return None
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
try:
username = request.form.get('username', '').upper()
breeze_cookie = request.form.get('breeze_cookie', '')
cms_cookie = request.form.get('cms_cookie', '')
inspect_cookie = request.form.get('inspect_cookie', '')
backend_type = request.form.get('backend_type', 'breeze_monitor')
# 检查必填字段 - 用户名
if not username:
return jsonify({'code': 1, 'message': '请填写工号'})
# 检查至少一个Cookie必须填写
if not (breeze_cookie or cms_cookie or inspect_cookie):
return jsonify({'code': 1, 'message': '请至少填写一个系统的Cookie'})
# 尝试从staff.json获取姓名,如果不存在则使用工号
staff_name = get_staff_name(username)
#if not staff_name: # 启用工号验证
# return jsonify({'code': 1, 'message': f'您的工号({username})尚未在系统中注册或为不可用状态,无法登录,请联系系统管理员协助'})
display_name = f"{username} ({staff_name})" if staff_name else username # 禁用工号验证
# 保存会话信息
session['username'] = username
#session['staff_name'] = staff_name # 启用工号验证
session['staff_name'] = display_name # 禁用工号验证
session['breeze_cookie'] = breeze_cookie
session['cms_cookie'] = cms_cookie
session['inspect_cookie'] = inspect_cookie
session['backend_type'] = backend_type
session['logged_in'] = True # 添加登录标志
# 启动监控进程
start_monitor_processes()
return jsonify({
'code': 0,
'message': '登录成功',
'staff_name': display_name, # 禁用工号验证
#'staff_name': staff_name, # 启用工号验证
'redirect': '/dashboard'
})
except Exception as e:
logging.error(f"登录失败: {str(e)}")
return jsonify({'code': 1, 'message': f'登录失败: {str(e)}'})
return render_template('login.html', version=VERSION)
@app.route('/logout')
def logout():
"""退出系统"""
try:
# 记录退出用户
username = session.get('username', '未知用户')
log("用户 [" + username + "] 退出系统")
# 清除会话
session.clear()
# 标记系统即将退出
global shutdown_flag
shutdown_flag = True
# 尝试停止监控进程
if processes['breeze'] and processes['breeze'].poll() is None:
try:
processes['breeze'].terminate()
log("已停止Breeze监控进程")
except Exception as e:
log("停止Breeze监控进程失败: " + str(e))
if processes['cms'] and processes['cms'].poll() is None:
try:
processes['cms'].terminate()
log("已停止CMS监控进程")
except Exception as e:
log("停止CMS监控进程失败: " + str(e))
# 删除共享数据文件
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
data_files = ['breeze_daily.json', 'breeze_hourly.json', 'cms_daily.json', 'cms_hourly.json', 'inspect_daily.json', 'inspect_hourly.json', 'breeze_coefficients.json', 'cms_coefficients.json', 'inspect_coefficients.json']
for file_name in data_files:
file_path = os.path.join(script_dir, file_name)
for attempt in range(3): # 最多重试3次
try:
if os.path.exists(file_path):
os.remove(file_path)
log(f"已删除共享数据文件: {file_name}")
break
except Exception as e:
log(f"删除文件 {file_name} 失败: {str(e)},第{attempt+1}次重试")
time.sleep(1)
log("成功清理所有共享数据文件")
except Exception as e:
log(f"删除共享数据文件时出错: {str(e)}")
# 在一个后台线程中等待几秒后退出应用程序
def shutdown_app():
time.sleep(3) # 等待3秒
log("用户退出系统,系统正在完全关闭...")
# 关闭所有子进程和当前进程
if sys.platform.startswith('win'):
# 在Windows系统下,强制关闭当前进程及所有子进程
try:
# 获取当前进程PID
current_pid = os.getpid()
# 使用taskkill命令强制结束进程树
subprocess.run(f'taskkill /F /T /PID {current_pid}', shell=True)
except Exception as e:
log(f"关闭进程时出错: {str(e)}")
else:
# 非Windows系统使用标准退出方式
os._exit(0) # 强制关闭整个程序
# 启动关闭线程
threading.Thread(target=shutdown_app).start()
# 返回包含关闭倒计时的页面
return """
正在关闭系统...
"""
except Exception as e:
log("Error in logout route: " + str(e))
return "Logout error. Please check logs.", 500
def read_data_file(filename):
"""读取数据文件"""
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(script_dir, filename)
if not os.path.exists(file_path):
return None
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
except Exception as e:
log("Failed to read file {0}: {1}".format(filename, str(e)))
return None
def check_version():
"""检查系统版本并与在线版本比较"""
global version_check_thread_running
try:
log("开始检查系统版本更新...")
with version_lock:
version_status['last_check_time'] = datetime.now()
# 确保VERSION.txt文件存在且内容与VERSION一致
local_version = ensure_version_file()
log(f"当前本地版本: {local_version}")
# 获取在线版本
online_version = get_online_version()
log(f"获取到的在线版本: {online_version}")
if online_version:
with version_lock:
version_status['online_version'] = online_version
# 比较版本号,只在线上版本比本地版本新时才提示更新
local_timestamp = int(local_version[1:].split('-')[0]) # 去掉'v'前缀并转换为数字
online_timestamp = int(online_version[1:].split('-')[0]) # 去掉'v'前缀并转换为数字
version_status['has_update'] = online_timestamp > local_timestamp
log(f"版本比较结果 - 在线版本: {online_version}, 本地版本: {local_version}, 是否有更新: {version_status['has_update']}")
# 如果有更新且没有正在运行的线程,启动告警线程
if version_status['has_update'] and not version_check_thread_running:
version_check_thread_running = True
threading.Thread(target=notify_version_update).start()
return {
'current_version': VERSION,
'online_version': online_version,
'has_update': version_status['has_update'],
'last_check_time': version_status['last_check_time'].strftime('%Y-%m-%d %H:%M:%S') if version_status['last_check_time'] else None
}
else:
log("无法获取在线版本,返回本地版本信息")
return {
'current_version': VERSION,
'online_version': None,
'has_update': False,
'last_check_time': version_status['last_check_time'].strftime('%Y-%m-%d %H:%M:%S') if version_status['last_check_time'] else None
}
except Exception as e:
log(f"版本检查过程中发生错误: {str(e)}")
return {
'current_version': VERSION,
'online_version': None,
'has_update': False,
'last_check_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'error': str(e)
}
finally:
# 确保线程标志被重置
if version_check_thread_running:
version_check_thread_running = False
def notify_version_update():
"""通知用户有新版本可用"""
try:
# 确保VERSION.txt文件存在
ensure_version_file()
with version_lock:
online_version = version_status['online_version']
if online_version:
message = f"系统有新版本可用: {online_version},当前版本: {VERSION}"
log(message)
try:
# 发送桌面通知
from plyer import notification
notification.notify(
title="系统版本更新",
message=message + "\n点击'检测更新'按钮了解详情",
app_icon=None,
timeout=10,
toast=False
)
# 通过WebSocket通知前端弹出版本检测窗口
socketio.emit('version_update', {
'type': 'show_version_dialog',
'current_version': VERSION,
'online_version': online_version
})
except Exception as e:
log(f"显示桌面通知失败: {str(e)}")
except Exception as e:
log(f"通知版本更新失败: {str(e)}")
finally:
# 重置线程运行标志
global version_check_thread_running
version_check_thread_running = False
def monitor_version_thread():
"""版本检测后台线程"""
log("版本监控线程启动")
while True:
try:
# 每30分钟检查一次版本更新
check_version()
time.sleep(1800) # 30分钟
except Exception as e:
log(f"版本监控线程异常: {str(e)}")
time.sleep(600) # 出错后等待10分钟重试
@app.route('/api/get-version')
def get_version():
"""获取系统版本信息"""
try:
# 确保VERSION.txt文件存在
ensure_version_file()
with version_lock:
status = {
'current_version': VERSION,
'local_version': VERSION, # 固定的本地版本号
'online_version': version_status['online_version'],
'has_update': version_status['has_update'],
'last_check_time': version_status['last_check_time'].strftime('%Y-%m-%d %H:%M:%S') if version_status['last_check_time'] else None
}
return jsonify({
'success': True,
'data': status
})
except Exception as e:
log("Error in get_version route: " + str(e))
return jsonify({'success': False, 'message': 'Internal server error'}), 500
@app.route('/api/check-version')
def check_version_api():
"""手动检查版本更新的API"""
try:
result = check_version()
if result:
return jsonify({
'success': True,
'data': result
})
else:
# 至少返回当前版本信息
return jsonify({
'success': False,
'message': '获取在线版本信息失败',
'data': {
'current_version': VERSION,
'local_version': VERSION,
'online_version': None,
'has_update': False,
'last_check_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
})
except Exception as e:
log("Error in check_version_api route: " + str(e))
return jsonify({'success': False, 'message': f'检测版本更新失败: {str(e)}'}), 500
@app.route('/api/get-stats')
def get_stats():
"""获取所有统计数据"""
try:
if 'logged_in' not in session:
return jsonify({'success': False, 'message': 'Not logged in'})
try:
# 读取各个数据文件
breeze_hourly = read_data_file('breeze_hourly.json') or {
'stats': {
'weighted_total': 0,
'total': 0,
'categories': {},
'details': {}
},
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
breeze_daily = read_data_file('breeze_daily.json') or {
'stats': {
'weighted_total': 0,
'total': 0,
'categories': {},
'details': {}
},
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
cms_hourly = read_data_file('cms_hourly.json') or {
'stats': {
'weighted_total': 0,
'total': 0,
'comment': 0,
'feed': 0,
'complaint': 0
},
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
cms_daily = read_data_file('cms_daily.json') or {
'stats': {
'weighted_total': 0,
'total': 0,
'comment': 0,
'feed': 0,
'complaint': 0
},
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
# 读取CC审核平台数据
inspect_hourly = read_data_file('inspect_hourly.json') or {
'stats': []
}
inspect_daily = read_data_file('inspect_daily.json') or {
'stats': []
}
# 获取最新的统计数据
inspect_hourly_stats = inspect_hourly['stats'][-1] if inspect_hourly['stats'] else {
'total': 0,
'weighted_total': 0,
'timestamp': int(time.time())
}
inspect_daily_stats = inspect_daily['stats'][-1] if inspect_daily['stats'] else {
'total': 0,
'weighted_total': 0,
'timestamp': int(time.time())
}
# 获取当前的CMS系数设置
script_dir = os.path.dirname(os.path.abspath(__file__))
cms_coefficients_file = os.path.join(script_dir, 'cms_coefficients.json')
cms_coefficients = None
if os.path.exists(cms_coefficients_file):
try:
with open(cms_coefficients_file, 'r', encoding='utf-8') as f:
cms_coefficients = json.load(f)
log("从配置文件加载CMS系数")
except Exception as e:
log(f"读取CMS系数配置文件失败: {str(e)}")
# 如果无法读取配置,使用默认系数
if not cms_coefficients:
cms_coefficients = {
'comment': 0.55,
'feed': 1.54,
'complaint': 5.4
}
log("使用默认CMS系数")
# 获取当前的Breeze系数设置
breeze_coefficients_file = os.path.join(script_dir, 'breeze_coefficients.json')
breeze_coefficients = None
if os.path.exists(breeze_coefficients_file):
try:
with open(breeze_coefficients_file, 'r', encoding='utf-8') as f:
breeze_coefficients = json.load(f)
log("从配置文件加载Breeze系数")
except Exception as e:
log(f"读取Breeze系数配置文件失败: {str(e)}")
# 如果无法读取配置,不设置默认值,让前端使用现有值
if not breeze_coefficients:
log("无法读取Breeze系数配置文件")
# 合并结果
result = {
'breeze': {
'hourly': breeze_hourly.get('stats', {}),
'hourly_update': breeze_hourly.get('timestamp', ''),
'daily': breeze_daily.get('stats', {}),
'daily_update': breeze_daily.get('timestamp', ''),
'coefficients': breeze_coefficients
},
'cms': {
'hourly': cms_hourly.get('stats', {}),
'hourly_update': cms_hourly.get('timestamp', ''),
'daily': cms_daily.get('stats', {}),
'daily_update': cms_daily.get('timestamp', ''),
'coefficients': cms_coefficients
},
'inspect': {
'hourly': inspect_hourly_stats,
'hourly_update': datetime.fromtimestamp(inspect_hourly_stats['timestamp']).strftime('%Y-%m-%d %H:%M:%S'),
'daily': inspect_daily_stats,
'daily_update': datetime.fromtimestamp(inspect_daily_stats['timestamp']).strftime('%Y-%m-%d %H:%M:%S'),
'coefficients': {'default': 1.5}
},
'total': {
'hourly': 0,
'daily': 0
}
}
# 计算总折算数
if breeze_hourly and 'stats' in breeze_hourly and 'weighted_total' in breeze_hourly['stats']:
result['total']['hourly'] += breeze_hourly['stats']['weighted_total']
if cms_hourly and 'stats' in cms_hourly and 'weighted_total' in cms_hourly['stats']:
result['total']['hourly'] += cms_hourly['stats']['weighted_total']
if inspect_hourly_stats and 'weighted_total' in inspect_hourly_stats:
result['total']['hourly'] += inspect_hourly_stats['weighted_total']
if breeze_daily and 'stats' in breeze_daily and 'weighted_total' in breeze_daily['stats']:
result['total']['daily'] += breeze_daily['stats']['weighted_total']
if cms_daily and 'stats' in cms_daily and 'weighted_total' in cms_daily['stats']:
result['total']['daily'] += cms_daily['stats']['weighted_total']
if inspect_daily_stats and 'weighted_total' in inspect_daily_stats:
result['total']['daily'] += inspect_daily_stats['weighted_total']
# 检查告警阈值
if result['total']['hourly'] >= ALARM_THRESHOLD:
check_alarm(result['total']['hourly'])
# 通过WebSocket发送更新
try:
socketio.emit('stats_update', {
'success': True,
'data': result,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
log(f"WebSocket发送数据更新失败: {str(e)}")
return jsonify({
'success': True,
'data': result,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
log("Failed to get stats: " + str(e))
return jsonify({
'success': False,
'message': "Failed to get stats: " + str(e)
})
except Exception as e:
log("Error in get_stats route: " + str(e))
return jsonify({'success': False, 'message': 'Internal server error'}), 500
def check_alarm(total_weighted):
"""检查并触发告警"""
try:
with alarm_lock:
now = datetime.now()
# 如果最近30秒内已经告警过,增加计数但不重复告警
if alarm_status['last_alarm_time'] and (now - alarm_status['last_alarm_time']).total_seconds() < 30:
return
# 设置告警状态
alarm_status['last_alarm_time'] = now
alarm_status['alarm_count'] += 1
alarm_status['is_alarming'] = True
# 如果已经达到3次告警,不再继续
if alarm_status['alarm_count'] > 3:
return
# 启动线程播放告警声音和显示通知
# 将当前加权总值传递给函数
threading.Thread(target=lambda: show_alarm_notification(total_weighted)).start()
log("Alarm triggered: Current weighted total {0:.2f}, exceeded threshold {1}".format(total_weighted, ALARM_THRESHOLD))
except Exception as e:
log("Error in check_alarm: " + str(e))
def show_alarm_notification(total_weighted, is_test=False, alarm_type="实时告警"):
"""显示告警通知和播放告警声音"""
try:
# 播放系统告警声音
try:
import winsound
winsound.PlaySound("SystemExclamation", winsound.SND_ALIAS)
except:
# 如果winsound不可用,尝试使用beep
print('\a')
# 添加桌面通知
try:
from plyer import notification
# 计算超出阈值的百分比
over_percentage = ((total_weighted - ALARM_THRESHOLD) / ALARM_THRESHOLD) * 100
# 根据是否为测试调整标题
title = "测试告警" if is_test else "审核数量告警"
# 根据是否为测试调整消息前缀
prefix = "[测试数据] " if is_test else ""
notification.notify(
title=title,
message=f"{prefix}{alarm_type}当前小时加权总计:{total_weighted:.2f}\n阈值:{ALARM_THRESHOLD}\n超出:{over_percentage:.1f}%\n请立即检查数据监控看板!",
app_icon=None,
timeout=15,
toast=False
)
except Exception as e:
log("Failed to show desktop notification: " + str(e))
# WebSocket推送网页弹窗
try:
from flask_socketio import SocketIO
global socketio
over_percentage = ((total_weighted - ALARM_THRESHOLD) / ALARM_THRESHOLD) * 100
socketio.emit('alarm', {
'type': alarm_type,
'message': f"{alarm_type}:当前值:{total_weighted:.2f},超出{over_percentage:.1f}%"
})
except Exception as e:
log(f"WebSocket推送告警失败: {str(e)}")
except Exception as e:
log("Failed to play alarm sound: " + str(e))
@app.route('/api/get-alarm-status')
def get_alarm_status():
"""获取告警状态"""
try:
with alarm_lock:
status = {
'is_alarming': alarm_status['is_alarming'],
'alarm_count': alarm_status['alarm_count'],
'last_alarm_time': alarm_status['last_alarm_time'].strftime('%Y-%m-%d %H:%M:%S') if alarm_status['last_alarm_time'] else None,
'threshold': ALARM_THRESHOLD,
'alarm_type': alarm_status['alarm_type']
}
return jsonify({
'success': True,
'data': status
})
except Exception as e:
log("Error in get_alarm_status route: " + str(e))
return jsonify({'success': False, 'message': 'Internal server error'}), 500
@app.route('/api/reset-alarm')
def reset_alarm():
"""重置告警状态"""
try:
with alarm_lock:
alarm_status['is_alarming'] = False
alarm_status['alarm_count'] = 0
return jsonify({
'success': True,
'message': 'Alarm has been reset'
})
except Exception as e:
log("Error in reset_alarm route: " + str(e))
return jsonify({'success': False, 'message': 'Internal server error'}), 500
@app.route('/api/restart-monitoring')
def restart_monitoring():
"""重启监控进程"""
try:
if 'logged_in' not in session:
return jsonify({'success': False, 'message': 'Not logged in'})
# 获取Python可执行文件路径
python_executable = sys.executable
# 获取后端类型和凭据
backend_type = session.get('backend_type', 'breeze_monitor')
breeze_cookie = session.get('breeze_cookie', '')
cms_cookie = session.get('cms_cookie', '')
inspect_cookie = session.get('inspect_cookie', '')
username = session.get('username', '')
# 终止现有进程
processes_to_stop = ['breeze', 'cms', 'inspect']
for process_name in processes_to_stop:
if processes.get(process_name) and processes[process_name].poll() is None:
try:
processes[process_name].terminate()
processes[process_name].wait(timeout=5)
log(f"已停止{process_name.upper()}监控进程")
except Exception as e:
log(f"停止{process_name.upper()}监控进程失败: {str(e)}")
try:
processes[process_name].kill()
except:
pass
# 等待一下确保进程完全终止
time.sleep(1)
# 根据后端类型选择监控脚本
if backend_type == 'breeze_monitor_CHAT':
monitor_script = 'breeze_monitor_CHAT.py'
else:
monitor_script = 'breeze_monitor.py'
# 获取脚本所在目录
script_dir = os.path.dirname(os.path.abspath(__file__))
monitor_path = os.path.join(script_dir, monitor_script)
cms_path = os.path.join(script_dir, "cms_monitor.py")
inspect_path = os.path.join(script_dir, "inspect_monitor.py")
# 检查监控脚本是否存在
if not os.path.exists(monitor_path):
log(f"监控脚本不存在: {monitor_path}")
return jsonify({'success': False, 'message': f'监控脚本不存在: {monitor_path}'})
# 启动新的监控进程
try:
# 启动Breeze监控
env = os.environ.copy()
env['BREEZE_COOKIE'] = breeze_cookie
env['BREEZE_USERNAME'] = username
processes['breeze'] = subprocess.Popen(
[python_executable, monitor_path],
env=env,
shell=False,
creationflags=subprocess.CREATE_NEW_CONSOLE
)
log(f"已启动{backend_type}监控进程, PID: {processes['breeze'].pid}")
# 启动CMS监控
env = os.environ.copy()
env['CMS_COOKIE'] = cms_cookie
env['CMS_USERNAME'] = username
processes['cms'] = subprocess.Popen(
[python_executable, cms_path],
env=env,
shell=False,
creationflags=subprocess.CREATE_NEW_CONSOLE
)
log(f"CMS监控进程已启动, PID: {processes['cms'].pid}")
# 启动CC审核平台监控
if inspect_cookie:
env = os.environ.copy()
env['INSPECT_COOKIE'] = inspect_cookie
env['INSPECT_USERNAME'] = username
processes['inspect'] = subprocess.Popen(
[python_executable, inspect_path],
env=env,
shell=False,
creationflags=subprocess.CREATE_NEW_CONSOLE
)
log(f"CC审核平台监控进程已启动, PID: {processes['inspect'].pid}")
return jsonify({
'success': True,
'message': '监控进程已重启'
})
except Exception as e:
log(f"启动监控进程失败: {str(e)}")
return jsonify({
'success': False,
'message': f'启动监控进程失败: {str(e)}'
})
except Exception as e:
log(f"重启监控进程失败: {str(e)}")
return jsonify({
'success': False,
'message': f'重启监控进程失败: {str(e)}'
})
def open_browser():
"""在新线程中打开浏览器"""
time.sleep(1) # 等待服务器启动
try:
webbrowser.open('http://localhost:8000')
except Exception as e:
log("Failed to open browser: " + str(e))
@app.route('/api/check-now')
def check_now():
"""立即检查当前小时数据"""
try:
if 'logged_in' not in session:
return jsonify({'success': False, 'message': 'Not logged in'})
# 获取Python可执行文件路径
python_executable = sys.executable
# 获取后端类型和凭据
backend_type = session.get('backend_type', 'breeze_monitor')
breeze_cookie = session.get('breeze_cookie', '')
cms_cookie = session.get('cms_cookie', '')
inspect_cookie = session.get('inspect_cookie', '')
username = session.get('username', '')
# 终止现有进程
if not terminate_existing_processes():
return jsonify({
'success': False,
'message': '无法完全终止现有进程,请手动关闭后重试'
})
# 获取脚本所在目录
script_dir = os.path.dirname(os.path.abspath(__file__))
# 根据后端类型选择监控脚本
if backend_type == 'breeze_monitor_CHAT':
monitor_script = 'breeze_monitor_CHAT.py'
else:
monitor_script = 'breeze_monitor.py'
monitor_path = os.path.join(script_dir, monitor_script)
cms_path = os.path.join(script_dir, "cms_monitor.py")
inspect_path = os.path.join(script_dir, "inspect_monitor.py")
# 检查监控脚本是否存在
if not os.path.exists(monitor_path):
log(f"监控脚本不存在: {monitor_path}")
return jsonify({'success': False, 'message': f'监控脚本不存在: {monitor_path}'})
# 启动临时检查进程
try:
processes_started = []
# 启动临时Breeze检查
env = os.environ.copy()
env['BREEZE_COOKIE'] = breeze_cookie
env['BREEZE_USERNAME'] = username
temp_breeze = subprocess.Popen(
[python_executable, monitor_path, "--check-now", "--force"],
env=env,
shell=False,
creationflags=subprocess.CREATE_NEW_CONSOLE
)
processes_started.append(('Breeze', temp_breeze))
# 启动临时CMS检查
env = os.environ.copy()
env['CMS_COOKIE'] = cms_cookie
env['CMS_USERNAME'] = username
temp_cms = subprocess.Popen(
[python_executable, cms_path, "--check-now", "--force"],
env=env,
shell=False,
creationflags=subprocess.CREATE_NEW_CONSOLE
)
processes_started.append(('CMS', temp_cms))
# 启动临时CC审核平台检查
if inspect_cookie:
env = os.environ.copy()
env['INSPECT_COOKIE'] = inspect_cookie
env['INSPECT_USERNAME'] = username
temp_inspect = subprocess.Popen(
[python_executable, inspect_path, "--check-now", "--force"],
env=env,
shell=False,
creationflags=subprocess.CREATE_NEW_CONSOLE
)
processes_started.append(('Inspect', temp_inspect))
# 等待所有进程完成或超时
wait_start = time.time()
while time.time() - wait_start < 30: # 最多等待30秒
all_done = True
for name, proc in processes_started:
if proc.poll() is None:
all_done = False
break
if all_done:
break
time.sleep(0.5)
# 检查是否所有进程都已完成
failed_processes = []
for name, proc in processes_started:
if proc.poll() is None:
try:
proc.terminate()
failed_processes.append(name)
except:
pass
if failed_processes:
return jsonify({
'success': False,
'message': f'以下检查进程未能在30秒内完成: {", ".join(failed_processes)}'
})
# 重新启动常规监控进程
if not start_monitor_processes():
return jsonify({
'success': False,
'message': '临时检查完成,但重启常规监控失败'
})
return jsonify({
'success': True,
'message': '数据检查完成并重启了监控进程'
})
except Exception as e:
log(f"启动临时检查进程失败: {str(e)}")
# 确保重启常规监控进程
start_monitor_processes()
return jsonify({
'success': False,
'message': f'启动临时检查进程失败: {str(e)}'
})
except Exception as e:
log(f"临时检查失败: {str(e)}")
# 确保重启常规监控进程
start_monitor_processes()
return jsonify({
'success': False,
'message': f'临时检查失败: {str(e)}'
})
@app.route('/api/test-alarm')
def test_alarm():
"""测试告警功能"""
try:
if 'logged_in' not in session:
return jsonify({'success': False, 'message': 'Not logged in'})
# 检查是否为实际数据告警
is_real_data = request.args.get('real_data', 'false').lower() == 'true'
# 根据请求类型选择不同的消息
message = "实时告警" if is_real_data else "测试告警"
# 立即触发第一次告警
if not is_real_data:
# 手动触发告警并传递测试告警值
show_alarm_notification(ALARM_THRESHOLD + 100, is_test=True, alarm_type="测试告警")
else:
# 从API获取当前真实的小时总量
try:
breeze_hourly = read_data_file('breeze_hourly.json') or {'stats': {'weighted_total': 0}}
cms_hourly = read_data_file('cms_hourly.json') or {'stats': {'weighted_total': 0}}
total_hourly = 0
if 'stats' in breeze_hourly and 'weighted_total' in breeze_hourly['stats']:
total_hourly += breeze_hourly['stats']['weighted_total']
if 'stats' in cms_hourly and 'weighted_total' in cms_hourly['stats']:
total_hourly += cms_hourly['stats']['weighted_total']
# 只有在实际超过阈值时才显示通知
if total_hourly >= ALARM_THRESHOLD:
show_alarm_notification(total_hourly, is_test=False, alarm_type="实时告警")
except Exception as e:
log(f"Error getting real data for alarm: {str(e)}")
# 设置告警状态,启用连续告警
with alarm_lock:
now = datetime.now()
alarm_status['last_alarm_time'] = now
alarm_status['is_alarming'] = True
alarm_status['alarm_type'] = message # 保存告警类型
# 设置告警计数为1,开始计数
alarm_status['alarm_count'] = 1
# 创建后台线程发送连续告警
if not is_real_data: # 仅对测试告警启用连续通知
threading.Thread(target=lambda: send_sequential_test_alarms("测试告警")).start()
log(f"{message} triggered by user")
return jsonify({
'success': True,
'message': f'{message} successful'
})
except Exception as e:
log("Error in test_alarm route: " + str(e))
return jsonify({'success': False, 'message': 'Failed to trigger alarm: ' + str(e)}), 500
@app.route('/api/get-coefficients')
def get_coefficients():
"""获取CMS系数配置"""
try:
if 'logged_in' not in session:
return jsonify({'success': False, 'message': 'Not logged in'})
# 从CMS系数配置文件读取
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
coefficients_file = os.path.join(script_dir, 'cms_coefficients.json')
if not os.path.exists(coefficients_file):
# 如果配置文件不存在,返回默认系数
default_coefficients = {
'comment': 0.55,
'feed': 1.54,
'complaint': 5.4
}
return jsonify({
'success': True,
'data': default_coefficients,
'message': 'Using default coefficients'
})
with open(coefficients_file, 'r', encoding='utf-8') as f:
coefficients = json.load(f)
return jsonify({
'success': True,
'data': coefficients
})
except Exception as e:
log(f"Error reading coefficients file: {str(e)}")
return jsonify({
'success': False,
'message': f'Error reading coefficients: {str(e)}'
})
except Exception as e:
log("Error in get_coefficients route: " + str(e))
return jsonify({'success': False, 'message': 'Internal server error'}), 500
@app.route('/api/update-coefficients', methods=['POST'])
def update_coefficients():
"""更新CMS系数配置"""
try:
if 'logged_in' not in session:
return jsonify({'success': False, 'message': 'Not logged in'})
# 获取请求中的系数
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': 'No data provided'
})
# 验证系数格式
required_keys = ['comment', 'feed', 'complaint']
for key in required_keys:
if key not in data:
return jsonify({
'success': False,
'message': f'Missing required key: {key}'
})
# 验证值是数字
try:
data[key] = float(data[key])
except ValueError:
return jsonify({
'success': False,
'message': f'Value for {key} must be a number'
})
# 确保只有必要的键
coefficients = {
'comment': data['comment'],
'feed': data['feed'],
'complaint': data['complaint']
}
# 保存到配置文件
script_dir = os.path.dirname(os.path.abspath(__file__))
coefficients_file = os.path.join(script_dir, 'cms_coefficients.json')
with open(coefficients_file, 'w', encoding='utf-8') as f:
json.dump(coefficients, f, indent=4, ensure_ascii=False)
log(f"CMS系数已更新: 评论={coefficients['comment']}, 动态={coefficients['feed']}, 举报={coefficients['complaint']}")
# 使用命令行参数方式通知CMS监控进程更新系数
try:
# 获取Python可执行文件路径
python_executable = sys.executable
# 获取CMS监控脚本路径
cms_script = os.path.join(script_dir, "cms_monitor.py")
# 获取环境变量
env = os.environ.copy()
env['CMS_COOKIE'] = session.get('cms_cookie', '')
env['CMS_USERNAME'] = session.get('username', '')
# 启动进程更新系数
cmd = [
python_executable,
cms_script,
"--update-coefficients",
str(coefficients['comment']),
str(coefficients['feed']),
str(coefficients['complaint'])
]
subprocess.Popen(
cmd,
env=env,
shell=False
)
log("已发送系数更新命令给CMS监控进程")
except Exception as e:
log(f"发送系数更新命令失败: {str(e)}")
return jsonify({
'success': True,
'message': 'Coefficients updated successfully',
'data': coefficients
})
except Exception as e:
log(f"更新系数失败: {str(e)}")
return jsonify({
'success': False,
'message': f'Error updating coefficients: {str(e)}'
})
except Exception as e:
log("Error in update_coefficients route: " + str(e))
return jsonify({'success': False, 'message': 'Internal server error'}), 500
def send_sequential_test_alarms(alarm_type):
"""发送连续的测试告警通知"""
try:
# 等待10秒后发送第二次告警
time.sleep(10)
with alarm_lock:
if alarm_status['is_alarming'] and alarm_status['alarm_count'] < 3:
alarm_status['alarm_count'] = 2
# 发送第二次告警
show_alarm_notification(ALARM_THRESHOLD + 120, is_test=True, alarm_type=alarm_type)
log("Second test alarm notification sent")
# 再等待10秒后发送第三次告警
time.sleep(10)
with alarm_lock:
if alarm_status['is_alarming'] and alarm_status['alarm_count'] < 3:
alarm_status['alarm_count'] = 3
# 发送第三次告警
show_alarm_notification(ALARM_THRESHOLD + 150, is_test=True, alarm_type=alarm_type)
log("Third test alarm notification sent")
except Exception as e:
log(f"Error in sequential test alarms: {str(e)}")
@app.route('/api/get-breeze-coefficients')
def get_breeze_coefficients():
"""获取Breeze系数配置"""
try:
if 'logged_in' not in session:
return jsonify({'success': False, 'message': 'Not logged in'})
# 从Breeze系数配置文件读取
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
coefficients_file = os.path.join(script_dir, 'breeze_coefficients.json')
if not os.path.exists(coefficients_file):
# 如果配置文件不存在,返回错误信息
return jsonify({
'success': False,
'message': 'Breeze coefficients file not found'
})
with open(coefficients_file, 'r', encoding='utf-8') as f:
coefficients = json.load(f)
return jsonify({
'success': True,
'data': coefficients
})
except Exception as e:
log(f"Error reading Breeze coefficients file: {str(e)}")
return jsonify({
'success': False,
'message': f'Error reading coefficients: {str(e)}'
})
except Exception as e:
log("Error in get_breeze_coefficients route: " + str(e))
return jsonify({'success': False, 'message': 'Internal server error'}), 500
@app.route('/api/update-breeze-coefficients', methods=['POST'])
def update_breeze_coefficients():
"""更新Breeze系数配置"""
try:
if 'logged_in' not in session:
return jsonify({'success': False, 'message': 'Not logged in'})
# 获取请求中的系数
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': 'No data provided'
})
# 验证系数格式
if not isinstance(data, dict):
return jsonify({
'success': False,
'message': 'Invalid data format, expected a dictionary'
})
# 验证所有值是数字
for key, value in data.items():
try:
data[key] = float(value)
except ValueError:
return jsonify({
'success': False,
'message': f'Value for {key} must be a number'
})
# 保存到配置文件
script_dir = os.path.dirname(os.path.abspath(__file__))
coefficients_file = os.path.join(script_dir, 'breeze_coefficients.json')
with open(coefficients_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
log(f"Breeze系数已更新")
# 使用命令行参数方式通知Breeze监控进程更新系数
try:
# 获取Python可执行文件路径
python_executable = sys.executable
# 获取Breeze监控脚本路径
breeze_script = os.path.join(script_dir, "breeze_monitor.py")
# 获取环境变量
env = os.environ.copy()
env['BREEZE_COOKIE'] = session.get('breeze_cookie', '')
env['BREEZE_USERNAME'] = session.get('username', '')
# 启动进程更新系数 - 一次只更新一个系数
for coefficient_type, coefficient_value in data.items():
cmd = [
python_executable,
breeze_script,
"--update-coefficients",
coefficient_type,
str(coefficient_value)
]
subprocess.Popen(
cmd,
env=env,
shell=False
)
log("已发送系数更新命令给Breeze监控进程")
except Exception as e:
log(f"发送系数更新命令失败: {str(e)}")
return jsonify({
'success': True,
'message': 'Breeze coefficients updated successfully',
'data': data
})
except Exception as e:
log(f"更新Breeze系数失败: {str(e)}")
return jsonify({
'success': False,
'message': f'Error updating coefficients: {str(e)}'
})
except Exception as e:
log("Error in update_breeze_coefficients route: " + str(e))
return jsonify({'success': False, 'message': 'Internal server error'}), 500
@app.route('/api/acknowledge-alarm')
def acknowledge_alarm():
"""确认并重置告警状态"""
try:
with alarm_lock:
# 完全重置告警状态
alarm_status['is_alarming'] = False
alarm_status['alarm_count'] = 0
alarm_status['last_alarm_time'] = None
# 记录告警已被确认
log("告警已被用户确认并重置")
return jsonify({
'success': True,
'message': 'Alarm has been acknowledged and reset'
})
except Exception as e:
log("Error in acknowledge_alarm route: " + str(e))
return jsonify({'success': False, 'message': 'Internal server error'}), 500
def is_port_available(port):
"""检查端口是否可用"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
result = s.connect_ex(('127.0.0.1', port))
return result != 0
except Exception as e:
log(f"检查端口可用性出错: {str(e)}")
return False
def free_port(port):
"""尝试释放被占用的端口"""
try:
if sys.platform.startswith('win'):
# Windows系统
os.system(f'for /f "tokens=5" %a in (\'netstat -ano ^| findstr :{port}\') do taskkill /f /pid %a')
return True
else:
# Linux/Mac系统
os.system(f"kill $(lsof -t -i:{port})")
return True
except Exception as e:
log(f"释放端口时出错: {str(e)}")
return False
def find_available_port(start_port, end_port):
"""查找可用端口"""
for port in range(start_port, end_port + 1):
if is_port_available(port):
return port
return None
@app.route('/api/check-cms-daily', methods=['POST'])
def check_cms_daily_data():
"""立即检查CMS每日数据"""
if 'user' not in session:
return jsonify({
'success': False,
'message': '请先登录系统'
})
try:
# 获取后端类型
backend_type = session.get('backend_type', 'breeze_monitor')
# 准备环境变量
env = os.environ.copy()
env['CMS_COOKIE'] = session.get('cms_cookie', '')
env['CMS_USERNAME'] = session.get('username', '')
# 根据后端类型选择监控脚本
if backend_type == 'breeze_monitor_CHAT':
monitor_script = 'breeze_monitor_CHAT.py'
else:
monitor_script = 'breeze_monitor.py'
# 设置--check-daily参数运行监控脚本
cmd = [sys.executable, os.path.join(os.path.dirname(os.path.abspath(__file__)), monitor_script), '--check-daily']
# 运行子进程
process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 等待进程完成
stdout, stderr = process.communicate(timeout=60)
if process.returncode == 0:
app.logger.info('成功执行CMS每日数据检查')
return jsonify({
'success': True,
'message': 'CMS每日数据检查已执行'
})
else:
app.logger.error(f'CMS每日数据检查失败: {stderr.decode("utf-8", errors="ignore")}')
return jsonify({
'success': False,
'message': f'CMS每日数据检查失败: {stderr.decode("utf-8", errors="ignore")}'
})
except subprocess.TimeoutExpired:
app.logger.error('CMS每日数据检查超时')
return jsonify({
'success': False,
'message': 'CMS每日数据检查超时,请检查系统日志'
})
except Exception as e:
app.logger.error(f'执行CMS每日数据检查时发生错误: {str(e)}')
return jsonify({
'success': False,
'message': f'执行CMS每日数据检查时发生错误: {str(e)}'
})
@app.route('/api/check-cms-hourly', methods=['POST'])
def check_cms_hourly_data():
"""立即检查CMS当前小时数据"""
if 'user' not in session:
return jsonify({
'success': False,
'message': '请先登录系统'
})
try:
# 获取后端类型
backend_type = session.get('backend_type', 'breeze_monitor')
# 准备环境变量
env = os.environ.copy()
env['CMS_COOKIE'] = session.get('cms_cookie', '')
env['CMS_USERNAME'] = session.get('username', '')
# 根据后端类型选择监控脚本
if backend_type == 'breeze_monitor_CHAT':
monitor_script = 'breeze_monitor_CHAT.py'
else:
monitor_script = 'breeze_monitor.py'
# 设置--check-hourly参数运行监控脚本
cmd = [sys.executable, os.path.join(os.path.dirname(os.path.abspath(__file__)), monitor_script), '--check-hourly']
# 运行子进程
process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 等待进程完成
stdout, stderr = process.communicate(timeout=60)
if process.returncode == 0:
app.logger.info('成功执行CMS当前小时数据检查')
return jsonify({
'success': True,
'message': 'CMS当前小时数据检查已执行'
})
else:
app.logger.error(f'CMS当前小时数据检查失败: {stderr.decode("utf-8", errors="ignore")}')
return jsonify({
'success': False,
'message': f'CMS当前小时数据检查失败: {stderr.decode("utf-8", errors="ignore")}'
})
except subprocess.TimeoutExpired:
app.logger.error('CMS当前小时数据检查超时')
return jsonify({
'success': False,
'message': 'CMS当前小时数据检查超时,请检查系统日志'
})
except Exception as e:
app.logger.error(f'执行CMS当前小时数据检查时发生错误: {str(e)}')
return jsonify({
'success': False,
'message': f'执行CMS当前小时数据检查时发生错误: {str(e)}'
})
@app.route('/api/check-version-service')
def check_version_service():
"""检查版本检测服务的可用性"""
try:
log("正在检查版本检测服务可用性...")
# 测试连接
try:
response = requests.get(VERSION_CHECK_URL, timeout=5)
log(f"版本检测服务响应状态码: {response.status_code}")
service_status = {
'url': VERSION_CHECK_URL,
'is_available': response.status_code == 200,
'status_code': response.status_code,
'response_time': response.elapsed.total_seconds(),
'content_length': len(response.content) if response.status_code == 200 else 0,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
if response.status_code == 200:
service_status['content'] = response.text.strip()
log(f"版本检测服务正常,返回内容: {service_status['content']}")
else:
log(f"版本检测服务异常,HTTP状态码: {response.status_code}")
return jsonify({
'success': True,
'data': service_status
})
except requests.exceptions.Timeout:
log("版本检测服务连接超时")
return jsonify({
'success': False,
'message': '版本检测服务连接超时',
'data': {
'url': VERSION_CHECK_URL,
'is_available': False,
'error': 'timeout',
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
})
except requests.exceptions.ConnectionError:
log("无法连接到版本检测服务")
return jsonify({
'success': False,
'message': '无法连接到版本检测服务',
'data': {
'url': VERSION_CHECK_URL,
'is_available': False,
'error': 'connection_error',
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
})
except Exception as e:
log(f"检查版本检测服务时发生未知错误: {str(e)}")
return jsonify({
'success': False,
'message': f'检查版本检测服务时发生错误: {str(e)}',
'data': {
'url': VERSION_CHECK_URL,
'is_available': False,
'error': 'unknown',
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
})
except Exception as e:
log(f"版本检测服务检查接口发生错误: {str(e)}")
return jsonify({
'success': False,
'message': '版本检测服务检查失败',
'error': str(e)
}), 500
@app.route('/api/update-system', methods=['POST'])
def update_system():
"""处理系统更新请求"""
try:
# 获取桌面路径和快捷方式路径
desktop_path = os.path.join(os.path.expanduser('~'), 'Desktop')
temp_dir = 'monitor_temp'
temp_path = os.path.join(desktop_path, temp_dir)
shortcut_path = os.path.join(temp_path, '网易大神实时审核数据监控看板一键安装.lnk')
# 下载快捷方式
shortcut_url = 'http://cos.ui-beam.com/work_scripts/monitor/%E7%BD%91%E6%98%93%E5%A4%A7%E7%A5%9E%E5%AE%A1%E6%A0%B8%E6%95%B0%E6%8D%AE%E7%9B%91%E6%8E%A7%E7%9C%8B%E6%9D%BF%E4%B8%80%E9%94%AE%E5%AE%89%E8%A3%85.lnk'
log(f"开始下载更新文件: {shortcut_url}")
# 获取系统代理设置
proxies = {
'http': os.environ.get('HTTP_PROXY', ''),
'https': os.environ.get('HTTPS_PROXY', '')
}
# 首先尝试不使用代理直接下载
try:
log("尝试直接下载更新文件...")
response = requests.get(shortcut_url, timeout=30, proxies={'http': None, 'https': None}, verify=False)
if response.status_code == 200:
with open(shortcut_path, 'wb') as f:
f.write(response.content)
log("更新文件下载成功")
else:
raise requests.exceptions.RequestException(f"下载失败,HTTP状态码: {response.status_code}")
except (requests.exceptions.RequestException, IOError) as e:
log(f"直接下载失败: {str(e)},尝试使用系统代理...")
# 如果直接下载失败,尝试使用系统代理
try:
response = requests.get(shortcut_url, timeout=30, proxies=proxies, verify=False)
if response.status_code == 200:
with open(shortcut_path, 'wb') as f:
f.write(response.content)
log("使用代理下载更新文件成功")
elif response.status_code == 407:
log("代理需要认证,尝试最后一次直接下载...")
# 最后一次尝试直接下载
response = requests.get(shortcut_url, timeout=30, proxies={'http': None, 'https': None}, verify=False)
if response.status_code == 200:
with open(shortcut_path, 'wb') as f:
f.write(response.content)
log("最后尝试下载成功")
else:
raise requests.exceptions.RequestException(f"所有下载尝试都失败,最后状态码: {response.status_code}")
else:
raise requests.exceptions.RequestException(f"使用代理下载失败,HTTP状态码: {response.status_code}")
except Exception as proxy_error:
log(f"使用代理下载失败: {str(proxy_error)}")
return jsonify({
'success': False,
'message': f'下载更新文件失败: {str(proxy_error)}'
}), 500
# 检查文件是否成功下载
if not os.path.exists(shortcut_path):
return jsonify({
'success': False,
'message': '更新文件下载成功但未找到文件'
}), 500
# 启动快捷方式
try:
log("正在启动更新程序...")
subprocess.Popen(['cmd', '/c', 'start', '', shortcut_path], shell=True)
log("更新程序启动成功")
return jsonify({
'success': True,
'message': '更新程序已启动'
})
except Exception as e:
log(f"启动更新程序失败: {str(e)}")
return jsonify({
'success': False,
'message': f'启动更新程序失败: {str(e)}'
}), 500
except Exception as e:
log(f"系统更新过程中发生错误: {str(e)}")
return jsonify({
'success': False,
'message': f'系统更新失败: {str(e)}'
}), 500
def main():
"""主函数"""
try:
print("\n" + "="*50)
print("网易大神审核数据监控服务启动中...")
print("="*50)
print("\n[系统状态]")
print("1. 正在初始化监控服务...")
# 初始化版本文件
timestamp_version = ensure_version_file()
print(f"2. 当前系统版本: {VERSION}")
# 检查并安装依赖
print("3. 正在检查系统依赖...")
check_and_install_dependencies()
# 启动版本检测线程
print("4. 启动版本检测服务...")
version_thread = threading.Thread(target=monitor_version_thread)
version_thread.daemon = True
version_thread.start()
# 获取端口
port = 8000
# 检查端口是否被占用
if not is_port_available(port):
print(f"5. 端口 {port} 已被占用,尝试释放...")
free_port(port)
# 再次检查端口
if not is_port_available(port):
port = find_available_port(8001, 8100)
print(f" - 使用替代端口 {port}")
else:
print(f"5. 端口 {port} 可用")
print("\n[启动完成]")
print(f"* 监控系统已启动,请打开浏览器访问: http://localhost:{port}")
print("* 等待用户登录...")
print("\n" + "!"*50)
print("! 警告:请勿关闭此窗口 !")
print("! 此窗口是系统监控的核心进程 !")
print("! 一旦关闭,所有监控服务将停止工作 !")
print("!"*50 + "\n")
# 在新线程中启动浏览器
threading.Thread(target=open_browser).start()
try:
# 使用 SocketIO 启动服务器
socketio.run(app, debug=False, host='0.0.0.0', port=port)
except Exception as e:
log(f"启动服务器出错: {str(e)}")
finally:
# 关闭所有子进程
for name, process in processes.items():
if process and process.poll() is None:
try:
process.terminate()
log(f"已关闭 {name} 监控进程")
except Exception as e:
log(f"关闭 {name} 监控进程失败: {str(e)}")
# 清理临时文件
script_dir = os.path.dirname(os.path.abspath(__file__))
data_files = ['breeze_daily.json', 'breeze_hourly.json', 'cms_daily.json', 'cms_hourly.json']
for file_name in data_files:
file_path = os.path.join(script_dir, file_name)
for attempt in range(3): # 最多重试3次
try:
if os.path.exists(file_path):
os.remove(file_path)
log(f"已删除共享数据文件: {file_name}")
break
except Exception as e:
log(f"删除文件 {file_name} 失败: {str(e)},第{attempt+1}次重试")
time.sleep(1)
log("监控系统已关闭")
except Exception as e:
log("Error in main function: " + str(e))
# 信号处理函数
def signal_handler(sig, frame):
"""处理信号,用于优雅退出"""
log("收到退出信号,正在关闭系统...")
global shutdown_flag
shutdown_flag = True
# 通过os._exit强制终止进程
os._exit(0)
@app.route('/api/inspect/stats')
def get_inspect_stats():
"""获取CC审核平台统计数据"""
try:
stats = load_inspect_stats()
if not stats:
return jsonify({
'hourly': {'total': 0, 'weighted_total': 0},
'daily': {'total': 0, 'weighted_total': 0}
})
# 获取最新的统计数据
latest_stats = stats.get('stats', [])
if not latest_stats:
return jsonify({
'hourly': {'total': 0, 'weighted_total': 0},
'daily': {'total': 0, 'weighted_total': 0}
})
# 获取最新的一条记录
latest = latest_stats[-1]
return jsonify({
'hourly': {
'total': latest.get('total', 0),
'weighted_total': latest.get('weighted_total', 0)
},
'daily': {
'total': latest.get('daily_total', 0),
'weighted_total': latest.get('daily_weighted_total', 0)
}
})
except Exception as e:
logging.error(f"获取CC审核平台统计数据时出错: {str(e)}")
return jsonify({
'hourly': {'total': 0, 'weighted_total': 0},
'daily': {'total': 0, 'weighted_total': 0}
})
def load_inspect_stats():
"""加载CC审核平台统计数据"""
try:
stats_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'inspect_stats.json')
if not os.path.exists(stats_file):
return None
with open(stats_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logging.error(f"加载CC审核平台统计数据时出错: {str(e)}")
return None
def terminate_existing_processes():
"""终止现有的监控进程"""
try:
terminated_pids = []
# 获取当前目录下的所有Python进程
current_dir = os.path.dirname(os.path.abspath(__file__))
monitor_scripts = ['breeze_monitor.py', 'breeze_monitor_CHAT.py', 'cms_monitor.py', 'inspect_monitor.py']
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
# 检查进程是否在运行监控脚本
if proc.info['name'] == 'python.exe' and proc.info['cmdline']:
cmdline = ' '.join(proc.info['cmdline'])
if any(script in cmdline for script in monitor_scripts):
# 先尝试正常终止
proc.terminate()
terminated_pids.append(proc.pid)
logging.info(f"已发送终止信号到进程: {proc.pid} - {cmdline}")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
# 等待进程终止,最多等待10秒
wait_start = time.time()
while time.time() - wait_start < 10:
still_alive = []
for pid in terminated_pids:
try:
proc = psutil.Process(pid)
still_alive.append(pid)
except psutil.NoSuchProcess:
continue
if not still_alive:
break
# 对于仍在运行的进程,强制结束
if time.time() - wait_start > 5: # 如果等待超过5秒,强制结束
for pid in still_alive:
try:
proc = psutil.Process(pid)
proc.kill()
logging.info(f"强制终止进程: {pid}")
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
time.sleep(0.5)
# 最后再检查一次
for pid in terminated_pids:
try:
proc = psutil.Process(pid)
proc.kill() # 最后一次尝试强制结束
logging.warning(f"进程仍在运行,强制终止: {pid}")
except psutil.NoSuchProcess:
continue
except Exception as e:
logging.error(f"终止进程 {pid} 时发生错误: {str(e)}")
# 确保完全等待
time.sleep(2)
return True
except Exception as e:
logging.error(f"终止进程时发生错误: {str(e)}")
return False
@app.route('/api/get_current_stats')
def get_current_stats():
"""获取当前统计数据"""
try:
stats = {
'weighted_total': 0,
'categories': {}
}
# 读取并合并所有监控数据
for monitor_type in ['breeze', 'cms', 'inspect']:
hourly_file = f'{monitor_type}_hourly.json'
if os.path.exists(hourly_file):
try:
with open(hourly_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict):
if 'stats' in data:
monitor_stats = data['stats']
if isinstance(monitor_stats, dict):
if 'weighted_total' in monitor_stats:
stats['weighted_total'] += float(monitor_stats['weighted_total'])
if 'categories' in monitor_stats:
stats['categories'].update(monitor_stats['categories'])
except Exception as e:
app.logger.error(f"读取{monitor_type}数据失败: {str(e)}")
return jsonify({
'success': True,
'stats': stats,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
app.logger.error(f"获取统计数据失败: {str(e)}")
return jsonify({
'success': False,
'message': str(e)
})
if __name__ == "__main__":
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 执行主函数
main()