558 lines
19 KiB
Python
558 lines
19 KiB
Python
# cron: 32 7 * * *
|
||
# new Env("捂碳星球旧衣服回收")
|
||
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
捂碳星球旧衣服回收 - 养鸡场自动登录签到版
|
||
|
||
链路:
|
||
wx_cloud + wx_token + wxid -> getMiniProgramCode 获取 code
|
||
code -> /api/login/getWxLogin 获取 openid / unionid / sessionKey
|
||
wxid -> getPhoneEncryptData 获取 encryptedData / iv
|
||
encryptedData + iv + openid + unionid + sessionKey -> /api/login/wxLogin 获取 Bearer token
|
||
token -> 签到 /api/signin/addSignIn
|
||
token -> 查询用户 /api/user/index
|
||
|
||
必填环境变量:
|
||
wx_cloud 养鸡场地址,例如 http://192.168.0.250:666
|
||
wx_token 养鸡场 token,支持裸 token 或 Bearer token
|
||
|
||
可选环境变量:
|
||
WTXQ_TEST_WXID 只跑指定 wxid
|
||
WTXQ_EXCLUDE_WXIDS 排除 wxid,逗号/换行/& 分隔
|
||
WTXQ_MAX_ACCOUNTS 最大处理账号数,默认 0 不限制
|
||
WTXQ_ACCOUNT_DELAY 账号间隔秒,默认 2
|
||
WTXQ_VERIFY_SSL 1/0,默认 1
|
||
WTXQ_DEBUG 1/0,默认 0
|
||
|
||
缓存:
|
||
APP_Buffer/wtxq_token_cache.json
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import os
|
||
import random
|
||
import re
|
||
import sys
|
||
import time
|
||
import traceback
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Any, Dict, Iterable, List, Tuple
|
||
|
||
import requests
|
||
|
||
SCRIPT_NAME = "捂碳星球"
|
||
APPID = "wx54c4768a6050a90e"
|
||
BASE = "https://wt.api.5tan.com/api"
|
||
REFERER_VERSION = "223"
|
||
|
||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||
CACHE_DIR = SCRIPT_DIR / "APP_Buffer"
|
||
CACHE_FILE = CACHE_DIR / "wtxq_token_cache.json"
|
||
UA_FILE = SCRIPT_DIR / "User_Agent.json"
|
||
|
||
ENV = os.environ
|
||
WX_CLOUD = ENV.get("wx_cloud", "").rstrip("/")
|
||
WX_TOKEN_RAW = ENV.get("wx_token", "").strip()
|
||
WX_TOKEN = WX_TOKEN_RAW if WX_TOKEN_RAW.lower().startswith("bearer ") else (f"Bearer {WX_TOKEN_RAW}" if WX_TOKEN_RAW else "")
|
||
VERIFY_SSL = ENV.get("WTXQ_VERIFY_SSL", "1") not in {"0", "false", "False", "no", "NO"}
|
||
TIMEOUT = int(ENV.get("WTXQ_TIMEOUT", "20") or "20")
|
||
DEBUG = ENV.get("WTXQ_DEBUG", "0") in {"1", "true", "True", "yes", "YES"}
|
||
TEST_WXID = ENV.get("WTXQ_TEST_WXID", "").strip()
|
||
MAX_ACCOUNTS = int(ENV.get("WTXQ_MAX_ACCOUNTS", "0") or "0")
|
||
ACCOUNT_DELAY = float(ENV.get("WTXQ_ACCOUNT_DELAY", "2") or "2")
|
||
|
||
DEFAULT_UA = (
|
||
"Mozilla/5.0 (Linux; Android 15; 22061218C Build/AQ3A.250226.002; wv) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/146.0.7680.177 "
|
||
"Mobile Safari/537.36 XWEB/1460075 MMWEBSDK/20260202 MMWEBID/6435 "
|
||
"MicroMessenger/8.0.71.3080(0x18004739) WeChat/arm64 Weixin NetType/WIFI "
|
||
"Language/zh_CN ABI/arm64 MiniProgramEnv/android"
|
||
)
|
||
|
||
|
||
def now() -> str:
|
||
return time.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
|
||
def log(msg: str, level: str = "INFO") -> None:
|
||
print(f"{now()} - {level} - {msg}", flush=True)
|
||
|
||
|
||
def debug(msg: str) -> None:
|
||
if DEBUG:
|
||
log(msg, "DEBUG")
|
||
|
||
|
||
def short(s: str, left: int = 8, right: int = 4) -> str:
|
||
s = str(s or "")
|
||
return s if len(s) <= left + right + 3 else f"{s[:left]}...{s[-right:]}"
|
||
|
||
|
||
def mask_phone(phone: str) -> str:
|
||
return re.sub(r"(\d{3})\d{4}(\d{4})", r"\1****\2", str(phone or "")) or "无"
|
||
|
||
|
||
def split_multi(value: str) -> List[str]:
|
||
if not value:
|
||
return []
|
||
return [x.strip() for x in re.split(r"[,&,\n\r]+", value) if x.strip()]
|
||
|
||
|
||
EXCLUDE_WXIDS = set(split_multi(ENV.get("WTXQ_EXCLUDE_WXIDS", "")))
|
||
|
||
|
||
def ensure_dirs() -> None:
|
||
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
|
||
def load_json(path: Path, default: Any) -> Any:
|
||
try:
|
||
if path.exists():
|
||
return json.loads(path.read_text(encoding="utf-8"))
|
||
except Exception as e:
|
||
log(f"读取 {path} 失败:{e}", "WARN")
|
||
return default
|
||
|
||
|
||
def save_json(path: Path, data: Any) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
tmp = path.with_suffix(path.suffix + ".tmp")
|
||
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
tmp.replace(path)
|
||
|
||
|
||
def load_uas() -> List[str]:
|
||
data = load_json(UA_FILE, [])
|
||
if isinstance(data, list):
|
||
uas = [str(x).strip() for x in data if str(x).strip()]
|
||
if uas:
|
||
return uas
|
||
return [DEFAULT_UA]
|
||
|
||
|
||
@dataclass
|
||
class Account:
|
||
wxid: str
|
||
remark: str = ""
|
||
ua: str = ""
|
||
|
||
|
||
@dataclass
|
||
class Auth:
|
||
wxid: str
|
||
remark: str
|
||
ua: str
|
||
code: str = ""
|
||
openid: str = ""
|
||
unionid: str = ""
|
||
session_key: str = ""
|
||
encrypted_data: str = ""
|
||
iv: str = ""
|
||
token: str = ""
|
||
user_id: str = ""
|
||
|
||
|
||
@dataclass
|
||
class Result:
|
||
index: int
|
||
total: int
|
||
wxid: str
|
||
remark: str
|
||
auth_mode: str = ""
|
||
code_ok: bool = False
|
||
openid_ok: bool = False
|
||
phone_param_ok: bool = False
|
||
login_ok: bool = False
|
||
token_status: str = "未执行"
|
||
sign_status: str = "未执行"
|
||
nick_name: str = "无"
|
||
phone: str = "无"
|
||
money: str = "0.00"
|
||
error: str = ""
|
||
runtime: float = 0.0
|
||
|
||
|
||
def wx_headers() -> Dict[str, str]:
|
||
return {"Authorization": WX_TOKEN, "Content-Type": "application/json;charset=UTF-8"}
|
||
|
||
|
||
def pick_nested(data: Any, keys: Tuple[str, ...]) -> str:
|
||
if isinstance(data, dict):
|
||
for key in keys:
|
||
val = data.get(key)
|
||
if isinstance(val, (str, int, float)) and str(val).strip():
|
||
return str(val).strip()
|
||
for val in data.values():
|
||
found = pick_nested(val, keys)
|
||
if found:
|
||
return found
|
||
elif isinstance(data, list):
|
||
for item in data:
|
||
found = pick_nested(item, keys)
|
||
if found:
|
||
return found
|
||
return ""
|
||
|
||
|
||
def parse_accounts_response(data: Dict[str, Any]) -> Tuple[List[Account], int]:
|
||
container = data.get("data", data)
|
||
rows: List[Any] = []
|
||
total = 0
|
||
if isinstance(container, dict):
|
||
rows = container.get("rows") or container.get("list") or container.get("records") or []
|
||
total = int(container.get("total") or len(rows) or 0)
|
||
elif isinstance(container, list):
|
||
rows = container
|
||
total = len(rows)
|
||
accounts: List[Account] = []
|
||
for row in rows:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
wxid = str(row.get("wxid") or row.get("wxId") or row.get("id") or "").strip()
|
||
if not wxid:
|
||
continue
|
||
remark = str(row.get("remark") or row.get("name") or row.get("nickName") or row.get("nickname") or wxid).strip()
|
||
accounts.append(Account(wxid=wxid, remark=remark))
|
||
return accounts, total
|
||
|
||
|
||
def fetch_account_page(page_num: int, page_size: int = 100) -> Tuple[List[Account], int]:
|
||
url = f"{WX_CLOUD}/prod-api/wechat/wechat/list"
|
||
resp = requests.get(url, headers=wx_headers(), params={"pageNum": page_num, "pageSize": page_size}, timeout=TIMEOUT, verify=VERIFY_SSL)
|
||
resp.raise_for_status()
|
||
return parse_accounts_response(resp.json())
|
||
|
||
|
||
def iter_accounts() -> Iterable[Account]:
|
||
if TEST_WXID:
|
||
yield Account(wxid=TEST_WXID, remark=TEST_WXID)
|
||
return
|
||
page = 1
|
||
yielded = 0
|
||
while True:
|
||
rows, total = fetch_account_page(page, 100)
|
||
if not rows:
|
||
break
|
||
for acc in rows:
|
||
if acc.wxid in EXCLUDE_WXIDS:
|
||
continue
|
||
yielded += 1
|
||
yield acc
|
||
if MAX_ACCOUNTS > 0 and yielded >= MAX_ACCOUNTS:
|
||
return
|
||
if page * 100 >= total or page >= 50:
|
||
break
|
||
page += 1
|
||
|
||
|
||
def parse_code_response(data: Dict[str, Any]) -> str:
|
||
"""兼容养鸡场常见返回:
|
||
1) {'code': 200, 'data': {'code': '0xxx'}}
|
||
2) {'code': 200, 'data': {'miniProgramCode': '0xxx'}}
|
||
3) {'code': 200, 'data': '0xxx'}
|
||
注意:根级 code=200 是业务状态码,不是微信登录 code。
|
||
"""
|
||
payload = data.get("data") if isinstance(data, dict) else data
|
||
|
||
if isinstance(payload, str) and payload.strip():
|
||
return payload.strip()
|
||
|
||
if isinstance(payload, dict):
|
||
for key in ("miniProgramCode", "jsCode", "wxCode", "loginCode", "code"):
|
||
val = payload.get(key)
|
||
if isinstance(val, (str, int, float)) and str(val).strip():
|
||
code = str(val).strip()
|
||
if code not in {"0", "200"}:
|
||
return code
|
||
code = pick_nested(payload, ("miniProgramCode", "jsCode", "wxCode", "loginCode", "code"))
|
||
if code and code not in {"0", "200"}:
|
||
return code
|
||
|
||
# 兜底:只有当根级 code 看起来不是状态码时才使用
|
||
if isinstance(data, dict):
|
||
root_code = str(data.get("code") or "").strip()
|
||
if root_code and root_code not in {"0", "200"}:
|
||
return root_code
|
||
|
||
raise RuntimeError(f"未能从养鸡场响应中提取 code:{str(data)[:300]}")
|
||
|
||
|
||
def get_mini_program_code(wxid: str) -> str:
|
||
url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode"
|
||
resp = requests.post(url, headers=wx_headers(), json={"wxid": wxid, "appid": APPID}, timeout=TIMEOUT, verify=VERIFY_SSL)
|
||
resp.raise_for_status()
|
||
return parse_code_response(resp.json())
|
||
|
||
|
||
def get_phone_encrypt_data(wxid: str) -> Tuple[str, str]:
|
||
url = f"{WX_CLOUD}/prod-api/wechat/api/getPhoneEncryptData"
|
||
payloads = [
|
||
{"wxid": wxid, "appid": APPID},
|
||
{"wxid": wxid, "appId": APPID},
|
||
]
|
||
last_err = ""
|
||
for payload in payloads:
|
||
try:
|
||
resp = requests.post(url, headers=wx_headers(), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
encrypted = pick_nested(data, ("encryptedData", "encrypted_data", "phoneEncryptedData", "phone_encrypted_data", "encryptData"))
|
||
iv = pick_nested(data, ("iv", "phoneIv", "phone_iv"))
|
||
if encrypted and iv:
|
||
return encrypted, iv
|
||
last_err = str(data)[:300]
|
||
except Exception as e:
|
||
last_err = str(e)
|
||
raise RuntimeError(f"养鸡场 getPhoneEncryptData 未返回 encryptedData/iv:{last_err}")
|
||
|
||
|
||
def api_headers(token: str = "", ua: str = "") -> Dict[str, str]:
|
||
return {
|
||
"accept": "*/*",
|
||
"accept-language": "zh-CN,zh;q=0.9",
|
||
"authorization": f"Bearer {token}" if token else "Bearer",
|
||
"content-type": "application/json",
|
||
"xweb_xhr": "1",
|
||
"User-Agent": ua or DEFAULT_UA,
|
||
"Referer": f"https://servicewechat.com/{APPID}/{REFERER_VERSION}/page-frame.html",
|
||
"Referrer-Policy": "unsafe-url",
|
||
}
|
||
|
||
|
||
def post_json(path: str, payload: Dict[str, Any], token: str = "", ua: str = "") -> Dict[str, Any]:
|
||
resp = requests.post(f"{BASE}{path}", headers=api_headers(token, ua), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
debug(f"POST {path} -> {str(data)[:300]}")
|
||
return data
|
||
|
||
|
||
def get_json(path: str, token: str = "", ua: str = "") -> Dict[str, Any]:
|
||
resp = requests.get(f"{BASE}{path}", headers=api_headers(token, ua), timeout=TIMEOUT, verify=VERIFY_SSL)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
debug(f"GET {path} -> {str(data)[:300]}")
|
||
return data
|
||
|
||
|
||
def get_wx_login(code: str, ua: str) -> Tuple[str, str, str]:
|
||
data = post_json("/login/getWxLogin", {"code": code}, ua=ua)
|
||
if data.get("code") != 200:
|
||
raise RuntimeError(f"getWxLogin失败:{data.get('msg') or data}")
|
||
d = data.get("data") or {}
|
||
openid = str(d.get("openid") or "")
|
||
unionid = str(d.get("unionid") or "")
|
||
session_key = str(d.get("sessionKey") or d.get("session_key") or "")
|
||
if not openid or not unionid:
|
||
raise RuntimeError(f"getWxLogin未返回openid/unionid:{data}")
|
||
return openid, unionid, session_key
|
||
|
||
|
||
def wx_login(auth: Auth) -> Tuple[str, str]:
|
||
payload = {
|
||
"encryptedData": auth.encrypted_data,
|
||
"errMsg": "getPhoneNumber:ok",
|
||
"iv": auth.iv,
|
||
"sessionKey": auth.session_key,
|
||
"unionid": auth.unionid,
|
||
"openid": auth.openid,
|
||
}
|
||
data = post_json("/login/wxLogin", payload, ua=auth.ua)
|
||
if data.get("code") != 200:
|
||
raise RuntimeError(f"wxLogin失败:{data.get('msg') or data}")
|
||
d = data.get("data") or {}
|
||
token = str(d.get("token") or "")
|
||
user_id = str(d.get("user_id") or "")
|
||
if not token:
|
||
raise RuntimeError(f"wxLogin未返回token:{data}")
|
||
return token, user_id
|
||
|
||
|
||
def validate_token(token: str, ua: str) -> Tuple[bool, Dict[str, Any]]:
|
||
try:
|
||
data = get_json("/user/index?platform=1", token=token, ua=ua)
|
||
return data.get("code") == 200, data
|
||
except Exception:
|
||
return False, {}
|
||
|
||
|
||
def load_cache() -> Dict[str, Any]:
|
||
data = load_json(CACHE_FILE, {})
|
||
return data if isinstance(data, dict) else {}
|
||
|
||
|
||
def save_auth_cache(auth: Auth) -> None:
|
||
cache = load_cache()
|
||
old = cache.get(auth.wxid) if isinstance(cache.get(auth.wxid), dict) else {}
|
||
cache[auth.wxid] = {
|
||
"remark": auth.remark,
|
||
"ua": auth.ua or old.get("ua", ""),
|
||
"token": auth.token or old.get("token", ""),
|
||
"user_id": auth.user_id or old.get("user_id", ""),
|
||
"openid": auth.openid or old.get("openid", ""),
|
||
"unionid": auth.unionid or old.get("unionid", ""),
|
||
"updated_at": now(),
|
||
}
|
||
save_json(CACHE_FILE, cache)
|
||
|
||
|
||
def get_auth(acc: Account, uas: List[str], result: Result) -> Tuple[Auth, Dict[str, Any]]:
|
||
cache = load_cache()
|
||
cached = cache.get(acc.wxid) if isinstance(cache.get(acc.wxid), dict) else {}
|
||
ua = str(cached.get("ua") or acc.ua or random.choice(uas))
|
||
auth = Auth(wxid=acc.wxid, remark=str(cached.get("remark") or acc.remark or acc.wxid), ua=ua)
|
||
|
||
cached_token = str(cached.get("token") or "").strip()
|
||
if cached_token:
|
||
ok, info = validate_token(cached_token, ua)
|
||
if ok:
|
||
auth.token = cached_token
|
||
auth.user_id = str(cached.get("user_id") or "")
|
||
auth.openid = str(cached.get("openid") or "")
|
||
auth.unionid = str(cached.get("unionid") or "")
|
||
result.auth_mode = "缓存token"
|
||
result.login_ok = True
|
||
result.token_status = "缓存有效"
|
||
return auth, info
|
||
result.token_status = "缓存失效,重新登录"
|
||
|
||
result.auth_mode = "养鸡场自动登录"
|
||
auth.code = get_mini_program_code(acc.wxid)
|
||
result.code_ok = True
|
||
log(f"账号{result.index}/{result.total} {auth.remark} 获取code完成:{short(auth.code)}")
|
||
|
||
auth.openid, auth.unionid, auth.session_key = get_wx_login(auth.code, auth.ua)
|
||
result.openid_ok = True
|
||
log(f"账号{result.index}/{result.total} {auth.remark} getWxLogin完成:openid={short(auth.openid)} unionid={short(auth.unionid)}")
|
||
|
||
auth.encrypted_data, auth.iv = get_phone_encrypt_data(acc.wxid)
|
||
result.phone_param_ok = True
|
||
log(f"账号{result.index}/{result.total} {auth.remark} 手机号授权参数获取完成")
|
||
|
||
auth.token, auth.user_id = wx_login(auth)
|
||
result.login_ok = True
|
||
result.token_status = "登录成功"
|
||
log(f"账号{result.index}/{result.total} {auth.remark} wxLogin完成:token={short(auth.token)} user_id={auth.user_id or '无'}")
|
||
|
||
ok, info = validate_token(auth.token, auth.ua)
|
||
if not ok:
|
||
raise RuntimeError("登录后 token 验证失败")
|
||
save_auth_cache(auth)
|
||
return auth, info
|
||
|
||
|
||
def sign(token: str, ua: str) -> str:
|
||
data = post_json("/signin/addSignIn", {"platform": 1}, token=token, ua=ua)
|
||
if data.get("code") == 200:
|
||
d = data.get("data") or {}
|
||
if d.get("title") == "签到成功~":
|
||
return "成功"
|
||
return f"失败:{d.get('content') or d.get('title') or data.get('msg') or data}"
|
||
return f"失败:{data.get('msg') or data}"
|
||
|
||
|
||
def parse_user_info(data: Dict[str, Any]) -> Tuple[str, str, str]:
|
||
d = data.get("data") or {}
|
||
nick = str(d.get("nick_name") or "无")
|
||
phone = str(d.get("tel") or "无")
|
||
money = str(d.get("money") or "0.00")
|
||
return nick, phone, money
|
||
|
||
|
||
def run_account(acc: Account, index: int, total: int, uas: List[str]) -> Result:
|
||
start = time.time()
|
||
result = Result(index=index, total=total, wxid=acc.wxid, remark=acc.remark or acc.wxid)
|
||
try:
|
||
log(f"账号{index}/{total} {result.remark} 开始 wxid={acc.wxid}")
|
||
auth, info = get_auth(acc, uas, result)
|
||
result.remark = auth.remark or result.remark
|
||
nick, phone, money = parse_user_info(info)
|
||
result.nick_name, result.phone, result.money = nick, phone, money
|
||
|
||
log(f"账号{index}/{total} {result.remark} 执行签到")
|
||
result.sign_status = sign(auth.token, auth.ua)
|
||
log(f"账号{index}/{total} {result.remark} 签到:{result.sign_status}")
|
||
|
||
info = get_json("/user/index?platform=1", token=auth.token, ua=auth.ua)
|
||
nick, phone, money = parse_user_info(info)
|
||
result.nick_name, result.phone, result.money = nick, phone, money
|
||
log(f"账号{index}/{total} {result.remark} 用户【{nick}】余额:{money}元 手机:{mask_phone(phone)}")
|
||
except Exception as e:
|
||
result.error = str(e)
|
||
log(f"账号{index}/{total} {result.remark} 跳过/失败 | {result.error}", "ERROR")
|
||
finally:
|
||
result.runtime = time.time() - start
|
||
return result
|
||
|
||
|
||
def build_report(results: List[Result], ua_count: int) -> str:
|
||
ok_accounts = [r for r in results if not r.error]
|
||
sign_ok = sum(1 for r in results if r.sign_status == "成功")
|
||
money_sum = 0.0
|
||
for r in results:
|
||
try:
|
||
money_sum += float(str(r.money).replace("元", "") or 0)
|
||
except Exception:
|
||
pass
|
||
lines = []
|
||
lines.append("=" * 50)
|
||
lines.append("📊 捂碳星球执行汇总")
|
||
lines.append("=" * 50)
|
||
lines.append(f"⏱️ 执行时间: {now()}")
|
||
lines.append(f"👥 总账号数: {len(results)}")
|
||
lines.append(f"✅ 成功账号: {len(ok_accounts)}")
|
||
lines.append(f"📝 签到成功: {sign_ok}")
|
||
lines.append(f"💰 余额合计: {money_sum:.2f}")
|
||
lines.append(f"🧭 UA数量: {ua_count}")
|
||
lines.append(f"📁 缓存目录: {CACHE_DIR}")
|
||
lines.append(f"📄 缓存文件: {CACHE_FILE}")
|
||
lines.append("📋 账号详情:")
|
||
for r in results:
|
||
lines.append(("✅" if not r.error else "❌") + f" 账号{r.index}: {r.remark}")
|
||
lines.append(f"🆔 wxid: {r.wxid}")
|
||
lines.append(f"👤 昵称: {r.nick_name}")
|
||
lines.append(f"📱 手机: {mask_phone(r.phone)}")
|
||
lines.append(f"🔐 登录: {r.auth_mode or '无'}")
|
||
lines.append(f"🔑 Code: {'成功' if r.code_ok else '未执行/缓存'}")
|
||
lines.append(f"🪪 OpenID: {'成功' if r.openid_ok else '未执行/缓存'}")
|
||
lines.append(f"📱 手机授权: {'成功' if r.phone_param_ok else '未执行/缓存'}")
|
||
lines.append(f"🎟️ Token: {r.token_status}")
|
||
lines.append(f"📝 签到: {r.sign_status}")
|
||
lines.append(f"💰 余额: {r.money}")
|
||
lines.append(f"⏱️ 耗时: {r.runtime:.2f}秒")
|
||
if r.error:
|
||
lines.append(f"⚠️ 错误: {r.error}")
|
||
lines.append("-" * 30)
|
||
return "\n".join(lines)
|
||
|
||
|
||
def main() -> None:
|
||
ensure_dirs()
|
||
log(f"【{SCRIPT_NAME}】开始执行任务")
|
||
if not WX_CLOUD or not WX_TOKEN:
|
||
raise RuntimeError("缺少 wx_cloud 或 wx_token")
|
||
uas = load_uas()
|
||
results: List[Result] = []
|
||
accounts = list(iter_accounts())
|
||
if not accounts:
|
||
raise RuntimeError("没有可处理账号,请检查 wx_cloud、wx_token、WTXQ_TEST_WXID、WTXQ_EXCLUDE_WXIDS")
|
||
total = len(accounts)
|
||
for idx, acc in enumerate(accounts, 1):
|
||
results.append(run_account(acc, idx, total, uas))
|
||
if idx < total and ACCOUNT_DELAY > 0:
|
||
time.sleep(ACCOUNT_DELAY)
|
||
print(build_report(results, len(uas)))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
main()
|
||
except Exception:
|
||
log(traceback.format_exc(), "ERROR")
|
||
sys.exit(1)
|
||
|
||
|