NetEaseDSMonitor/download_auto_run.py
2025-04-30 02:05:03 +08:00

561 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Python 2.7 Auto Download Script
For NetEase Da Shen Review Data Monitoring System
"""
import os
import sys
import time
import shutil
import urllib2
import getpass
import base64
import subprocess
import traceback
import codecs
from datetime import datetime
# 先尝试设置控制台代码页为GBK仅Windows环境
try:
if sys.platform.startswith('win'):
# Windows下默认使用GBK编码代码页936
# 先设置控制台代码页
subprocess.call('chcp 936', shell=True)
# 再设置Python默认编码
import sys
reload(sys)
sys.setdefaultencoding('gbk')
except:
pass
# 基本配置
COS_URL = "https://gitea.ui-beam.cn/ui_beam/NetEaseDSMonitor/raw/branch/main"
TEMP_DIR = os.path.join(os.path.expanduser("~"), "Desktop", "monitor_temp")
LOG_FILE = os.path.join(TEMP_DIR, "download_log.txt")
# 要下载的文件列表 - 格式: (URL相对路径, 本地保存路径, 是否必需)
FILES_TO_DOWNLOAD = [
("start_monitor.cmd", "start_monitor.cmd", True),
("dashboard.py", "dashboard.py", True),
("breeze_monitor.py", "breeze_monitor.py", True),
("breeze_monitor_CHAT.py", "breeze_monitor_CHAT.py", True),
("cms_monitor.py", "cms_monitor.py", True),
("templates/dashboard.html", "templates/dashboard.html", True),
("templates/login.html", "templates/login.html", True),
("static/ds-favicon.ico", "static/ds-favicon.ico", True),
("install_dependencies.bat", "install_dependencies.bat", True),
("inspect_monitor.py", "inspect_monitor.py", True)
]
# 记录日志函数
def log_message(message):
"""记录日志消息"""
try:
# 确保日志目录存在
log_dir = os.path.dirname(LOG_FILE)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 处理可能的编码问题
try:
# 如果是字符串确保它是unicode
if isinstance(message, str):
message = message.decode('gbk', 'ignore')
# 使用codecs打开文件指定编码为gbk
with codecs.open(LOG_FILE, "a", "gbk", errors='ignore') as f:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_line = "[%s] %s\n" % (timestamp, message)
f.write(log_line)
except:
# 如果上面的方法失败,尝试另一种方式写入
with open(LOG_FILE, "a") as f:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
log_line = "[%s] %s\n" % (timestamp, message)
if isinstance(log_line, unicode):
f.write(log_line.encode('gbk', 'ignore'))
else:
f.write(str(log_line))
except:
f.write("[%s] [ENCODE_ERROR] 无法写入日志内容\n" % timestamp)
# 使用safe_print输出到控制台
safe_print(message)
except Exception as e:
try:
# 以简单的方式输出日志错误
error_msg = "日志写入错误,继续执行程序"
print(error_msg)
except:
pass # 如果连简单输出都失败,就完全忽略错误
# 使用UTF-8安全打印
def safe_print(text):
"""安全打印函数,处理编码问题"""
try:
# 如果是字节串且不是unicode尝试转换
if isinstance(text, str) and not isinstance(text, unicode):
try:
# 先尝试gbk解码
text = text.decode('gbk', 'ignore')
except:
try:
# 再尝试utf-8解码
text = text.decode('utf-8', 'ignore')
except:
# 都失败,直接转为字符串
text = unicode(str(text), 'gbk', 'ignore')
# 输出到控制台
if sys.platform.startswith('win'):
# Windows平台强制使用gbk编码
print(text.encode('gbk', 'ignore'))
else:
# 其他平台使用utf-8编码
print(text.encode('utf-8', 'ignore'))
except Exception as e:
# 如果还是失败,使用最简单的方式
try:
print(u"打印失败")
except:
pass # 完全放弃
# 获取最新版本号
def get_latest_version():
"""获取最新版本号"""
try:
version_url = "https://gitea.ui-beam.cn/ui_beam/NetEaseDSMonitor/raw/branch/main/VERSION.txt"
# 添加身份验证
username = "bug"
password = "12345678"
auth_string = base64.b64encode('%s:%s' % (username, password))
# 创建请求对象并添加认证头
request = urllib2.Request(version_url)
request.add_header('Authorization', 'Basic %s' % auth_string)
# 尝试不使用代理下载
try:
# 禁用代理
proxy_handler = urllib2.ProxyHandler({})
opener = urllib2.build_opener(proxy_handler)
urllib2.install_opener(opener)
response = urllib2.urlopen(request, timeout=30)
version = response.read().strip()
return version
except urllib2.URLError as e:
log_message("[WARNING] 不使用代理获取版本失败,尝试使用代理: %s" % str(e))
# 使用代理重试
proxy_handler = urllib2.ProxyHandler({
'http': 'http://CD-WEBPROXY02.yajuenet.internal:8080',
'https': 'http://CD-WEBPROXY02.yajuenet.internal:8080'
})
opener = urllib2.build_opener(proxy_handler)
urllib2.install_opener(opener)
response = urllib2.urlopen(request, timeout=30)
version = response.read().strip()
return version
except Exception as e:
error_msg = u"获取版本号失败: %s" % unicode(str(e), 'gbk', 'ignore')
log_message("[ERROR] %s" % error_msg)
# 如果获取失败直接从文件URL解析出固定版本号格式
try:
# 搜索files_to_download中最新的版本号
for url_path, _, _ in FILES_TO_DOWNLOAD:
if "VERSION" in url_path.upper():
log_message("[INFO] 尝试从文件路径解析版本号: %s" % url_path)
return "v" + datetime.now().strftime("%Y%m%d%H%M%S")
except:
pass
return u"未知版本"
# 非中文字符界面信息
MESSAGES = {
'tool_title': u"网易大神审核数据监控系统安装工具",
'downloading': u"正在下载文件,请稍候...",
'script_started': u"下载脚本已启动",
'temp_dir': u"临时目录",
'dir_init_failed': u"初始化目录失败,退出",
'created_temp_dir': u"创建临时目录",
'dir_exists': u"临时目录已存在,正在清理内容...",
'dir_cleared': u"临时目录已清理",
'created_templates_dir': u"创建templates目录",
'created_static_dir': u"创建static目录",
'using_proxy': u"使用代理下载",
'retrying_with_proxy': u"尝试使用代理重试...",
'downloaded_files': u"已下载文件",
'of': u"个,共",
'files': u"个文件",
'download_required_failed': u"部分必需文件下载失败",
'all_files_downloaded': u"所有文件下载成功",
'install_deps_not_found': u"依赖安装脚本未找到",
'installing_deps': u"正在安装依赖...",
'deps_output': u"依赖安装输出",
'deps_error': u"依赖安装错误",
'deps_installed': u"依赖安装成功",
'deps_failed': u"依赖安装失败,返回码",
'deps_script_error': u"运行依赖安装脚本出错",
'start_script_not_found': u"启动脚本未找到",
'starting_system': u"正在启动监控系统...",
'system_started': u"监控系统启动成功",
'start_system_failed': u"启动监控系统失败",
'manual_guide_title': u"手动安装指南",
'manual_guide_intro': u"如果自动安装失败,请按照以下步骤手动安装:",
'use_ie': u"1. 获取系统文件:使用浏览器下载这些文件:",
'save_to_structure': u"2. 文件结构:将文件保存到以下结构:",
'run_deps_first': u"3. 安装依赖运行install_dependencies.bat安装依赖",
'then_run_start': u"4. 启动系统运行start_monitor.cmd启动系统",
'created_manual_guide': u"已创建手动安装指南",
'create_guide_failed': u"创建手动指南失败",
'script_copied': u"已将下载脚本复制到临时目录",
'copy_script_failed': u"复制脚本失败",
'deps_install_failed_try_start': u"依赖安装失败,尝试直接启动监控系统",
'steps_failed': u"某些步骤失败,请检查日志获取详情",
'try_manual': u"您可以尝试使用以下指南手动安装",
'unhandled_exception': u"未处理的异常",
'execution_completed': u"脚本执行完成",
'press_enter': u"按回车键退出..."
}
# 初始化目录函数
def init_directory():
"""初始化临时目录"""
try:
# 创建主目录
if not os.path.exists(TEMP_DIR):
os.makedirs(TEMP_DIR)
log_message("[INFO] %s: %s" % (MESSAGES['created_temp_dir'], TEMP_DIR))
else:
log_message("[INFO] %s" % MESSAGES['dir_exists'])
# 清理现有目录内容,只保留日志文件
for item in os.listdir(TEMP_DIR):
item_path = os.path.join(TEMP_DIR, item)
if item != "download_log.txt": # 现在只保留日志文件
if os.path.isdir(item_path):
shutil.rmtree(item_path)
else:
os.remove(item_path)
log_message("[INFO] %s" % MESSAGES['dir_cleared'])
# 创建templates子目录
templates_dir = os.path.join(TEMP_DIR, "templates")
if not os.path.exists(templates_dir):
os.makedirs(templates_dir)
log_message("[INFO] %s" % MESSAGES['created_templates_dir'])
# 创建static子目录
static_dir = os.path.join(TEMP_DIR, "static")
if not os.path.exists(static_dir):
os.makedirs(static_dir)
log_message("[INFO] %s" % MESSAGES['created_static_dir'])
return True
except Exception as e:
log_message("[ERROR] Failed to initialize directory: %s" % str(e))
return False
# 下载函数
def download_file(url_path, local_path, use_proxy=False):
"""下载单个文件,默认不使用代理"""
full_url = "%s/%s" % (COS_URL, url_path)
full_local_path = os.path.join(TEMP_DIR, local_path)
# 确保目标目录存在
local_dir = os.path.dirname(full_local_path)
if not os.path.exists(local_dir):
os.makedirs(local_dir)
# 只显示文件名不显示完整URL
log_message(u"[INFO] 正在下载文件: %s" % os.path.basename(url_path))
try:
# 设置代理信息
if use_proxy:
log_message(u"[INFO] %s" % MESSAGES['using_proxy'])
proxy_handler = urllib2.ProxyHandler({
'http': 'http://CD-WEBPROXY02.yajuenet.internal:8080',
'https': 'http://CD-WEBPROXY02.yajuenet.internal:8080'
})
opener = urllib2.build_opener(proxy_handler)
# 添加基本身份验证
username = "bug"
password = "123454678"
auth_string = base64.b64encode('%s:%s' % (username, password))
opener.addheaders = [('Authorization', 'Basic %s' % auth_string)]
urllib2.install_opener(opener)
else:
# 禁用代理,但仍添加基本身份验证
proxy_handler = urllib2.ProxyHandler({})
opener = urllib2.build_opener(proxy_handler)
# 添加基本身份验证
username = "bug"
password = "12345678"
auth_string = base64.b64encode('%s:%s' % (username, password))
opener.addheaders = [('Authorization', 'Basic %s' % auth_string)]
urllib2.install_opener(opener)
# 下载文件
response = urllib2.urlopen(full_url, timeout=30)
content = response.read()
# 检查是否是批处理文件,如果是则进行行尾转换
is_batch_file = url_path.lower().endswith('.bat') or url_path.lower().endswith('.cmd')
if is_batch_file:
log_message(u"[INFO] 批处理文件行尾转换: %s" % local_path)
# 将LF转换为CRLF
content_str = content.replace('\n', '\r\n')
# 确保没有重复的\r\n
content_str = content_str.replace('\r\r\n', '\r\n')
# 转回二进制
content = content_str
# 写入文件
with open(full_local_path, 'wb') as f:
f.write(content)
log_message(u"[INFO] 文件下载成功: %s" % local_path)
return True
except urllib2.URLError as e:
log_message(u"[ERROR] 下载失败 %s: %s" % (os.path.basename(url_path), unicode(str(e), 'gbk', 'ignore')))
# 如果不使用代理失败,尝试使用代理
if not use_proxy:
log_message(u"[INFO] %s" % MESSAGES['retrying_with_proxy'])
return download_file(url_path, local_path, True)
return False
except Exception as e:
log_message(u"[ERROR] 下载失败 %s: %s" % (os.path.basename(url_path), unicode(str(e), 'gbk', 'ignore')))
return False
# 下载所有文件
def download_all_files():
"""下载所有必要的文件"""
success_count = 0
failed_required = False
for url_path, local_path, required in FILES_TO_DOWNLOAD:
# 先尝试不使用代理下载
if download_file(url_path, local_path, False):
success_count += 1
elif required:
failed_required = True
log_message("[INFO] %s %d %s %d %s" % (
MESSAGES['downloaded_files'],
success_count,
MESSAGES['of'],
len(FILES_TO_DOWNLOAD),
MESSAGES['files']
))
if failed_required:
log_message("[ERROR] %s" % MESSAGES['download_required_failed'])
return False
return True
# 运行依赖安装脚本
def run_install_dependencies():
"""运行依赖安装脚本"""
install_script = os.path.join(TEMP_DIR, "install_dependencies.bat")
if not os.path.exists(install_script):
log_message("[ERROR] %s: %s" % (MESSAGES['install_deps_not_found'], install_script))
return False
try:
log_message("[INFO] %s" % MESSAGES['installing_deps'])
# 记录当前工作目录
original_dir = os.getcwd()
# 切换到临时目录并启动脚本
os.chdir(TEMP_DIR)
# 使用subprocess运行批处理文件并显示在控制台
# 移除stdout和stderr的PIPE重定向让输出直接显示在控制台
process = subprocess.Popen(["cmd", "/c", install_script],
shell=True)
# 等待脚本执行完成
return_code = process.wait()
# 返回到原始目录
os.chdir(original_dir)
if return_code == 0:
log_message("[INFO] %s" % MESSAGES['deps_installed'])
return True
else:
log_message("[ERROR] %s: %d" % (MESSAGES['deps_failed'], return_code))
return False
except Exception as e:
log_message("[ERROR] %s: %s" % (MESSAGES['deps_script_error'], str(e)))
return False
# 启动监控系统
def start_monitor_system():
"""启动监控系统"""
start_script = os.path.join(TEMP_DIR, "start_monitor.cmd")
if not os.path.exists(start_script):
log_message("[ERROR] %s: %s" % (MESSAGES['start_script_not_found'], start_script))
return False
try:
log_message("[INFO] %s" % MESSAGES['starting_system'])
# 记录当前工作目录
original_dir = os.getcwd()
# 切换到临时目录并启动脚本
os.chdir(TEMP_DIR)
# 使用subprocess启动批处理文件不等待其完成
process = subprocess.Popen(["cmd", "/c", "start", "", "start_monitor.cmd"],
shell=True)
# 返回到原始目录
os.chdir(original_dir)
log_message("[INFO] %s" % MESSAGES['system_started'])
return True
except Exception as e:
log_message("[ERROR] %s: %s" % (MESSAGES['start_system_failed'], str(e)))
return False
# 创建手动指南
def create_manual_guide():
"""创建手动安装指南"""
guide_path = os.path.join(TEMP_DIR, "manual_install_guide.txt")
try:
with codecs.open(guide_path, "w", "gbk") as f:
f.write("=" * 50 + "\n")
f.write("%s\n" % MESSAGES['manual_guide_title'])
f.write("=" * 50 + "\n\n")
f.write("%s\n\n" % MESSAGES['manual_guide_intro'])
f.write("%s\n\n" % MESSAGES['use_ie'])
for url_path, local_path, required in FILES_TO_DOWNLOAD:
full_url = "%s/%s" % (COS_URL, url_path)
f.write(" %s\n" % full_url)
f.write("\n%s\n\n" % MESSAGES['save_to_structure'])
f.write(" %s/\n" % TEMP_DIR)
f.write(" |-- start_monitor.cmd\n")
f.write(" |-- dashboard.py\n")
f.write(" |-- breeze_monitor.py\n")
f.write(" |-- breeze_monitor_CHAT.py\n")
f.write(" |-- cms_monitor.py\n")
f.write(" |-- inspect_monitor.py\n")
f.write(" |-- cms_coefficients.json\n")
f.write(" |-- breeze_coefficients.json\n")
f.write(" |-- install_dependencies.bat\n")
f.write(" |-- templates/\n")
f.write(" | |-- dashboard.html\n")
f.write(" | |-- login.html\n")
f.write(" |-- static/\n")
f.write(" |-- ds-favicon.ico\n\n")
f.write("%s\n\n" % MESSAGES['run_deps_first'])
f.write("%s\n\n" % MESSAGES['then_run_start'])
f.write("=" * 50 + "\n")
log_message("[INFO] %s: %s" % (MESSAGES['created_manual_guide'], guide_path))
return True
except Exception as e:
log_message("[ERROR] %s: %s" % (MESSAGES['create_guide_failed'], str(e)))
return False
# 主函数
def main():
"""主函数"""
try:
# 使用print可能会更安全不使用log_message来输出开头的界面
safe_print(u"\n%s" % MESSAGES['tool_title'])
print("======================================================\n")
safe_print(u"%s\n" % MESSAGES['downloading'])
# 初始化日志
if not os.path.exists(os.path.dirname(LOG_FILE)):
os.makedirs(os.path.dirname(LOG_FILE))
log_message("\n" + "=" * 40)
log_message("[INFO] %s" % MESSAGES['script_started'])
log_message("[INFO] %s: %s" % (MESSAGES['temp_dir'], TEMP_DIR))
# 初始化目录
if not init_directory():
log_message("[ERROR] %s" % MESSAGES['dir_init_failed'])
return 1
# 创建手动指南
create_manual_guide()
# 下载文件
if download_all_files():
log_message("[INFO] %s" % MESSAGES['all_files_downloaded'])
# 安装依赖
if run_install_dependencies():
log_message("[INFO] %s" % MESSAGES['deps_installed'])
# 启动监控系统
if start_monitor_system():
# 稍等几秒,让监控系统先启动
time.sleep(3)
return 0
else:
log_message("[WARNING] %s" % MESSAGES['deps_install_failed_try_start'])
if start_monitor_system():
# 稍等几秒,让监控系统先启动
time.sleep(3)
return 0
log_message("[WARNING] %s" % MESSAGES['steps_failed'])
log_message("[INFO] %s: %s" % (
MESSAGES['try_manual'],
os.path.join(TEMP_DIR, "manual_install_guide.txt")
))
return 1
except Exception as e:
log_message("[ERROR] %s: %s" % (MESSAGES['unhandled_exception'], str(e)))
log_message("[TRACE] %s" % traceback.format_exc())
return 1
finally:
log_message("[INFO] %s" % MESSAGES['execution_completed'])
# 入口点
if __name__ == "__main__":
sys.exit(main())