From 26ab286c9b4e4459e2cd28ccfa01cdb3172af46c Mon Sep 17 00:00:00 2001 From: admin <362324317@qq.com> Date: Sat, 23 May 2026 21:30:00 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20WX=5FApplet/Applet=5FYCT.p?= =?UTF-8?q?y?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- WX_Applet/Applet_YCT.py | 590 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 590 insertions(+) create mode 100644 WX_Applet/Applet_YCT.py diff --git a/WX_Applet/Applet_YCT.py b/WX_Applet/Applet_YCT.py new file mode 100644 index 0000000..f21133e --- /dev/null +++ b/WX_Applet/Applet_YCT.py @@ -0,0 +1,590 @@ +# cron: 29 7 * * * +# new Env("衣城通_求职") +""" +name: 衣城通 - 养鸡场自动版 +功能: 从养鸡场逐个读取微信账号,获取衣城通小程序 code,登录换 token,签到并完成每日任务。 + +外部环境变量: + wx_cloud 养鸡场地址,例如 http://192.168.0.250:666 + wx_token 养鸡场 Authorization,支持裸 token 或 Bearer token + YCT_TEST_WXID 可选,只跑某个 wxid + YCT_EXCLUDE_WXIDS 可选,排除 wxid,支持换行/英文逗号/中文逗号分隔 + +脚本内配置: + TASK_CONFIG_IDS 每日任务 configId,默认 [4, 5, 6, 7] + TASK_EXECUTE_TIMES 每个任务提交次数,默认 2 + REQUEST_DELAY_MIN/MAX 单账号内请求间隔 + ACCOUNT_DELAY_MIN/MAX 多账号之间间隔 + DEBUG 调试日志,默认 False + +说明: + HAR 已确认登录接口: + POST https://api.yctjob.com/client/web/wechatSession?code={小程序code} + body: {} + token路径: data.userInfo.token +""" + +import json +import logging +import os +import random +import re +import sys +import time +import traceback +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Generator, List, Optional, Tuple + +try: + import requests + requests.packages.urllib3.disable_warnings() +except ImportError: + print("缺少 requests,请先安装:pip install requests") + sys.exit(1) + +# ===================== 固定配置 ===================== +APPID = "wxc4eaf0fd0c97862f" +BASE = "https://api.yctjob.com" +LOGIN_URL = f"{BASE}/client/web/wechatSession" +SIGN_HOME_URL = f"{BASE}/client/user/signHome" +SIGN_URL = f"{BASE}/client/user/sign" +TASK_HOME_URL = f"{BASE}/client/user/taskHome" +TASK_SUB_URL = f"{BASE}/client/user/taskSub" +RESUME_URL = f"{BASE}/client/user/myResume" +REFERER = f"https://servicewechat.com/{APPID}/138/page-frame.html" +SCRIPT_DIR = Path(__file__).resolve().parent +CACHE_DIR = SCRIPT_DIR / "APP_Buffer" +TOKEN_CACHE_FILE = CACHE_DIR / "yct_token_cache.json" +UA_FILE = SCRIPT_DIR / "User_Agent.json" + +# 外部环境变量 +ENV = os.environ +WX_CLOUD = ENV.get("wx_cloud", "").rstrip("/") +WX_TOKEN = ENV.get("wx_token", "").strip() +if WX_TOKEN and not WX_TOKEN.lower().startswith("bearer "): + WX_TOKEN = f"Bearer {WX_TOKEN}" +SINGLE_TEST_WXID = ENV.get("YCT_TEST_WXID", "").strip() +EXCLUDE_WXIDS_RAW = ENV.get("YCT_EXCLUDE_WXIDS", "") +EXCLUDE_WXIDS = [x.strip() for x in re.split(r"[\n,,]+", EXCLUDE_WXIDS_RAW) if x.strip()] + +# 脚本内配置 +DEBUG = False +VERIFY_SSL = False +TIMEOUT = 20.0 +CLOUD_PAGE_SIZE = 1 +REQUEST_DELAY_MIN = 3 +REQUEST_DELAY_MAX = 5 +ACCOUNT_DELAY_MIN = 20 +ACCOUNT_DELAY_MAX = 40 +TASK_CONFIG_IDS = [4, 5, 6, 7] +TASK_EXECUTE_TIMES = 2 +TOKEN_CACHE_TTL = 7 * 24 * 3600 +RUN_START_TS = time.time() + +DEFAULT_UA_LIST = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) UnifiedPCWindowsWechat(0xf2541939) XWEB/19841", +] + + +@dataclass +class Account: + remark: str + wxid: str + ua: str = "" + + +@dataclass +class AccountResult: + remark: str + wxid: str + success: bool = False + login: str = "未执行" + sign: str = "未执行" + log_id: Any = "" + task_success: int = 0 + task_total: int = 0 + task_details: List[str] = field(default_factory=list) + nick_name: str = "" + mobile: str = "" + integral: int = 0 + amount: float = 0.0 + message: str = "" + + +# ===================== 基础工具 ===================== +def setup_logging() -> None: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s\t- %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[logging.StreamHandler()], + ) + + +def mask(value: Any, keep_start: int = 6, keep_end: int = 4) -> str: + s = str(value or "") + if not s: + return "" + if len(s) <= keep_start + keep_end: + return "***" + return s[:keep_start] + "***" + s[-keep_end:] + + +def now_ts() -> int: + return int(time.time()) + + +def debug(title: str, data: Any) -> None: + if not DEBUG: + return + try: + text = json.dumps(data, ensure_ascii=False, indent=2) + text = re.sub(r"eyJ[A-Za-z0-9_\-.]+", "", text) + logging.info("[调试] %s: %s", title, text) + except Exception: + logging.info("[调试] %s: %s", title, data) + + +def safe_json(resp: requests.Response) -> Optional[Dict[str, Any]]: + try: + return resp.json() + except Exception: + return None + + +def load_ua_list() -> List[str]: + if UA_FILE.exists(): + try: + uas = [x.strip() for x in UA_FILE.read_text(encoding="utf-8").splitlines() if x.strip() and not x.strip().startswith("#")] + if uas: + return uas + except Exception: + pass + return DEFAULT_UA_LIST + + +UA_LIST = load_ua_list() + + +def stable_ua(key: str) -> str: + return UA_LIST[abs(hash(key)) % len(UA_LIST)] + + +def sleep_request() -> None: + time.sleep(random.randint(REQUEST_DELAY_MIN, REQUEST_DELAY_MAX)) + + +def ensure_cache_dir() -> None: + CACHE_DIR.mkdir(parents=True, exist_ok=True) + + +def load_cache() -> Dict[str, Any]: + ensure_cache_dir() + if not TOKEN_CACHE_FILE.exists(): + return {"_说明": "衣城通自动维护缓存。按 wxid 保存 token、昵称、积分、余额等。", "accounts": {}} + try: + data = json.loads(TOKEN_CACHE_FILE.read_text(encoding="utf-8")) + if not isinstance(data, dict): + raise ValueError("cache root is not dict") + data.setdefault("_说明", "衣城通自动维护缓存。") + data.setdefault("accounts", {}) + return data + except Exception as e: + backup = TOKEN_CACHE_FILE.with_suffix(f".broken.{now_ts()}.json") + try: + TOKEN_CACHE_FILE.rename(backup) + except Exception: + pass + if DEBUG: + logging.warning("[调试] 缓存文件损坏,已重建: %s", e) + return {"_说明": "衣城通自动维护缓存。", "accounts": {}} + + +def save_cache(cache: Dict[str, Any]) -> None: + ensure_cache_dir() + TOKEN_CACHE_FILE.write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8") + + +def get_cached_token(acc: Account, cache: Dict[str, Any]) -> str: + item = (cache.get("accounts") or {}).get(acc.wxid) or {} + token = str(item.get("token") or "") + updated_at = int(item.get("updated_at") or 0) + if token and now_ts() - updated_at < TOKEN_CACHE_TTL: + return token + return "" + + +def update_account_cache(acc: Account, cache: Dict[str, Any], *, token: str = "", nick_name: str = "", mobile: str = "", integral: int = 0, amount: float = 0.0) -> None: + accounts = cache.setdefault("accounts", {}) + old = accounts.get(acc.wxid) or {} + old.update({ + "remark": acc.remark, + "wxid": acc.wxid, + "token": token or old.get("token", ""), + "nick_name": nick_name or old.get("nick_name", ""), + "mobile": mobile or old.get("mobile", ""), + "integral": integral, + "amount": amount, + "ua": acc.ua, + "updated_at": now_ts(), + "updated_at_text": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + }) + accounts[acc.wxid] = old + save_cache(cache) + + +# ===================== 养鸡场账号/code ===================== +def wx_cloud_headers() -> Dict[str, str]: + return {"Authorization": WX_TOKEN, "Content-Type": "application/json"} + + +def account_from_cloud_row(item: Dict[str, Any]) -> Optional[Account]: + wxid = item.get("wxId") or item.get("wxid") or item.get("Wxid") or "" + if not wxid: + return None + remark = item.get("wxName") or item.get("wxname") or item.get("remark") or item.get("nickName") or mask(wxid) + return Account(remark=str(remark), wxid=str(wxid)) + + +def fetch_cloud_account_page(page_num: int, page_size: int) -> Tuple[List[Account], int]: + if not WX_CLOUD or not WX_TOKEN: + raise RuntimeError("缺少 wx_cloud 或 wx_token") + url = f"{WX_CLOUD}/prod-api/wechat/wechat/list" + resp = requests.get(url, headers=wx_cloud_headers(), params={"pageNum": page_num, "pageSize": page_size}, timeout=TIMEOUT, verify=VERIFY_SSL) + data = safe_json(resp) + debug(f"养鸡场账号列表 page={page_num}", data if data is not None else resp.text[:500]) + if data and data.get("code") == 200 and isinstance(data.get("rows"), list): + accounts = [acc for row in data["rows"] if (acc := account_from_cloud_row(row))] + return accounts, int(data.get("total") or len(accounts)) + raise RuntimeError((data or {}).get("msg") or (data or {}).get("message") or f"获取账号失败 HTTP {resp.status_code}") + + +def iter_accounts_from_cloud() -> Generator[Tuple[Account, int, int], None, None]: + page = 1 + seen = 0 + total = None + while True: + accounts, total_count = fetch_cloud_account_page(page, CLOUD_PAGE_SIZE) + total = total_count if total is None else total + if not accounts: + break + for acc in accounts: + seen += 1 + yield acc, seen, total + if seen >= total: + break + page += 1 + + +def should_skip(acc: Account) -> bool: + if acc.wxid in EXCLUDE_WXIDS: + return True + if SINGLE_TEST_WXID and acc.wxid != SINGLE_TEST_WXID: + return True + return False + + +def iter_selected_accounts() -> Generator[Tuple[Account, int, int], None, None]: + processed = 0 + for acc, cloud_index, cloud_total in iter_accounts_from_cloud(): + if should_skip(acc): + continue + processed += 1 + yield acc, processed if SINGLE_TEST_WXID else cloud_index, 1 if SINGLE_TEST_WXID else cloud_total + if SINGLE_TEST_WXID and acc.wxid == SINGLE_TEST_WXID: + break + + +def get_mini_program_code(wxid: str) -> str: + url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode" + payload = {"wxid": wxid, "appid": APPID} + resp = requests.post(url, headers=wx_cloud_headers(), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL) + data = safe_json(resp) + debug("获取小程序code", data if data is not None else resp.text[:500]) + if not data: + raise RuntimeError(f"获取 code 失败: HTTP {resp.status_code}") + if data.get("code") != 200: + raise RuntimeError(data.get("msg") or data.get("message") or f"获取 code 失败: {data}") + code = ((data.get("data") or {}).get("code") or data.get("codeData") or data.get("data")) + if isinstance(code, dict): + code = code.get("code") + if not code: + raise RuntimeError("获取 code 失败: 响应中没有 code") + return str(code) + + +# ===================== 衣城通接口 ===================== +def yct_headers(token: str = "", ua: str = "") -> Dict[str, str]: + headers = { + "Accept": "*/*", + "Accept-Language": "zh-CN,zh;q=0.9", + "Content-Type": "application/json", + "Host": "api.yctjob.com", + "Referer": REFERER, + "User-Agent": ua or DEFAULT_UA_LIST[0], + "xweb_xhr": "1", + } + if token: + headers["Authorization"] = f"Bearer {token}" + return headers + + +def yct_request(method: str, url: str, *, token: str = "", ua: str = "", params: Dict[str, Any] = None, json_body: Dict[str, Any] = None) -> Dict[str, Any]: + headers = yct_headers(token=token, ua=ua) + if method.upper() == "POST": + resp = requests.post(url, headers=headers, params=params, json=json_body if json_body is not None else {}, timeout=TIMEOUT, verify=VERIFY_SSL) + else: + resp = requests.get(url, headers=headers, params=params, timeout=TIMEOUT, verify=VERIFY_SSL) + data = safe_json(resp) + debug(f"衣城通接口 {url}", data if data is not None else resp.text[:500]) + if isinstance(data, dict): + return data + return {"code": -1, "msg": f"JSON解析失败 HTTP {resp.status_code}: {resp.text[:200]}"} + + +def token_invalid(data: Dict[str, Any]) -> bool: + msg = str(data.get("msg") or data.get("message") or "") + code = data.get("code") + return code in (401, 403) or any(k in msg for k in ["token", "登录", "授权", "失效", "过期", "无效", "请先"]) + + +def login_with_code(code: str, ua: str) -> Tuple[str, Dict[str, Any]]: + # HAR: POST /client/web/wechatSession?code=*** body {} + data = yct_request("POST", LOGIN_URL, ua=ua, params={"code": code}, json_body={}) + if data.get("code") == 200: + user_info = ((data.get("data") or {}).get("userInfo") or {}) + token = user_info.get("token") + if token: + return str(token), user_info + raise RuntimeError(data.get("msg") or data.get("message") or "code 登录换 token 失败") + + +def validate_token(token: str, ua: str) -> Dict[str, Any]: + data = yct_request("GET", RESUME_URL, token=token, ua=ua) + if data.get("code") == 200: + return data.get("data") or {} + if token_invalid(data): + raise RuntimeError("TOKEN_INVALID") + raise RuntimeError(data.get("msg") or data.get("message") or "验证 token 失败") + + +def sign_home(token: str, ua: str) -> Dict[str, Any]: + data = yct_request("GET", SIGN_HOME_URL, token=token, ua=ua) + if data.get("code") == 200: + return data.get("data") or {} + if token_invalid(data): + raise RuntimeError("TOKEN_INVALID") + raise RuntimeError(data.get("msg") or data.get("message") or "获取签到首页失败") + + +def find_today_log_id(home: Dict[str, Any]) -> Any: + today = datetime.now().strftime("%Y-%m-%d") + for config in home.get("configs") or []: + if config.get("signDate") == today: + return config.get("logId") + return None + + +def do_sign(token: str, ua: str, log_id: Any) -> str: + data = yct_request("POST", SIGN_URL, token=token, ua=ua, json_body={"logId": log_id}) + if data.get("code") == 200: + return data.get("msg") or data.get("message") or "签到成功" + if token_invalid(data): + raise RuntimeError("TOKEN_INVALID") + return data.get("msg") or data.get("message") or f"签到失败: {data.get('code')}" + + +def task_home(token: str, ua: str) -> Dict[str, Any]: + data = yct_request("GET", TASK_HOME_URL, token=token, ua=ua) + if data.get("code") == 200: + return data.get("data") or {} + if token_invalid(data): + raise RuntimeError("TOKEN_INVALID") + raise RuntimeError(data.get("msg") or data.get("message") or "获取任务首页失败") + + +def do_task_sub(token: str, ua: str, config_id: int) -> str: + data = yct_request("POST", TASK_SUB_URL, token=token, ua=ua, json_body={"configId": config_id}) + if data.get("code") == 200: + return data.get("msg") or data.get("message") or "操作成功" + if token_invalid(data): + raise RuntimeError("TOKEN_INVALID") + return data.get("msg") or data.get("message") or f"任务失败: {data.get('code')}" + + +def parse_score(home: Dict[str, Any]) -> Tuple[int, float]: + try: + integral = int(home.get("integral") or 0) + except Exception: + integral = 0 + try: + amount = float(home.get("amount") or 0.0) + except Exception: + amount = 0.0 + return integral, amount + + +# ===================== 主流程 ===================== +def prepare_token(acc: Account, cache: Dict[str, Any], result: AccountResult) -> str: + token = get_cached_token(acc, cache) + if token: + try: + info = validate_token(token, acc.ua) + result.login = "使用缓存" + result.nick_name = str(info.get("name") or info.get("nickName") or "") + result.mobile = str(info.get("mobile") or "") + logging.info("账号 %s 读取并验证token缓存完成", acc.remark) + return token + except RuntimeError as e: + if str(e) != "TOKEN_INVALID": + raise + logging.info("账号 %s 缓存token失效,重新获取code登录", acc.remark) + code = get_mini_program_code(acc.wxid) + logging.info("账号 %s 获取code完成", acc.remark) + sleep_request() + token, user_info = login_with_code(code, acc.ua) + result.login = "成功" + result.nick_name = str(user_info.get("nickName") or "") + result.mobile = str(user_info.get("phonenumber") or user_info.get("userName") or "") + update_account_cache(acc, cache, token=token, nick_name=result.nick_name, mobile=result.mobile) + logging.info("账号 %s 登录完成,token已缓存", acc.remark) + return token + + +def process_account(acc: Account, index: int, total: int) -> AccountResult: + acc.ua = stable_ua(acc.wxid or acc.remark) + result = AccountResult(remark=acc.remark, wxid=acc.wxid) + logging.info("账号%d/%d %s 开始", index, total, acc.remark) + cache = load_cache() + try: + token = prepare_token(acc, cache, result) + if DEBUG: + logging.info("[调试] token=%s 缓存目录=%s", mask(token), CACHE_DIR) + sleep_request() + + home = sign_home(token, acc.ua) + result.integral, result.amount = parse_score(home) + result.log_id = find_today_log_id(home) + logging.info("账号%d/%d %s 签到首页完成 | logId: %s | 积分: %s | 余额: %.2f元", index, total, acc.remark, result.log_id or "无", result.integral, result.amount) + sleep_request() + + if result.log_id: + result.sign = do_sign(token, acc.ua, result.log_id) + else: + result.sign = "未找到今日logId,跳过签到" + logging.info("账号%d/%d %s 签到完成 | %s", index, total, acc.remark, result.sign) + sleep_request() + + thome = task_home(token, acc.ua) + result.integral, result.amount = parse_score(thome) + logging.info("账号%d/%d %s 任务首页完成 | 积分: %s | 余额: %.2f元", index, total, acc.remark, result.integral, result.amount) + sleep_request() + + for config_id in TASK_CONFIG_IDS: + for times in range(1, TASK_EXECUTE_TIMES + 1): + result.task_total += 1 + msg = do_task_sub(token, acc.ua, config_id) + ok = "成功" in msg or "操作成功" in msg or "已" in msg or msg == "OK" + if ok: + result.task_success += 1 + line = f"configId={config_id} 第{times}次: {msg}" + result.task_details.append(line) + logging.info("账号%d/%d %s 任务提交完成 | %s", index, total, acc.remark, line) + sleep_request() + + # 任务后再查一次任务首页,记录最终积分/余额 + final_home = task_home(token, acc.ua) + result.integral, result.amount = parse_score(final_home) + update_account_cache(acc, cache, token=token, nick_name=result.nick_name, mobile=result.mobile, integral=result.integral, amount=result.amount) + logging.info("账号%d/%d %s 最终余额查询完成 | 积分: %s | 余额: %.2f元", index, total, acc.remark, result.integral, result.amount) + result.success = True + except Exception as e: + result.success = False + result.message = str(e).replace("TOKEN_INVALID", "token失效") + if DEBUG: + logging.error("账号 %s 执行失败: %s", acc.remark, e) + traceback.print_exc() + if result.success: + logging.info( + "账号%d/%d %s 完成 | 登录: %s | 签到: %s | 任务: %d/%d | 积分: %s | 余额: %.2f元", + index, total, acc.remark, result.login, result.sign, result.task_success, result.task_total, result.integral, result.amount, + ) + else: + logging.info("账号%d/%d %s 跳过/失败 | %s", index, total, acc.remark, result.message or "失败") + return result + + +def build_report(results: List[AccountResult]) -> str: + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + elapsed = time.time() - RUN_START_TS + total = len(results) + success = sum(1 for r in results if r.success) + sign_ok = sum(1 for r in results if r.success and ("成功" in r.sign or "已" in r.sign or "操作成功" in r.sign)) + task_success = sum(r.task_success for r in results) + task_total = sum(r.task_total for r in results) + total_integral = sum(r.integral for r in results) + total_amount = sum(r.amount for r in results) + lines = [ + "", + "==================================================", + "📊 衣城通执行汇总", + "==================================================", + f"⏱️ 执行时间: {now}", + f"👥 总账号数: {total}", + f"✅ 成功账号: {success}", + f"📝 签到成功/已签: {sign_ok}", + f"🧩 任务完成: {task_success}/{task_total}", + f"🎯 积分合计: {total_integral}", + f"💰 余额合计: {total_amount:.2f}元", + f"🧭 UA数量: {len(UA_LIST)}", + f"📁 缓存目录: {CACHE_DIR}", + "", + "📋 账号详情:", + ] + for i, r in enumerate(results, 1): + icon = "✅" if r.success else "❌" + lines.append(f"{icon} 账号{i}: {r.remark} | 昵称: {r.nick_name or '无'} | 手机: {r.mobile or '无'}") + lines.append(f" 🔐 登录: {r.login}") + lines.append(f" 📝 签到: {r.sign}") + lines.append(f" 🧩 任务: {r.task_success}/{r.task_total}") + lines.append(f" 🎯 积分: {r.integral}") + lines.append(f" 💰 余额: {r.amount:.2f}元") + if r.message: + lines.append(f" ⚠️ 错误: {r.message}") + lines.append(f"=== 执行结束 [{now}] 耗时 {elapsed:.2f} 秒 退出码 0 ===") + return "\n".join(lines) + + +def main() -> None: + setup_logging() + logging.info("【衣城通】开始执行任务") + if DEBUG: + logging.info("APPID=%s WX_CLOUD=%s", APPID, WX_CLOUD or "(未设置)") + results: List[AccountResult] = [] + handled = 0 + for acc, i, total in iter_selected_accounts(): + handled += 1 + results.append(process_account(acc, i, total)) + if not SINGLE_TEST_WXID and i < total: + delay = random.randint(ACCOUNT_DELAY_MIN, ACCOUNT_DELAY_MAX) + if DEBUG: + logging.info("[调试] 账号间隔等待 %d 秒", delay) + time.sleep(delay) + if handled == 0: + raise RuntimeError("没有可处理账号。请检查 wx_cloud、wx_token、YCT_TEST_WXID、YCT_EXCLUDE_WXIDS 配置") + print(build_report(results)) + + +if __name__ == "__main__": + try: + main() + except Exception as e: + setup_logging() + logging.error("主流程错误: %s", e) + if DEBUG: + traceback.print_exc() + sys.exit(1) + + \ No newline at end of file