#!/usr/bin/env python # -*- coding: utf-8 -*- """ Python 2.7 Auto Download Script For NetEase Da Shen Review Data Monitoring System """ import os import sys import time import shutil import urllib2 import getpass import base64 import subprocess import traceback import codecs from datetime import datetime # 先尝试设置控制台代码页为GBK(仅Windows环境) try: if sys.platform.startswith('win'): # Windows下,默认使用GBK编码(代码页936) # 先设置控制台代码页 subprocess.call('chcp 936', shell=True) # 再设置Python默认编码 import sys reload(sys) sys.setdefaultencoding('gbk') except: pass # 基本配置 COS_URL = "https://gitea.ui-beam.cn/ui_beam/NetEaseDSMonitor/raw/branch/main" TEMP_DIR = os.path.join(os.path.expanduser("~"), "Desktop", "monitor_temp") LOG_FILE = os.path.join(TEMP_DIR, "download_log.txt") # 要下载的文件列表 - 格式: (URL相对路径, 本地保存路径, 是否必需) FILES_TO_DOWNLOAD = [ ("start_monitor.cmd", "start_monitor.cmd", True), ("dashboard.py", "dashboard.py", True), ("breeze_monitor.py", "breeze_monitor.py", True), ("breeze_monitor_CHAT.py", "breeze_monitor_CHAT.py", True), ("cms_monitor.py", "cms_monitor.py", True), ("templates/dashboard.html", "templates/dashboard.html", True), ("templates/login.html", "templates/login.html", True), ("static/ds-favicon.ico", "static/ds-favicon.ico", True), ("install_dependencies.bat", "install_dependencies.bat", True), ("inspect_monitor.py", "inspect_monitor.py", True) ] # 记录日志函数 def log_message(message): """记录日志消息""" try: # 确保日志目录存在 log_dir = os.path.dirname(LOG_FILE) if not os.path.exists(log_dir): os.makedirs(log_dir) # 处理可能的编码问题 try: # 如果是字符串,确保它是unicode if isinstance(message, str): message = message.decode('gbk', 'ignore') # 使用codecs打开文件,指定编码为gbk with codecs.open(LOG_FILE, "a", "gbk", errors='ignore') as f: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_line = "[%s] %s\n" % (timestamp, message) f.write(log_line) except: # 如果上面的方法失败,尝试另一种方式写入 with open(LOG_FILE, "a") as f: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: log_line = "[%s] %s\n" % (timestamp, message) if isinstance(log_line, unicode): f.write(log_line.encode('gbk', 'ignore')) else: f.write(str(log_line)) except: f.write("[%s] [ENCODE_ERROR] 无法写入日志内容\n" % timestamp) # 使用safe_print输出到控制台 safe_print(message) except Exception as e: try: # 以简单的方式输出日志错误 error_msg = "日志写入错误,继续执行程序" print(error_msg) except: pass # 如果连简单输出都失败,就完全忽略错误 # 使用UTF-8安全打印 def safe_print(text): """安全打印函数,处理编码问题""" try: # 如果是字节串且不是unicode,尝试转换 if isinstance(text, str) and not isinstance(text, unicode): try: # 先尝试gbk解码 text = text.decode('gbk', 'ignore') except: try: # 再尝试utf-8解码 text = text.decode('utf-8', 'ignore') except: # 都失败,直接转为字符串 text = unicode(str(text), 'gbk', 'ignore') # 输出到控制台 if sys.platform.startswith('win'): # Windows平台,强制使用gbk编码 print(text.encode('gbk', 'ignore')) else: # 其他平台,使用utf-8编码 print(text.encode('utf-8', 'ignore')) except Exception as e: # 如果还是失败,使用最简单的方式 try: print(u"打印失败") except: pass # 完全放弃 # 获取最新版本号 def get_latest_version(): """获取最新版本号""" try: version_url = "https://gitea.ui-beam.cn/ui_beam/NetEaseDSMonitor/raw/branch/main/VERSION.txt" # 添加身份验证 username = "bug" password = "12345678" auth_string = base64.b64encode('%s:%s' % (username, password)) # 创建请求对象并添加认证头 request = urllib2.Request(version_url) request.add_header('Authorization', 'Basic %s' % auth_string) # 尝试不使用代理下载 try: # 禁用代理 proxy_handler = urllib2.ProxyHandler({}) opener = urllib2.build_opener(proxy_handler) urllib2.install_opener(opener) response = urllib2.urlopen(request, timeout=30) version = response.read().strip() return version except urllib2.URLError as e: log_message("[WARNING] 不使用代理获取版本失败,尝试使用代理: %s" % str(e)) # 使用代理重试 proxy_handler = urllib2.ProxyHandler({ 'http': 'http://CD-WEBPROXY02.yajuenet.internal:8080', 'https': 'http://CD-WEBPROXY02.yajuenet.internal:8080' }) opener = urllib2.build_opener(proxy_handler) urllib2.install_opener(opener) response = urllib2.urlopen(request, timeout=30) version = response.read().strip() return version except Exception as e: error_msg = u"获取版本号失败: %s" % unicode(str(e), 'gbk', 'ignore') log_message("[ERROR] %s" % error_msg) # 如果获取失败,直接从文件URL解析出固定版本号格式 try: # 搜索files_to_download中最新的版本号 for url_path, _, _ in FILES_TO_DOWNLOAD: if "VERSION" in url_path.upper(): log_message("[INFO] 尝试从文件路径解析版本号: %s" % url_path) return "v" + datetime.now().strftime("%Y%m%d%H%M%S") except: pass return u"未知版本" # 非中文字符界面信息 MESSAGES = { 'tool_title': u"网易大神审核数据监控系统安装工具", 'downloading': u"正在下载文件,请稍候...", 'script_started': u"下载脚本已启动", 'temp_dir': u"临时目录", 'dir_init_failed': u"初始化目录失败,退出", 'created_temp_dir': u"创建临时目录", 'dir_exists': u"临时目录已存在,正在清理内容...", 'dir_cleared': u"临时目录已清理", 'created_templates_dir': u"创建templates目录", 'created_static_dir': u"创建static目录", 'using_proxy': u"使用代理下载", 'retrying_with_proxy': u"尝试使用代理重试...", 'downloaded_files': u"已下载文件", 'of': u"个,共", 'files': u"个文件", 'download_required_failed': u"部分必需文件下载失败", 'all_files_downloaded': u"所有文件下载成功", 'install_deps_not_found': u"依赖安装脚本未找到", 'installing_deps': u"正在安装依赖...", 'deps_output': u"依赖安装输出", 'deps_error': u"依赖安装错误", 'deps_installed': u"依赖安装成功", 'deps_failed': u"依赖安装失败,返回码", 'deps_script_error': u"运行依赖安装脚本出错", 'start_script_not_found': u"启动脚本未找到", 'starting_system': u"正在启动监控系统...", 'system_started': u"监控系统启动成功", 'start_system_failed': u"启动监控系统失败", 'manual_guide_title': u"手动安装指南", 'manual_guide_intro': u"如果自动安装失败,请按照以下步骤手动安装:", 'use_ie': u"1. 获取系统文件:使用浏览器下载这些文件:", 'save_to_structure': u"2. 文件结构:将文件保存到以下结构:", 'run_deps_first': u"3. 安装依赖:运行install_dependencies.bat安装依赖", 'then_run_start': u"4. 启动系统:运行start_monitor.cmd启动系统", 'created_manual_guide': u"已创建手动安装指南", 'create_guide_failed': u"创建手动指南失败", 'script_copied': u"已将下载脚本复制到临时目录", 'copy_script_failed': u"复制脚本失败", 'deps_install_failed_try_start': u"依赖安装失败,尝试直接启动监控系统", 'steps_failed': u"某些步骤失败,请检查日志获取详情", 'try_manual': u"您可以尝试使用以下指南手动安装", 'unhandled_exception': u"未处理的异常", 'execution_completed': u"脚本执行完成", 'press_enter': u"按回车键退出..." } # 初始化目录函数 def init_directory(): """初始化临时目录""" try: # 创建主目录 if not os.path.exists(TEMP_DIR): os.makedirs(TEMP_DIR) log_message("[INFO] %s: %s" % (MESSAGES['created_temp_dir'], TEMP_DIR)) else: log_message("[INFO] %s" % MESSAGES['dir_exists']) # 清理现有目录内容,只保留日志文件 for item in os.listdir(TEMP_DIR): item_path = os.path.join(TEMP_DIR, item) if item != "download_log.txt": # 现在只保留日志文件 if os.path.isdir(item_path): shutil.rmtree(item_path) else: os.remove(item_path) log_message("[INFO] %s" % MESSAGES['dir_cleared']) # 创建templates子目录 templates_dir = os.path.join(TEMP_DIR, "templates") if not os.path.exists(templates_dir): os.makedirs(templates_dir) log_message("[INFO] %s" % MESSAGES['created_templates_dir']) # 创建static子目录 static_dir = os.path.join(TEMP_DIR, "static") if not os.path.exists(static_dir): os.makedirs(static_dir) log_message("[INFO] %s" % MESSAGES['created_static_dir']) return True except Exception as e: log_message("[ERROR] Failed to initialize directory: %s" % str(e)) return False # 下载函数 def download_file(url_path, local_path, use_proxy=False): """下载单个文件,默认不使用代理""" full_url = "%s/%s" % (COS_URL, url_path) full_local_path = os.path.join(TEMP_DIR, local_path) # 确保目标目录存在 local_dir = os.path.dirname(full_local_path) if not os.path.exists(local_dir): os.makedirs(local_dir) # 只显示文件名,不显示完整URL log_message(u"[INFO] 正在下载文件: %s" % os.path.basename(url_path)) try: # 设置代理信息 if use_proxy: log_message(u"[INFO] %s" % MESSAGES['using_proxy']) proxy_handler = urllib2.ProxyHandler({ 'http': 'http://CD-WEBPROXY02.yajuenet.internal:8080', 'https': 'http://CD-WEBPROXY02.yajuenet.internal:8080' }) opener = urllib2.build_opener(proxy_handler) # 添加基本身份验证 username = "bug" password = "123454678" auth_string = base64.b64encode('%s:%s' % (username, password)) opener.addheaders = [('Authorization', 'Basic %s' % auth_string)] urllib2.install_opener(opener) else: # 禁用代理,但仍添加基本身份验证 proxy_handler = urllib2.ProxyHandler({}) opener = urllib2.build_opener(proxy_handler) # 添加基本身份验证 username = "bug" password = "12345678" auth_string = base64.b64encode('%s:%s' % (username, password)) opener.addheaders = [('Authorization', 'Basic %s' % auth_string)] urllib2.install_opener(opener) # 下载文件 response = urllib2.urlopen(full_url, timeout=30) content = response.read() # 检查是否是批处理文件,如果是则进行行尾转换 is_batch_file = url_path.lower().endswith('.bat') or url_path.lower().endswith('.cmd') if is_batch_file: log_message(u"[INFO] 批处理文件行尾转换: %s" % local_path) # 将LF转换为CRLF content_str = content.replace('\n', '\r\n') # 确保没有重复的\r\n content_str = content_str.replace('\r\r\n', '\r\n') # 转回二进制 content = content_str # 写入文件 with open(full_local_path, 'wb') as f: f.write(content) log_message(u"[INFO] 文件下载成功: %s" % local_path) return True except urllib2.URLError as e: log_message(u"[ERROR] 下载失败 %s: %s" % (os.path.basename(url_path), unicode(str(e), 'gbk', 'ignore'))) # 如果不使用代理失败,尝试使用代理 if not use_proxy: log_message(u"[INFO] %s" % MESSAGES['retrying_with_proxy']) return download_file(url_path, local_path, True) return False except Exception as e: log_message(u"[ERROR] 下载失败 %s: %s" % (os.path.basename(url_path), unicode(str(e), 'gbk', 'ignore'))) return False # 下载所有文件 def download_all_files(): """下载所有必要的文件""" success_count = 0 failed_required = False for url_path, local_path, required in FILES_TO_DOWNLOAD: # 先尝试不使用代理下载 if download_file(url_path, local_path, False): success_count += 1 elif required: failed_required = True log_message("[INFO] %s %d %s %d %s" % ( MESSAGES['downloaded_files'], success_count, MESSAGES['of'], len(FILES_TO_DOWNLOAD), MESSAGES['files'] )) if failed_required: log_message("[ERROR] %s" % MESSAGES['download_required_failed']) return False return True # 运行依赖安装脚本 def run_install_dependencies(): """运行依赖安装脚本""" install_script = os.path.join(TEMP_DIR, "install_dependencies.bat") if not os.path.exists(install_script): log_message("[ERROR] %s: %s" % (MESSAGES['install_deps_not_found'], install_script)) return False try: log_message("[INFO] %s" % MESSAGES['installing_deps']) # 记录当前工作目录 original_dir = os.getcwd() # 切换到临时目录并启动脚本 os.chdir(TEMP_DIR) # 使用subprocess运行批处理文件并显示在控制台 # 移除stdout和stderr的PIPE重定向,让输出直接显示在控制台 process = subprocess.Popen(["cmd", "/c", install_script], shell=True) # 等待脚本执行完成 return_code = process.wait() # 返回到原始目录 os.chdir(original_dir) if return_code == 0: log_message("[INFO] %s" % MESSAGES['deps_installed']) return True else: log_message("[ERROR] %s: %d" % (MESSAGES['deps_failed'], return_code)) return False except Exception as e: log_message("[ERROR] %s: %s" % (MESSAGES['deps_script_error'], str(e))) return False # 启动监控系统 def start_monitor_system(): """启动监控系统""" start_script = os.path.join(TEMP_DIR, "start_monitor.cmd") if not os.path.exists(start_script): log_message("[ERROR] %s: %s" % (MESSAGES['start_script_not_found'], start_script)) return False try: log_message("[INFO] %s" % MESSAGES['starting_system']) # 记录当前工作目录 original_dir = os.getcwd() # 切换到临时目录并启动脚本 os.chdir(TEMP_DIR) # 使用subprocess启动批处理文件,不等待其完成 process = subprocess.Popen(["cmd", "/c", "start", "", "start_monitor.cmd"], shell=True) # 返回到原始目录 os.chdir(original_dir) log_message("[INFO] %s" % MESSAGES['system_started']) return True except Exception as e: log_message("[ERROR] %s: %s" % (MESSAGES['start_system_failed'], str(e))) return False # 创建手动指南 def create_manual_guide(): """创建手动安装指南""" guide_path = os.path.join(TEMP_DIR, "manual_install_guide.txt") try: with codecs.open(guide_path, "w", "gbk") as f: f.write("=" * 50 + "\n") f.write("%s\n" % MESSAGES['manual_guide_title']) f.write("=" * 50 + "\n\n") f.write("%s\n\n" % MESSAGES['manual_guide_intro']) f.write("%s\n\n" % MESSAGES['use_ie']) for url_path, local_path, required in FILES_TO_DOWNLOAD: full_url = "%s/%s" % (COS_URL, url_path) f.write(" %s\n" % full_url) f.write("\n%s\n\n" % MESSAGES['save_to_structure']) f.write(" %s/\n" % TEMP_DIR) f.write(" |-- start_monitor.cmd\n") f.write(" |-- dashboard.py\n") f.write(" |-- breeze_monitor.py\n") f.write(" |-- breeze_monitor_CHAT.py\n") f.write(" |-- cms_monitor.py\n") f.write(" |-- inspect_monitor.py\n") f.write(" |-- cms_coefficients.json\n") f.write(" |-- breeze_coefficients.json\n") f.write(" |-- install_dependencies.bat\n") f.write(" |-- templates/\n") f.write(" | |-- dashboard.html\n") f.write(" | |-- login.html\n") f.write(" |-- static/\n") f.write(" |-- ds-favicon.ico\n\n") f.write("%s\n\n" % MESSAGES['run_deps_first']) f.write("%s\n\n" % MESSAGES['then_run_start']) f.write("=" * 50 + "\n") log_message("[INFO] %s: %s" % (MESSAGES['created_manual_guide'], guide_path)) return True except Exception as e: log_message("[ERROR] %s: %s" % (MESSAGES['create_guide_failed'], str(e))) return False # 主函数 def main(): """主函数""" try: # 使用print可能会更安全,不使用log_message来输出开头的界面 safe_print(u"\n%s" % MESSAGES['tool_title']) print("======================================================\n") safe_print(u"%s\n" % MESSAGES['downloading']) # 初始化日志 if not os.path.exists(os.path.dirname(LOG_FILE)): os.makedirs(os.path.dirname(LOG_FILE)) log_message("\n" + "=" * 40) log_message("[INFO] %s" % MESSAGES['script_started']) log_message("[INFO] %s: %s" % (MESSAGES['temp_dir'], TEMP_DIR)) # 初始化目录 if not init_directory(): log_message("[ERROR] %s" % MESSAGES['dir_init_failed']) return 1 # 创建手动指南 create_manual_guide() # 下载文件 if download_all_files(): log_message("[INFO] %s" % MESSAGES['all_files_downloaded']) # 安装依赖 if run_install_dependencies(): log_message("[INFO] %s" % MESSAGES['deps_installed']) # 启动监控系统 if start_monitor_system(): # 稍等几秒,让监控系统先启动 time.sleep(3) return 0 else: log_message("[WARNING] %s" % MESSAGES['deps_install_failed_try_start']) if start_monitor_system(): # 稍等几秒,让监控系统先启动 time.sleep(3) return 0 log_message("[WARNING] %s" % MESSAGES['steps_failed']) log_message("[INFO] %s: %s" % ( MESSAGES['try_manual'], os.path.join(TEMP_DIR, "manual_install_guide.txt") )) return 1 except Exception as e: log_message("[ERROR] %s: %s" % (MESSAGES['unhandled_exception'], str(e))) log_message("[TRACE] %s" % traceback.format_exc()) return 1 finally: log_message("[INFO] %s" % MESSAGES['execution_completed']) # 入口点 if __name__ == "__main__": sys.exit(main())