Files
Yangmao_Script/WX_Applet/Applet_JYHS_HSW.py

742 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# cron: 52 9 * * *
# new Env("回收蛙旧衣服回收")
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
回收蛙 - 养鸡场完整自动版
链路:
wx_cloud + wx_token + wxid -> 微信 code
code -> openid/session_key/unionid
encryptedData + iv + session_key -> 手机号
openid + unionid + phone -> user_id
user_id -> 签到、浏览视频、浏览商品、查询余额
必填环境变量:
wx_cloud 养鸡场地址,例如 http://192.168.0.250:666
wx_token 养鸡场 token支持裸 token 或 Bearer token
手机号授权数据:
默认自动调用养鸡场 getPhoneEncryptData 获取 encryptedData/iv再走目标小程序 phone_new 解密手机号
如果协议端不支持该接口,可用 HSW_PHONE 或 hsw_accounts.json 兜底
可选环境变量:
HSW_TEST_WXID 只跑指定 wxid
HSW_EXCLUDE_WXIDS 排除 wxid逗号分隔
HSW_PHONE 固定手机号兜底,仅单号测试建议使用
HSW_ENCRYPTED_DATA encryptedData 兜底
HSW_IV iv 兜底
HSW_ACCOUNT_FILE 账号映射文件,默认同目录 hsw_accounts.json
HSW_MAX_ACCOUNTS 最大处理账号数,默认 0 不限制
HSW_ACCOUNT_DELAY 账号间隔秒,默认 2
HSW_VERIFY_SSL 1/0默认 1
账号映射文件格式 hsw_accounts.json
{
"wxid_xxx": {
"remark": "账号1",
"phone": "13800000000",
"encryptedData": "...",
"iv": "..."
}
}
缓存:
APP_Buffer/hsw_user_cache.json
"""
from __future__ import annotations
import hashlib
import json
import os
import random
import re
import sys
import time
import traceback
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import quote_plus, urlencode, urlsplit, parse_qsl
import requests
SCRIPT_NAME = "回收蛙"
APPID = "wx5f671b00a9dfca58"
I_ID = "373"
M_NAME = "zm_jyf"
OA_BASE = "https://oa.syrecovery.com"
WWW_BASE = "https://www.syrecovery.com/app/index.php"
SIGN_TOKEN = "undified" # 原 JS 源码就是这个拼写
SCRIPT_DIR = Path(__file__).resolve().parent
CACHE_DIR = SCRIPT_DIR / "APP_Buffer"
CACHE_FILE = CACHE_DIR / "hsw_user_cache.json"
UA_FILE = SCRIPT_DIR / "User_Agent.json"
ACCOUNT_FILE = Path(os.environ.get("HSW_ACCOUNT_FILE", str(SCRIPT_DIR / "hsw_accounts.json")))
ENV = os.environ
WX_CLOUD = ENV.get("wx_cloud", "").rstrip("/")
WX_TOKEN_RAW = ENV.get("wx_token", "").strip()
WX_TOKEN = WX_TOKEN_RAW if WX_TOKEN_RAW.lower().startswith("bearer ") else (f"Bearer {WX_TOKEN_RAW}" if WX_TOKEN_RAW else "")
VERIFY_SSL = ENV.get("HSW_VERIFY_SSL", "1") not in {"0", "false", "False", "no", "NO"}
TIMEOUT = int(ENV.get("HSW_TIMEOUT", "20"))
TEST_WXID = ENV.get("HSW_TEST_WXID", "").strip()
EXCLUDE_WXIDS = {x.strip() for x in ENV.get("HSW_EXCLUDE_WXIDS", "").split(",") if x.strip()}
MAX_ACCOUNTS = int(ENV.get("HSW_MAX_ACCOUNTS", "0") or "0")
ACCOUNT_DELAY = float(ENV.get("HSW_ACCOUNT_DELAY", "2") or "2")
GLOBAL_PHONE = ENV.get("HSW_PHONE", "").strip()
GLOBAL_ENCRYPTED_DATA = ENV.get("HSW_ENCRYPTED_DATA", "").strip()
GLOBAL_IV = ENV.get("HSW_IV", "").strip()
DEFAULT_UA = (
"Mozilla/5.0 (Linux; Android 15; 22061218C Build/AQ3A.250226.002; wv) "
"AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/146.0.7680.177 "
"Mobile Safari/537.36 XWEB/1460075 MMWEBSDK/20260202 MMWEBID/6435 "
"MicroMessenger/8.0.71.3080(0x18004739) WeChat/arm64 Weixin NetType/WIFI "
"Language/zh_CN ABI/arm64 MiniProgramEnv/android"
)
def now() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def log(msg: str, level: str = "INFO") -> None:
print(f"{now()} - {level} - {msg}", flush=True)
def mask_phone(phone: str) -> str:
if not phone:
return ""
return re.sub(r"(\d{3})\d{4}(\d{4})", r"\1****\2", phone)
def short(s: str, left: int = 8, right: int = 4) -> str:
if not s:
return ""
return s if len(s) <= left + right + 3 else f"{s[:left]}...{s[-right:]}"
def ensure_dirs() -> None:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
def load_json(path: Path, default: Any) -> Any:
try:
if path.exists():
return json.loads(path.read_text(encoding="utf-8"))
except Exception as e:
log(f"读取 {path} 失败:{e}", "WARN")
return default
def save_json(path: Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(path)
def load_uas() -> List[str]:
data = load_json(UA_FILE, [])
if isinstance(data, list):
uas = [str(x) for x in data if str(x).strip()]
if uas:
return uas
return [DEFAULT_UA]
def wx_headers() -> Dict[str, str]:
return {"Authorization": WX_TOKEN, "Content-Type": "application/json;charset=UTF-8"}
@dataclass
class Account:
wxid: str
remark: str = ""
nickname: str = ""
@dataclass
class HswAuth:
wxid: str = ""
remark: str = ""
code: str = ""
openid: str = ""
unionid: str = ""
session_key: str = ""
phone: str = ""
user_id: str = ""
ua: str = ""
@dataclass
class Result:
index: int
total: int
wxid: str
remark: str
auth_mode: str = ""
code_ok: bool = False
openid_ok: bool = False
phone_ok: bool = False
login_ok: bool = False
user_id: str = ""
phone: str = ""
sign_status: str = "未执行"
video_status: str = "未执行"
product_status: str = "未执行"
product_count: int = 0
balance_text: str = "未查询"
jifen: int = 0
error: str = ""
runtime: float = 0.0
def parse_accounts_response(data: Dict[str, Any]) -> Tuple[List[Account], int]:
container = data.get("data", data)
rows = []
total = 0
if isinstance(container, dict):
rows = container.get("rows") or container.get("list") or container.get("records") or []
total = int(container.get("total") or len(rows) or 0)
elif isinstance(container, list):
rows = container
total = len(rows)
accounts: List[Account] = []
for row in rows:
if not isinstance(row, dict):
continue
wxid = str(row.get("wxid") or row.get("wxId") or row.get("id") or "").strip()
if not wxid:
continue
remark = str(row.get("remark") or row.get("name") or row.get("nickName") or row.get("nickname") or wxid).strip()
nickname = str(row.get("nickName") or row.get("nickname") or "").strip()
accounts.append(Account(wxid=wxid, remark=remark, nickname=nickname))
return accounts, total
def fetch_account_page(page_num: int, page_size: int = 100) -> Tuple[List[Account], int]:
if not WX_CLOUD or not WX_TOKEN:
raise RuntimeError("缺少 wx_cloud 或 wx_token")
url = f"{WX_CLOUD}/prod-api/wechat/wechat/list"
resp = requests.get(url, headers=wx_headers(), params={"pageNum": page_num, "pageSize": page_size}, timeout=TIMEOUT, verify=VERIFY_SSL)
resp.raise_for_status()
return parse_accounts_response(resp.json())
def get_accounts() -> List[Account]:
if TEST_WXID:
return [Account(wxid=TEST_WXID, remark=TEST_WXID)]
accounts: List[Account] = []
page = 1
while True:
rows, total = fetch_account_page(page, 100)
accounts.extend(rows)
if not rows or len(accounts) >= total or page >= 20:
break
page += 1
accounts = [a for a in accounts if a.wxid not in EXCLUDE_WXIDS]
if MAX_ACCOUNTS > 0:
accounts = accounts[:MAX_ACCOUNTS]
return accounts
def pick_nested(data: Any, keys: Tuple[str, ...]) -> str:
"""在常见嵌套响应里递归找字段。"""
if isinstance(data, dict):
for key in keys:
val = data.get(key)
if isinstance(val, str) and val.strip():
return val.strip()
for val in data.values():
found = pick_nested(val, keys)
if found:
return found
elif isinstance(data, list):
for item in data:
found = pick_nested(item, keys)
if found:
return found
return ""
def parse_code_response(data: Dict[str, Any]) -> str:
candidates = [data]
if isinstance(data.get("data"), dict):
candidates.append(data["data"])
for c in candidates:
for key in ("code", "miniProgramCode", "jsCode"):
val = c.get(key) if isinstance(c, dict) else None
if isinstance(val, str) and val.strip() and val.strip() not in {"0", "200"}:
return val.strip()
if isinstance(data.get("data"), str) and data["data"].strip():
return data["data"].strip()
raise RuntimeError(f"未能从养鸡场响应中提取 code{str(data)[:300]}")
def get_mini_program_code(wxid: str) -> str:
url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode"
resp = requests.post(url, headers=wx_headers(), json={"wxid": wxid, "appid": APPID}, timeout=TIMEOUT, verify=VERIFY_SSL)
resp.raise_for_status()
data = resp.json()
code = parse_code_response(data)
if not code:
raise RuntimeError("养鸡场返回 code 为空")
return code
def get_phone_encrypt_data(wxid: str) -> Tuple[str, str]:
"""调用养鸡场手机号授权接口,返回 encryptedData/iv。
不同养鸡场版本字段名可能略有差异,这里兼容常见返回结构。
"""
url = f"{WX_CLOUD}/prod-api/wechat/api/getPhoneEncryptData"
payloads = [
{"wxid": wxid, "appid": APPID},
{"wxid": wxid, "appId": APPID},
]
last_err = ""
for payload in payloads:
try:
resp = requests.post(url, headers=wx_headers(), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL)
resp.raise_for_status()
data = resp.json()
encrypted = pick_nested(data, ("encryptedData", "encrypted_data", "phoneEncryptedData", "phone_encrypted_data", "encryptData"))
iv = pick_nested(data, ("iv", "phoneIv", "phone_iv"))
if encrypted and iv:
return encrypted, iv
last_err = str(data)[:300]
except Exception as e:
last_err = str(e)
raise RuntimeError(f"养鸡场 getPhoneEncryptData 未返回 encryptedData/iv{last_err}")
def hsw_sign(params: Dict[str, Any], token: str = SIGN_TOKEN) -> str:
pairs = []
for k, v in params.items():
if k == "sign" or v is None or v == "":
continue
pairs.append((str(k), str(v)))
pairs.sort(key=lambda x: x[0])
param_string = "&".join([f"{k}={v}" for k, v in pairs])
return hashlib.md5((param_string + token).encode("utf-8")).hexdigest()
def www_get(params: Dict[str, Any], ua: str) -> Dict[str, Any]:
params = dict(params)
params["sign"] = hsw_sign(params)
headers = {
"User-Agent": ua,
"Referer": f"https://servicewechat.com/{APPID}/156/page-frame.html",
"Content-Type": "application/x-www-form-urlencoded",
"xweb_xhr": "1",
"Accept-Language": "zh-CN,zh;q=0.9",
}
resp = requests.get(WWW_BASE, headers=headers, params=params, timeout=TIMEOUT, verify=VERIFY_SSL)
resp.raise_for_status()
text = resp.text.lstrip("\ufeff")
return json.loads(text)
def oa_post(path: str, data: Dict[str, Any], ua: str) -> Dict[str, Any]:
headers = {
"User-Agent": ua,
"Referer": f"https://servicewechat.com/{APPID}/156/page-frame.html",
"Content-Type": "application/x-www-form-urlencoded",
"xweb_xhr": "1",
"Accept-Language": "zh-CN,zh;q=0.9",
}
resp = requests.post(f"{OA_BASE}{path}", headers=headers, data=data, timeout=TIMEOUT, verify=VERIFY_SSL)
resp.raise_for_status()
return resp.json()
def openid_new(code: str, ua: str) -> Tuple[str, str, str]:
params = {
"i": I_ID,
"t": "undefined",
"v": "1.0.0",
"from": "wxapp",
"c": "entry",
"a": "wxapp",
"do": "openid_new",
"m": M_NAME,
"code": code,
}
data = www_get(params, ua)
if data.get("errno") != 0:
raise RuntimeError(f"openid_new 失败:{data.get('message') or data}")
d = data.get("data") or {}
return str(d.get("openid") or ""), str(d.get("unionid") or ""), str(d.get("session_key") or "")
def phone_new(encrypted_data: str, iv: str, session_key: str, ua: str) -> str:
params = {
"i": I_ID,
"t": "undefined",
"v": "1.0.0",
"from": "wxapp",
"c": "entry",
"a": "wxapp",
"do": "phone_new",
"m": M_NAME,
"encryptedData": encrypted_data,
"iv": iv,
"session_key": session_key,
}
data = www_get(params, ua)
if data.get("errno") != 0:
raise RuntimeError(f"phone_new 失败:{data.get('message') or data}")
d = data.get("data") or {}
return str(d.get("phoneNumber") or d.get("purePhoneNumber") or "")
def oa_user_login(openid: str, unionid: str, phone: str, ua: str) -> str:
data = oa_post("/api/recycle/app/login/user_login", {
"type": "1",
"openid": openid,
"unionid": unionid,
"platform": "1",
"phone": phone,
}, ua)
if data.get("code") != 1:
raise RuntimeError(f"oa user_login 失败:{data.get('msg') or data}")
user_id = str((data.get("data") or {}).get("user_id") or "")
if not user_id:
raise RuntimeError(f"oa user_login 未返回 user_id{data}")
return user_id
def www_user_login(openid: str, ua: str) -> Dict[str, Any]:
params = {
"i": I_ID,
"t": "undefined",
"v": "1.0.0",
"from": "wxapp",
"c": "entry",
"a": "wxapp",
"do": "user_login",
"m": M_NAME,
"openid": openid,
"type": "wx",
"lat": "undefined",
"lng": "undefined",
}
return www_get(params, ua)
def load_account_map() -> Dict[str, Any]:
data = load_json(ACCOUNT_FILE, {})
return data if isinstance(data, dict) else {}
def get_account_extra(wxid: str) -> Dict[str, Any]:
amap = load_account_map()
item = amap.get(wxid) or amap.get("default") or {}
return item if isinstance(item, dict) else {}
def save_auth_cache(auth: HswAuth) -> None:
cache = load_json(CACHE_FILE, {})
cache[auth.wxid] = {
"remark": auth.remark,
"openid": auth.openid,
"unionid": auth.unionid,
"session_key": auth.session_key,
"phone": auth.phone,
"user_id": auth.user_id,
"ua": auth.ua,
"updated_at": now(),
}
save_json(CACHE_FILE, cache)
def load_auth_cache(wxid: str) -> Dict[str, Any]:
cache = load_json(CACHE_FILE, {})
item = cache.get(wxid) or {}
return item if isinstance(item, dict) else {}
def get_auth(account: Account, uas: List[str], result: Result) -> HswAuth:
extra = get_account_extra(account.wxid)
cached = load_auth_cache(account.wxid)
ua = str(cached.get("ua") or extra.get("ua") or random.choice(uas))
auth = HswAuth(wxid=account.wxid, remark=str(extra.get("remark") or account.remark or account.wxid), ua=ua)
# 有 user_id 直接复用,任务本身不依赖 token/cookie。
cached_user_id = str(cached.get("user_id") or extra.get("user_id") or "").strip()
if cached_user_id:
auth.user_id = cached_user_id
auth.phone = str(cached.get("phone") or extra.get("phone") or GLOBAL_PHONE or "")
auth.openid = str(cached.get("openid") or extra.get("openid") or "")
auth.unionid = str(cached.get("unionid") or extra.get("unionid") or "")
auth.session_key = str(cached.get("session_key") or "")
result.auth_mode = "缓存user_id"
result.login_ok = True
result.user_id = auth.user_id
result.phone = auth.phone
return auth
result.auth_mode = "养鸡场code登录"
auth.code = get_mini_program_code(account.wxid)
result.code_ok = True
log(f"账号{result.index}/{result.total} {auth.remark} 获取微信code完成{short(auth.code)}")
auth.openid, auth.unionid, auth.session_key = openid_new(auth.code, ua)
result.openid_ok = True
log(f"账号{result.index}/{result.total} {auth.remark} openid_new完成openid={short(auth.openid)} unionid={short(auth.unionid)}")
phone = str(extra.get("phone") or GLOBAL_PHONE or cached.get("phone") or "").strip()
encrypted = str(extra.get("encryptedData") or extra.get("encrypted_data") or GLOBAL_ENCRYPTED_DATA or "").strip()
iv = str(extra.get("iv") or GLOBAL_IV or "").strip()
if not phone:
if not encrypted or not iv:
log(f"账号{result.index}/{result.total} {auth.remark} 调用养鸡场手机号授权接口 getPhoneEncryptData")
encrypted, iv = get_phone_encrypt_data(account.wxid)
log(f"账号{result.index}/{result.total} {auth.remark} 手机号授权参数获取完成")
phone = phone_new(encrypted, iv, auth.session_key, ua)
auth.phone = phone
result.phone_ok = True
log(f"账号{result.index}/{result.total} {auth.remark} 手机号准备完成:{mask_phone(phone)}")
auth.user_id = oa_user_login(auth.openid, auth.unionid, auth.phone, ua)
result.login_ok = True
result.user_id = auth.user_id
result.phone = auth.phone
log(f"账号{result.index}/{result.total} {auth.remark} 登录完成user_id={auth.user_id}")
# 访问 www 侧 user_login补齐会话/验证 user_id不强依赖。
try:
data = www_user_login(auth.openid, ua)
user = ((data.get("data") or {}).get("user") or {}) if isinstance(data, dict) else {}
if user.get("id"):
auth.user_id = str(user.get("id"))
result.user_id = auth.user_id
except Exception as e:
log(f"账号{result.index}/{result.total} {auth.remark} www_user_login 非关键失败:{e}", "WARN")
save_auth_cache(auth)
return auth
def product_list(user_id: str, ua: str) -> List[Dict[str, str]]:
params = {
"i": I_ID,
"t": "undefined",
"v": "1.0.0",
"from": "wxapp",
"c": "entry",
"a": "wxapp",
"do": "goods_list_new",
"m": M_NAME,
"page": "1",
"uid": user_id,
"state": "0",
"type": "wx",
}
data = www_get(params, ua)
if data.get("errno") != 0:
raise RuntimeError(f"获取商品id失败{data.get('message') or data}")
rows = ((data.get("data") or {}).get("list") or [])
goods = [{"id": str(x.get("id")), "title": str(x.get("title") or x.get("id"))} for x in rows if isinstance(x, dict) and x.get("id")]
random.shuffle(goods)
return goods[:5]
def video_list(user_id: str, ua: str) -> Optional[Dict[str, str]]:
params = {
"i": I_ID,
"t": "undefined",
"v": "1.0.0",
"from": "wxapp",
"c": "entry",
"a": "wxapp",
"do": "notice_list",
"m": M_NAME,
"page": "1",
"uid": user_id,
"state": "0",
"type": "wx",
}
data = www_get(params, ua)
if data.get("errno") != 0:
raise RuntimeError(f"获取视频id失败{data.get('message') or data}")
rows = ((data.get("data") or {}).get("list") or [])
valid = [x for x in rows if isinstance(x, dict) and x.get("video") and x.get("content")]
if not valid:
return None
x = random.choice(valid)
return {"video": str(x.get("video")), "content": str(x.get("content"))}
def sign_in(user_id: str, ua: str) -> str:
data = oa_post("/api/recycle/app/welfare/sign_in", {"user_id": user_id}, ua)
if data.get("code") == 1:
return "成功"
return f"失败:{data.get('errorMsg') or data.get('msg') or data}"
def watch_video(user_id: str, video: Dict[str, str], ua: str) -> str:
data = oa_post("/api/recycle/app/welfare/watch_video", {"video_id": video["video"], "user_id": user_id}, ua)
if data.get("code") == 1:
return f"成功:{video.get('content', '')[:20]}"
return f"失败:{data.get('errorMsg') or data.get('msg') or data}"
def watch_product(user_id: str, product: Dict[str, str], ua: str) -> str:
data = oa_post("/api/recycle/app/welfare/watch_product", {"product_id": product["id"], "user_id": user_id}, ua)
if data.get("code") == 1:
return "成功"
return f"失败:{data.get('errorMsg') or data.get('msg') or data}"
def user_jf_log(user_id: str, ua: str) -> Tuple[str, int]:
params = {
"i": I_ID,
"t": "undefined",
"v": "1.0.0",
"from": "wxapp",
"c": "entry",
"a": "wxapp",
"do": "user_jf_log",
"m": M_NAME,
"page": "1",
"uid": user_id,
"state": "0",
"type": "wx",
}
data = www_get(params, ua)
if data.get("errno") != 0:
raise RuntimeError(f"查询积分失败:{data.get('message') or data}")
d = data.get("data") or {}
jifen = int(float(d.get("jifen") or 0))
tx_min_money = d.get("tx_min_money", "")
return f"余额{jifen / 1000:.2f}元,最低提现{tx_min_money}", jifen
def random_sleep(a: int = 3, b: int = 6) -> None:
delay = random.randint(a, b)
log(f"等待 {delay}")
time.sleep(delay)
def run_account(account: Account, index: int, total: int, uas: List[str]) -> Result:
start = time.time()
result = Result(index=index, total=total, wxid=account.wxid, remark=account.remark or account.wxid)
try:
log(f"账号{index}/{total} {result.remark} 开始 wxid={account.wxid}")
auth = get_auth(account, uas, result)
result.remark = auth.remark or result.remark
result.user_id = auth.user_id
result.phone = auth.phone
log(f"账号{index}/{total} {result.remark} 执行签到 user_id={auth.user_id}")
result.sign_status = sign_in(auth.user_id, auth.ua)
log(f"账号{index}/{total} {result.remark} 签到:{result.sign_status}")
random_sleep()
log(f"账号{index}/{total} {result.remark} 获取并浏览视频")
video = video_list(auth.user_id, auth.ua)
if video:
result.video_status = watch_video(auth.user_id, video, auth.ua)
else:
result.video_status = "无可用视频"
log(f"账号{index}/{total} {result.remark} 视频:{result.video_status}")
random_sleep()
log(f"账号{index}/{total} {result.remark} 获取并浏览商品")
goods = product_list(auth.user_id, auth.ua)
ok_count = 0
statuses = []
for g in goods:
st = watch_product(auth.user_id, g, auth.ua)
statuses.append(st)
if st.startswith("成功"):
ok_count += 1
log(f"账号{index}/{total} {result.remark} 浏览商品【{g.get('title')}】:{st}")
random_sleep()
result.product_count = ok_count
result.product_status = f"成功{ok_count}/{len(goods)}" if goods else "无商品"
log(f"账号{index}/{total} {result.remark} 查询积分/余额")
result.balance_text, result.jifen = user_jf_log(auth.user_id, auth.ua)
log(f"账号{index}/{total} {result.remark} 当前:{result.balance_text}")
except Exception as e:
result.error = str(e)
log(f"账号{index}/{total} {result.remark} 跳过/失败 | {result.error}", "ERROR")
finally:
result.runtime = time.time() - start
return result
def build_report(results: List[Result]) -> str:
success = [r for r in results if not r.error]
total_jifen = sum(r.jifen for r in results)
lines = []
lines.append("=" * 50)
lines.append("📊 回收蛙执行汇总")
lines.append("=" * 50)
lines.append(f"⏱️ 执行时间: {now()}")
lines.append(f"👥 总账号数: {len(results)}")
lines.append(f"✅ 成功账号: {len(success)}")
lines.append(f"📝 签到成功: {sum(1 for r in results if r.sign_status == '成功')}")
lines.append(f"🎬 视频浏览成功: {sum(1 for r in results if r.video_status.startswith('成功'))}")
lines.append(f"🛍️ 商品浏览成功次数: {sum(r.product_count for r in results)}")
lines.append(f"🎯 总积分: {total_jifen}")
lines.append(f"💰 余额合计: {total_jifen / 1000:.2f}")
lines.append(f"🧭 UA数量: {len(load_uas())}")
lines.append(f"📁 缓存目录: {CACHE_DIR}")
lines.append(f"📄 缓存文件: {CACHE_FILE}")
lines.append("📋 账号详情:")
for r in results:
lines.append(("" if not r.error else "") + f" 账号{r.index}: {r.remark}")
lines.append(f"🆔 wxid: {r.wxid}")
lines.append(f"👤 user_id: {r.user_id or ''}")
lines.append(f"📱 手机: {mask_phone(r.phone)}")
lines.append(f"🔐 登录: {r.auth_mode or ''}")
lines.append(f"🔑 Code: {'成功' if r.code_ok else '未执行/缓存'}")
lines.append(f"🪪 OpenID: {'成功' if r.openid_ok else '未执行/缓存'}")
lines.append(f"📱 手机授权: {'成功' if r.phone_ok else '未执行/缓存'}")
lines.append(f"📝 签到: {r.sign_status}")
lines.append(f"🎬 视频: {r.video_status}")
lines.append(f"🛍️ 商品: {r.product_status}")
lines.append(f"💰 余额: {r.balance_text}")
lines.append(f"🎯 积分: {r.jifen}")
lines.append(f"⏱️ 耗时: {r.runtime:.2f}")
if r.error:
lines.append(f"⚠️ 错误: {r.error}")
lines.append("-" * 30)
return "\n".join(lines)
def main() -> None:
ensure_dirs()
log(f"{SCRIPT_NAME}】开始执行任务")
if not WX_CLOUD or not WX_TOKEN:
raise RuntimeError("缺少 wx_cloud 或 wx_token")
uas = load_uas()
accounts = get_accounts()
if not accounts:
raise RuntimeError("没有可处理账号,请检查 wx_cloud、wx_token、HSW_TEST_WXID、HSW_EXCLUDE_WXIDS")
results: List[Result] = []
total = len(accounts)
for idx, account in enumerate(accounts, 1):
results.append(run_account(account, idx, total, uas))
if idx < total and ACCOUNT_DELAY > 0:
log(f"账号间隔 {ACCOUNT_DELAY}")
time.sleep(ACCOUNT_DELAY)
print(build_report(results))
if __name__ == "__main__":
try:
main()
except Exception:
log(traceback.format_exc(), "ERROR")
sys.exit(1)