diff --git a/WX_Applet/Applet_JYHS_WTXQ.py b/WX_Applet/Applet_JYHS_WTXQ.py new file mode 100644 index 0000000..6fc4985 --- /dev/null +++ b/WX_Applet/Applet_JYHS_WTXQ.py @@ -0,0 +1,558 @@ +# cron: 32 7 * * * +# new Env("捂碳星球旧衣服回收") +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +捂碳星球旧衣服回收 - 养鸡场自动登录签到版 + +链路: + wx_cloud + wx_token + wxid -> getMiniProgramCode 获取 code + code -> /api/login/getWxLogin 获取 openid / unionid / sessionKey + wxid -> getPhoneEncryptData 获取 encryptedData / iv + encryptedData + iv + openid + unionid + sessionKey -> /api/login/wxLogin 获取 Bearer token + token -> 签到 /api/signin/addSignIn + token -> 查询用户 /api/user/index + +必填环境变量: + wx_cloud 养鸡场地址,例如 http://192.168.0.250:666 + wx_token 养鸡场 token,支持裸 token 或 Bearer token + +可选环境变量: + WTXQ_TEST_WXID 只跑指定 wxid + WTXQ_EXCLUDE_WXIDS 排除 wxid,逗号/换行/& 分隔 + WTXQ_MAX_ACCOUNTS 最大处理账号数,默认 0 不限制 + WTXQ_ACCOUNT_DELAY 账号间隔秒,默认 2 + WTXQ_VERIFY_SSL 1/0,默认 1 + WTXQ_DEBUG 1/0,默认 0 + +缓存: + APP_Buffer/wtxq_token_cache.json +""" + +from __future__ import annotations + +import json +import os +import random +import re +import sys +import time +import traceback +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, Iterable, List, Tuple + +import requests + +SCRIPT_NAME = "捂碳星球" +APPID = "wx54c4768a6050a90e" +BASE = "https://wt.api.5tan.com/api" +REFERER_VERSION = "223" + +SCRIPT_DIR = Path(__file__).resolve().parent +CACHE_DIR = SCRIPT_DIR / "APP_Buffer" +CACHE_FILE = CACHE_DIR / "wtxq_token_cache.json" +UA_FILE = SCRIPT_DIR / "User_Agent.json" + +ENV = os.environ +WX_CLOUD = ENV.get("wx_cloud", "").rstrip("/") +WX_TOKEN_RAW = ENV.get("wx_token", "").strip() +WX_TOKEN = WX_TOKEN_RAW if WX_TOKEN_RAW.lower().startswith("bearer ") else (f"Bearer {WX_TOKEN_RAW}" if WX_TOKEN_RAW else "") +VERIFY_SSL = ENV.get("WTXQ_VERIFY_SSL", "1") not in {"0", "false", "False", "no", "NO"} +TIMEOUT = int(ENV.get("WTXQ_TIMEOUT", "20") or "20") +DEBUG = ENV.get("WTXQ_DEBUG", "0") in {"1", "true", "True", "yes", "YES"} +TEST_WXID = ENV.get("WTXQ_TEST_WXID", "").strip() +MAX_ACCOUNTS = int(ENV.get("WTXQ_MAX_ACCOUNTS", "0") or "0") +ACCOUNT_DELAY = float(ENV.get("WTXQ_ACCOUNT_DELAY", "2") or "2") + +DEFAULT_UA = ( + "Mozilla/5.0 (Linux; Android 15; 22061218C Build/AQ3A.250226.002; wv) " + "AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/146.0.7680.177 " + "Mobile Safari/537.36 XWEB/1460075 MMWEBSDK/20260202 MMWEBID/6435 " + "MicroMessenger/8.0.71.3080(0x18004739) WeChat/arm64 Weixin NetType/WIFI " + "Language/zh_CN ABI/arm64 MiniProgramEnv/android" +) + + +def now() -> str: + return time.strftime("%Y-%m-%d %H:%M:%S") + + +def log(msg: str, level: str = "INFO") -> None: + print(f"{now()} - {level} - {msg}", flush=True) + + +def debug(msg: str) -> None: + if DEBUG: + log(msg, "DEBUG") + + +def short(s: str, left: int = 8, right: int = 4) -> str: + s = str(s or "") + return s if len(s) <= left + right + 3 else f"{s[:left]}...{s[-right:]}" + + +def mask_phone(phone: str) -> str: + return re.sub(r"(\d{3})\d{4}(\d{4})", r"\1****\2", str(phone or "")) or "无" + + +def split_multi(value: str) -> List[str]: + if not value: + return [] + return [x.strip() for x in re.split(r"[,&,\n\r]+", value) if x.strip()] + + +EXCLUDE_WXIDS = set(split_multi(ENV.get("WTXQ_EXCLUDE_WXIDS", ""))) + + +def ensure_dirs() -> None: + CACHE_DIR.mkdir(parents=True, exist_ok=True) + + +def load_json(path: Path, default: Any) -> Any: + try: + if path.exists(): + return json.loads(path.read_text(encoding="utf-8")) + except Exception as e: + log(f"读取 {path} 失败:{e}", "WARN") + return default + + +def save_json(path: Path, data: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + tmp.replace(path) + + +def load_uas() -> List[str]: + data = load_json(UA_FILE, []) + if isinstance(data, list): + uas = [str(x).strip() for x in data if str(x).strip()] + if uas: + return uas + return [DEFAULT_UA] + + +@dataclass +class Account: + wxid: str + remark: str = "" + ua: str = "" + + +@dataclass +class Auth: + wxid: str + remark: str + ua: str + code: str = "" + openid: str = "" + unionid: str = "" + session_key: str = "" + encrypted_data: str = "" + iv: str = "" + token: str = "" + user_id: str = "" + + +@dataclass +class Result: + index: int + total: int + wxid: str + remark: str + auth_mode: str = "" + code_ok: bool = False + openid_ok: bool = False + phone_param_ok: bool = False + login_ok: bool = False + token_status: str = "未执行" + sign_status: str = "未执行" + nick_name: str = "无" + phone: str = "无" + money: str = "0.00" + error: str = "" + runtime: float = 0.0 + + +def wx_headers() -> Dict[str, str]: + return {"Authorization": WX_TOKEN, "Content-Type": "application/json;charset=UTF-8"} + + +def pick_nested(data: Any, keys: Tuple[str, ...]) -> str: + if isinstance(data, dict): + for key in keys: + val = data.get(key) + if isinstance(val, (str, int, float)) and str(val).strip(): + return str(val).strip() + for val in data.values(): + found = pick_nested(val, keys) + if found: + return found + elif isinstance(data, list): + for item in data: + found = pick_nested(item, keys) + if found: + return found + return "" + + +def parse_accounts_response(data: Dict[str, Any]) -> Tuple[List[Account], int]: + container = data.get("data", data) + rows: List[Any] = [] + total = 0 + if isinstance(container, dict): + rows = container.get("rows") or container.get("list") or container.get("records") or [] + total = int(container.get("total") or len(rows) or 0) + elif isinstance(container, list): + rows = container + total = len(rows) + accounts: List[Account] = [] + for row in rows: + if not isinstance(row, dict): + continue + wxid = str(row.get("wxid") or row.get("wxId") or row.get("id") or "").strip() + if not wxid: + continue + remark = str(row.get("remark") or row.get("name") or row.get("nickName") or row.get("nickname") or wxid).strip() + accounts.append(Account(wxid=wxid, remark=remark)) + return accounts, total + + +def fetch_account_page(page_num: int, page_size: int = 100) -> Tuple[List[Account], int]: + url = f"{WX_CLOUD}/prod-api/wechat/wechat/list" + resp = requests.get(url, headers=wx_headers(), params={"pageNum": page_num, "pageSize": page_size}, timeout=TIMEOUT, verify=VERIFY_SSL) + resp.raise_for_status() + return parse_accounts_response(resp.json()) + + +def iter_accounts() -> Iterable[Account]: + if TEST_WXID: + yield Account(wxid=TEST_WXID, remark=TEST_WXID) + return + page = 1 + yielded = 0 + while True: + rows, total = fetch_account_page(page, 100) + if not rows: + break + for acc in rows: + if acc.wxid in EXCLUDE_WXIDS: + continue + yielded += 1 + yield acc + if MAX_ACCOUNTS > 0 and yielded >= MAX_ACCOUNTS: + return + if page * 100 >= total or page >= 50: + break + page += 1 + + +def parse_code_response(data: Dict[str, Any]) -> str: + """兼容养鸡场常见返回: + 1) {'code': 200, 'data': {'code': '0xxx'}} + 2) {'code': 200, 'data': {'miniProgramCode': '0xxx'}} + 3) {'code': 200, 'data': '0xxx'} + 注意:根级 code=200 是业务状态码,不是微信登录 code。 + """ + payload = data.get("data") if isinstance(data, dict) else data + + if isinstance(payload, str) and payload.strip(): + return payload.strip() + + if isinstance(payload, dict): + for key in ("miniProgramCode", "jsCode", "wxCode", "loginCode", "code"): + val = payload.get(key) + if isinstance(val, (str, int, float)) and str(val).strip(): + code = str(val).strip() + if code not in {"0", "200"}: + return code + code = pick_nested(payload, ("miniProgramCode", "jsCode", "wxCode", "loginCode", "code")) + if code and code not in {"0", "200"}: + return code + + # 兜底:只有当根级 code 看起来不是状态码时才使用 + if isinstance(data, dict): + root_code = str(data.get("code") or "").strip() + if root_code and root_code not in {"0", "200"}: + return root_code + + raise RuntimeError(f"未能从养鸡场响应中提取 code:{str(data)[:300]}") + + +def get_mini_program_code(wxid: str) -> str: + url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode" + resp = requests.post(url, headers=wx_headers(), json={"wxid": wxid, "appid": APPID}, timeout=TIMEOUT, verify=VERIFY_SSL) + resp.raise_for_status() + return parse_code_response(resp.json()) + + +def get_phone_encrypt_data(wxid: str) -> Tuple[str, str]: + url = f"{WX_CLOUD}/prod-api/wechat/api/getPhoneEncryptData" + payloads = [ + {"wxid": wxid, "appid": APPID}, + {"wxid": wxid, "appId": APPID}, + ] + last_err = "" + for payload in payloads: + try: + resp = requests.post(url, headers=wx_headers(), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL) + resp.raise_for_status() + data = resp.json() + encrypted = pick_nested(data, ("encryptedData", "encrypted_data", "phoneEncryptedData", "phone_encrypted_data", "encryptData")) + iv = pick_nested(data, ("iv", "phoneIv", "phone_iv")) + if encrypted and iv: + return encrypted, iv + last_err = str(data)[:300] + except Exception as e: + last_err = str(e) + raise RuntimeError(f"养鸡场 getPhoneEncryptData 未返回 encryptedData/iv:{last_err}") + + +def api_headers(token: str = "", ua: str = "") -> Dict[str, str]: + return { + "accept": "*/*", + "accept-language": "zh-CN,zh;q=0.9", + "authorization": f"Bearer {token}" if token else "Bearer", + "content-type": "application/json", + "xweb_xhr": "1", + "User-Agent": ua or DEFAULT_UA, + "Referer": f"https://servicewechat.com/{APPID}/{REFERER_VERSION}/page-frame.html", + "Referrer-Policy": "unsafe-url", + } + + +def post_json(path: str, payload: Dict[str, Any], token: str = "", ua: str = "") -> Dict[str, Any]: + resp = requests.post(f"{BASE}{path}", headers=api_headers(token, ua), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL) + resp.raise_for_status() + data = resp.json() + debug(f"POST {path} -> {str(data)[:300]}") + return data + + +def get_json(path: str, token: str = "", ua: str = "") -> Dict[str, Any]: + resp = requests.get(f"{BASE}{path}", headers=api_headers(token, ua), timeout=TIMEOUT, verify=VERIFY_SSL) + resp.raise_for_status() + data = resp.json() + debug(f"GET {path} -> {str(data)[:300]}") + return data + + +def get_wx_login(code: str, ua: str) -> Tuple[str, str, str]: + data = post_json("/login/getWxLogin", {"code": code}, ua=ua) + if data.get("code") != 200: + raise RuntimeError(f"getWxLogin失败:{data.get('msg') or data}") + d = data.get("data") or {} + openid = str(d.get("openid") or "") + unionid = str(d.get("unionid") or "") + session_key = str(d.get("sessionKey") or d.get("session_key") or "") + if not openid or not unionid: + raise RuntimeError(f"getWxLogin未返回openid/unionid:{data}") + return openid, unionid, session_key + + +def wx_login(auth: Auth) -> Tuple[str, str]: + payload = { + "encryptedData": auth.encrypted_data, + "errMsg": "getPhoneNumber:ok", + "iv": auth.iv, + "sessionKey": auth.session_key, + "unionid": auth.unionid, + "openid": auth.openid, + } + data = post_json("/login/wxLogin", payload, ua=auth.ua) + if data.get("code") != 200: + raise RuntimeError(f"wxLogin失败:{data.get('msg') or data}") + d = data.get("data") or {} + token = str(d.get("token") or "") + user_id = str(d.get("user_id") or "") + if not token: + raise RuntimeError(f"wxLogin未返回token:{data}") + return token, user_id + + +def validate_token(token: str, ua: str) -> Tuple[bool, Dict[str, Any]]: + try: + data = get_json("/user/index?platform=1", token=token, ua=ua) + return data.get("code") == 200, data + except Exception: + return False, {} + + +def load_cache() -> Dict[str, Any]: + data = load_json(CACHE_FILE, {}) + return data if isinstance(data, dict) else {} + + +def save_auth_cache(auth: Auth) -> None: + cache = load_cache() + old = cache.get(auth.wxid) if isinstance(cache.get(auth.wxid), dict) else {} + cache[auth.wxid] = { + "remark": auth.remark, + "ua": auth.ua or old.get("ua", ""), + "token": auth.token or old.get("token", ""), + "user_id": auth.user_id or old.get("user_id", ""), + "openid": auth.openid or old.get("openid", ""), + "unionid": auth.unionid or old.get("unionid", ""), + "updated_at": now(), + } + save_json(CACHE_FILE, cache) + + +def get_auth(acc: Account, uas: List[str], result: Result) -> Tuple[Auth, Dict[str, Any]]: + cache = load_cache() + cached = cache.get(acc.wxid) if isinstance(cache.get(acc.wxid), dict) else {} + ua = str(cached.get("ua") or acc.ua or random.choice(uas)) + auth = Auth(wxid=acc.wxid, remark=str(cached.get("remark") or acc.remark or acc.wxid), ua=ua) + + cached_token = str(cached.get("token") or "").strip() + if cached_token: + ok, info = validate_token(cached_token, ua) + if ok: + auth.token = cached_token + auth.user_id = str(cached.get("user_id") or "") + auth.openid = str(cached.get("openid") or "") + auth.unionid = str(cached.get("unionid") or "") + result.auth_mode = "缓存token" + result.login_ok = True + result.token_status = "缓存有效" + return auth, info + result.token_status = "缓存失效,重新登录" + + result.auth_mode = "养鸡场自动登录" + auth.code = get_mini_program_code(acc.wxid) + result.code_ok = True + log(f"账号{result.index}/{result.total} {auth.remark} 获取code完成:{short(auth.code)}") + + auth.openid, auth.unionid, auth.session_key = get_wx_login(auth.code, auth.ua) + result.openid_ok = True + log(f"账号{result.index}/{result.total} {auth.remark} getWxLogin完成:openid={short(auth.openid)} unionid={short(auth.unionid)}") + + auth.encrypted_data, auth.iv = get_phone_encrypt_data(acc.wxid) + result.phone_param_ok = True + log(f"账号{result.index}/{result.total} {auth.remark} 手机号授权参数获取完成") + + auth.token, auth.user_id = wx_login(auth) + result.login_ok = True + result.token_status = "登录成功" + log(f"账号{result.index}/{result.total} {auth.remark} wxLogin完成:token={short(auth.token)} user_id={auth.user_id or '无'}") + + ok, info = validate_token(auth.token, auth.ua) + if not ok: + raise RuntimeError("登录后 token 验证失败") + save_auth_cache(auth) + return auth, info + + +def sign(token: str, ua: str) -> str: + data = post_json("/signin/addSignIn", {"platform": 1}, token=token, ua=ua) + if data.get("code") == 200: + d = data.get("data") or {} + if d.get("title") == "签到成功~": + return "成功" + return f"失败:{d.get('content') or d.get('title') or data.get('msg') or data}" + return f"失败:{data.get('msg') or data}" + + +def parse_user_info(data: Dict[str, Any]) -> Tuple[str, str, str]: + d = data.get("data") or {} + nick = str(d.get("nick_name") or "无") + phone = str(d.get("tel") or "无") + money = str(d.get("money") or "0.00") + return nick, phone, money + + +def run_account(acc: Account, index: int, total: int, uas: List[str]) -> Result: + start = time.time() + result = Result(index=index, total=total, wxid=acc.wxid, remark=acc.remark or acc.wxid) + try: + log(f"账号{index}/{total} {result.remark} 开始 wxid={acc.wxid}") + auth, info = get_auth(acc, uas, result) + result.remark = auth.remark or result.remark + nick, phone, money = parse_user_info(info) + result.nick_name, result.phone, result.money = nick, phone, money + + log(f"账号{index}/{total} {result.remark} 执行签到") + result.sign_status = sign(auth.token, auth.ua) + log(f"账号{index}/{total} {result.remark} 签到:{result.sign_status}") + + info = get_json("/user/index?platform=1", token=auth.token, ua=auth.ua) + nick, phone, money = parse_user_info(info) + result.nick_name, result.phone, result.money = nick, phone, money + log(f"账号{index}/{total} {result.remark} 用户【{nick}】余额:{money}元 手机:{mask_phone(phone)}") + except Exception as e: + result.error = str(e) + log(f"账号{index}/{total} {result.remark} 跳过/失败 | {result.error}", "ERROR") + finally: + result.runtime = time.time() - start + return result + + +def build_report(results: List[Result], ua_count: int) -> str: + ok_accounts = [r for r in results if not r.error] + sign_ok = sum(1 for r in results if r.sign_status == "成功") + money_sum = 0.0 + for r in results: + try: + money_sum += float(str(r.money).replace("元", "") or 0) + except Exception: + pass + lines = [] + lines.append("=" * 50) + lines.append("📊 捂碳星球执行汇总") + lines.append("=" * 50) + lines.append(f"⏱️ 执行时间: {now()}") + lines.append(f"👥 总账号数: {len(results)}") + lines.append(f"✅ 成功账号: {len(ok_accounts)}") + lines.append(f"📝 签到成功: {sign_ok}") + lines.append(f"💰 余额合计: {money_sum:.2f}") + lines.append(f"🧭 UA数量: {ua_count}") + lines.append(f"📁 缓存目录: {CACHE_DIR}") + lines.append(f"📄 缓存文件: {CACHE_FILE}") + lines.append("📋 账号详情:") + for r in results: + lines.append(("✅" if not r.error else "❌") + f" 账号{r.index}: {r.remark}") + lines.append(f"🆔 wxid: {r.wxid}") + lines.append(f"👤 昵称: {r.nick_name}") + lines.append(f"📱 手机: {mask_phone(r.phone)}") + lines.append(f"🔐 登录: {r.auth_mode or '无'}") + lines.append(f"🔑 Code: {'成功' if r.code_ok else '未执行/缓存'}") + lines.append(f"🪪 OpenID: {'成功' if r.openid_ok else '未执行/缓存'}") + lines.append(f"📱 手机授权: {'成功' if r.phone_param_ok else '未执行/缓存'}") + lines.append(f"🎟️ Token: {r.token_status}") + lines.append(f"📝 签到: {r.sign_status}") + lines.append(f"💰 余额: {r.money}") + lines.append(f"⏱️ 耗时: {r.runtime:.2f}秒") + if r.error: + lines.append(f"⚠️ 错误: {r.error}") + lines.append("-" * 30) + return "\n".join(lines) + + +def main() -> None: + ensure_dirs() + log(f"【{SCRIPT_NAME}】开始执行任务") + if not WX_CLOUD or not WX_TOKEN: + raise RuntimeError("缺少 wx_cloud 或 wx_token") + uas = load_uas() + results: List[Result] = [] + accounts = list(iter_accounts()) + if not accounts: + raise RuntimeError("没有可处理账号,请检查 wx_cloud、wx_token、WTXQ_TEST_WXID、WTXQ_EXCLUDE_WXIDS") + total = len(accounts) + for idx, acc in enumerate(accounts, 1): + results.append(run_account(acc, idx, total, uas)) + if idx < total and ACCOUNT_DELAY > 0: + time.sleep(ACCOUNT_DELAY) + print(build_report(results, len(uas))) + + +if __name__ == "__main__": + try: + main() + except Exception: + log(traceback.format_exc(), "ERROR") + sys.exit(1) + + \ No newline at end of file