# cron: 32 7 * * * # new Env("旧衣回收-捂碳星球") #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 捂碳星球旧衣服回收 - 养鸡场自动登录签到版 链路: wx_cloud + wx_token + wxid -> getMiniProgramCode 获取 code code -> /api/login/getWxLogin 获取 openid / unionid / sessionKey wxid -> getPhoneEncryptData 获取 encryptedData / iv encryptedData + iv + openid + unionid + sessionKey -> /api/login/wxLogin 获取 Bearer token token -> 签到 /api/signin/addSignIn token -> 查询用户 /api/user/index 必填环境变量: wx_cloud 养鸡场地址,例如 http://192.168.0.250:666 wx_token 养鸡场 token,支持裸 token 或 Bearer token 可选环境变量: WTXQ_TEST_WXID 只跑指定 wxid WTXQ_EXCLUDE_WXIDS 排除 wxid,逗号/换行/& 分隔 WTXQ_MAX_ACCOUNTS 最大处理账号数,默认 0 不限制 WTXQ_ACCOUNT_DELAY 账号间隔秒,默认 2 WTXQ_VERIFY_SSL 1/0,默认 1 WTXQ_DEBUG 1/0,默认 0 缓存: APP_Buffer/wtxq_token_cache.json """ from __future__ import annotations import json import os import random import re import sys import time import traceback from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterable, List, Tuple import requests SCRIPT_NAME = "捂碳星球" APPID = "wx54c4768a6050a90e" BASE = "https://wt.api.5tan.com/api" REFERER_VERSION = "223" SCRIPT_DIR = Path(__file__).resolve().parent CACHE_DIR = SCRIPT_DIR / "APP_Buffer" CACHE_FILE = CACHE_DIR / "wtxq_token_cache.json" UA_FILE = SCRIPT_DIR / "User_Agent.json" ENV = os.environ WX_CLOUD = ENV.get("wx_cloud", "").rstrip("/") WX_TOKEN_RAW = ENV.get("wx_token", "").strip() WX_TOKEN = WX_TOKEN_RAW if WX_TOKEN_RAW.lower().startswith("bearer ") else (f"Bearer {WX_TOKEN_RAW}" if WX_TOKEN_RAW else "") VERIFY_SSL = ENV.get("WTXQ_VERIFY_SSL", "1") not in {"0", "false", "False", "no", "NO"} TIMEOUT = int(ENV.get("WTXQ_TIMEOUT", "20") or "20") DEBUG = ENV.get("WTXQ_DEBUG", "0") in {"1", "true", "True", "yes", "YES"} TEST_WXID = ENV.get("WTXQ_TEST_WXID", "").strip() MAX_ACCOUNTS = int(ENV.get("WTXQ_MAX_ACCOUNTS", "0") or "0") ACCOUNT_DELAY = float(ENV.get("WTXQ_ACCOUNT_DELAY", "2") or "2") DEFAULT_UA = ( "Mozilla/5.0 (Linux; Android 15; 22061218C Build/AQ3A.250226.002; wv) " "AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/146.0.7680.177 " "Mobile Safari/537.36 XWEB/1460075 MMWEBSDK/20260202 MMWEBID/6435 " "MicroMessenger/8.0.71.3080(0x18004739) WeChat/arm64 Weixin NetType/WIFI " "Language/zh_CN ABI/arm64 MiniProgramEnv/android" ) def now() -> str: return time.strftime("%Y-%m-%d %H:%M:%S") def log(msg: str, level: str = "INFO") -> None: print(f"{now()} - {level} - {msg}", flush=True) def debug(msg: str) -> None: if DEBUG: log(msg, "DEBUG") def short(s: str, left: int = 8, right: int = 4) -> str: s = str(s or "") return s if len(s) <= left + right + 3 else f"{s[:left]}...{s[-right:]}" def mask_phone(phone: str) -> str: return re.sub(r"(\d{3})\d{4}(\d{4})", r"\1****\2", str(phone or "")) or "无" def split_multi(value: str) -> List[str]: if not value: return [] return [x.strip() for x in re.split(r"[,&,\n\r]+", value) if x.strip()] EXCLUDE_WXIDS = set(split_multi(ENV.get("WTXQ_EXCLUDE_WXIDS", ""))) def ensure_dirs() -> None: CACHE_DIR.mkdir(parents=True, exist_ok=True) def load_json(path: Path, default: Any) -> Any: try: if path.exists(): return json.loads(path.read_text(encoding="utf-8")) except Exception as e: log(f"读取 {path} 失败:{e}", "WARN") return default def save_json(path: Path, data: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") tmp.replace(path) def load_uas() -> List[str]: data = load_json(UA_FILE, []) if isinstance(data, list): uas = [str(x).strip() for x in data if str(x).strip()] if uas: return uas return [DEFAULT_UA] @dataclass class Account: wxid: str remark: str = "" ua: str = "" @dataclass class Auth: wxid: str remark: str ua: str code: str = "" openid: str = "" unionid: str = "" session_key: str = "" encrypted_data: str = "" iv: str = "" token: str = "" user_id: str = "" @dataclass class Result: index: int total: int wxid: str remark: str auth_mode: str = "" code_ok: bool = False openid_ok: bool = False phone_param_ok: bool = False login_ok: bool = False token_status: str = "未执行" sign_status: str = "未执行" nick_name: str = "无" phone: str = "无" money: str = "0.00" error: str = "" runtime: float = 0.0 def wx_headers() -> Dict[str, str]: return {"Authorization": WX_TOKEN, "Content-Type": "application/json;charset=UTF-8"} def pick_nested(data: Any, keys: Tuple[str, ...]) -> str: if isinstance(data, dict): for key in keys: val = data.get(key) if isinstance(val, (str, int, float)) and str(val).strip(): return str(val).strip() for val in data.values(): found = pick_nested(val, keys) if found: return found elif isinstance(data, list): for item in data: found = pick_nested(item, keys) if found: return found return "" def parse_accounts_response(data: Dict[str, Any]) -> Tuple[List[Account], int]: container = data.get("data", data) rows: List[Any] = [] total = 0 if isinstance(container, dict): rows = container.get("rows") or container.get("list") or container.get("records") or [] total = int(container.get("total") or len(rows) or 0) elif isinstance(container, list): rows = container total = len(rows) accounts: List[Account] = [] for row in rows: if not isinstance(row, dict): continue wxid = str(row.get("wxid") or row.get("wxId") or row.get("id") or "").strip() if not wxid: continue remark = str(row.get("remark") or row.get("name") or row.get("nickName") or row.get("nickname") or wxid).strip() accounts.append(Account(wxid=wxid, remark=remark)) return accounts, total def fetch_account_page(page_num: int, page_size: int = 100) -> Tuple[List[Account], int]: url = f"{WX_CLOUD}/prod-api/wechat/wechat/list" resp = requests.get(url, headers=wx_headers(), params={"pageNum": page_num, "pageSize": page_size}, timeout=TIMEOUT, verify=VERIFY_SSL) resp.raise_for_status() return parse_accounts_response(resp.json()) def iter_accounts() -> Iterable[Account]: if TEST_WXID: yield Account(wxid=TEST_WXID, remark=TEST_WXID) return page = 1 yielded = 0 while True: rows, total = fetch_account_page(page, 100) if not rows: break for acc in rows: if acc.wxid in EXCLUDE_WXIDS: continue yielded += 1 yield acc if MAX_ACCOUNTS > 0 and yielded >= MAX_ACCOUNTS: return if page * 100 >= total or page >= 50: break page += 1 def parse_code_response(data: Dict[str, Any]) -> str: """兼容养鸡场常见返回: 1) {'code': 200, 'data': {'code': '0xxx'}} 2) {'code': 200, 'data': {'miniProgramCode': '0xxx'}} 3) {'code': 200, 'data': '0xxx'} 注意:根级 code=200 是业务状态码,不是微信登录 code。 """ payload = data.get("data") if isinstance(data, dict) else data if isinstance(payload, str) and payload.strip(): return payload.strip() if isinstance(payload, dict): for key in ("miniProgramCode", "jsCode", "wxCode", "loginCode", "code"): val = payload.get(key) if isinstance(val, (str, int, float)) and str(val).strip(): code = str(val).strip() if code not in {"0", "200"}: return code code = pick_nested(payload, ("miniProgramCode", "jsCode", "wxCode", "loginCode", "code")) if code and code not in {"0", "200"}: return code # 兜底:只有当根级 code 看起来不是状态码时才使用 if isinstance(data, dict): root_code = str(data.get("code") or "").strip() if root_code and root_code not in {"0", "200"}: return root_code raise RuntimeError(f"未能从养鸡场响应中提取 code:{str(data)[:300]}") def get_mini_program_code(wxid: str) -> str: url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode" resp = requests.post(url, headers=wx_headers(), json={"wxid": wxid, "appid": APPID}, timeout=TIMEOUT, verify=VERIFY_SSL) resp.raise_for_status() return parse_code_response(resp.json()) def get_phone_encrypt_data(wxid: str) -> Tuple[str, str]: url = f"{WX_CLOUD}/prod-api/wechat/api/getPhoneEncryptData" payloads = [ {"wxid": wxid, "appid": APPID}, {"wxid": wxid, "appId": APPID}, ] last_err = "" for payload in payloads: try: resp = requests.post(url, headers=wx_headers(), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL) resp.raise_for_status() data = resp.json() encrypted = pick_nested(data, ("encryptedData", "encrypted_data", "phoneEncryptedData", "phone_encrypted_data", "encryptData")) iv = pick_nested(data, ("iv", "phoneIv", "phone_iv")) if encrypted and iv: return encrypted, iv last_err = str(data)[:300] except Exception as e: last_err = str(e) raise RuntimeError(f"养鸡场 getPhoneEncryptData 未返回 encryptedData/iv:{last_err}") def api_headers(token: str = "", ua: str = "") -> Dict[str, str]: return { "accept": "*/*", "accept-language": "zh-CN,zh;q=0.9", "authorization": f"Bearer {token}" if token else "Bearer", "content-type": "application/json", "xweb_xhr": "1", "User-Agent": ua or DEFAULT_UA, "Referer": f"https://servicewechat.com/{APPID}/{REFERER_VERSION}/page-frame.html", "Referrer-Policy": "unsafe-url", } def post_json(path: str, payload: Dict[str, Any], token: str = "", ua: str = "") -> Dict[str, Any]: resp = requests.post(f"{BASE}{path}", headers=api_headers(token, ua), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL) resp.raise_for_status() data = resp.json() debug(f"POST {path} -> {str(data)[:300]}") return data def get_json(path: str, token: str = "", ua: str = "") -> Dict[str, Any]: resp = requests.get(f"{BASE}{path}", headers=api_headers(token, ua), timeout=TIMEOUT, verify=VERIFY_SSL) resp.raise_for_status() data = resp.json() debug(f"GET {path} -> {str(data)[:300]}") return data def get_wx_login(code: str, ua: str) -> Tuple[str, str, str]: data = post_json("/login/getWxLogin", {"code": code}, ua=ua) if data.get("code") != 200: raise RuntimeError(f"getWxLogin失败:{data.get('msg') or data}") d = data.get("data") or {} openid = str(d.get("openid") or "") unionid = str(d.get("unionid") or "") session_key = str(d.get("sessionKey") or d.get("session_key") or "") if not openid or not unionid: raise RuntimeError(f"getWxLogin未返回openid/unionid:{data}") return openid, unionid, session_key def wx_login(auth: Auth) -> Tuple[str, str]: payload = { "encryptedData": auth.encrypted_data, "errMsg": "getPhoneNumber:ok", "iv": auth.iv, "sessionKey": auth.session_key, "unionid": auth.unionid, "openid": auth.openid, } data = post_json("/login/wxLogin", payload, ua=auth.ua) if data.get("code") != 200: raise RuntimeError(f"wxLogin失败:{data.get('msg') or data}") d = data.get("data") or {} token = str(d.get("token") or "") user_id = str(d.get("user_id") or "") if not token: raise RuntimeError(f"wxLogin未返回token:{data}") return token, user_id def validate_token(token: str, ua: str) -> Tuple[bool, Dict[str, Any]]: try: data = get_json("/user/index?platform=1", token=token, ua=ua) return data.get("code") == 200, data except Exception: return False, {} def load_cache() -> Dict[str, Any]: data = load_json(CACHE_FILE, {}) return data if isinstance(data, dict) else {} def save_auth_cache(auth: Auth) -> None: cache = load_cache() old = cache.get(auth.wxid) if isinstance(cache.get(auth.wxid), dict) else {} cache[auth.wxid] = { "remark": auth.remark, "ua": auth.ua or old.get("ua", ""), "token": auth.token or old.get("token", ""), "user_id": auth.user_id or old.get("user_id", ""), "openid": auth.openid or old.get("openid", ""), "unionid": auth.unionid or old.get("unionid", ""), "updated_at": now(), } save_json(CACHE_FILE, cache) def get_auth(acc: Account, uas: List[str], result: Result) -> Tuple[Auth, Dict[str, Any]]: cache = load_cache() cached = cache.get(acc.wxid) if isinstance(cache.get(acc.wxid), dict) else {} ua = str(cached.get("ua") or acc.ua or random.choice(uas)) auth = Auth(wxid=acc.wxid, remark=str(cached.get("remark") or acc.remark or acc.wxid), ua=ua) cached_token = str(cached.get("token") or "").strip() if cached_token: ok, info = validate_token(cached_token, ua) if ok: auth.token = cached_token auth.user_id = str(cached.get("user_id") or "") auth.openid = str(cached.get("openid") or "") auth.unionid = str(cached.get("unionid") or "") result.auth_mode = "缓存token" result.login_ok = True result.token_status = "缓存有效" return auth, info result.token_status = "缓存失效,重新登录" result.auth_mode = "养鸡场自动登录" auth.code = get_mini_program_code(acc.wxid) result.code_ok = True log(f"账号{result.index}/{result.total} {auth.remark} 获取code完成:{short(auth.code)}") auth.openid, auth.unionid, auth.session_key = get_wx_login(auth.code, auth.ua) result.openid_ok = True log(f"账号{result.index}/{result.total} {auth.remark} getWxLogin完成:openid={short(auth.openid)} unionid={short(auth.unionid)}") auth.encrypted_data, auth.iv = get_phone_encrypt_data(acc.wxid) result.phone_param_ok = True log(f"账号{result.index}/{result.total} {auth.remark} 手机号授权参数获取完成") auth.token, auth.user_id = wx_login(auth) result.login_ok = True result.token_status = "登录成功" log(f"账号{result.index}/{result.total} {auth.remark} wxLogin完成:token={short(auth.token)} user_id={auth.user_id or '无'}") ok, info = validate_token(auth.token, auth.ua) if not ok: raise RuntimeError("登录后 token 验证失败") save_auth_cache(auth) return auth, info def sign(token: str, ua: str) -> str: data = post_json("/signin/addSignIn", {"platform": 1}, token=token, ua=ua) if data.get("code") == 200: d = data.get("data") or {} if d.get("title") == "签到成功~": return "成功" return f"失败:{d.get('content') or d.get('title') or data.get('msg') or data}" return f"失败:{data.get('msg') or data}" def parse_user_info(data: Dict[str, Any]) -> Tuple[str, str, str]: d = data.get("data") or {} nick = str(d.get("nick_name") or "无") phone = str(d.get("tel") or "无") money = str(d.get("money") or "0.00") return nick, phone, money def run_account(acc: Account, index: int, total: int, uas: List[str]) -> Result: start = time.time() result = Result(index=index, total=total, wxid=acc.wxid, remark=acc.remark or acc.wxid) try: log(f"账号{index}/{total} {result.remark} 开始 wxid={acc.wxid}") auth, info = get_auth(acc, uas, result) result.remark = auth.remark or result.remark nick, phone, money = parse_user_info(info) result.nick_name, result.phone, result.money = nick, phone, money log(f"账号{index}/{total} {result.remark} 执行签到") result.sign_status = sign(auth.token, auth.ua) log(f"账号{index}/{total} {result.remark} 签到:{result.sign_status}") info = get_json("/user/index?platform=1", token=auth.token, ua=auth.ua) nick, phone, money = parse_user_info(info) result.nick_name, result.phone, result.money = nick, phone, money log(f"账号{index}/{total} {result.remark} 用户【{nick}】余额:{money}元 手机:{mask_phone(phone)}") except Exception as e: result.error = str(e) log(f"账号{index}/{total} {result.remark} 跳过/失败 | {result.error}", "ERROR") finally: result.runtime = time.time() - start return result def build_report(results: List[Result], ua_count: int) -> str: ok_accounts = [r for r in results if not r.error] sign_ok = sum(1 for r in results if r.sign_status == "成功") money_sum = 0.0 for r in results: try: money_sum += float(str(r.money).replace("元", "") or 0) except Exception: pass lines = [] lines.append("=" * 50) lines.append("📊 捂碳星球执行汇总") lines.append("=" * 50) lines.append(f"⏱️ 执行时间: {now()}") lines.append(f"👥 总账号数: {len(results)}") lines.append(f"✅ 成功账号: {len(ok_accounts)}") lines.append(f"📝 签到成功: {sign_ok}") lines.append(f"💰 余额合计: {money_sum:.2f}") lines.append(f"🧭 UA数量: {ua_count}") lines.append(f"📁 缓存目录: {CACHE_DIR}") lines.append(f"📄 缓存文件: {CACHE_FILE}") lines.append("📋 账号详情:") for r in results: lines.append(("✅" if not r.error else "❌") + f" 账号{r.index}: {r.remark}") lines.append(f"🆔 wxid: {r.wxid}") lines.append(f"👤 昵称: {r.nick_name}") lines.append(f"📱 手机: {mask_phone(r.phone)}") lines.append(f"🔐 登录: {r.auth_mode or '无'}") lines.append(f"🔑 Code: {'成功' if r.code_ok else '未执行/缓存'}") lines.append(f"🪪 OpenID: {'成功' if r.openid_ok else '未执行/缓存'}") lines.append(f"📱 手机授权: {'成功' if r.phone_param_ok else '未执行/缓存'}") lines.append(f"🎟️ Token: {r.token_status}") lines.append(f"📝 签到: {r.sign_status}") lines.append(f"💰 余额: {r.money}") lines.append(f"⏱️ 耗时: {r.runtime:.2f}秒") if r.error: lines.append(f"⚠️ 错误: {r.error}") lines.append("-" * 30) return "\n".join(lines) def main() -> None: ensure_dirs() log(f"【{SCRIPT_NAME}】开始执行任务") if not WX_CLOUD or not WX_TOKEN: raise RuntimeError("缺少 wx_cloud 或 wx_token") uas = load_uas() results: List[Result] = [] accounts = list(iter_accounts()) if not accounts: raise RuntimeError("没有可处理账号,请检查 wx_cloud、wx_token、WTXQ_TEST_WXID、WTXQ_EXCLUDE_WXIDS") total = len(accounts) for idx, acc in enumerate(accounts, 1): results.append(run_account(acc, idx, total, uas)) if idx < total and ACCOUNT_DELAY > 0: time.sleep(ACCOUNT_DELAY) print(build_report(results, len(uas))) if __name__ == "__main__": try: main() except Exception: log(traceback.format_exc(), "ERROR") sys.exit(1)