#!/usr/bin/env python # -*- coding: utf-8 -*- import os import json import time import subprocess import threading import sys from datetime import datetime, timedelta import logging import webbrowser from flask import Flask, render_template, request, jsonify, session, redirect, url_for, make_response, send_from_directory, flash import threading from threading import Lock import signal import socket import requests import re from flask_socketio import SocketIO # 添加WebSocket支持 import hashlib import psutil import urllib3 # 禁用不安全请求的警告 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # 全局应用状态变量 shutdown_flag = False # 控制应用程序关闭的标志 # 初始化 SocketIO socketio = SocketIO() # 系统版本固定在代码中 VERSION = "v20250429213045" # 系统版本号 - 这是固定的本地版本 # 配置日志 def setup_logging(): try: # 获取当前脚本所在目录 script_dir = os.path.dirname(os.path.abspath(__file__)) log_file = os.path.join(script_dir, 'dashboard.log') # 确保日志文件存在 if not os.path.exists(log_file): with open(log_file, 'w', encoding='utf-8') as f: f.write('') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file, encoding='utf-8', mode='a'), logging.StreamHandler() ] ) logging.info("Dashboard日志系统初始化成功") except Exception as e: print(f"日志系统初始化失败: {str(e)}") # 如果文件日志失败,至少使用控制台日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()] ) # 初始化日志系统 setup_logging() # 获取或创建版本文件,格式为"当前时间(v版本号)",例如"20250408125815(v1.1)"。 def ensure_version_file(): """确保版本文件存在,写入固定版本号""" try: version_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'VERSION.txt') # 使用固定版本号 version_content = VERSION # 写入版本文件 with open(version_file, 'w') as f: f.write(version_content) logging.info(f"更新版本文件: {version_content}") return version_content except Exception as e: logging.error(f"处理版本文件时出错: {e}") return VERSION # 返回固定版本号 # 获取线上版本进行比较 def get_online_version(): """获取线上版本信息""" try: log("正在获取线上版本信息...") # 获取系统代理设置 proxies = { 'http': os.environ.get('HTTP_PROXY', ''), 'https': os.environ.get('HTTPS_PROXY', '') } # 如果没有代理设置,尝试不使用代理直接访问 if not any(proxies.values()): log("未检测到系统代理设置,尝试直接连接...") response = requests.get(VERSION_CHECK_URL, timeout=5, verify=False) else: log(f"使用系统代理设置: {proxies}") response = requests.get(VERSION_CHECK_URL, timeout=5, proxies=proxies, verify=False) log(f"获取线上版本响应状态码: {response.status_code}") if response.status_code == 200: version = response.text.strip() log(f"成功获取线上版本: {version}") return version elif response.status_code == 407: log("需要代理认证,尝试不使用代理直接访问...") # 尝试不使用代理直接访问 response = requests.get(VERSION_CHECK_URL, timeout=5, proxies={'http': None, 'https': None}, verify=False) if response.status_code == 200: version = response.text.strip() log(f"直接访问成功获取线上版本: {version}") return version else: log(f"直接访问失败,HTTP状态码: {response.status_code}") return None else: log(f"获取线上版本失败,HTTP状态码: {response.status_code}") return None except requests.exceptions.Timeout: log("获取线上版本超时") return None except requests.exceptions.ConnectionError: log("连接线上版本服务器失败,尝试不使用代理直接访问...") try: # 尝试不使用代理直接访问 response = requests.get(VERSION_CHECK_URL, timeout=5, proxies={'http': None, 'https': None}, verify=False) if response.status_code == 200: version = response.text.strip() log(f"直接访问成功获取线上版本: {version}") return version else: log(f"直接访问失败,HTTP状态码: {response.status_code}") return None except Exception as e: log(f"直接访问失败: {str(e)}") return None except Exception as e: log(f"获取线上版本时发生未知错误: {str(e)}") return None VERSION_CHECK_URL = "https://gitea.ui-beam.cn/ui_beam/NetEaseDSMonitor/raw/branch/main/VERSION.txt" # 正式版本地址 version_status = { 'last_check_time': None, 'online_version': None, 'has_update': False } version_lock = Lock() # 版本检测线程标志 version_check_thread_running = False # 检查并安装依赖 def check_and_install_dependencies(): try: # 检查plyer库是否安装 try: import plyer except ImportError: log("正在安装plyer库...") subprocess.check_call([sys.executable, "-m", "pip", "install", "plyer"]) log("plyer库安装成功") except Exception as e: log(f"依赖检查/安装过程中出错: {str(e)}") # 禁用 Flask 的访问日志 logging.getLogger('werkzeug').setLevel(logging.ERROR) # 创建Flask应用 app = Flask(__name__) app.secret_key = 'netease-dashboard-secret-key' # 用于session加密 app.permanent_session_lifetime = timedelta(days=30) # 设置会话有效期为30天 socketio.init_app(app) # 初始化 SocketIO # 告警阈值 ALARM_THRESHOLD = 1900 # 两个系统折算总和的告警阈值 alarm_status = { 'last_alarm_time': None, 'alarm_count': 0, 'is_alarming': False, 'alarm_type': None } alarm_lock = Lock() # 进程对象 processes = { 'breeze': None, 'cms': None, 'inspect': None } def log(message): """记录日志""" try: logging.info(message) except Exception as e: print("Log error: " + str(e)) print("Original message: " + message) def start_monitor_processes(): """启动监控进程""" try: # 获取会话凭据 breeze_cookie = session.get('breeze_cookie', '') cms_cookie = session.get('cms_cookie', '') inspect_cookie = session.get('inspect_cookie', '') username = session.get('username', '') backend_type = session.get('backend_type', 'breeze_monitor') # 检查是否至少有一个Cookie可用 if not username or not (breeze_cookie or cms_cookie or inspect_cookie): logging.error("缺少必要的会话凭据,至少需要一个系统的Cookie和用户名") return False # 终止现有进程 terminate_existing_processes() # 根据提供的Cookie决定启动哪些监控进程 processes_started = 0 # 仅当提供了Breeze cookie时才启动Breeze监控 if breeze_cookie: # 选择正确的监控脚本 if backend_type == 'breeze_monitor_CHAT': monitor_script = 'breeze_monitor_CHAT.py' else: monitor_script = 'breeze_monitor.py' if not os.path.exists(monitor_script): logging.error(f"监控脚本不存在: {monitor_script}") else: # 启动Breeze监控进程 breeze_env = os.environ.copy() breeze_env['BREEZE_COOKIE'] = breeze_cookie breeze_env['BREEZE_USERNAME'] = username # 定义最小化窗口的启动参数(仅Windows) if sys.platform.startswith('win'): si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW si.wShowWindow = 6 # SW_MINIMIZE else: si = None try: global processes processes['breeze'] = subprocess.Popen( ['python', monitor_script], env=breeze_env, creationflags=subprocess.CREATE_NEW_CONSOLE, startupinfo=si ) logging.info(f"已启动Breeze监控进程: {processes['breeze'].pid}") processes_started += 1 except Exception as e: logging.error(f"启动Breeze监控进程失败: {str(e)}") else: logging.info("未提供Breeze Cookie,跳过启动Breeze监控进程") # 仅当提供了CMS cookie时才启动CMS监控 if cms_cookie: if not os.path.exists('cms_monitor.py'): logging.error("CMS监控脚本不存在") else: # 启动CMS监控进程 cms_env = os.environ.copy() cms_env['CMS_COOKIE'] = cms_cookie cms_env['CMS_USERNAME'] = username # 定义最小化窗口的启动参数(仅Windows) if sys.platform.startswith('win'): si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW si.wShowWindow = 6 # SW_MINIMIZE else: si = None try: processes['cms'] = subprocess.Popen( ['python', 'cms_monitor.py'], env=cms_env, creationflags=subprocess.CREATE_NEW_CONSOLE, startupinfo=si ) logging.info(f"已启动CMS监控进程: {processes['cms'].pid}") processes_started += 1 except Exception as e: logging.error(f"启动CMS监控进程失败: {str(e)}") else: logging.info("未提供CMS Cookie,跳过启动CMS监控进程") # 仅当提供了CC审核平台cookie时才启动其监控进程 if inspect_cookie: if not os.path.exists('inspect_monitor.py'): logging.error("CC审核平台监控脚本不存在") else: # 启动CC审核平台监控进程 inspect_env = os.environ.copy() inspect_env['INSPECT_COOKIE'] = inspect_cookie inspect_env['INSPECT_USERNAME'] = username # 定义最小化窗口的启动参数(仅Windows) if sys.platform.startswith('win'): si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW si.wShowWindow = 6 # SW_MINIMIZE else: si = None try: processes['inspect'] = subprocess.Popen( ['python', 'inspect_monitor.py'], env=inspect_env, creationflags=subprocess.CREATE_NEW_CONSOLE, startupinfo=si ) logging.info(f"已启动CC审核平台监控进程: {processes['inspect'].pid}") processes_started += 1 except Exception as e: logging.error(f"启动CC审核平台监控进程失败: {str(e)}") else: logging.info("未提供CC审核平台Cookie,跳过启动其监控进程") # 至少启动一个进程才算成功 if processes_started > 0: logging.info(f"成功启动了 {processes_started} 个监控进程") return True else: logging.error("没有启动任何监控进程") return False except Exception as e: logging.error(f"启动监控进程失败: {str(e)}") return False def add_no_cache_headers(response): """添加禁止缓存的响应头""" response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '0' return response @app.after_request def after_request(response): """每个响应添加禁止缓存的头部""" return add_no_cache_headers(response) @app.route('/favicon.ico') def favicon(): """提供网站图标""" try: script_dir = os.path.dirname(os.path.abspath(__file__)) static_path = os.path.join(script_dir, 'static') return send_from_directory(static_path, 'ds-favicon.ico', mimetype='image/vnd.microsoft.icon') except Exception as e: log("Failed to serve favicon: " + str(e)) return '', 204 @app.route('/') def index(): """主页路由""" if not session.get('logged_in'): return redirect(url_for('login')) return redirect(url_for('dashboard')) @app.route('/dashboard') def dashboard(): """仪表盘路由""" if not session.get('logged_in'): return redirect(url_for('login')) # 获取当前登录用户信息 staff_name = session.get('staff_name', '') username = session.get('username', '') # 获取当前时间 current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 获取统计数据 breeze_hourly = read_data_file('breeze_hourly.json') breeze_daily = read_data_file('breeze_daily.json') cms_hourly = read_data_file('cms_hourly.json') cms_daily = read_data_file('cms_daily.json') # 获取告警状态 alarm_status = get_alarm_status() # 获取系数配置 breeze_coefficients = get_breeze_coefficients() cms_coefficients = get_coefficients() return render_template('dashboard.html', staff_name=staff_name, username=username, current_time=current_time, breeze_hourly=breeze_hourly, breeze_daily=breeze_daily, cms_hourly=cms_hourly, cms_daily=cms_daily, alarm_status=alarm_status, breeze_coefficients=breeze_coefficients, cms_coefficients=cms_coefficients, version=VERSION) def get_online_staff_data(): """从在线链接获取staff.json数据""" url = "http://scripts.ui-beam.com:5000/NetEaseDSMonitor/config/staff.json" # 添加重试机制 max_retries = 3 retry_delay = 5 # 重试间隔(秒) for attempt in range(max_retries): try: # 尝试不使用代理直接获取 try: response = requests.get(url, timeout=10, proxies={'http': None, 'https': None}, verify=False) except: # 如果直接获取失败,尝试使用系统代理 proxies = { 'http': os.environ.get('HTTP_PROXY', ''), 'https': os.environ.get('HTTPS_PROXY', '') } response = requests.get(url, timeout=10, proxies=proxies, verify=False) if response.status_code == 200: staff_data = response.json() log(f"从在线链接加载员工数据成功") return staff_data else: error_msg = f"获取在线员工数据失败,HTTP状态码: {response.status_code}" log(error_msg) if attempt < max_retries - 1: # 如果不是最后一次重试 log(f"第{attempt + 1}次重试失败,{retry_delay}秒后重试...") time.sleep(retry_delay) continue else: log("所有重试均失败,返回空数据") return {} except Exception as e: error_msg = f"获取在线员工数据时发生错误: {str(e)}" log(error_msg) if attempt < max_retries - 1: # 如果不是最后一次重试 log(f"第{attempt + 1}次重试失败,{retry_delay}秒后重试...") time.sleep(retry_delay) continue else: log("所有重试均失败,返回空数据") return {} return {} def get_staff_name(staff_id): """根据工号获取员工姓名""" try: staff_data = get_online_staff_data() if staff_data: return staff_data.get(staff_id) return None except Exception as e: log(f"获取员工姓名时发生错误: {str(e)}") return None @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': try: username = request.form.get('username', '').upper() breeze_cookie = request.form.get('breeze_cookie', '') cms_cookie = request.form.get('cms_cookie', '') inspect_cookie = request.form.get('inspect_cookie', '') backend_type = request.form.get('backend_type', 'breeze_monitor') # 检查必填字段 - 用户名 if not username: return jsonify({'code': 1, 'message': '请填写工号'}) # 检查至少一个Cookie必须填写 if not (breeze_cookie or cms_cookie or inspect_cookie): return jsonify({'code': 1, 'message': '请至少填写一个系统的Cookie'}) # 尝试从staff.json获取姓名,如果不存在则使用工号 staff_name = get_staff_name(username) #if not staff_name: # 启用工号验证 # return jsonify({'code': 1, 'message': f'您的工号({username})尚未在系统中注册或为不可用状态,无法登录,请联系系统管理员协助'}) display_name = f"{username} ({staff_name})" if staff_name else username # 禁用工号验证 # 保存会话信息 session['username'] = username #session['staff_name'] = staff_name # 启用工号验证 session['staff_name'] = display_name # 禁用工号验证 session['breeze_cookie'] = breeze_cookie session['cms_cookie'] = cms_cookie session['inspect_cookie'] = inspect_cookie session['backend_type'] = backend_type session['logged_in'] = True # 添加登录标志 # 启动监控进程 start_monitor_processes() return jsonify({ 'code': 0, 'message': '登录成功', 'staff_name': display_name, # 禁用工号验证 #'staff_name': staff_name, # 启用工号验证 'redirect': '/dashboard' }) except Exception as e: logging.error(f"登录失败: {str(e)}") return jsonify({'code': 1, 'message': f'登录失败: {str(e)}'}) return render_template('login.html', version=VERSION) @app.route('/logout') def logout(): """退出系统""" try: # 记录退出用户 username = session.get('username', '未知用户') log("用户 [" + username + "] 退出系统") # 清除会话 session.clear() # 标记系统即将退出 global shutdown_flag shutdown_flag = True # 尝试停止监控进程 if processes['breeze'] and processes['breeze'].poll() is None: try: processes['breeze'].terminate() log("已停止Breeze监控进程") except Exception as e: log("停止Breeze监控进程失败: " + str(e)) if processes['cms'] and processes['cms'].poll() is None: try: processes['cms'].terminate() log("已停止CMS监控进程") except Exception as e: log("停止CMS监控进程失败: " + str(e)) # 删除共享数据文件 try: script_dir = os.path.dirname(os.path.abspath(__file__)) data_files = ['breeze_daily.json', 'breeze_hourly.json', 'cms_daily.json', 'cms_hourly.json', 'inspect_daily.json', 'inspect_hourly.json', 'breeze_coefficients.json', 'cms_coefficients.json', 'inspect_coefficients.json'] for file_name in data_files: file_path = os.path.join(script_dir, file_name) for attempt in range(3): # 最多重试3次 try: if os.path.exists(file_path): os.remove(file_path) log(f"已删除共享数据文件: {file_name}") break except Exception as e: log(f"删除文件 {file_name} 失败: {str(e)},第{attempt+1}次重试") time.sleep(1) log("成功清理所有共享数据文件") except Exception as e: log(f"删除共享数据文件时出错: {str(e)}") # 在一个后台线程中等待几秒后退出应用程序 def shutdown_app(): time.sleep(3) # 等待3秒 log("用户退出系统,系统正在完全关闭...") # 关闭所有子进程和当前进程 if sys.platform.startswith('win'): # 在Windows系统下,强制关闭当前进程及所有子进程 try: # 获取当前进程PID current_pid = os.getpid() # 使用taskkill命令强制结束进程树 subprocess.run(f'taskkill /F /T /PID {current_pid}', shell=True) except Exception as e: log(f"关闭进程时出错: {str(e)}") else: # 非Windows系统使用标准退出方式 os._exit(0) # 强制关闭整个程序 # 启动关闭线程 threading.Thread(target=shutdown_app).start() # 返回包含关闭倒计时的页面 return """ 正在关闭系统...

正在清理数据

请稍候...

""" except Exception as e: log("Error in logout route: " + str(e)) return "Logout error. Please check logs.", 500 def read_data_file(filename): """读取数据文件""" try: script_dir = os.path.dirname(os.path.abspath(__file__)) file_path = os.path.join(script_dir, filename) if not os.path.exists(file_path): return None with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) return data except Exception as e: log("Failed to read file {0}: {1}".format(filename, str(e))) return None def check_version(): """检查系统版本并与在线版本比较""" global version_check_thread_running try: log("开始检查系统版本更新...") with version_lock: version_status['last_check_time'] = datetime.now() # 确保VERSION.txt文件存在且内容与VERSION一致 local_version = ensure_version_file() log(f"当前本地版本: {local_version}") # 获取在线版本 online_version = get_online_version() log(f"获取到的在线版本: {online_version}") if online_version: with version_lock: version_status['online_version'] = online_version # 比较版本号,只在线上版本比本地版本新时才提示更新 local_timestamp = int(local_version[1:].split('-')[0]) # 去掉'v'前缀并转换为数字 online_timestamp = int(online_version[1:].split('-')[0]) # 去掉'v'前缀并转换为数字 version_status['has_update'] = online_timestamp > local_timestamp log(f"版本比较结果 - 在线版本: {online_version}, 本地版本: {local_version}, 是否有更新: {version_status['has_update']}") # 如果有更新且没有正在运行的线程,启动告警线程 if version_status['has_update'] and not version_check_thread_running: version_check_thread_running = True threading.Thread(target=notify_version_update).start() return { 'current_version': VERSION, 'online_version': online_version, 'has_update': version_status['has_update'], 'last_check_time': version_status['last_check_time'].strftime('%Y-%m-%d %H:%M:%S') if version_status['last_check_time'] else None } else: log("无法获取在线版本,返回本地版本信息") return { 'current_version': VERSION, 'online_version': None, 'has_update': False, 'last_check_time': version_status['last_check_time'].strftime('%Y-%m-%d %H:%M:%S') if version_status['last_check_time'] else None } except Exception as e: log(f"版本检查过程中发生错误: {str(e)}") return { 'current_version': VERSION, 'online_version': None, 'has_update': False, 'last_check_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'error': str(e) } finally: # 确保线程标志被重置 if version_check_thread_running: version_check_thread_running = False def notify_version_update(): """通知用户有新版本可用""" try: # 确保VERSION.txt文件存在 ensure_version_file() with version_lock: online_version = version_status['online_version'] if online_version: message = f"系统有新版本可用: {online_version},当前版本: {VERSION}" log(message) try: # 发送桌面通知 from plyer import notification notification.notify( title="系统版本更新", message=message + "\n点击'检测更新'按钮了解详情", app_icon=None, timeout=10, toast=False ) # 通过WebSocket通知前端弹出版本检测窗口 socketio.emit('version_update', { 'type': 'show_version_dialog', 'current_version': VERSION, 'online_version': online_version }) except Exception as e: log(f"显示桌面通知失败: {str(e)}") except Exception as e: log(f"通知版本更新失败: {str(e)}") finally: # 重置线程运行标志 global version_check_thread_running version_check_thread_running = False def monitor_version_thread(): """版本检测后台线程""" log("版本监控线程启动") while True: try: # 每30分钟检查一次版本更新 check_version() time.sleep(1800) # 30分钟 except Exception as e: log(f"版本监控线程异常: {str(e)}") time.sleep(600) # 出错后等待10分钟重试 @app.route('/api/get-version') def get_version(): """获取系统版本信息""" try: # 确保VERSION.txt文件存在 ensure_version_file() with version_lock: status = { 'current_version': VERSION, 'local_version': VERSION, # 固定的本地版本号 'online_version': version_status['online_version'], 'has_update': version_status['has_update'], 'last_check_time': version_status['last_check_time'].strftime('%Y-%m-%d %H:%M:%S') if version_status['last_check_time'] else None } return jsonify({ 'success': True, 'data': status }) except Exception as e: log("Error in get_version route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 @app.route('/api/check-version') def check_version_api(): """手动检查版本更新的API""" try: result = check_version() if result: return jsonify({ 'success': True, 'data': result }) else: # 至少返回当前版本信息 return jsonify({ 'success': False, 'message': '获取在线版本信息失败', 'data': { 'current_version': VERSION, 'local_version': VERSION, 'online_version': None, 'has_update': False, 'last_check_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } }) except Exception as e: log("Error in check_version_api route: " + str(e)) return jsonify({'success': False, 'message': f'检测版本更新失败: {str(e)}'}), 500 @app.route('/api/get-stats') def get_stats(): """获取所有统计数据""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) try: # 读取各个数据文件 breeze_hourly = read_data_file('breeze_hourly.json') or { 'stats': { 'weighted_total': 0, 'total': 0, 'categories': {}, 'details': {} }, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } breeze_daily = read_data_file('breeze_daily.json') or { 'stats': { 'weighted_total': 0, 'total': 0, 'categories': {}, 'details': {} }, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } cms_hourly = read_data_file('cms_hourly.json') or { 'stats': { 'weighted_total': 0, 'total': 0, 'comment': 0, 'feed': 0, 'complaint': 0 }, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } cms_daily = read_data_file('cms_daily.json') or { 'stats': { 'weighted_total': 0, 'total': 0, 'comment': 0, 'feed': 0, 'complaint': 0 }, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } # 读取CC审核平台数据 inspect_hourly = read_data_file('inspect_hourly.json') or { 'stats': [] } inspect_daily = read_data_file('inspect_daily.json') or { 'stats': [] } # 获取最新的统计数据 inspect_hourly_stats = inspect_hourly['stats'][-1] if inspect_hourly['stats'] else { 'total': 0, 'weighted_total': 0, 'timestamp': int(time.time()) } inspect_daily_stats = inspect_daily['stats'][-1] if inspect_daily['stats'] else { 'total': 0, 'weighted_total': 0, 'timestamp': int(time.time()) } # 获取当前的CMS系数设置 script_dir = os.path.dirname(os.path.abspath(__file__)) cms_coefficients_file = os.path.join(script_dir, 'cms_coefficients.json') cms_coefficients = None if os.path.exists(cms_coefficients_file): try: with open(cms_coefficients_file, 'r', encoding='utf-8') as f: cms_coefficients = json.load(f) log("从配置文件加载CMS系数") except Exception as e: log(f"读取CMS系数配置文件失败: {str(e)}") # 如果无法读取配置,使用默认系数 if not cms_coefficients: cms_coefficients = { 'comment': 0.55, 'feed': 1.54, 'complaint': 5.4 } log("使用默认CMS系数") # 获取当前的Breeze系数设置 breeze_coefficients_file = os.path.join(script_dir, 'breeze_coefficients.json') breeze_coefficients = None if os.path.exists(breeze_coefficients_file): try: with open(breeze_coefficients_file, 'r', encoding='utf-8') as f: breeze_coefficients = json.load(f) log("从配置文件加载Breeze系数") except Exception as e: log(f"读取Breeze系数配置文件失败: {str(e)}") # 如果无法读取配置,不设置默认值,让前端使用现有值 if not breeze_coefficients: log("无法读取Breeze系数配置文件") # 合并结果 result = { 'breeze': { 'hourly': breeze_hourly.get('stats', {}), 'hourly_update': breeze_hourly.get('timestamp', ''), 'daily': breeze_daily.get('stats', {}), 'daily_update': breeze_daily.get('timestamp', ''), 'coefficients': breeze_coefficients }, 'cms': { 'hourly': cms_hourly.get('stats', {}), 'hourly_update': cms_hourly.get('timestamp', ''), 'daily': cms_daily.get('stats', {}), 'daily_update': cms_daily.get('timestamp', ''), 'coefficients': cms_coefficients }, 'inspect': { 'hourly': inspect_hourly_stats, 'hourly_update': datetime.fromtimestamp(inspect_hourly_stats['timestamp']).strftime('%Y-%m-%d %H:%M:%S'), 'daily': inspect_daily_stats, 'daily_update': datetime.fromtimestamp(inspect_daily_stats['timestamp']).strftime('%Y-%m-%d %H:%M:%S'), 'coefficients': {'default': 1.5} }, 'total': { 'hourly': 0, 'daily': 0 } } # 计算总折算数 if breeze_hourly and 'stats' in breeze_hourly and 'weighted_total' in breeze_hourly['stats']: result['total']['hourly'] += breeze_hourly['stats']['weighted_total'] if cms_hourly and 'stats' in cms_hourly and 'weighted_total' in cms_hourly['stats']: result['total']['hourly'] += cms_hourly['stats']['weighted_total'] if inspect_hourly_stats and 'weighted_total' in inspect_hourly_stats: result['total']['hourly'] += inspect_hourly_stats['weighted_total'] if breeze_daily and 'stats' in breeze_daily and 'weighted_total' in breeze_daily['stats']: result['total']['daily'] += breeze_daily['stats']['weighted_total'] if cms_daily and 'stats' in cms_daily and 'weighted_total' in cms_daily['stats']: result['total']['daily'] += cms_daily['stats']['weighted_total'] if inspect_daily_stats and 'weighted_total' in inspect_daily_stats: result['total']['daily'] += inspect_daily_stats['weighted_total'] # 检查告警阈值 if result['total']['hourly'] >= ALARM_THRESHOLD: check_alarm(result['total']['hourly']) # 通过WebSocket发送更新 try: socketio.emit('stats_update', { 'success': True, 'data': result, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) except Exception as e: log(f"WebSocket发送数据更新失败: {str(e)}") return jsonify({ 'success': True, 'data': result, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) except Exception as e: log("Failed to get stats: " + str(e)) return jsonify({ 'success': False, 'message': "Failed to get stats: " + str(e) }) except Exception as e: log("Error in get_stats route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 def check_alarm(total_weighted): """检查并触发告警""" try: with alarm_lock: now = datetime.now() # 如果最近30秒内已经告警过,增加计数但不重复告警 if alarm_status['last_alarm_time'] and (now - alarm_status['last_alarm_time']).total_seconds() < 30: return # 设置告警状态 alarm_status['last_alarm_time'] = now alarm_status['alarm_count'] += 1 alarm_status['is_alarming'] = True # 如果已经达到3次告警,不再继续 if alarm_status['alarm_count'] > 3: return # 启动线程播放告警声音和显示通知 # 将当前加权总值传递给函数 threading.Thread(target=lambda: show_alarm_notification(total_weighted)).start() log("Alarm triggered: Current weighted total {0:.2f}, exceeded threshold {1}".format(total_weighted, ALARM_THRESHOLD)) except Exception as e: log("Error in check_alarm: " + str(e)) def show_alarm_notification(total_weighted, is_test=False, alarm_type="实时告警"): """显示告警通知和播放告警声音""" try: # 播放系统告警声音 try: import winsound winsound.PlaySound("SystemExclamation", winsound.SND_ALIAS) except: # 如果winsound不可用,尝试使用beep print('\a') # 添加桌面通知 try: from plyer import notification # 计算超出阈值的百分比 over_percentage = ((total_weighted - ALARM_THRESHOLD) / ALARM_THRESHOLD) * 100 # 根据是否为测试调整标题 title = "测试告警" if is_test else "审核数量告警" # 根据是否为测试调整消息前缀 prefix = "[测试数据] " if is_test else "" notification.notify( title=title, message=f"{prefix}{alarm_type}当前小时加权总计:{total_weighted:.2f}\n阈值:{ALARM_THRESHOLD}\n超出:{over_percentage:.1f}%\n请立即检查数据监控看板!", app_icon=None, timeout=15, toast=False ) except Exception as e: log("Failed to show desktop notification: " + str(e)) # WebSocket推送网页弹窗 try: from flask_socketio import SocketIO global socketio over_percentage = ((total_weighted - ALARM_THRESHOLD) / ALARM_THRESHOLD) * 100 socketio.emit('alarm', { 'type': alarm_type, 'message': f"{alarm_type}:当前值:{total_weighted:.2f},超出{over_percentage:.1f}%" }) except Exception as e: log(f"WebSocket推送告警失败: {str(e)}") except Exception as e: log("Failed to play alarm sound: " + str(e)) @app.route('/api/get-alarm-status') def get_alarm_status(): """获取告警状态""" try: with alarm_lock: status = { 'is_alarming': alarm_status['is_alarming'], 'alarm_count': alarm_status['alarm_count'], 'last_alarm_time': alarm_status['last_alarm_time'].strftime('%Y-%m-%d %H:%M:%S') if alarm_status['last_alarm_time'] else None, 'threshold': ALARM_THRESHOLD, 'alarm_type': alarm_status['alarm_type'] } return jsonify({ 'success': True, 'data': status }) except Exception as e: log("Error in get_alarm_status route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 @app.route('/api/reset-alarm') def reset_alarm(): """重置告警状态""" try: with alarm_lock: alarm_status['is_alarming'] = False alarm_status['alarm_count'] = 0 return jsonify({ 'success': True, 'message': 'Alarm has been reset' }) except Exception as e: log("Error in reset_alarm route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 @app.route('/api/restart-monitoring') def restart_monitoring(): """重启监控进程""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) # 获取Python可执行文件路径 python_executable = sys.executable # 获取后端类型和凭据 backend_type = session.get('backend_type', 'breeze_monitor') breeze_cookie = session.get('breeze_cookie', '') cms_cookie = session.get('cms_cookie', '') inspect_cookie = session.get('inspect_cookie', '') username = session.get('username', '') # 终止现有进程 processes_to_stop = ['breeze', 'cms', 'inspect'] for process_name in processes_to_stop: if processes.get(process_name) and processes[process_name].poll() is None: try: processes[process_name].terminate() processes[process_name].wait(timeout=5) log(f"已停止{process_name.upper()}监控进程") except Exception as e: log(f"停止{process_name.upper()}监控进程失败: {str(e)}") try: processes[process_name].kill() except: pass # 等待一下确保进程完全终止 time.sleep(1) # 根据后端类型选择监控脚本 if backend_type == 'breeze_monitor_CHAT': monitor_script = 'breeze_monitor_CHAT.py' else: monitor_script = 'breeze_monitor.py' # 获取脚本所在目录 script_dir = os.path.dirname(os.path.abspath(__file__)) monitor_path = os.path.join(script_dir, monitor_script) cms_path = os.path.join(script_dir, "cms_monitor.py") inspect_path = os.path.join(script_dir, "inspect_monitor.py") # 检查监控脚本是否存在 if not os.path.exists(monitor_path): log(f"监控脚本不存在: {monitor_path}") return jsonify({'success': False, 'message': f'监控脚本不存在: {monitor_path}'}) # 启动新的监控进程 try: # 启动Breeze监控 env = os.environ.copy() env['BREEZE_COOKIE'] = breeze_cookie env['BREEZE_USERNAME'] = username processes['breeze'] = subprocess.Popen( [python_executable, monitor_path], env=env, shell=False, creationflags=subprocess.CREATE_NEW_CONSOLE ) log(f"已启动{backend_type}监控进程, PID: {processes['breeze'].pid}") # 启动CMS监控 env = os.environ.copy() env['CMS_COOKIE'] = cms_cookie env['CMS_USERNAME'] = username processes['cms'] = subprocess.Popen( [python_executable, cms_path], env=env, shell=False, creationflags=subprocess.CREATE_NEW_CONSOLE ) log(f"CMS监控进程已启动, PID: {processes['cms'].pid}") # 启动CC审核平台监控 if inspect_cookie: env = os.environ.copy() env['INSPECT_COOKIE'] = inspect_cookie env['INSPECT_USERNAME'] = username processes['inspect'] = subprocess.Popen( [python_executable, inspect_path], env=env, shell=False, creationflags=subprocess.CREATE_NEW_CONSOLE ) log(f"CC审核平台监控进程已启动, PID: {processes['inspect'].pid}") return jsonify({ 'success': True, 'message': '监控进程已重启' }) except Exception as e: log(f"启动监控进程失败: {str(e)}") return jsonify({ 'success': False, 'message': f'启动监控进程失败: {str(e)}' }) except Exception as e: log(f"重启监控进程失败: {str(e)}") return jsonify({ 'success': False, 'message': f'重启监控进程失败: {str(e)}' }) def open_browser(): """在新线程中打开浏览器""" time.sleep(1) # 等待服务器启动 try: webbrowser.open('http://localhost:8000') except Exception as e: log("Failed to open browser: " + str(e)) @app.route('/api/check-now') def check_now(): """立即检查当前小时数据""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) # 获取Python可执行文件路径 python_executable = sys.executable # 获取后端类型和凭据 backend_type = session.get('backend_type', 'breeze_monitor') breeze_cookie = session.get('breeze_cookie', '') cms_cookie = session.get('cms_cookie', '') inspect_cookie = session.get('inspect_cookie', '') username = session.get('username', '') # 终止现有进程 if not terminate_existing_processes(): return jsonify({ 'success': False, 'message': '无法完全终止现有进程,请手动关闭后重试' }) # 获取脚本所在目录 script_dir = os.path.dirname(os.path.abspath(__file__)) # 根据后端类型选择监控脚本 if backend_type == 'breeze_monitor_CHAT': monitor_script = 'breeze_monitor_CHAT.py' else: monitor_script = 'breeze_monitor.py' monitor_path = os.path.join(script_dir, monitor_script) cms_path = os.path.join(script_dir, "cms_monitor.py") inspect_path = os.path.join(script_dir, "inspect_monitor.py") # 检查监控脚本是否存在 if not os.path.exists(monitor_path): log(f"监控脚本不存在: {monitor_path}") return jsonify({'success': False, 'message': f'监控脚本不存在: {monitor_path}'}) # 启动临时检查进程 try: processes_started = [] # 启动临时Breeze检查 env = os.environ.copy() env['BREEZE_COOKIE'] = breeze_cookie env['BREEZE_USERNAME'] = username temp_breeze = subprocess.Popen( [python_executable, monitor_path, "--check-now", "--force"], env=env, shell=False, creationflags=subprocess.CREATE_NEW_CONSOLE ) processes_started.append(('Breeze', temp_breeze)) # 启动临时CMS检查 env = os.environ.copy() env['CMS_COOKIE'] = cms_cookie env['CMS_USERNAME'] = username temp_cms = subprocess.Popen( [python_executable, cms_path, "--check-now", "--force"], env=env, shell=False, creationflags=subprocess.CREATE_NEW_CONSOLE ) processes_started.append(('CMS', temp_cms)) # 启动临时CC审核平台检查 if inspect_cookie: env = os.environ.copy() env['INSPECT_COOKIE'] = inspect_cookie env['INSPECT_USERNAME'] = username temp_inspect = subprocess.Popen( [python_executable, inspect_path, "--check-now", "--force"], env=env, shell=False, creationflags=subprocess.CREATE_NEW_CONSOLE ) processes_started.append(('Inspect', temp_inspect)) # 等待所有进程完成或超时 wait_start = time.time() while time.time() - wait_start < 30: # 最多等待30秒 all_done = True for name, proc in processes_started: if proc.poll() is None: all_done = False break if all_done: break time.sleep(0.5) # 检查是否所有进程都已完成 failed_processes = [] for name, proc in processes_started: if proc.poll() is None: try: proc.terminate() failed_processes.append(name) except: pass if failed_processes: return jsonify({ 'success': False, 'message': f'以下检查进程未能在30秒内完成: {", ".join(failed_processes)}' }) # 重新启动常规监控进程 if not start_monitor_processes(): return jsonify({ 'success': False, 'message': '临时检查完成,但重启常规监控失败' }) return jsonify({ 'success': True, 'message': '数据检查完成并重启了监控进程' }) except Exception as e: log(f"启动临时检查进程失败: {str(e)}") # 确保重启常规监控进程 start_monitor_processes() return jsonify({ 'success': False, 'message': f'启动临时检查进程失败: {str(e)}' }) except Exception as e: log(f"临时检查失败: {str(e)}") # 确保重启常规监控进程 start_monitor_processes() return jsonify({ 'success': False, 'message': f'临时检查失败: {str(e)}' }) @app.route('/api/test-alarm') def test_alarm(): """测试告警功能""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) # 检查是否为实际数据告警 is_real_data = request.args.get('real_data', 'false').lower() == 'true' # 根据请求类型选择不同的消息 message = "实时告警" if is_real_data else "测试告警" # 立即触发第一次告警 if not is_real_data: # 手动触发告警并传递测试告警值 show_alarm_notification(ALARM_THRESHOLD + 100, is_test=True, alarm_type="测试告警") else: # 从API获取当前真实的小时总量 try: breeze_hourly = read_data_file('breeze_hourly.json') or {'stats': {'weighted_total': 0}} cms_hourly = read_data_file('cms_hourly.json') or {'stats': {'weighted_total': 0}} total_hourly = 0 if 'stats' in breeze_hourly and 'weighted_total' in breeze_hourly['stats']: total_hourly += breeze_hourly['stats']['weighted_total'] if 'stats' in cms_hourly and 'weighted_total' in cms_hourly['stats']: total_hourly += cms_hourly['stats']['weighted_total'] # 只有在实际超过阈值时才显示通知 if total_hourly >= ALARM_THRESHOLD: show_alarm_notification(total_hourly, is_test=False, alarm_type="实时告警") except Exception as e: log(f"Error getting real data for alarm: {str(e)}") # 设置告警状态,启用连续告警 with alarm_lock: now = datetime.now() alarm_status['last_alarm_time'] = now alarm_status['is_alarming'] = True alarm_status['alarm_type'] = message # 保存告警类型 # 设置告警计数为1,开始计数 alarm_status['alarm_count'] = 1 # 创建后台线程发送连续告警 if not is_real_data: # 仅对测试告警启用连续通知 threading.Thread(target=lambda: send_sequential_test_alarms("测试告警")).start() log(f"{message} triggered by user") return jsonify({ 'success': True, 'message': f'{message} successful' }) except Exception as e: log("Error in test_alarm route: " + str(e)) return jsonify({'success': False, 'message': 'Failed to trigger alarm: ' + str(e)}), 500 @app.route('/api/get-coefficients') def get_coefficients(): """获取CMS系数配置""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) # 从CMS系数配置文件读取 try: script_dir = os.path.dirname(os.path.abspath(__file__)) coefficients_file = os.path.join(script_dir, 'cms_coefficients.json') if not os.path.exists(coefficients_file): # 如果配置文件不存在,返回默认系数 default_coefficients = { 'comment': 0.55, 'feed': 1.54, 'complaint': 5.4 } return jsonify({ 'success': True, 'data': default_coefficients, 'message': 'Using default coefficients' }) with open(coefficients_file, 'r', encoding='utf-8') as f: coefficients = json.load(f) return jsonify({ 'success': True, 'data': coefficients }) except Exception as e: log(f"Error reading coefficients file: {str(e)}") return jsonify({ 'success': False, 'message': f'Error reading coefficients: {str(e)}' }) except Exception as e: log("Error in get_coefficients route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 @app.route('/api/update-coefficients', methods=['POST']) def update_coefficients(): """更新CMS系数配置""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) # 获取请求中的系数 try: data = request.get_json() if not data: return jsonify({ 'success': False, 'message': 'No data provided' }) # 验证系数格式 required_keys = ['comment', 'feed', 'complaint'] for key in required_keys: if key not in data: return jsonify({ 'success': False, 'message': f'Missing required key: {key}' }) # 验证值是数字 try: data[key] = float(data[key]) except ValueError: return jsonify({ 'success': False, 'message': f'Value for {key} must be a number' }) # 确保只有必要的键 coefficients = { 'comment': data['comment'], 'feed': data['feed'], 'complaint': data['complaint'] } # 保存到配置文件 script_dir = os.path.dirname(os.path.abspath(__file__)) coefficients_file = os.path.join(script_dir, 'cms_coefficients.json') with open(coefficients_file, 'w', encoding='utf-8') as f: json.dump(coefficients, f, indent=4, ensure_ascii=False) log(f"CMS系数已更新: 评论={coefficients['comment']}, 动态={coefficients['feed']}, 举报={coefficients['complaint']}") # 使用命令行参数方式通知CMS监控进程更新系数 try: # 获取Python可执行文件路径 python_executable = sys.executable # 获取CMS监控脚本路径 cms_script = os.path.join(script_dir, "cms_monitor.py") # 获取环境变量 env = os.environ.copy() env['CMS_COOKIE'] = session.get('cms_cookie', '') env['CMS_USERNAME'] = session.get('username', '') # 启动进程更新系数 cmd = [ python_executable, cms_script, "--update-coefficients", str(coefficients['comment']), str(coefficients['feed']), str(coefficients['complaint']) ] subprocess.Popen( cmd, env=env, shell=False ) log("已发送系数更新命令给CMS监控进程") except Exception as e: log(f"发送系数更新命令失败: {str(e)}") return jsonify({ 'success': True, 'message': 'Coefficients updated successfully', 'data': coefficients }) except Exception as e: log(f"更新系数失败: {str(e)}") return jsonify({ 'success': False, 'message': f'Error updating coefficients: {str(e)}' }) except Exception as e: log("Error in update_coefficients route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 def send_sequential_test_alarms(alarm_type): """发送连续的测试告警通知""" try: # 等待10秒后发送第二次告警 time.sleep(10) with alarm_lock: if alarm_status['is_alarming'] and alarm_status['alarm_count'] < 3: alarm_status['alarm_count'] = 2 # 发送第二次告警 show_alarm_notification(ALARM_THRESHOLD + 120, is_test=True, alarm_type=alarm_type) log("Second test alarm notification sent") # 再等待10秒后发送第三次告警 time.sleep(10) with alarm_lock: if alarm_status['is_alarming'] and alarm_status['alarm_count'] < 3: alarm_status['alarm_count'] = 3 # 发送第三次告警 show_alarm_notification(ALARM_THRESHOLD + 150, is_test=True, alarm_type=alarm_type) log("Third test alarm notification sent") except Exception as e: log(f"Error in sequential test alarms: {str(e)}") @app.route('/api/get-breeze-coefficients') def get_breeze_coefficients(): """获取Breeze系数配置""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) # 从Breeze系数配置文件读取 try: script_dir = os.path.dirname(os.path.abspath(__file__)) coefficients_file = os.path.join(script_dir, 'breeze_coefficients.json') if not os.path.exists(coefficients_file): # 如果配置文件不存在,返回错误信息 return jsonify({ 'success': False, 'message': 'Breeze coefficients file not found' }) with open(coefficients_file, 'r', encoding='utf-8') as f: coefficients = json.load(f) return jsonify({ 'success': True, 'data': coefficients }) except Exception as e: log(f"Error reading Breeze coefficients file: {str(e)}") return jsonify({ 'success': False, 'message': f'Error reading coefficients: {str(e)}' }) except Exception as e: log("Error in get_breeze_coefficients route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 @app.route('/api/update-breeze-coefficients', methods=['POST']) def update_breeze_coefficients(): """更新Breeze系数配置""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) # 获取请求中的系数 try: data = request.get_json() if not data: return jsonify({ 'success': False, 'message': 'No data provided' }) # 验证系数格式 if not isinstance(data, dict): return jsonify({ 'success': False, 'message': 'Invalid data format, expected a dictionary' }) # 验证所有值是数字 for key, value in data.items(): try: data[key] = float(value) except ValueError: return jsonify({ 'success': False, 'message': f'Value for {key} must be a number' }) # 保存到配置文件 script_dir = os.path.dirname(os.path.abspath(__file__)) coefficients_file = os.path.join(script_dir, 'breeze_coefficients.json') with open(coefficients_file, 'w', encoding='utf-8') as f: json.dump(data, f, indent=4, ensure_ascii=False) log(f"Breeze系数已更新") # 使用命令行参数方式通知Breeze监控进程更新系数 try: # 获取Python可执行文件路径 python_executable = sys.executable # 获取Breeze监控脚本路径 breeze_script = os.path.join(script_dir, "breeze_monitor.py") # 获取环境变量 env = os.environ.copy() env['BREEZE_COOKIE'] = session.get('breeze_cookie', '') env['BREEZE_USERNAME'] = session.get('username', '') # 启动进程更新系数 - 一次只更新一个系数 for coefficient_type, coefficient_value in data.items(): cmd = [ python_executable, breeze_script, "--update-coefficients", coefficient_type, str(coefficient_value) ] subprocess.Popen( cmd, env=env, shell=False ) log("已发送系数更新命令给Breeze监控进程") except Exception as e: log(f"发送系数更新命令失败: {str(e)}") return jsonify({ 'success': True, 'message': 'Breeze coefficients updated successfully', 'data': data }) except Exception as e: log(f"更新Breeze系数失败: {str(e)}") return jsonify({ 'success': False, 'message': f'Error updating coefficients: {str(e)}' }) except Exception as e: log("Error in update_breeze_coefficients route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 @app.route('/api/acknowledge-alarm') def acknowledge_alarm(): """确认并重置告警状态""" try: with alarm_lock: # 完全重置告警状态 alarm_status['is_alarming'] = False alarm_status['alarm_count'] = 0 alarm_status['last_alarm_time'] = None # 记录告警已被确认 log("告警已被用户确认并重置") return jsonify({ 'success': True, 'message': 'Alarm has been acknowledged and reset' }) except Exception as e: log("Error in acknowledge_alarm route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 def is_port_available(port): """检查端口是否可用""" try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(1) result = s.connect_ex(('127.0.0.1', port)) return result != 0 except Exception as e: log(f"检查端口可用性出错: {str(e)}") return False def free_port(port): """尝试释放被占用的端口""" try: if sys.platform.startswith('win'): # Windows系统 os.system(f'for /f "tokens=5" %a in (\'netstat -ano ^| findstr :{port}\') do taskkill /f /pid %a') return True else: # Linux/Mac系统 os.system(f"kill $(lsof -t -i:{port})") return True except Exception as e: log(f"释放端口时出错: {str(e)}") return False def find_available_port(start_port, end_port): """查找可用端口""" for port in range(start_port, end_port + 1): if is_port_available(port): return port return None @app.route('/api/check-cms-daily', methods=['POST']) def check_cms_daily_data(): """立即检查CMS每日数据""" if 'user' not in session: return jsonify({ 'success': False, 'message': '请先登录系统' }) try: # 获取后端类型 backend_type = session.get('backend_type', 'breeze_monitor') # 准备环境变量 env = os.environ.copy() env['CMS_COOKIE'] = session.get('cms_cookie', '') env['CMS_USERNAME'] = session.get('username', '') # 根据后端类型选择监控脚本 if backend_type == 'breeze_monitor_CHAT': monitor_script = 'breeze_monitor_CHAT.py' else: monitor_script = 'breeze_monitor.py' # 设置--check-daily参数运行监控脚本 cmd = [sys.executable, os.path.join(os.path.dirname(os.path.abspath(__file__)), monitor_script), '--check-daily'] # 运行子进程 process = subprocess.Popen( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # 等待进程完成 stdout, stderr = process.communicate(timeout=60) if process.returncode == 0: app.logger.info('成功执行CMS每日数据检查') return jsonify({ 'success': True, 'message': 'CMS每日数据检查已执行' }) else: app.logger.error(f'CMS每日数据检查失败: {stderr.decode("utf-8", errors="ignore")}') return jsonify({ 'success': False, 'message': f'CMS每日数据检查失败: {stderr.decode("utf-8", errors="ignore")}' }) except subprocess.TimeoutExpired: app.logger.error('CMS每日数据检查超时') return jsonify({ 'success': False, 'message': 'CMS每日数据检查超时,请检查系统日志' }) except Exception as e: app.logger.error(f'执行CMS每日数据检查时发生错误: {str(e)}') return jsonify({ 'success': False, 'message': f'执行CMS每日数据检查时发生错误: {str(e)}' }) @app.route('/api/check-cms-hourly', methods=['POST']) def check_cms_hourly_data(): """立即检查CMS当前小时数据""" if 'user' not in session: return jsonify({ 'success': False, 'message': '请先登录系统' }) try: # 获取后端类型 backend_type = session.get('backend_type', 'breeze_monitor') # 准备环境变量 env = os.environ.copy() env['CMS_COOKIE'] = session.get('cms_cookie', '') env['CMS_USERNAME'] = session.get('username', '') # 根据后端类型选择监控脚本 if backend_type == 'breeze_monitor_CHAT': monitor_script = 'breeze_monitor_CHAT.py' else: monitor_script = 'breeze_monitor.py' # 设置--check-hourly参数运行监控脚本 cmd = [sys.executable, os.path.join(os.path.dirname(os.path.abspath(__file__)), monitor_script), '--check-hourly'] # 运行子进程 process = subprocess.Popen( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # 等待进程完成 stdout, stderr = process.communicate(timeout=60) if process.returncode == 0: app.logger.info('成功执行CMS当前小时数据检查') return jsonify({ 'success': True, 'message': 'CMS当前小时数据检查已执行' }) else: app.logger.error(f'CMS当前小时数据检查失败: {stderr.decode("utf-8", errors="ignore")}') return jsonify({ 'success': False, 'message': f'CMS当前小时数据检查失败: {stderr.decode("utf-8", errors="ignore")}' }) except subprocess.TimeoutExpired: app.logger.error('CMS当前小时数据检查超时') return jsonify({ 'success': False, 'message': 'CMS当前小时数据检查超时,请检查系统日志' }) except Exception as e: app.logger.error(f'执行CMS当前小时数据检查时发生错误: {str(e)}') return jsonify({ 'success': False, 'message': f'执行CMS当前小时数据检查时发生错误: {str(e)}' }) @app.route('/api/check-version-service') def check_version_service(): """检查版本检测服务的可用性""" try: log("正在检查版本检测服务可用性...") # 测试连接 try: response = requests.get(VERSION_CHECK_URL, timeout=5) log(f"版本检测服务响应状态码: {response.status_code}") service_status = { 'url': VERSION_CHECK_URL, 'is_available': response.status_code == 200, 'status_code': response.status_code, 'response_time': response.elapsed.total_seconds(), 'content_length': len(response.content) if response.status_code == 200 else 0, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } if response.status_code == 200: service_status['content'] = response.text.strip() log(f"版本检测服务正常,返回内容: {service_status['content']}") else: log(f"版本检测服务异常,HTTP状态码: {response.status_code}") return jsonify({ 'success': True, 'data': service_status }) except requests.exceptions.Timeout: log("版本检测服务连接超时") return jsonify({ 'success': False, 'message': '版本检测服务连接超时', 'data': { 'url': VERSION_CHECK_URL, 'is_available': False, 'error': 'timeout', 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } }) except requests.exceptions.ConnectionError: log("无法连接到版本检测服务") return jsonify({ 'success': False, 'message': '无法连接到版本检测服务', 'data': { 'url': VERSION_CHECK_URL, 'is_available': False, 'error': 'connection_error', 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } }) except Exception as e: log(f"检查版本检测服务时发生未知错误: {str(e)}") return jsonify({ 'success': False, 'message': f'检查版本检测服务时发生错误: {str(e)}', 'data': { 'url': VERSION_CHECK_URL, 'is_available': False, 'error': 'unknown', 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } }) except Exception as e: log(f"版本检测服务检查接口发生错误: {str(e)}") return jsonify({ 'success': False, 'message': '版本检测服务检查失败', 'error': str(e) }), 500 @app.route('/api/update-system', methods=['POST']) def update_system(): """处理系统更新请求""" try: # 获取桌面路径和快捷方式路径 desktop_path = os.path.join(os.path.expanduser('~'), 'Desktop') temp_dir = 'monitor_temp' temp_path = os.path.join(desktop_path, temp_dir) shortcut_path = os.path.join(temp_path, '网易大神实时审核数据监控看板一键安装.lnk') # 下载快捷方式 shortcut_url = 'http://cos.ui-beam.com/work_scripts/monitor/%E7%BD%91%E6%98%93%E5%A4%A7%E7%A5%9E%E5%AE%A1%E6%A0%B8%E6%95%B0%E6%8D%AE%E7%9B%91%E6%8E%A7%E7%9C%8B%E6%9D%BF%E4%B8%80%E9%94%AE%E5%AE%89%E8%A3%85.lnk' log(f"开始下载更新文件: {shortcut_url}") # 获取系统代理设置 proxies = { 'http': os.environ.get('HTTP_PROXY', ''), 'https': os.environ.get('HTTPS_PROXY', '') } # 首先尝试不使用代理直接下载 try: log("尝试直接下载更新文件...") response = requests.get(shortcut_url, timeout=30, proxies={'http': None, 'https': None}, verify=False) if response.status_code == 200: with open(shortcut_path, 'wb') as f: f.write(response.content) log("更新文件下载成功") else: raise requests.exceptions.RequestException(f"下载失败,HTTP状态码: {response.status_code}") except (requests.exceptions.RequestException, IOError) as e: log(f"直接下载失败: {str(e)},尝试使用系统代理...") # 如果直接下载失败,尝试使用系统代理 try: response = requests.get(shortcut_url, timeout=30, proxies=proxies, verify=False) if response.status_code == 200: with open(shortcut_path, 'wb') as f: f.write(response.content) log("使用代理下载更新文件成功") elif response.status_code == 407: log("代理需要认证,尝试最后一次直接下载...") # 最后一次尝试直接下载 response = requests.get(shortcut_url, timeout=30, proxies={'http': None, 'https': None}, verify=False) if response.status_code == 200: with open(shortcut_path, 'wb') as f: f.write(response.content) log("最后尝试下载成功") else: raise requests.exceptions.RequestException(f"所有下载尝试都失败,最后状态码: {response.status_code}") else: raise requests.exceptions.RequestException(f"使用代理下载失败,HTTP状态码: {response.status_code}") except Exception as proxy_error: log(f"使用代理下载失败: {str(proxy_error)}") return jsonify({ 'success': False, 'message': f'下载更新文件失败: {str(proxy_error)}' }), 500 # 检查文件是否成功下载 if not os.path.exists(shortcut_path): return jsonify({ 'success': False, 'message': '更新文件下载成功但未找到文件' }), 500 # 启动快捷方式 try: log("正在启动更新程序...") subprocess.Popen(['cmd', '/c', 'start', '', shortcut_path], shell=True) log("更新程序启动成功") return jsonify({ 'success': True, 'message': '更新程序已启动' }) except Exception as e: log(f"启动更新程序失败: {str(e)}") return jsonify({ 'success': False, 'message': f'启动更新程序失败: {str(e)}' }), 500 except Exception as e: log(f"系统更新过程中发生错误: {str(e)}") return jsonify({ 'success': False, 'message': f'系统更新失败: {str(e)}' }), 500 def main(): """主函数""" try: print("\n" + "="*50) print("网易大神审核数据监控服务启动中...") print("="*50) print("\n[系统状态]") print("1. 正在初始化监控服务...") # 初始化版本文件 timestamp_version = ensure_version_file() print(f"2. 当前系统版本: {VERSION}") # 检查并安装依赖 print("3. 正在检查系统依赖...") check_and_install_dependencies() # 启动版本检测线程 print("4. 启动版本检测服务...") version_thread = threading.Thread(target=monitor_version_thread) version_thread.daemon = True version_thread.start() # 获取端口 port = 8000 # 检查端口是否被占用 if not is_port_available(port): print(f"5. 端口 {port} 已被占用,尝试释放...") free_port(port) # 再次检查端口 if not is_port_available(port): port = find_available_port(8001, 8100) print(f" - 使用替代端口 {port}") else: print(f"5. 端口 {port} 可用") print("\n[启动完成]") print(f"* 监控系统已启动,请打开浏览器访问: http://localhost:{port}") print("* 等待用户登录...") print("\n" + "!"*50) print("! 警告:请勿关闭此窗口 !") print("! 此窗口是系统监控的核心进程 !") print("! 一旦关闭,所有监控服务将停止工作 !") print("!"*50 + "\n") # 在新线程中启动浏览器 threading.Thread(target=open_browser).start() try: # 使用 SocketIO 启动服务器 socketio.run(app, debug=False, host='0.0.0.0', port=port) except Exception as e: log(f"启动服务器出错: {str(e)}") finally: # 关闭所有子进程 for name, process in processes.items(): if process and process.poll() is None: try: process.terminate() log(f"已关闭 {name} 监控进程") except Exception as e: log(f"关闭 {name} 监控进程失败: {str(e)}") # 清理临时文件 script_dir = os.path.dirname(os.path.abspath(__file__)) data_files = ['breeze_daily.json', 'breeze_hourly.json', 'cms_daily.json', 'cms_hourly.json'] for file_name in data_files: file_path = os.path.join(script_dir, file_name) for attempt in range(3): # 最多重试3次 try: if os.path.exists(file_path): os.remove(file_path) log(f"已删除共享数据文件: {file_name}") break except Exception as e: log(f"删除文件 {file_name} 失败: {str(e)},第{attempt+1}次重试") time.sleep(1) log("监控系统已关闭") except Exception as e: log("Error in main function: " + str(e)) # 信号处理函数 def signal_handler(sig, frame): """处理信号,用于优雅退出""" log("收到退出信号,正在关闭系统...") global shutdown_flag shutdown_flag = True # 通过os._exit强制终止进程 os._exit(0) @app.route('/api/inspect/stats') def get_inspect_stats(): """获取CC审核平台统计数据""" try: stats = load_inspect_stats() if not stats: return jsonify({ 'hourly': {'total': 0, 'weighted_total': 0}, 'daily': {'total': 0, 'weighted_total': 0} }) # 获取最新的统计数据 latest_stats = stats.get('stats', []) if not latest_stats: return jsonify({ 'hourly': {'total': 0, 'weighted_total': 0}, 'daily': {'total': 0, 'weighted_total': 0} }) # 获取最新的一条记录 latest = latest_stats[-1] return jsonify({ 'hourly': { 'total': latest.get('total', 0), 'weighted_total': latest.get('weighted_total', 0) }, 'daily': { 'total': latest.get('daily_total', 0), 'weighted_total': latest.get('daily_weighted_total', 0) } }) except Exception as e: logging.error(f"获取CC审核平台统计数据时出错: {str(e)}") return jsonify({ 'hourly': {'total': 0, 'weighted_total': 0}, 'daily': {'total': 0, 'weighted_total': 0} }) def load_inspect_stats(): """加载CC审核平台统计数据""" try: stats_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'inspect_stats.json') if not os.path.exists(stats_file): return None with open(stats_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logging.error(f"加载CC审核平台统计数据时出错: {str(e)}") return None def terminate_existing_processes(): """终止现有的监控进程""" try: terminated_pids = [] # 获取当前目录下的所有Python进程 current_dir = os.path.dirname(os.path.abspath(__file__)) monitor_scripts = ['breeze_monitor.py', 'breeze_monitor_CHAT.py', 'cms_monitor.py', 'inspect_monitor.py'] for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: # 检查进程是否在运行监控脚本 if proc.info['name'] == 'python.exe' and proc.info['cmdline']: cmdline = ' '.join(proc.info['cmdline']) if any(script in cmdline for script in monitor_scripts): # 先尝试正常终止 proc.terminate() terminated_pids.append(proc.pid) logging.info(f"已发送终止信号到进程: {proc.pid} - {cmdline}") except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue # 等待进程终止,最多等待10秒 wait_start = time.time() while time.time() - wait_start < 10: still_alive = [] for pid in terminated_pids: try: proc = psutil.Process(pid) still_alive.append(pid) except psutil.NoSuchProcess: continue if not still_alive: break # 对于仍在运行的进程,强制结束 if time.time() - wait_start > 5: # 如果等待超过5秒,强制结束 for pid in still_alive: try: proc = psutil.Process(pid) proc.kill() logging.info(f"强制终止进程: {pid}") except (psutil.NoSuchProcess, psutil.AccessDenied): continue time.sleep(0.5) # 最后再检查一次 for pid in terminated_pids: try: proc = psutil.Process(pid) proc.kill() # 最后一次尝试强制结束 logging.warning(f"进程仍在运行,强制终止: {pid}") except psutil.NoSuchProcess: continue except Exception as e: logging.error(f"终止进程 {pid} 时发生错误: {str(e)}") # 确保完全等待 time.sleep(2) return True except Exception as e: logging.error(f"终止进程时发生错误: {str(e)}") return False @app.route('/api/get_current_stats') def get_current_stats(): """获取当前统计数据""" try: stats = { 'weighted_total': 0, 'categories': {} } # 读取并合并所有监控数据 for monitor_type in ['breeze', 'cms', 'inspect']: hourly_file = f'{monitor_type}_hourly.json' if os.path.exists(hourly_file): try: with open(hourly_file, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, dict): if 'stats' in data: monitor_stats = data['stats'] if isinstance(monitor_stats, dict): if 'weighted_total' in monitor_stats: stats['weighted_total'] += float(monitor_stats['weighted_total']) if 'categories' in monitor_stats: stats['categories'].update(monitor_stats['categories']) except Exception as e: app.logger.error(f"读取{monitor_type}数据失败: {str(e)}") return jsonify({ 'success': True, 'stats': stats, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) except Exception as e: app.logger.error(f"获取统计数据失败: {str(e)}") return jsonify({ 'success': False, 'message': str(e) }) if __name__ == "__main__": # 注册信号处理器 signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # 执行主函数 main()