#!/usr/bin/env python # -*- coding: utf-8 -*- import os import json import time import subprocess import threading import sys from datetime import datetime, timedelta import logging import webbrowser from flask import Flask, render_template, request, jsonify, session, redirect, url_for, make_response, send_from_directory, flash import threading from threading import Lock import signal import socket import requests import re from flask_socketio import SocketIO # 添加WebSocket支持 import hashlib # 全局应用状态变量 shutdown_flag = False # 控制应用程序关闭的标志 # 初始化 SocketIO socketio = SocketIO() # 常量定义:警报阈值和系统版本 ALARM_THRESHOLD = 2750 # 系统版本固定在代码中 VERSION = "v20250412034434-dev" # 系统版本号 - 这是固定的本地版本 # 获取或创建版本文件,格式为"当前时间(v版本号)",例如"20250408125815(v1.1)"。 def ensure_version_file(): """确保版本文件存在,写入固定版本号""" try: version_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'VERSION.txt') # 使用固定版本号 version_content = VERSION # 写入版本文件 with open(version_file, 'w') as f: f.write(version_content) logging.info(f"更新版本文件: {version_content}") return version_content except Exception as e: logging.error(f"处理版本文件时出错: {e}") return VERSION # 返回固定版本号 # 获取线上版本进行比较 def get_online_version(): """获取线上版本信息""" try: log(f"正在从 {VERSION_CHECK_URL} 获取线上版本信息...") # 获取系统代理设置 proxies = { 'http': os.environ.get('HTTP_PROXY', ''), 'https': os.environ.get('HTTPS_PROXY', '') } # 如果没有代理设置,尝试不使用代理直接访问 if not any(proxies.values()): log("未检测到系统代理设置,尝试直接连接...") response = requests.get(VERSION_CHECK_URL, timeout=5, verify=False) else: log(f"使用系统代理设置: {proxies}") response = requests.get(VERSION_CHECK_URL, timeout=5, proxies=proxies, verify=False) log(f"获取线上版本响应状态码: {response.status_code}") if response.status_code == 200: version = response.text.strip() log(f"成功获取线上版本: {version}") return version elif response.status_code == 407: log("需要代理认证,尝试不使用代理直接访问...") # 尝试不使用代理直接访问 response = requests.get(VERSION_CHECK_URL, timeout=5, proxies={'http': None, 'https': None}, verify=False) if response.status_code == 200: version = response.text.strip() log(f"直接访问成功获取线上版本: {version}") return version else: log(f"直接访问失败,HTTP状态码: {response.status_code}") return None else: log(f"获取线上版本失败,HTTP状态码: {response.status_code}") return None except requests.exceptions.Timeout: log("获取线上版本超时") return None except requests.exceptions.ConnectionError: log("连接线上版本服务器失败,尝试不使用代理直接访问...") try: # 尝试不使用代理直接访问 response = requests.get(VERSION_CHECK_URL, timeout=5, proxies={'http': None, 'https': None}, verify=False) if response.status_code == 200: version = response.text.strip() log(f"直接访问成功获取线上版本: {version}") return version else: log(f"直接访问失败,HTTP状态码: {response.status_code}") return None except Exception as e: log(f"直接访问失败: {str(e)}") return None except Exception as e: log(f"获取线上版本时发生未知错误: {str(e)}") return None VERSION_CHECK_URL = "http://cos.ui-beam.com/work_scripts/monitor/dev/VERSION.txt" # 测试版本地址 # VERSION_CHECK_URL = "http://cos.ui-beam.com/work_scripts/monitor/releases/VERSION.txt" # 正式版本地址 version_status = { 'last_check_time': None, 'online_version': None, 'has_update': False } version_lock = Lock() # 版本检测线程标志 version_check_thread_running = False # 检查并安装依赖 def check_and_install_dependencies(): try: # 检查plyer库是否安装 try: import plyer except ImportError: log("正在安装plyer库...") subprocess.check_call([sys.executable, "-m", "pip", "install", "plyer"]) log("plyer库安装成功") except Exception as e: log(f"依赖检查/安装过程中出错: {str(e)}") # 配置日志 def setup_logging(): """设置日志系统""" try: log_file = os.path.join('logs', f'dashboard_{datetime.now().strftime("%Y%m%d")}.log') logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler(log_file, encoding='utf-8'), logging.StreamHandler() ] ) return True except Exception as e: print(f"设置日志系统失败: {str(e)}") return False # 禁用 Flask 的访问日志 logging.getLogger('werkzeug').setLevel(logging.ERROR) # 创建Flask应用 app = Flask(__name__) app.secret_key = 'netease-dashboard-secret-key' # 用于session加密 app.permanent_session_lifetime = timedelta(days=30) # 设置会话有效期为30天 socketio.init_app(app) # 初始化 SocketIO # 告警阈值 ALARM_THRESHOLD = 1900 # 两个系统折算总和的告警阈值 alarm_status = { 'last_alarm_time': None, 'alarm_count': 0, 'is_alarming': False, 'alarm_type': None } alarm_lock = Lock() # 进程对象 processes = { 'breeze': None, 'cms': None } def log(message): """记录日志""" try: logging.info(message) except Exception as e: print("Log error: " + str(e)) print("Original message: " + message) def start_monitor_processes(): """启动监控进程""" try: script_dir = os.path.dirname(os.path.abspath(__file__)) # 获取会话中的凭据 breeze_cookie = session.get('breeze_cookie', '') cms_cookie = session.get('cms_cookie', '') username = session.get('username', '') # 获取Python可执行文件路径 python_executable = sys.executable # 启动breeze监控 if processes['breeze'] is None or processes['breeze'].poll() is not None: breeze_script = os.path.join(script_dir, "breeze_monitor.py") # 在命令行参数中传递环境变量 cmd = [python_executable, breeze_script] env = os.environ.copy() env['BREEZE_COOKIE'] = breeze_cookie env['BREEZE_USERNAME'] = username processes['breeze'] = subprocess.Popen( cmd, env=env, shell=False ) log(f"Breeze监控进程已启动, PID: {processes['breeze'].pid}") # 启动cms监控 if processes['cms'] is None or processes['cms'].poll() is not None: cms_script = os.path.join(script_dir, "cms_monitor.py") # 在命令行参数中传递环境变量 cmd = [python_executable, cms_script] env = os.environ.copy() env['CMS_COOKIE'] = cms_cookie env['CMS_USERNAME'] = username processes['cms'] = subprocess.Popen( cmd, env=env, shell=False ) log(f"CMS监控进程已启动, PID: {processes['cms'].pid}") return True except Exception as e: log(f"启动监控进程失败: {str(e)}") return False def add_no_cache_headers(response): """添加禁止缓存的响应头""" response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '0' return response @app.after_request def after_request(response): """每个响应添加禁止缓存的头部""" return add_no_cache_headers(response) @app.route('/favicon.ico') def favicon(): """提供网站图标""" try: script_dir = os.path.dirname(os.path.abspath(__file__)) static_path = os.path.join(script_dir, 'static') return send_from_directory(static_path, 'dashboard-favicon.ico', mimetype='image/vnd.microsoft.icon') except Exception as e: log("Failed to serve favicon: " + str(e)) return '', 204 @app.route('/') def index(): """首页路由""" if not session.get('logged_in'): return redirect(url_for('login')) staff_name = session.get('staff_name', '') staff_id = session.get('username', '') # 获取工号 # 添加登录成功的提示消息 message = None if session.get('just_logged_in'): # 不再在这里设置消息,而是在前端处理 session['just_logged_in'] = False # 清除标记,确保消息只显示一次 return render_template('dashboard.html', staff_name=f"{staff_name}({staff_id})", # 修改显示格式,添加工号 message=message, version=VERSION) def get_staff_name(staff_id): """根据工号获取员工姓名""" try: # 从在线链接获取staff.json url = "http://cos.ui-beam.com/work_scripts/monitor/config/staff.json" # 尝试不使用代理直接获取 try: response = requests.get(url, timeout=5, proxies={'http': None, 'https': None}, verify=False) except: # 如果直接获取失败,尝试使用系统代理 proxies = { 'http': os.environ.get('HTTP_PROXY', ''), 'https': os.environ.get('HTTPS_PROXY', '') } response = requests.get(url, timeout=5, proxies=proxies, verify=False) if response.status_code == 200: staff_data = response.json() return staff_data.get(staff_id) else: log(f"获取在线staff.json失败,HTTP状态码: {response.status_code}") return None except Exception as e: log(f"获取员工姓名失败: {str(e)}") return None @app.route('/login', methods=['GET', 'POST']) def login(): """登录路由""" if request.method == 'POST': staff_id = request.form.get('username', '').strip() # 从username字段获取工号 breeze_cookie = request.form.get('breeze_cookie', '').strip() # 获取Breeze Cookie cms_cookie = request.form.get('cms_cookie', '').strip() # 获取CMS Cookie staff_name = get_staff_name(staff_id) if staff_name: session['logged_in'] = True session['staff_name'] = staff_name session['username'] = staff_id # 保存工号 session['breeze_cookie'] = breeze_cookie # 保存Breeze Cookie session['cms_cookie'] = cms_cookie # 保存CMS Cookie session['just_logged_in'] = True # 添加标记,用于显示欢迎消息 # 添加详细的系统启动日志 current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"\n{current_time} - INFO - Breeze监控日志系统初始化成功") print(f"{current_time} - INFO - CMS监控日志系统初始化成功") print(f"{current_time} - INFO - 监控服务启动") print(f"{current_time} - INFO - 初始化监控服务...") print(f"{current_time} - INFO - 初始化用户凭据...") print(f"{current_time} - INFO - [Breeze] 已从环境变量加载用户凭据: {staff_id}") print(f"{current_time} - INFO - [CMS] 已从环境变量加载用户凭据: {staff_id}") print(f"{current_time} - INFO - 加载系数配置...") # 加载并显示 Breeze 系数 try: script_dir = os.path.dirname(os.path.abspath(__file__)) breeze_coefficients_file = os.path.join(script_dir, 'breeze_coefficients.json') if os.path.exists(breeze_coefficients_file): with open(breeze_coefficients_file, 'r', encoding='utf-8') as f: breeze_coefficients = json.load(f) print(f"{current_time} - INFO - [Breeze] 从配置文件加载系数: {breeze_coefficients}") print(f"{current_time} - INFO - [Breeze] 当前使用的系数已更新") except Exception as e: print(f"{current_time} - ERROR - [Breeze] 加载系数配置失败: {str(e)}") # 加载并显示 CMS 系数 try: cms_coefficients_file = os.path.join(script_dir, 'cms_coefficients.json') if os.path.exists(cms_coefficients_file): with open(cms_coefficients_file, 'r', encoding='utf-8') as f: cms_coefficients = json.load(f) print(f"{current_time} - INFO - [CMS] 从配置文件加载系数: {cms_coefficients}") print(f"{current_time} - INFO - [CMS] 当前使用的系数: 评论={cms_coefficients['comment']}, 动态={cms_coefficients['feed']}, 举报={cms_coefficients['complaint']}") except Exception as e: print(f"{current_time} - ERROR - [CMS] 加载系数配置失败: {str(e)}") # 启动监控进程 start_monitor_processes() print(f"{current_time} - INFO - Breeze监控线程启动") print(f"{current_time} - INFO - CMS监控线程启动") # 返回成功响应,让前端显示欢迎消息 return jsonify({ 'success': True, 'staff_name': staff_name, 'redirect': url_for('index') }) else: return jsonify({ 'success': False, 'message': f'您的账号 ({staff_id}) 尚未在系统上注册或为不可用状态,无法登录,请联系系统管理员 (od0269@mail.go.com) 协助' }) return render_template('login.html') @app.route('/logout') def logout(): """登出""" try: # 记录登出用户 username = session.get('username', '未知用户') log("用户 [" + username + "] 登出系统") # 清除会话 session.clear() # 标记系统即将退出 global shutdown_flag shutdown_flag = True # 尝试停止监控进程 if processes['breeze'] and processes['breeze'].poll() is None: try: processes['breeze'].terminate() log("已停止Breeze监控进程") except Exception as e: log("停止Breeze监控进程失败: " + str(e)) if processes['cms'] and processes['cms'].poll() is None: try: processes['cms'].terminate() log("已停止CMS监控进程") except Exception as e: log("停止CMS监控进程失败: " + str(e)) # 删除共享数据文件 try: script_dir = os.path.dirname(os.path.abspath(__file__)) data_files = ['breeze_daily.json', 'breeze_hourly.json', 'cms_daily.json', 'cms_hourly.json'] for file_name in data_files: file_path = os.path.join(script_dir, file_name) if os.path.exists(file_path): os.remove(file_path) log(f"已删除共享数据文件: {file_name}") log("成功清理所有共享数据文件") except Exception as e: log(f"删除共享数据文件时出错: {str(e)}") # 在一个后台线程中等待几秒后退出应用程序 def shutdown_app(): time.sleep(3) # 等待3秒 log("用户登出,系统正在完全关闭...") # 关闭所有子进程和当前进程 if sys.platform.startswith('win'): # 在Windows系统下,强制关闭当前进程及所有子进程 try: # 获取当前进程PID current_pid = os.getpid() # 使用taskkill命令强制结束进程树 subprocess.run(f'taskkill /F /T /PID {current_pid}', shell=True) except Exception as e: log(f"关闭进程时出错: {str(e)}") else: # 非Windows系统使用标准退出方式 os._exit(0) # 强制关闭整个程序 # 启动关闭线程 threading.Thread(target=shutdown_app).start() # 返回包含关闭倒计时的页面 return """ 正在关闭系统...

正在关闭系统

数据清理完毕,系统即将退出,请手动关闭浏览器页面和控制台

3
""" except Exception as e: log("Error in logout route: " + str(e)) return "Logout error. Please check logs.", 500 def read_data_file(filename): """读取数据文件""" try: script_dir = os.path.dirname(os.path.abspath(__file__)) file_path = os.path.join(script_dir, filename) if not os.path.exists(file_path): return None with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) return data except Exception as e: log("Failed to read file {0}: {1}".format(filename, str(e))) return None def check_version(): """检查系统版本并与在线版本比较""" global version_check_thread_running try: log("开始检查系统版本更新...") with version_lock: version_status['last_check_time'] = datetime.now() # 确保VERSION.txt文件存在且内容与VERSION一致 local_version = ensure_version_file() log(f"当前本地版本: {local_version}") # 获取在线版本 online_version = get_online_version() log(f"获取到的在线版本: {online_version}") if online_version: with version_lock: version_status['online_version'] = online_version # 比较版本号,只在线上版本比本地版本新时才提示更新 local_timestamp = int(local_version[1:].split('-')[0]) # 去掉'v'前缀并转换为数字 online_timestamp = int(online_version[1:].split('-')[0]) # 去掉'v'前缀并转换为数字 version_status['has_update'] = online_timestamp > local_timestamp log(f"版本比较结果 - 在线版本: {online_version}, 本地版本: {local_version}, 是否有更新: {version_status['has_update']}") # 如果有更新且没有正在运行的线程,启动告警线程 if version_status['has_update'] and not version_check_thread_running: version_check_thread_running = True threading.Thread(target=notify_version_update).start() return { 'current_version': VERSION, 'online_version': online_version, 'has_update': version_status['has_update'], 'last_check_time': version_status['last_check_time'].strftime('%Y-%m-%d %H:%M:%S') if version_status['last_check_time'] else None } else: log("无法获取在线版本,返回本地版本信息") return { 'current_version': VERSION, 'online_version': None, 'has_update': False, 'last_check_time': version_status['last_check_time'].strftime('%Y-%m-%d %H:%M:%S') if version_status['last_check_time'] else None } except Exception as e: log(f"版本检查过程中发生错误: {str(e)}") return { 'current_version': VERSION, 'online_version': None, 'has_update': False, 'last_check_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'error': str(e) } finally: # 确保线程标志被重置 if version_check_thread_running: version_check_thread_running = False def notify_version_update(): """通知用户有新版本可用""" try: # 确保VERSION.txt文件存在 ensure_version_file() with version_lock: online_version = version_status['online_version'] if online_version: message = f"系统有新版本可用: {online_version},当前版本: {VERSION}" log(message) try: # 发送桌面通知 from plyer import notification notification.notify( title="系统版本更新", message=message + "\n点击'检测更新'按钮了解详情", app_icon=None, timeout=10, toast=False ) # 通过WebSocket通知前端弹出版本检测窗口 socketio.emit('version_update', { 'type': 'show_version_dialog', 'current_version': VERSION, 'online_version': online_version }) except Exception as e: log(f"显示桌面通知失败: {str(e)}") except Exception as e: log(f"通知版本更新失败: {str(e)}") finally: # 重置线程运行标志 global version_check_thread_running version_check_thread_running = False def monitor_version_thread(): """版本检测后台线程""" log("版本监控线程启动") while True: try: # 每30分钟检查一次版本更新 check_version() time.sleep(1800) # 30分钟 except Exception as e: log(f"版本监控线程异常: {str(e)}") time.sleep(600) # 出错后等待10分钟重试 @app.route('/api/get-version') def get_version(): """获取系统版本信息""" try: # 确保VERSION.txt文件存在 ensure_version_file() with version_lock: status = { 'current_version': VERSION, 'local_version': VERSION, # 固定的本地版本号 'online_version': version_status['online_version'], 'has_update': version_status['has_update'], 'last_check_time': version_status['last_check_time'].strftime('%Y-%m-%d %H:%M:%S') if version_status['last_check_time'] else None } return jsonify({ 'success': True, 'data': status }) except Exception as e: log("Error in get_version route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 @app.route('/api/check-version') def check_version_api(): """手动检查版本更新的API""" try: result = check_version() if result: return jsonify({ 'success': True, 'data': result }) else: # 至少返回当前版本信息 return jsonify({ 'success': False, 'message': '获取在线版本信息失败', 'data': { 'current_version': VERSION, 'local_version': VERSION, 'online_version': None, 'has_update': False, 'last_check_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } }) except Exception as e: log("Error in check_version_api route: " + str(e)) return jsonify({'success': False, 'message': f'检测版本更新失败: {str(e)}'}), 500 @app.route('/api/get-stats') def get_stats(): """获取统计数据""" try: breeze_hourly = read_data_file('breeze_hourly.json') breeze_daily = read_data_file('breeze_daily.json') cms_hourly = read_data_file('cms_hourly.json') cms_daily = read_data_file('cms_daily.json') # 获取时间戳 breeze_hourly_time = datetime.strptime(breeze_hourly['timestamp'], '%Y-%m-%d %H:%M:%S').strftime('%H:%M:%S') if breeze_hourly else '' breeze_daily_time = datetime.strptime(breeze_daily['timestamp'], '%Y-%m-%d %H:%M:%S').strftime('%H:%M:%S') if breeze_daily else '' cms_hourly_time = datetime.strptime(cms_hourly['timestamp'], '%Y-%m-%d %H:%M:%S').strftime('%H:%M:%S') if cms_hourly else '' cms_daily_time = datetime.strptime(cms_daily['timestamp'], '%Y-%m-%d %H:%M:%S').strftime('%H:%M:%S') if cms_daily else '' # 构建返回数据 data = { 'breeze_hourly': { 'total': breeze_hourly['stats']['total'] if breeze_hourly else 0, 'weighted_total': round(breeze_hourly['stats']['weighted_total'], 2) if breeze_hourly else 0, 'categories': breeze_hourly['stats']['categories'] if breeze_hourly else {}, 'timestamp': breeze_hourly_time }, 'breeze_daily': { 'total': breeze_daily['stats']['total'] if breeze_daily else 0, 'weighted_total': round(breeze_daily['stats']['weighted_total'], 2) if breeze_daily else 0, 'categories': breeze_daily['stats']['categories'] if breeze_daily else {}, 'timestamp': breeze_daily_time }, 'cms_hourly': { 'stats': cms_hourly['stats']['stats'] if cms_hourly else {'comment': 0, 'feed': 0, 'complaint': 0}, 'weighted_total': round(cms_hourly['stats']['weighted_total'], 2) if cms_hourly else 0, 'total_count': cms_hourly['stats']['total_count'] if cms_hourly else 0, 'coefficients': cms_hourly['stats']['coefficients'] if cms_hourly else {'comment': 0.55, 'feed': 1.54, 'complaint': 5.4}, 'timestamp': cms_hourly_time }, 'cms_daily': { 'stats': cms_daily['stats']['stats'] if cms_daily else {'comment': 0, 'feed': 0, 'complaint': 0}, 'weighted_total': round(cms_daily['stats']['weighted_total'], 2) if cms_daily else 0, 'total_count': cms_daily['stats']['total_count'] if cms_daily else 0, 'coefficients': cms_daily['stats']['coefficients'] if cms_daily else {'comment': 0.55, 'feed': 1.54, 'complaint': 5.4}, 'timestamp': cms_daily_time } } # 计算总折算量 total_weighted = (data['breeze_hourly']['weighted_total'] + data['cms_hourly']['weighted_total']) return jsonify({ 'code': 0, 'data': data, 'total_weighted': round(total_weighted, 2) }) except Exception as e: log(f"获取统计数据时发生错误: {str(e)}") return jsonify({'code': 1, 'message': '获取统计数据失败'}) def check_alarm(total_weighted): """检查并触发告警""" try: with alarm_lock: now = datetime.now() # 如果最近30秒内已经告警过,增加计数但不重复告警 if alarm_status['last_alarm_time'] and (now - alarm_status['last_alarm_time']).total_seconds() < 30: return # 设置告警状态 alarm_status['last_alarm_time'] = now alarm_status['alarm_count'] += 1 alarm_status['is_alarming'] = True # 如果已经达到3次告警,不再继续 if alarm_status['alarm_count'] > 3: return # 启动线程播放告警声音和显示通知 # 将当前加权总值传递给函数 threading.Thread(target=lambda: show_alarm_notification(total_weighted)).start() log("Alarm triggered: Current weighted total {0:.2f}, exceeded threshold {1}".format(total_weighted, ALARM_THRESHOLD)) except Exception as e: log("Error in check_alarm: " + str(e)) def show_alarm_notification(total_weighted, is_test=False, alarm_type="实时告警"): """显示告警通知和播放告警声音""" try: # 播放系统告警声音 try: import winsound winsound.PlaySound("SystemExclamation", winsound.SND_ALIAS) except: # 如果winsound不可用,尝试使用beep print('\a') # 添加桌面通知 try: from plyer import notification # 计算超出阈值的百分比 over_percentage = ((total_weighted - ALARM_THRESHOLD) / ALARM_THRESHOLD) * 100 # 根据是否为测试调整标题 title = "测试告警" if is_test else "审核数量告警" # 根据是否为测试调整消息前缀 prefix = "[测试数据] " if is_test else "" notification.notify( title=title, message=f"{prefix}{alarm_type}当前小时加权总计:{total_weighted:.2f}\n阈值:{ALARM_THRESHOLD}\n超出:{over_percentage:.1f}%\n请立即检查数据监控看板!", app_icon=None, timeout=15, toast=False ) except Exception as e: log("Failed to show desktop notification: " + str(e)) except Exception as e: log("Failed to play alarm sound: " + str(e)) @app.route('/api/get-alarm-status') def get_alarm_status(): """获取告警状态""" try: with alarm_lock: status = { 'is_alarming': alarm_status['is_alarming'], 'alarm_count': alarm_status['alarm_count'], 'last_alarm_time': alarm_status['last_alarm_time'].strftime('%Y-%m-%d %H:%M:%S') if alarm_status['last_alarm_time'] else None, 'threshold': ALARM_THRESHOLD, 'alarm_type': alarm_status['alarm_type'] } return jsonify({ 'success': True, 'data': status }) except Exception as e: log("Error in get_alarm_status route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 @app.route('/api/reset-alarm') def reset_alarm(): """重置告警状态""" try: with alarm_lock: alarm_status['is_alarming'] = False alarm_status['alarm_count'] = 0 return jsonify({ 'success': True, 'message': 'Alarm has been reset' }) except Exception as e: log("Error in reset_alarm route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 @app.route('/api/restart-monitoring') def restart_monitoring(): """重启监控进程""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) log("正在尝试重启监控进程...") # 停止现有进程 if processes['breeze'] and processes['breeze'].poll() is None: try: processes['breeze'].terminate() log("Breeze监控进程已停止") except Exception as e: log(f"停止Breeze监控进程出错: {str(e)}") if processes['cms'] and processes['cms'].poll() is None: try: processes['cms'].terminate() log("CMS监控进程已停止") except Exception as e: log(f"停止CMS监控进程出错: {str(e)}") # 等待进程完全终止 time.sleep(2) # 获取Python可执行文件路径 python_executable = sys.executable log(f"使用Python可执行文件: {python_executable}") # 重新启动 try: script_dir = os.path.dirname(os.path.abspath(__file__)) # 获取会话中的凭据 breeze_cookie = session.get('breeze_cookie', '') cms_cookie = session.get('cms_cookie', '') username = session.get('username', '') # 启动breeze监控 breeze_script = os.path.join(script_dir, "breeze_monitor.py") env = os.environ.copy() env['BREEZE_COOKIE'] = breeze_cookie env['BREEZE_USERNAME'] = username processes['breeze'] = subprocess.Popen( [python_executable, breeze_script], env=env, shell=False ) log(f"Breeze监控进程已启动, PID: {processes['breeze'].pid}") # 启动cms监控 cms_script = os.path.join(script_dir, "cms_monitor.py") env = os.environ.copy() env['CMS_COOKIE'] = cms_cookie env['CMS_USERNAME'] = username processes['cms'] = subprocess.Popen( [python_executable, cms_script], env=env, shell=False ) log(f"CMS监控进程已启动, PID: {processes['cms'].pid}") return jsonify({ 'success': True, 'message': '监控进程已成功重启' }) except Exception as e: log(f"重启监控进程时出错: {str(e)}") return jsonify({ 'success': False, 'message': f"重启监控进程失败: {str(e)}" }), 500 except Exception as e: log(f"重启监控进程时出错: {str(e)}") return jsonify({ 'success': False, 'message': f"重启监控进程失败: {str(e)}" }), 500 def open_browser(): """在新线程中打开浏览器""" time.sleep(1) # 等待服务器启动 try: webbrowser.open('http://localhost:8000') except Exception as e: log("Failed to open browser: " + str(e)) @app.route('/api/check-now') def check_now(): """立即检查当前小时数据""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) # 获取Python可执行文件路径 python_executable = sys.executable # 向子进程发送检查请求 if processes['breeze'] and processes['breeze'].poll() is None: # 获取会话中的凭据 breeze_cookie = session.get('breeze_cookie', '') username = session.get('username', '') # 创建临时环境 env = os.environ.copy() env['BREEZE_COOKIE'] = breeze_cookie env['BREEZE_USERNAME'] = username # 启动一个临时进程来执行检查,添加--force参数强制使用整点时间 # 添加--no-config-check参数防止频繁检查配置文件 script_dir = os.path.dirname(os.path.abspath(__file__)) subprocess.Popen( [python_executable, os.path.join(script_dir, "breeze_monitor.py"), "--check-now", "--force", "--no-config-check"], env=env, shell=False ) log("开始执行Breeze系统手动检查") if processes['cms'] and processes['cms'].poll() is None: # 获取会话中的凭据 cms_cookie = session.get('cms_cookie', '') username = session.get('username', '') # 创建临时环境 env = os.environ.copy() env['CMS_COOKIE'] = cms_cookie env['CMS_USERNAME'] = username # 启动一个临时进程来执行检查,添加--force参数强制使用整点时间 # 添加--no-config-check参数防止频繁检查配置文件 script_dir = os.path.dirname(os.path.abspath(__file__)) subprocess.Popen( [python_executable, os.path.join(script_dir, "cms_monitor.py"), "--check-now", "--force", "--no-config-check"], env=env, shell=False ) log("开始执行CMS系统手动检查") return jsonify({ 'success': True, 'message': '已启动数据检查', 'threshold': ALARM_THRESHOLD }) except Exception as e: log(f"立即检查功能出错: {str(e)}") return jsonify({'success': False, 'message': f'数据检查启动失败: {str(e)}'}), 500 @app.route('/api/test-alarm') def test_alarm(): """测试告警功能""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) # 检查是否为实际数据告警 is_real_data = request.args.get('real_data', 'false').lower() == 'true' # 根据请求类型选择不同的消息 message = "实时告警" if is_real_data else "测试告警" # 立即触发第一次告警 if not is_real_data: # 手动触发告警并传递测试告警值 show_alarm_notification(ALARM_THRESHOLD + 100, is_test=True, alarm_type="测试告警") else: # 从API获取当前真实的小时总量 try: breeze_hourly = read_data_file('breeze_hourly.json') or {'stats': {'weighted_total': 0}} cms_hourly = read_data_file('cms_hourly.json') or {'stats': {'weighted_total': 0}} total_hourly = 0 if 'stats' in breeze_hourly and 'weighted_total' in breeze_hourly['stats']: total_hourly += breeze_hourly['stats']['weighted_total'] if 'stats' in cms_hourly and 'weighted_total' in cms_hourly['stats']: total_hourly += cms_hourly['stats']['weighted_total'] # 只有在实际超过阈值时才显示通知 if total_hourly >= ALARM_THRESHOLD: show_alarm_notification(total_hourly, is_test=False, alarm_type="实时告警") except Exception as e: log(f"Error getting real data for alarm: {str(e)}") # 设置告警状态,启用连续告警 with alarm_lock: now = datetime.now() alarm_status['last_alarm_time'] = now alarm_status['is_alarming'] = True alarm_status['alarm_type'] = message # 保存告警类型 # 设置告警计数为1,开始计数 alarm_status['alarm_count'] = 1 # 创建后台线程发送连续告警 if not is_real_data: # 仅对测试告警启用连续通知 threading.Thread(target=lambda: send_sequential_test_alarms("测试告警")).start() log(f"{message} triggered by user") return jsonify({ 'success': True, 'message': f'{message} successful' }) except Exception as e: log("Error in test_alarm route: " + str(e)) return jsonify({'success': False, 'message': 'Failed to trigger alarm: ' + str(e)}), 500 @app.route('/api/get-coefficients') def get_coefficients(): """获取CMS系数配置""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) # 从CMS系数配置文件读取 try: script_dir = os.path.dirname(os.path.abspath(__file__)) coefficients_file = os.path.join(script_dir, 'cms_coefficients.json') if not os.path.exists(coefficients_file): # 如果配置文件不存在,返回默认系数 default_coefficients = { 'comment': 0.55, 'feed': 1.54, 'complaint': 5.4 } return jsonify({ 'success': True, 'data': default_coefficients, 'message': 'Using default coefficients' }) with open(coefficients_file, 'r', encoding='utf-8') as f: coefficients = json.load(f) return jsonify({ 'success': True, 'data': coefficients }) except Exception as e: log(f"Error reading coefficients file: {str(e)}") return jsonify({ 'success': False, 'message': f'Error reading coefficients: {str(e)}' }) except Exception as e: log("Error in get_coefficients route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 @app.route('/api/update-coefficients', methods=['POST']) def update_coefficients(): """更新CMS系数配置""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) # 获取请求中的系数 try: data = request.get_json() if not data: return jsonify({ 'success': False, 'message': 'No data provided' }) # 验证系数格式 required_keys = ['comment', 'feed', 'complaint'] for key in required_keys: if key not in data: return jsonify({ 'success': False, 'message': f'Missing required key: {key}' }) # 验证值是数字 try: data[key] = float(data[key]) except ValueError: return jsonify({ 'success': False, 'message': f'Value for {key} must be a number' }) # 确保只有必要的键 coefficients = { 'comment': data['comment'], 'feed': data['feed'], 'complaint': data['complaint'] } # 保存到配置文件 script_dir = os.path.dirname(os.path.abspath(__file__)) coefficients_file = os.path.join(script_dir, 'cms_coefficients.json') with open(coefficients_file, 'w', encoding='utf-8') as f: json.dump(coefficients, f, indent=4, ensure_ascii=False) log(f"CMS系数已更新: 评论={coefficients['comment']}, 动态={coefficients['feed']}, 举报={coefficients['complaint']}") # 使用命令行参数方式通知CMS监控进程更新系数 try: # 获取Python可执行文件路径 python_executable = sys.executable # 获取CMS监控脚本路径 cms_script = os.path.join(script_dir, "cms_monitor.py") # 获取环境变量 env = os.environ.copy() env['CMS_COOKIE'] = session.get('cms_cookie', '') env['CMS_USERNAME'] = session.get('username', '') # 启动进程更新系数 cmd = [ python_executable, cms_script, "--update-coefficients", str(coefficients['comment']), str(coefficients['feed']), str(coefficients['complaint']) ] subprocess.Popen( cmd, env=env, shell=False ) log("已发送系数更新命令给CMS监控进程") except Exception as e: log(f"发送系数更新命令失败: {str(e)}") return jsonify({ 'success': True, 'message': 'Coefficients updated successfully', 'data': coefficients }) except Exception as e: log(f"更新系数失败: {str(e)}") return jsonify({ 'success': False, 'message': f'Error updating coefficients: {str(e)}' }) except Exception as e: log("Error in update_coefficients route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 def send_sequential_test_alarms(alarm_type): """发送连续的测试告警通知""" try: # 等待10秒后发送第二次告警 time.sleep(10) with alarm_lock: if alarm_status['is_alarming'] and alarm_status['alarm_count'] < 3: alarm_status['alarm_count'] = 2 # 发送第二次告警 show_alarm_notification(ALARM_THRESHOLD + 120, is_test=True, alarm_type=alarm_type) log("Second test alarm notification sent") # 再等待10秒后发送第三次告警 time.sleep(10) with alarm_lock: if alarm_status['is_alarming'] and alarm_status['alarm_count'] < 3: alarm_status['alarm_count'] = 3 # 发送第三次告警 show_alarm_notification(ALARM_THRESHOLD + 150, is_test=True, alarm_type=alarm_type) log("Third test alarm notification sent") except Exception as e: log(f"Error in sequential test alarms: {str(e)}") @app.route('/api/get-breeze-coefficients') def get_breeze_coefficients(): """获取Breeze系数配置""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) # 从Breeze系数配置文件读取 try: script_dir = os.path.dirname(os.path.abspath(__file__)) coefficients_file = os.path.join(script_dir, 'breeze_coefficients.json') if not os.path.exists(coefficients_file): # 如果配置文件不存在,返回错误信息 return jsonify({ 'success': False, 'message': 'Breeze coefficients file not found' }) with open(coefficients_file, 'r', encoding='utf-8') as f: coefficients = json.load(f) return jsonify({ 'success': True, 'data': coefficients }) except Exception as e: log(f"Error reading Breeze coefficients file: {str(e)}") return jsonify({ 'success': False, 'message': f'Error reading coefficients: {str(e)}' }) except Exception as e: log("Error in get_breeze_coefficients route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 @app.route('/api/update-breeze-coefficients', methods=['POST']) def update_breeze_coefficients(): """更新Breeze系数配置""" try: if 'logged_in' not in session: return jsonify({'success': False, 'message': 'Not logged in'}) # 获取请求中的系数 try: data = request.get_json() if not data: return jsonify({ 'success': False, 'message': 'No data provided' }) # 验证系数格式 if not isinstance(data, dict): return jsonify({ 'success': False, 'message': 'Invalid data format, expected a dictionary' }) # 验证所有值是数字 for key, value in data.items(): try: data[key] = float(value) except ValueError: return jsonify({ 'success': False, 'message': f'Value for {key} must be a number' }) # 保存到配置文件 script_dir = os.path.dirname(os.path.abspath(__file__)) coefficients_file = os.path.join(script_dir, 'breeze_coefficients.json') with open(coefficients_file, 'w', encoding='utf-8') as f: json.dump(data, f, indent=4, ensure_ascii=False) log(f"Breeze系数已更新") # 使用命令行参数方式通知Breeze监控进程更新系数 try: # 获取Python可执行文件路径 python_executable = sys.executable # 获取Breeze监控脚本路径 breeze_script = os.path.join(script_dir, "breeze_monitor.py") # 获取环境变量 env = os.environ.copy() env['BREEZE_COOKIE'] = session.get('breeze_cookie', '') env['BREEZE_USERNAME'] = session.get('username', '') # 启动进程更新系数 - 一次只更新一个系数 for coefficient_type, coefficient_value in data.items(): cmd = [ python_executable, breeze_script, "--update-coefficients", coefficient_type, str(coefficient_value) ] subprocess.Popen( cmd, env=env, shell=False ) log("已发送系数更新命令给Breeze监控进程") except Exception as e: log(f"发送系数更新命令失败: {str(e)}") return jsonify({ 'success': True, 'message': 'Breeze coefficients updated successfully', 'data': data }) except Exception as e: log(f"更新Breeze系数失败: {str(e)}") return jsonify({ 'success': False, 'message': f'Error updating coefficients: {str(e)}' }) except Exception as e: log("Error in update_breeze_coefficients route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 @app.route('/api/acknowledge-alarm') def acknowledge_alarm(): """确认并重置告警状态""" try: with alarm_lock: # 完全重置告警状态 alarm_status['is_alarming'] = False alarm_status['alarm_count'] = 0 alarm_status['last_alarm_time'] = None # 记录告警已被确认 log("告警已被用户确认并重置") return jsonify({ 'success': True, 'message': 'Alarm has been acknowledged and reset' }) except Exception as e: log("Error in acknowledge_alarm route: " + str(e)) return jsonify({'success': False, 'message': 'Internal server error'}), 500 def is_port_available(port): """检查端口是否可用""" try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(1) result = s.connect_ex(('127.0.0.1', port)) return result != 0 except Exception as e: log(f"检查端口可用性出错: {str(e)}") return False def free_port(port): """尝试释放被占用的端口""" try: if sys.platform.startswith('win'): # Windows系统 os.system(f'for /f "tokens=5" %a in (\'netstat -ano ^| findstr :{port}\') do taskkill /f /pid %a') return True else: # Linux/Mac系统 os.system(f"kill $(lsof -t -i:{port})") return True except Exception as e: log(f"释放端口时出错: {str(e)}") return False def find_available_port(start_port, end_port): """查找可用端口""" for port in range(start_port, end_port + 1): if is_port_available(port): return port return None @app.route('/api/check-cms-daily', methods=['POST']) def check_cms_daily_data(): """立即检查CMS每日数据""" if 'user' not in session: return jsonify({ 'success': False, 'message': '请先登录系统' }) try: # 准备环境变量 env = os.environ.copy() env['CMS_COOKIE'] = session.get('cms_cookie', '') env['CMS_USERNAME'] = session.get('username', '') # 设置--check-daily参数运行CMS监控脚本 cmd = [sys.executable, os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cms_monitor.py'), '--check-daily'] # 运行子进程 process = subprocess.Popen( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # 等待进程完成 stdout, stderr = process.communicate(timeout=60) if process.returncode == 0: app.logger.info('成功执行CMS每日数据检查') return jsonify({ 'success': True, 'message': 'CMS每日数据检查已执行' }) else: app.logger.error(f'CMS每日数据检查失败: {stderr.decode("utf-8", errors="ignore")}') return jsonify({ 'success': False, 'message': f'CMS每日数据检查失败: {stderr.decode("utf-8", errors="ignore")}' }) except subprocess.TimeoutExpired: app.logger.error('CMS每日数据检查超时') return jsonify({ 'success': False, 'message': 'CMS每日数据检查超时,请检查系统日志' }) except Exception as e: app.logger.error(f'执行CMS每日数据检查时发生错误: {str(e)}') return jsonify({ 'success': False, 'message': f'执行CMS每日数据检查时发生错误: {str(e)}' }) @app.route('/api/check-version-service') def check_version_service(): """检查版本检测服务的可用性""" try: log("正在检查版本检测服务可用性...") # 测试连接 try: response = requests.get(VERSION_CHECK_URL, timeout=5) log(f"版本检测服务响应状态码: {response.status_code}") service_status = { 'url': VERSION_CHECK_URL, 'is_available': response.status_code == 200, 'status_code': response.status_code, 'response_time': response.elapsed.total_seconds(), 'content_length': len(response.content) if response.status_code == 200 else 0, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } if response.status_code == 200: service_status['content'] = response.text.strip() log(f"版本检测服务正常,返回内容: {service_status['content']}") else: log(f"版本检测服务异常,HTTP状态码: {response.status_code}") return jsonify({ 'success': True, 'data': service_status }) except requests.exceptions.Timeout: log("版本检测服务连接超时") return jsonify({ 'success': False, 'message': '版本检测服务连接超时', 'data': { 'url': VERSION_CHECK_URL, 'is_available': False, 'error': 'timeout', 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } }) except requests.exceptions.ConnectionError: log("无法连接到版本检测服务") return jsonify({ 'success': False, 'message': '无法连接到版本检测服务', 'data': { 'url': VERSION_CHECK_URL, 'is_available': False, 'error': 'connection_error', 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } }) except Exception as e: log(f"检查版本检测服务时发生未知错误: {str(e)}") return jsonify({ 'success': False, 'message': f'检查版本检测服务时发生错误: {str(e)}', 'data': { 'url': VERSION_CHECK_URL, 'is_available': False, 'error': 'unknown', 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } }) except Exception as e: log(f"版本检测服务检查接口发生错误: {str(e)}") return jsonify({ 'success': False, 'message': '版本检测服务检查失败', 'error': str(e) }), 500 @app.route('/api/update-system', methods=['POST']) def update_system(): """处理系统更新请求""" try: # 获取桌面路径和快捷方式路径 desktop_path = os.path.join(os.path.expanduser('~'), 'Desktop') temp_dir = 'monitor_temp' temp_path = os.path.join(desktop_path, temp_dir) shortcut_path = os.path.join(temp_path, '网易大神审核数据监控看板一键安装.lnk') # 下载快捷方式 shortcut_url = 'http://cos.ui-beam.com/work_scripts/monitor/%E7%BD%91%E6%98%93%E5%A4%A7%E7%A5%9E%E5%AE%A1%E6%A0%B8%E6%95%B0%E6%8D%AE%E7%9B%91%E6%8E%A7%E7%9C%8B%E6%9D%BF%E4%B8%80%E9%94%AE%E5%AE%89%E8%A3%85.lnk' log(f"开始下载更新文件: {shortcut_url}") # 获取系统代理设置 proxies = { 'http': os.environ.get('HTTP_PROXY', ''), 'https': os.environ.get('HTTPS_PROXY', '') } # 首先尝试不使用代理直接下载 try: log("尝试直接下载更新文件...") response = requests.get(shortcut_url, timeout=30, proxies={'http': None, 'https': None}, verify=False) if response.status_code == 200: with open(shortcut_path, 'wb') as f: f.write(response.content) log("更新文件下载成功") else: raise requests.exceptions.RequestException(f"下载失败,HTTP状态码: {response.status_code}") except (requests.exceptions.RequestException, IOError) as e: log(f"直接下载失败: {str(e)},尝试使用系统代理...") # 如果直接下载失败,尝试使用系统代理 try: response = requests.get(shortcut_url, timeout=30, proxies=proxies, verify=False) if response.status_code == 200: with open(shortcut_path, 'wb') as f: f.write(response.content) log("使用代理下载更新文件成功") elif response.status_code == 407: log("代理需要认证,尝试最后一次直接下载...") # 最后一次尝试直接下载 response = requests.get(shortcut_url, timeout=30, proxies={'http': None, 'https': None}, verify=False) if response.status_code == 200: with open(shortcut_path, 'wb') as f: f.write(response.content) log("最后尝试下载成功") else: raise requests.exceptions.RequestException(f"所有下载尝试都失败,最后状态码: {response.status_code}") else: raise requests.exceptions.RequestException(f"使用代理下载失败,HTTP状态码: {response.status_code}") except Exception as proxy_error: log(f"使用代理下载失败: {str(proxy_error)}") return jsonify({ 'success': False, 'message': f'下载更新文件失败: {str(proxy_error)}' }), 500 # 检查文件是否成功下载 if not os.path.exists(shortcut_path): return jsonify({ 'success': False, 'message': '更新文件下载成功但未找到文件' }), 500 # 启动快捷方式 try: log("正在启动更新程序...") subprocess.Popen(['cmd', '/c', 'start', '', shortcut_path], shell=True) log("更新程序启动成功") return jsonify({ 'success': True, 'message': '更新程序已启动' }) except Exception as e: log(f"启动更新程序失败: {str(e)}") return jsonify({ 'success': False, 'message': f'启动更新程序失败: {str(e)}' }), 500 except Exception as e: log(f"系统更新过程中发生错误: {str(e)}") return jsonify({ 'success': False, 'message': f'系统更新失败: {str(e)}' }), 500 def main(): """主函数""" try: print("\n" + "="*50) print("网易大神审核数据监控服务启动中...") print("="*50) print("\n[系统状态]") print("1. 正在初始化监控服务...") # 初始化版本文件 timestamp_version = ensure_version_file() print(f"2. 当前系统版本: {VERSION}") # 检查并安装依赖 print("3. 正在检查系统依赖...") check_and_install_dependencies() # 启动版本检测线程 print("4. 启动版本检测服务...") version_thread = threading.Thread(target=monitor_version_thread) version_thread.daemon = True version_thread.start() # 获取端口 port = 8000 # 检查端口是否被占用 if not is_port_available(port): print(f"5. 端口 {port} 已被占用,尝试释放...") free_port(port) # 再次检查端口 if not is_port_available(port): port = find_available_port(8001, 8100) print(f" - 使用替代端口 {port}") else: print(f"5. 端口 {port} 可用") print("\n[启动完成]") print(f"* 监控系统已启动,请打开浏览器访问: http://localhost:{port}") print("* 等待用户登录...") print("\n" + "!"*50) print("! 警告:请勿关闭此窗口 !") print("! 此窗口是系统监控的核心进程 !") print("! 一旦关闭,所有监控服务将停止工作 !") print("!"*50 + "\n") # 在新线程中启动浏览器 threading.Thread(target=open_browser).start() try: # 使用 SocketIO 启动服务器 socketio.run(app, debug=False, host='0.0.0.0', port=port) except Exception as e: log(f"启动服务器出错: {str(e)}") finally: # 关闭所有子进程 for name, process in processes.items(): if process and process.poll() is None: try: process.terminate() log(f"已关闭 {name} 监控进程") except Exception as e: log(f"关闭 {name} 监控进程失败: {str(e)}") # 清理临时文件 script_dir = os.path.dirname(os.path.abspath(__file__)) data_files = ['breeze_daily.json', 'breeze_hourly.json', 'cms_daily.json', 'cms_hourly.json'] for file_name in data_files: file_path = os.path.join(script_dir, file_name) if os.path.exists(file_path): try: os.remove(file_path) log(f"已删除临时文件: {file_name}") except Exception as e: log(f"删除临时文件 {file_name} 失败: {str(e)}") log("监控系统已关闭") except Exception as e: log("Error in main function: " + str(e)) # 信号处理函数 def signal_handler(sig, frame): """处理信号,用于优雅退出""" log("收到退出信号,正在关闭系统...") global shutdown_flag shutdown_flag = True # 通过os._exit强制终止进程 os._exit(0) if __name__ == "__main__": # 注册信号处理器 signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # 执行主函数 main()