# cron: 52 9 * * * # new Env("回收蛙旧衣服回收") #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 回收蛙 - 养鸡场完整自动版 链路: wx_cloud + wx_token + wxid -> 微信 code code -> openid/session_key/unionid encryptedData + iv + session_key -> 手机号 openid + unionid + phone -> user_id user_id -> 签到、浏览视频、浏览商品、查询余额 必填环境变量: wx_cloud 养鸡场地址,例如 http://192.168.0.250:666 wx_token 养鸡场 token,支持裸 token 或 Bearer token 手机号授权数据: 默认自动调用养鸡场 getPhoneEncryptData 获取 encryptedData/iv,再走目标小程序 phone_new 解密手机号 如果协议端不支持该接口,可用 HSW_PHONE 或 hsw_accounts.json 兜底 可选环境变量: HSW_TEST_WXID 只跑指定 wxid HSW_EXCLUDE_WXIDS 排除 wxid,逗号分隔 HSW_PHONE 固定手机号兜底,仅单号测试建议使用 HSW_ENCRYPTED_DATA encryptedData 兜底 HSW_IV iv 兜底 HSW_ACCOUNT_FILE 账号映射文件,默认同目录 hsw_accounts.json HSW_MAX_ACCOUNTS 最大处理账号数,默认 0 不限制 HSW_ACCOUNT_DELAY 账号间隔秒,默认 2 HSW_VERIFY_SSL 1/0,默认 1 账号映射文件格式 hsw_accounts.json: { "wxid_xxx": { "remark": "账号1", "phone": "13800000000", "encryptedData": "...", "iv": "..." } } 缓存: APP_Buffer/hsw_user_cache.json """ from __future__ import annotations import hashlib import json import os import random import re import sys import time import traceback from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from urllib.parse import quote_plus, urlencode, urlsplit, parse_qsl import requests SCRIPT_NAME = "回收蛙" APPID = "wx5f671b00a9dfca58" I_ID = "373" M_NAME = "zm_jyf" OA_BASE = "https://oa.syrecovery.com" WWW_BASE = "https://www.syrecovery.com/app/index.php" SIGN_TOKEN = "undified" # 原 JS 源码就是这个拼写 SCRIPT_DIR = Path(__file__).resolve().parent CACHE_DIR = SCRIPT_DIR / "APP_Buffer" CACHE_FILE = CACHE_DIR / "hsw_user_cache.json" UA_FILE = SCRIPT_DIR / "User_Agent.json" ACCOUNT_FILE = Path(os.environ.get("HSW_ACCOUNT_FILE", str(SCRIPT_DIR / "hsw_accounts.json"))) ENV = os.environ WX_CLOUD = ENV.get("wx_cloud", "").rstrip("/") WX_TOKEN_RAW = ENV.get("wx_token", "").strip() WX_TOKEN = WX_TOKEN_RAW if WX_TOKEN_RAW.lower().startswith("bearer ") else (f"Bearer {WX_TOKEN_RAW}" if WX_TOKEN_RAW else "") VERIFY_SSL = ENV.get("HSW_VERIFY_SSL", "1") not in {"0", "false", "False", "no", "NO"} TIMEOUT = int(ENV.get("HSW_TIMEOUT", "20")) TEST_WXID = ENV.get("HSW_TEST_WXID", "wxid_axur9hlf58aq21").strip() EXCLUDE_WXIDS = {x.strip() for x in ENV.get("HSW_EXCLUDE_WXIDS", "").split(",") if x.strip()} MAX_ACCOUNTS = int(ENV.get("HSW_MAX_ACCOUNTS", "0") or "0") ACCOUNT_DELAY = float(ENV.get("HSW_ACCOUNT_DELAY", "2") or "2") GLOBAL_PHONE = ENV.get("HSW_PHONE", "").strip() GLOBAL_ENCRYPTED_DATA = ENV.get("HSW_ENCRYPTED_DATA", "").strip() GLOBAL_IV = ENV.get("HSW_IV", "").strip() DEFAULT_UA = ( "Mozilla/5.0 (Linux; Android 15; 22061218C Build/AQ3A.250226.002; wv) " "AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/146.0.7680.177 " "Mobile Safari/537.36 XWEB/1460075 MMWEBSDK/20260202 MMWEBID/6435 " "MicroMessenger/8.0.71.3080(0x18004739) WeChat/arm64 Weixin NetType/WIFI " "Language/zh_CN ABI/arm64 MiniProgramEnv/android" ) def now() -> str: return time.strftime("%Y-%m-%d %H:%M:%S") def log(msg: str, level: str = "INFO") -> None: print(f"{now()} - {level} - {msg}", flush=True) def mask_phone(phone: str) -> str: if not phone: return "无" return re.sub(r"(\d{3})\d{4}(\d{4})", r"\1****\2", phone) def short(s: str, left: int = 8, right: int = 4) -> str: if not s: return "" return s if len(s) <= left + right + 3 else f"{s[:left]}...{s[-right:]}" def ensure_dirs() -> None: CACHE_DIR.mkdir(parents=True, exist_ok=True) def load_json(path: Path, default: Any) -> Any: try: if path.exists(): return json.loads(path.read_text(encoding="utf-8")) except Exception as e: log(f"读取 {path} 失败:{e}", "WARN") return default def save_json(path: Path, data: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") tmp.replace(path) def load_uas() -> List[str]: data = load_json(UA_FILE, []) if isinstance(data, list): uas = [str(x) for x in data if str(x).strip()] if uas: return uas return [DEFAULT_UA] def wx_headers() -> Dict[str, str]: return {"Authorization": WX_TOKEN, "Content-Type": "application/json;charset=UTF-8"} @dataclass class Account: wxid: str remark: str = "" nickname: str = "" @dataclass class HswAuth: wxid: str = "" remark: str = "" code: str = "" openid: str = "" unionid: str = "" session_key: str = "" phone: str = "" user_id: str = "" ua: str = "" @dataclass class Result: index: int total: int wxid: str remark: str auth_mode: str = "" code_ok: bool = False openid_ok: bool = False phone_ok: bool = False login_ok: bool = False user_id: str = "" phone: str = "" sign_status: str = "未执行" video_status: str = "未执行" product_status: str = "未执行" product_count: int = 0 balance_text: str = "未查询" jifen: int = 0 error: str = "" runtime: float = 0.0 def parse_accounts_response(data: Dict[str, Any]) -> Tuple[List[Account], int]: container = data.get("data", data) rows = [] total = 0 if isinstance(container, dict): rows = container.get("rows") or container.get("list") or container.get("records") or [] total = int(container.get("total") or len(rows) or 0) elif isinstance(container, list): rows = container total = len(rows) accounts: List[Account] = [] for row in rows: if not isinstance(row, dict): continue wxid = str(row.get("wxid") or row.get("wxId") or row.get("id") or "").strip() if not wxid: continue remark = str(row.get("remark") or row.get("name") or row.get("nickName") or row.get("nickname") or wxid).strip() nickname = str(row.get("nickName") or row.get("nickname") or "").strip() accounts.append(Account(wxid=wxid, remark=remark, nickname=nickname)) return accounts, total def fetch_account_page(page_num: int, page_size: int = 100) -> Tuple[List[Account], int]: if not WX_CLOUD or not WX_TOKEN: raise RuntimeError("缺少 wx_cloud 或 wx_token") url = f"{WX_CLOUD}/prod-api/wechat/wechat/list" resp = requests.get(url, headers=wx_headers(), params={"pageNum": page_num, "pageSize": page_size}, timeout=TIMEOUT, verify=VERIFY_SSL) resp.raise_for_status() return parse_accounts_response(resp.json()) def get_accounts() -> List[Account]: if TEST_WXID: return [Account(wxid=TEST_WXID, remark=TEST_WXID)] accounts: List[Account] = [] page = 1 while True: rows, total = fetch_account_page(page, 100) accounts.extend(rows) if not rows or len(accounts) >= total or page >= 20: break page += 1 accounts = [a for a in accounts if a.wxid not in EXCLUDE_WXIDS] if MAX_ACCOUNTS > 0: accounts = accounts[:MAX_ACCOUNTS] return accounts def pick_nested(data: Any, keys: Tuple[str, ...]) -> str: """在常见嵌套响应里递归找字段。""" if isinstance(data, dict): for key in keys: val = data.get(key) if isinstance(val, str) and val.strip(): return val.strip() for val in data.values(): found = pick_nested(val, keys) if found: return found elif isinstance(data, list): for item in data: found = pick_nested(item, keys) if found: return found return "" def parse_code_response(data: Dict[str, Any]) -> str: candidates = [data] if isinstance(data.get("data"), dict): candidates.append(data["data"]) for c in candidates: for key in ("code", "miniProgramCode", "jsCode"): val = c.get(key) if isinstance(c, dict) else None if isinstance(val, str) and val.strip() and val.strip() not in {"0", "200"}: return val.strip() if isinstance(data.get("data"), str) and data["data"].strip(): return data["data"].strip() raise RuntimeError(f"未能从养鸡场响应中提取 code:{str(data)[:300]}") def get_mini_program_code(wxid: str) -> str: url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode" resp = requests.post(url, headers=wx_headers(), json={"wxid": wxid, "appid": APPID}, timeout=TIMEOUT, verify=VERIFY_SSL) resp.raise_for_status() data = resp.json() code = parse_code_response(data) if not code: raise RuntimeError("养鸡场返回 code 为空") return code def get_phone_encrypt_data(wxid: str) -> Tuple[str, str]: """调用养鸡场手机号授权接口,返回 encryptedData/iv。 不同养鸡场版本字段名可能略有差异,这里兼容常见返回结构。 """ url = f"{WX_CLOUD}/prod-api/wechat/api/getPhoneEncryptData" payloads = [ {"wxid": wxid, "appid": APPID}, {"wxid": wxid, "appId": APPID}, ] last_err = "" for payload in payloads: try: resp = requests.post(url, headers=wx_headers(), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL) resp.raise_for_status() data = resp.json() encrypted = pick_nested(data, ("encryptedData", "encrypted_data", "phoneEncryptedData", "phone_encrypted_data", "encryptData")) iv = pick_nested(data, ("iv", "phoneIv", "phone_iv")) if encrypted and iv: return encrypted, iv last_err = str(data)[:300] except Exception as e: last_err = str(e) raise RuntimeError(f"养鸡场 getPhoneEncryptData 未返回 encryptedData/iv:{last_err}") def hsw_sign(params: Dict[str, Any], token: str = SIGN_TOKEN) -> str: pairs = [] for k, v in params.items(): if k == "sign" or v is None or v == "": continue pairs.append((str(k), str(v))) pairs.sort(key=lambda x: x[0]) param_string = "&".join([f"{k}={v}" for k, v in pairs]) return hashlib.md5((param_string + token).encode("utf-8")).hexdigest() def www_get(params: Dict[str, Any], ua: str) -> Dict[str, Any]: params = dict(params) params["sign"] = hsw_sign(params) headers = { "User-Agent": ua, "Referer": f"https://servicewechat.com/{APPID}/156/page-frame.html", "Content-Type": "application/x-www-form-urlencoded", "xweb_xhr": "1", "Accept-Language": "zh-CN,zh;q=0.9", } resp = requests.get(WWW_BASE, headers=headers, params=params, timeout=TIMEOUT, verify=VERIFY_SSL) resp.raise_for_status() text = resp.text.lstrip("\ufeff") return json.loads(text) def oa_post(path: str, data: Dict[str, Any], ua: str) -> Dict[str, Any]: headers = { "User-Agent": ua, "Referer": f"https://servicewechat.com/{APPID}/156/page-frame.html", "Content-Type": "application/x-www-form-urlencoded", "xweb_xhr": "1", "Accept-Language": "zh-CN,zh;q=0.9", } resp = requests.post(f"{OA_BASE}{path}", headers=headers, data=data, timeout=TIMEOUT, verify=VERIFY_SSL) resp.raise_for_status() return resp.json() def openid_new(code: str, ua: str) -> Tuple[str, str, str]: params = { "i": I_ID, "t": "undefined", "v": "1.0.0", "from": "wxapp", "c": "entry", "a": "wxapp", "do": "openid_new", "m": M_NAME, "code": code, } data = www_get(params, ua) if data.get("errno") != 0: raise RuntimeError(f"openid_new 失败:{data.get('message') or data}") d = data.get("data") or {} return str(d.get("openid") or ""), str(d.get("unionid") or ""), str(d.get("session_key") or "") def phone_new(encrypted_data: str, iv: str, session_key: str, ua: str) -> str: params = { "i": I_ID, "t": "undefined", "v": "1.0.0", "from": "wxapp", "c": "entry", "a": "wxapp", "do": "phone_new", "m": M_NAME, "encryptedData": encrypted_data, "iv": iv, "session_key": session_key, } data = www_get(params, ua) if data.get("errno") != 0: raise RuntimeError(f"phone_new 失败:{data.get('message') or data}") d = data.get("data") or {} return str(d.get("phoneNumber") or d.get("purePhoneNumber") or "") def oa_user_login(openid: str, unionid: str, phone: str, ua: str) -> str: data = oa_post("/api/recycle/app/login/user_login", { "type": "1", "openid": openid, "unionid": unionid, "platform": "1", "phone": phone, }, ua) if data.get("code") != 1: raise RuntimeError(f"oa user_login 失败:{data.get('msg') or data}") user_id = str((data.get("data") or {}).get("user_id") or "") if not user_id: raise RuntimeError(f"oa user_login 未返回 user_id:{data}") return user_id def www_user_login(openid: str, ua: str) -> Dict[str, Any]: params = { "i": I_ID, "t": "undefined", "v": "1.0.0", "from": "wxapp", "c": "entry", "a": "wxapp", "do": "user_login", "m": M_NAME, "openid": openid, "type": "wx", "lat": "undefined", "lng": "undefined", } return www_get(params, ua) def load_account_map() -> Dict[str, Any]: data = load_json(ACCOUNT_FILE, {}) return data if isinstance(data, dict) else {} def get_account_extra(wxid: str) -> Dict[str, Any]: amap = load_account_map() item = amap.get(wxid) or amap.get("default") or {} return item if isinstance(item, dict) else {} def save_auth_cache(auth: HswAuth) -> None: cache = load_json(CACHE_FILE, {}) cache[auth.wxid] = { "remark": auth.remark, "openid": auth.openid, "unionid": auth.unionid, "session_key": auth.session_key, "phone": auth.phone, "user_id": auth.user_id, "ua": auth.ua, "updated_at": now(), } save_json(CACHE_FILE, cache) def load_auth_cache(wxid: str) -> Dict[str, Any]: cache = load_json(CACHE_FILE, {}) item = cache.get(wxid) or {} return item if isinstance(item, dict) else {} def get_auth(account: Account, uas: List[str], result: Result) -> HswAuth: extra = get_account_extra(account.wxid) cached = load_auth_cache(account.wxid) ua = str(cached.get("ua") or extra.get("ua") or random.choice(uas)) auth = HswAuth(wxid=account.wxid, remark=str(extra.get("remark") or account.remark or account.wxid), ua=ua) # 有 user_id 直接复用,任务本身不依赖 token/cookie。 cached_user_id = str(cached.get("user_id") or extra.get("user_id") or "").strip() if cached_user_id: auth.user_id = cached_user_id auth.phone = str(cached.get("phone") or extra.get("phone") or GLOBAL_PHONE or "") auth.openid = str(cached.get("openid") or extra.get("openid") or "") auth.unionid = str(cached.get("unionid") or extra.get("unionid") or "") auth.session_key = str(cached.get("session_key") or "") result.auth_mode = "缓存user_id" result.login_ok = True result.user_id = auth.user_id result.phone = auth.phone return auth result.auth_mode = "养鸡场code登录" auth.code = get_mini_program_code(account.wxid) result.code_ok = True log(f"账号{result.index}/{result.total} {auth.remark} 获取微信code完成:{short(auth.code)}") auth.openid, auth.unionid, auth.session_key = openid_new(auth.code, ua) result.openid_ok = True log(f"账号{result.index}/{result.total} {auth.remark} openid_new完成:openid={short(auth.openid)} unionid={short(auth.unionid)}") phone = str(extra.get("phone") or GLOBAL_PHONE or cached.get("phone") or "").strip() encrypted = str(extra.get("encryptedData") or extra.get("encrypted_data") or GLOBAL_ENCRYPTED_DATA or "").strip() iv = str(extra.get("iv") or GLOBAL_IV or "").strip() if not phone: if not encrypted or not iv: log(f"账号{result.index}/{result.total} {auth.remark} 调用养鸡场手机号授权接口 getPhoneEncryptData") encrypted, iv = get_phone_encrypt_data(account.wxid) log(f"账号{result.index}/{result.total} {auth.remark} 手机号授权参数获取完成") phone = phone_new(encrypted, iv, auth.session_key, ua) auth.phone = phone result.phone_ok = True log(f"账号{result.index}/{result.total} {auth.remark} 手机号准备完成:{mask_phone(phone)}") auth.user_id = oa_user_login(auth.openid, auth.unionid, auth.phone, ua) result.login_ok = True result.user_id = auth.user_id result.phone = auth.phone log(f"账号{result.index}/{result.total} {auth.remark} 登录完成:user_id={auth.user_id}") # 访问 www 侧 user_login,补齐会话/验证 user_id,不强依赖。 try: data = www_user_login(auth.openid, ua) user = ((data.get("data") or {}).get("user") or {}) if isinstance(data, dict) else {} if user.get("id"): auth.user_id = str(user.get("id")) result.user_id = auth.user_id except Exception as e: log(f"账号{result.index}/{result.total} {auth.remark} www_user_login 非关键失败:{e}", "WARN") save_auth_cache(auth) return auth def product_list(user_id: str, ua: str) -> List[Dict[str, str]]: params = { "i": I_ID, "t": "undefined", "v": "1.0.0", "from": "wxapp", "c": "entry", "a": "wxapp", "do": "goods_list_new", "m": M_NAME, "page": "1", "uid": user_id, "state": "0", "type": "wx", } data = www_get(params, ua) if data.get("errno") != 0: raise RuntimeError(f"获取商品id失败:{data.get('message') or data}") rows = ((data.get("data") or {}).get("list") or []) goods = [{"id": str(x.get("id")), "title": str(x.get("title") or x.get("id"))} for x in rows if isinstance(x, dict) and x.get("id")] random.shuffle(goods) return goods[:5] def video_list(user_id: str, ua: str) -> Optional[Dict[str, str]]: params = { "i": I_ID, "t": "undefined", "v": "1.0.0", "from": "wxapp", "c": "entry", "a": "wxapp", "do": "notice_list", "m": M_NAME, "page": "1", "uid": user_id, "state": "0", "type": "wx", } data = www_get(params, ua) if data.get("errno") != 0: raise RuntimeError(f"获取视频id失败:{data.get('message') or data}") rows = ((data.get("data") or {}).get("list") or []) valid = [x for x in rows if isinstance(x, dict) and x.get("video") and x.get("content")] if not valid: return None x = random.choice(valid) return {"video": str(x.get("video")), "content": str(x.get("content"))} def sign_in(user_id: str, ua: str) -> str: data = oa_post("/api/recycle/app/welfare/sign_in", {"user_id": user_id}, ua) if data.get("code") == 1: return "成功" return f"失败:{data.get('errorMsg') or data.get('msg') or data}" def watch_video(user_id: str, video: Dict[str, str], ua: str) -> str: data = oa_post("/api/recycle/app/welfare/watch_video", {"video_id": video["video"], "user_id": user_id}, ua) if data.get("code") == 1: return f"成功:{video.get('content', '')[:20]}" return f"失败:{data.get('errorMsg') or data.get('msg') or data}" def watch_product(user_id: str, product: Dict[str, str], ua: str) -> str: data = oa_post("/api/recycle/app/welfare/watch_product", {"product_id": product["id"], "user_id": user_id}, ua) if data.get("code") == 1: return "成功" return f"失败:{data.get('errorMsg') or data.get('msg') or data}" def user_jf_log(user_id: str, ua: str) -> Tuple[str, int]: params = { "i": I_ID, "t": "undefined", "v": "1.0.0", "from": "wxapp", "c": "entry", "a": "wxapp", "do": "user_jf_log", "m": M_NAME, "page": "1", "uid": user_id, "state": "0", "type": "wx", } data = www_get(params, ua) if data.get("errno") != 0: raise RuntimeError(f"查询积分失败:{data.get('message') or data}") d = data.get("data") or {} jifen = int(float(d.get("jifen") or 0)) tx_min_money = d.get("tx_min_money", "无") return f"余额{jifen / 1000:.2f}元,最低提现{tx_min_money}元", jifen def random_sleep(a: int = 3, b: int = 6) -> None: delay = random.randint(a, b) log(f"等待 {delay} 秒") time.sleep(delay) def run_account(account: Account, index: int, total: int, uas: List[str]) -> Result: start = time.time() result = Result(index=index, total=total, wxid=account.wxid, remark=account.remark or account.wxid) try: log(f"账号{index}/{total} {result.remark} 开始 wxid={account.wxid}") auth = get_auth(account, uas, result) result.remark = auth.remark or result.remark result.user_id = auth.user_id result.phone = auth.phone log(f"账号{index}/{total} {result.remark} 执行签到 user_id={auth.user_id}") result.sign_status = sign_in(auth.user_id, auth.ua) log(f"账号{index}/{total} {result.remark} 签到:{result.sign_status}") random_sleep() log(f"账号{index}/{total} {result.remark} 获取并浏览视频") video = video_list(auth.user_id, auth.ua) if video: result.video_status = watch_video(auth.user_id, video, auth.ua) else: result.video_status = "无可用视频" log(f"账号{index}/{total} {result.remark} 视频:{result.video_status}") random_sleep() log(f"账号{index}/{total} {result.remark} 获取并浏览商品") goods = product_list(auth.user_id, auth.ua) ok_count = 0 statuses = [] for g in goods: st = watch_product(auth.user_id, g, auth.ua) statuses.append(st) if st.startswith("成功"): ok_count += 1 log(f"账号{index}/{total} {result.remark} 浏览商品【{g.get('title')}】:{st}") random_sleep() result.product_count = ok_count result.product_status = f"成功{ok_count}/{len(goods)}" if goods else "无商品" log(f"账号{index}/{total} {result.remark} 查询积分/余额") result.balance_text, result.jifen = user_jf_log(auth.user_id, auth.ua) log(f"账号{index}/{total} {result.remark} 当前:{result.balance_text}") except Exception as e: result.error = str(e) log(f"账号{index}/{total} {result.remark} 跳过/失败 | {result.error}", "ERROR") finally: result.runtime = time.time() - start return result def build_report(results: List[Result]) -> str: success = [r for r in results if not r.error] total_jifen = sum(r.jifen for r in results) lines = [] lines.append("=" * 50) lines.append("📊 回收蛙执行汇总") lines.append("=" * 50) lines.append(f"⏱️ 执行时间: {now()}") lines.append(f"👥 总账号数: {len(results)}") lines.append(f"✅ 成功账号: {len(success)}") lines.append(f"📝 签到成功: {sum(1 for r in results if r.sign_status == '成功')}") lines.append(f"🎬 视频浏览成功: {sum(1 for r in results if r.video_status.startswith('成功'))}") lines.append(f"🛍️ 商品浏览成功次数: {sum(r.product_count for r in results)}") lines.append(f"🎯 总积分: {total_jifen}") lines.append(f"💰 余额合计: {total_jifen / 1000:.2f}") lines.append(f"🧭 UA数量: {len(load_uas())}") lines.append(f"📁 缓存目录: {CACHE_DIR}") lines.append(f"📄 缓存文件: {CACHE_FILE}") lines.append("📋 账号详情:") for r in results: lines.append(("✅" if not r.error else "❌") + f" 账号{r.index}: {r.remark}") lines.append(f"🆔 wxid: {r.wxid}") lines.append(f"👤 user_id: {r.user_id or '无'}") lines.append(f"📱 手机: {mask_phone(r.phone)}") lines.append(f"🔐 登录: {r.auth_mode or '无'}") lines.append(f"🔑 Code: {'成功' if r.code_ok else '未执行/缓存'}") lines.append(f"🪪 OpenID: {'成功' if r.openid_ok else '未执行/缓存'}") lines.append(f"📱 手机授权: {'成功' if r.phone_ok else '未执行/缓存'}") lines.append(f"📝 签到: {r.sign_status}") lines.append(f"🎬 视频: {r.video_status}") lines.append(f"🛍️ 商品: {r.product_status}") lines.append(f"💰 余额: {r.balance_text}") lines.append(f"🎯 积分: {r.jifen}") lines.append(f"⏱️ 耗时: {r.runtime:.2f}秒") if r.error: lines.append(f"⚠️ 错误: {r.error}") lines.append("-" * 30) return "\n".join(lines) def main() -> None: ensure_dirs() log(f"【{SCRIPT_NAME}】开始执行任务") if not WX_CLOUD or not WX_TOKEN: raise RuntimeError("缺少 wx_cloud 或 wx_token") uas = load_uas() accounts = get_accounts() if not accounts: raise RuntimeError("没有可处理账号,请检查 wx_cloud、wx_token、HSW_TEST_WXID、HSW_EXCLUDE_WXIDS") results: List[Result] = [] total = len(accounts) for idx, account in enumerate(accounts, 1): results.append(run_account(account, idx, total, uas)) if idx < total and ACCOUNT_DELAY > 0: log(f"账号间隔 {ACCOUNT_DELAY} 秒") time.sleep(ACCOUNT_DELAY) print(build_report(results)) if __name__ == "__main__": try: main() except Exception: log(traceback.format_exc(), "ERROR") sys.exit(1)