# cron: 29 7 * * * # new Env("衣城通_求职") """ name: 衣城通 - 养鸡场自动版 功能: 从养鸡场逐个读取微信账号,获取衣城通小程序 code,登录换 token,签到并完成每日任务。 外部环境变量: wx_cloud 养鸡场地址,例如 http://192.168.0.250:666 wx_token 养鸡场 Authorization,支持裸 token 或 Bearer token YCT_TEST_WXID 可选,只跑某个 wxid YCT_EXCLUDE_WXIDS 可选,排除 wxid,支持换行/英文逗号/中文逗号分隔 脚本内配置: TASK_CONFIG_IDS 每日任务 configId,默认 [4, 5, 6, 7] TASK_EXECUTE_TIMES 每个任务提交次数,默认 2 REQUEST_DELAY_MIN/MAX 单账号内请求间隔 ACCOUNT_DELAY_MIN/MAX 多账号之间间隔 DEBUG 调试日志,默认 False 说明: HAR 已确认登录接口: POST https://api.yctjob.com/client/web/wechatSession?code={小程序code} body: {} token路径: data.userInfo.token """ import json import logging import os import random import re import sys import time import traceback from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Tuple try: import requests requests.packages.urllib3.disable_warnings() except ImportError: print("缺少 requests,请先安装:pip install requests") sys.exit(1) # ===================== 固定配置 ===================== APPID = "wxc4eaf0fd0c97862f" BASE = "https://api.yctjob.com" LOGIN_URL = f"{BASE}/client/web/wechatSession" SIGN_HOME_URL = f"{BASE}/client/user/signHome" SIGN_URL = f"{BASE}/client/user/sign" TASK_HOME_URL = f"{BASE}/client/user/taskHome" TASK_SUB_URL = f"{BASE}/client/user/taskSub" RESUME_URL = f"{BASE}/client/user/myResume" REFERER = f"https://servicewechat.com/{APPID}/138/page-frame.html" SCRIPT_DIR = Path(__file__).resolve().parent CACHE_DIR = SCRIPT_DIR / "APP_Buffer" TOKEN_CACHE_FILE = CACHE_DIR / "yct_token_cache.json" UA_FILE = SCRIPT_DIR / "User_Agent.json" # 外部环境变量 ENV = os.environ WX_CLOUD = ENV.get("wx_cloud", "").rstrip("/") WX_TOKEN = ENV.get("wx_token", "").strip() if WX_TOKEN and not WX_TOKEN.lower().startswith("bearer "): WX_TOKEN = f"Bearer {WX_TOKEN}" SINGLE_TEST_WXID = ENV.get("YCT_TEST_WXID", "").strip() EXCLUDE_WXIDS_RAW = ENV.get("YCT_EXCLUDE_WXIDS", "") EXCLUDE_WXIDS = [x.strip() for x in re.split(r"[\n,,]+", EXCLUDE_WXIDS_RAW) if x.strip()] # 脚本内配置 DEBUG = False VERIFY_SSL = False TIMEOUT = 20.0 CLOUD_PAGE_SIZE = 1 REQUEST_DELAY_MIN = 3 REQUEST_DELAY_MAX = 5 ACCOUNT_DELAY_MIN = 20 ACCOUNT_DELAY_MAX = 40 TASK_CONFIG_IDS = [4, 5, 6, 7] TASK_EXECUTE_TIMES = 2 TOKEN_CACHE_TTL = 7 * 24 * 3600 RUN_START_TS = time.time() DEFAULT_UA_LIST = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) UnifiedPCWindowsWechat(0xf2541939) XWEB/19841", ] @dataclass class Account: remark: str wxid: str ua: str = "" @dataclass class AccountResult: remark: str wxid: str success: bool = False login: str = "未执行" sign: str = "未执行" log_id: Any = "" task_success: int = 0 task_total: int = 0 task_details: List[str] = field(default_factory=list) nick_name: str = "" mobile: str = "" integral: int = 0 amount: float = 0.0 message: str = "" # ===================== 基础工具 ===================== def setup_logging() -> None: logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s\t- %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[logging.StreamHandler()], ) def mask(value: Any, keep_start: int = 6, keep_end: int = 4) -> str: s = str(value or "") if not s: return "" if len(s) <= keep_start + keep_end: return "***" return s[:keep_start] + "***" + s[-keep_end:] def now_ts() -> int: return int(time.time()) def debug(title: str, data: Any) -> None: if not DEBUG: return try: text = json.dumps(data, ensure_ascii=False, indent=2) text = re.sub(r"eyJ[A-Za-z0-9_\-.]+", "", text) logging.info("[调试] %s: %s", title, text) except Exception: logging.info("[调试] %s: %s", title, data) def safe_json(resp: requests.Response) -> Optional[Dict[str, Any]]: try: return resp.json() except Exception: return None def load_ua_list() -> List[str]: if UA_FILE.exists(): try: uas = [x.strip() for x in UA_FILE.read_text(encoding="utf-8").splitlines() if x.strip() and not x.strip().startswith("#")] if uas: return uas except Exception: pass return DEFAULT_UA_LIST UA_LIST = load_ua_list() def stable_ua(key: str) -> str: return UA_LIST[abs(hash(key)) % len(UA_LIST)] def sleep_request() -> None: time.sleep(random.randint(REQUEST_DELAY_MIN, REQUEST_DELAY_MAX)) def ensure_cache_dir() -> None: CACHE_DIR.mkdir(parents=True, exist_ok=True) def load_cache() -> Dict[str, Any]: ensure_cache_dir() if not TOKEN_CACHE_FILE.exists(): return {"_说明": "衣城通自动维护缓存。按 wxid 保存 token、昵称、积分、余额等。", "accounts": {}} try: data = json.loads(TOKEN_CACHE_FILE.read_text(encoding="utf-8")) if not isinstance(data, dict): raise ValueError("cache root is not dict") data.setdefault("_说明", "衣城通自动维护缓存。") data.setdefault("accounts", {}) return data except Exception as e: backup = TOKEN_CACHE_FILE.with_suffix(f".broken.{now_ts()}.json") try: TOKEN_CACHE_FILE.rename(backup) except Exception: pass if DEBUG: logging.warning("[调试] 缓存文件损坏,已重建: %s", e) return {"_说明": "衣城通自动维护缓存。", "accounts": {}} def save_cache(cache: Dict[str, Any]) -> None: ensure_cache_dir() TOKEN_CACHE_FILE.write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8") def get_cached_token(acc: Account, cache: Dict[str, Any]) -> str: item = (cache.get("accounts") or {}).get(acc.wxid) or {} token = str(item.get("token") or "") updated_at = int(item.get("updated_at") or 0) if token and now_ts() - updated_at < TOKEN_CACHE_TTL: return token return "" def update_account_cache(acc: Account, cache: Dict[str, Any], *, token: str = "", nick_name: str = "", mobile: str = "", integral: int = 0, amount: float = 0.0) -> None: accounts = cache.setdefault("accounts", {}) old = accounts.get(acc.wxid) or {} old.update({ "remark": acc.remark, "wxid": acc.wxid, "token": token or old.get("token", ""), "nick_name": nick_name or old.get("nick_name", ""), "mobile": mobile or old.get("mobile", ""), "integral": integral, "amount": amount, "ua": acc.ua, "updated_at": now_ts(), "updated_at_text": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), }) accounts[acc.wxid] = old save_cache(cache) # ===================== 养鸡场账号/code ===================== def wx_cloud_headers() -> Dict[str, str]: return {"Authorization": WX_TOKEN, "Content-Type": "application/json"} def account_from_cloud_row(item: Dict[str, Any]) -> Optional[Account]: wxid = item.get("wxId") or item.get("wxid") or item.get("Wxid") or "" if not wxid: return None remark = item.get("wxName") or item.get("wxname") or item.get("remark") or item.get("nickName") or mask(wxid) return Account(remark=str(remark), wxid=str(wxid)) def fetch_cloud_account_page(page_num: int, page_size: int) -> Tuple[List[Account], int]: if not WX_CLOUD or not WX_TOKEN: raise RuntimeError("缺少 wx_cloud 或 wx_token") url = f"{WX_CLOUD}/prod-api/wechat/wechat/list" resp = requests.get(url, headers=wx_cloud_headers(), params={"pageNum": page_num, "pageSize": page_size}, timeout=TIMEOUT, verify=VERIFY_SSL) data = safe_json(resp) debug(f"养鸡场账号列表 page={page_num}", data if data is not None else resp.text[:500]) if data and data.get("code") == 200 and isinstance(data.get("rows"), list): accounts = [acc for row in data["rows"] if (acc := account_from_cloud_row(row))] return accounts, int(data.get("total") or len(accounts)) raise RuntimeError((data or {}).get("msg") or (data or {}).get("message") or f"获取账号失败 HTTP {resp.status_code}") def iter_accounts_from_cloud() -> Generator[Tuple[Account, int, int], None, None]: page = 1 seen = 0 total = None while True: accounts, total_count = fetch_cloud_account_page(page, CLOUD_PAGE_SIZE) total = total_count if total is None else total if not accounts: break for acc in accounts: seen += 1 yield acc, seen, total if seen >= total: break page += 1 def should_skip(acc: Account) -> bool: if acc.wxid in EXCLUDE_WXIDS: return True if SINGLE_TEST_WXID and acc.wxid != SINGLE_TEST_WXID: return True return False def iter_selected_accounts() -> Generator[Tuple[Account, int, int], None, None]: processed = 0 for acc, cloud_index, cloud_total in iter_accounts_from_cloud(): if should_skip(acc): continue processed += 1 yield acc, processed if SINGLE_TEST_WXID else cloud_index, 1 if SINGLE_TEST_WXID else cloud_total if SINGLE_TEST_WXID and acc.wxid == SINGLE_TEST_WXID: break def get_mini_program_code(wxid: str) -> str: url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode" payload = {"wxid": wxid, "appid": APPID} resp = requests.post(url, headers=wx_cloud_headers(), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL) data = safe_json(resp) debug("获取小程序code", data if data is not None else resp.text[:500]) if not data: raise RuntimeError(f"获取 code 失败: HTTP {resp.status_code}") if data.get("code") != 200: raise RuntimeError(data.get("msg") or data.get("message") or f"获取 code 失败: {data}") code = ((data.get("data") or {}).get("code") or data.get("codeData") or data.get("data")) if isinstance(code, dict): code = code.get("code") if not code: raise RuntimeError("获取 code 失败: 响应中没有 code") return str(code) # ===================== 衣城通接口 ===================== def yct_headers(token: str = "", ua: str = "") -> Dict[str, str]: headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Content-Type": "application/json", "Host": "api.yctjob.com", "Referer": REFERER, "User-Agent": ua or DEFAULT_UA_LIST[0], "xweb_xhr": "1", } if token: headers["Authorization"] = f"Bearer {token}" return headers def yct_request(method: str, url: str, *, token: str = "", ua: str = "", params: Dict[str, Any] = None, json_body: Dict[str, Any] = None) -> Dict[str, Any]: headers = yct_headers(token=token, ua=ua) if method.upper() == "POST": resp = requests.post(url, headers=headers, params=params, json=json_body if json_body is not None else {}, timeout=TIMEOUT, verify=VERIFY_SSL) else: resp = requests.get(url, headers=headers, params=params, timeout=TIMEOUT, verify=VERIFY_SSL) data = safe_json(resp) debug(f"衣城通接口 {url}", data if data is not None else resp.text[:500]) if isinstance(data, dict): return data return {"code": -1, "msg": f"JSON解析失败 HTTP {resp.status_code}: {resp.text[:200]}"} def token_invalid(data: Dict[str, Any]) -> bool: msg = str(data.get("msg") or data.get("message") or "") code = data.get("code") return code in (401, 403) or any(k in msg for k in ["token", "登录", "授权", "失效", "过期", "无效", "请先"]) def login_with_code(code: str, ua: str) -> Tuple[str, Dict[str, Any]]: # HAR: POST /client/web/wechatSession?code=*** body {} data = yct_request("POST", LOGIN_URL, ua=ua, params={"code": code}, json_body={}) if data.get("code") == 200: user_info = ((data.get("data") or {}).get("userInfo") or {}) token = user_info.get("token") if token: return str(token), user_info raise RuntimeError(data.get("msg") or data.get("message") or "code 登录换 token 失败") def validate_token(token: str, ua: str) -> Dict[str, Any]: data = yct_request("GET", RESUME_URL, token=token, ua=ua) if data.get("code") == 200: return data.get("data") or {} if token_invalid(data): raise RuntimeError("TOKEN_INVALID") raise RuntimeError(data.get("msg") or data.get("message") or "验证 token 失败") def sign_home(token: str, ua: str) -> Dict[str, Any]: data = yct_request("GET", SIGN_HOME_URL, token=token, ua=ua) if data.get("code") == 200: return data.get("data") or {} if token_invalid(data): raise RuntimeError("TOKEN_INVALID") raise RuntimeError(data.get("msg") or data.get("message") or "获取签到首页失败") def find_today_log_id(home: Dict[str, Any]) -> Any: today = datetime.now().strftime("%Y-%m-%d") for config in home.get("configs") or []: if config.get("signDate") == today: return config.get("logId") return None def do_sign(token: str, ua: str, log_id: Any) -> str: data = yct_request("POST", SIGN_URL, token=token, ua=ua, json_body={"logId": log_id}) if data.get("code") == 200: return data.get("msg") or data.get("message") or "签到成功" if token_invalid(data): raise RuntimeError("TOKEN_INVALID") return data.get("msg") or data.get("message") or f"签到失败: {data.get('code')}" def task_home(token: str, ua: str) -> Dict[str, Any]: data = yct_request("GET", TASK_HOME_URL, token=token, ua=ua) if data.get("code") == 200: return data.get("data") or {} if token_invalid(data): raise RuntimeError("TOKEN_INVALID") raise RuntimeError(data.get("msg") or data.get("message") or "获取任务首页失败") def do_task_sub(token: str, ua: str, config_id: int) -> str: data = yct_request("POST", TASK_SUB_URL, token=token, ua=ua, json_body={"configId": config_id}) if data.get("code") == 200: return data.get("msg") or data.get("message") or "操作成功" if token_invalid(data): raise RuntimeError("TOKEN_INVALID") return data.get("msg") or data.get("message") or f"任务失败: {data.get('code')}" def parse_score(home: Dict[str, Any]) -> Tuple[int, float]: try: integral = int(home.get("integral") or 0) except Exception: integral = 0 try: amount = float(home.get("amount") or 0.0) except Exception: amount = 0.0 return integral, amount # ===================== 主流程 ===================== def prepare_token(acc: Account, cache: Dict[str, Any], result: AccountResult) -> str: token = get_cached_token(acc, cache) if token: try: info = validate_token(token, acc.ua) result.login = "使用缓存" result.nick_name = str(info.get("name") or info.get("nickName") or "") result.mobile = str(info.get("mobile") or "") logging.info("账号 %s 读取并验证token缓存完成", acc.remark) return token except RuntimeError as e: if str(e) != "TOKEN_INVALID": raise logging.info("账号 %s 缓存token失效,重新获取code登录", acc.remark) code = get_mini_program_code(acc.wxid) logging.info("账号 %s 获取code完成", acc.remark) sleep_request() token, user_info = login_with_code(code, acc.ua) result.login = "成功" result.nick_name = str(user_info.get("nickName") or "") result.mobile = str(user_info.get("phonenumber") or user_info.get("userName") or "") update_account_cache(acc, cache, token=token, nick_name=result.nick_name, mobile=result.mobile) logging.info("账号 %s 登录完成,token已缓存", acc.remark) return token def process_account(acc: Account, index: int, total: int) -> AccountResult: acc.ua = stable_ua(acc.wxid or acc.remark) result = AccountResult(remark=acc.remark, wxid=acc.wxid) logging.info("账号%d/%d %s 开始", index, total, acc.remark) cache = load_cache() try: token = prepare_token(acc, cache, result) if DEBUG: logging.info("[调试] token=%s 缓存目录=%s", mask(token), CACHE_DIR) sleep_request() home = sign_home(token, acc.ua) result.integral, result.amount = parse_score(home) result.log_id = find_today_log_id(home) logging.info("账号%d/%d %s 签到首页完成 | logId: %s | 积分: %s | 余额: %.2f元", index, total, acc.remark, result.log_id or "无", result.integral, result.amount) sleep_request() if result.log_id: result.sign = do_sign(token, acc.ua, result.log_id) else: result.sign = "未找到今日logId,跳过签到" logging.info("账号%d/%d %s 签到完成 | %s", index, total, acc.remark, result.sign) sleep_request() thome = task_home(token, acc.ua) result.integral, result.amount = parse_score(thome) logging.info("账号%d/%d %s 任务首页完成 | 积分: %s | 余额: %.2f元", index, total, acc.remark, result.integral, result.amount) sleep_request() for config_id in TASK_CONFIG_IDS: for times in range(1, TASK_EXECUTE_TIMES + 1): result.task_total += 1 msg = do_task_sub(token, acc.ua, config_id) ok = "成功" in msg or "操作成功" in msg or "已" in msg or msg == "OK" if ok: result.task_success += 1 line = f"configId={config_id} 第{times}次: {msg}" result.task_details.append(line) logging.info("账号%d/%d %s 任务提交完成 | %s", index, total, acc.remark, line) sleep_request() # 任务后再查一次任务首页,记录最终积分/余额 final_home = task_home(token, acc.ua) result.integral, result.amount = parse_score(final_home) update_account_cache(acc, cache, token=token, nick_name=result.nick_name, mobile=result.mobile, integral=result.integral, amount=result.amount) logging.info("账号%d/%d %s 最终余额查询完成 | 积分: %s | 余额: %.2f元", index, total, acc.remark, result.integral, result.amount) result.success = True except Exception as e: result.success = False result.message = str(e).replace("TOKEN_INVALID", "token失效") if DEBUG: logging.error("账号 %s 执行失败: %s", acc.remark, e) traceback.print_exc() if result.success: logging.info( "账号%d/%d %s 完成 | 登录: %s | 签到: %s | 任务: %d/%d | 积分: %s | 余额: %.2f元", index, total, acc.remark, result.login, result.sign, result.task_success, result.task_total, result.integral, result.amount, ) else: logging.info("账号%d/%d %s 跳过/失败 | %s", index, total, acc.remark, result.message or "失败") return result def build_report(results: List[AccountResult]) -> str: now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") elapsed = time.time() - RUN_START_TS total = len(results) success = sum(1 for r in results if r.success) sign_ok = sum(1 for r in results if r.success and ("成功" in r.sign or "已" in r.sign or "操作成功" in r.sign)) task_success = sum(r.task_success for r in results) task_total = sum(r.task_total for r in results) total_integral = sum(r.integral for r in results) total_amount = sum(r.amount for r in results) lines = [ "", "==================================================", "📊 衣城通执行汇总", "==================================================", f"⏱️ 执行时间: {now}", f"👥 总账号数: {total}", f"✅ 成功账号: {success}", f"📝 签到成功/已签: {sign_ok}", f"🧩 任务完成: {task_success}/{task_total}", f"🎯 积分合计: {total_integral}", f"💰 余额合计: {total_amount:.2f}元", f"🧭 UA数量: {len(UA_LIST)}", f"📁 缓存目录: {CACHE_DIR}", "", "📋 账号详情:", ] for i, r in enumerate(results, 1): icon = "✅" if r.success else "❌" lines.append(f"{icon} 账号{i}: {r.remark} | 昵称: {r.nick_name or '无'} | 手机: {r.mobile or '无'}") lines.append(f" 🔐 登录: {r.login}") lines.append(f" 📝 签到: {r.sign}") lines.append(f" 🧩 任务: {r.task_success}/{r.task_total}") lines.append(f" 🎯 积分: {r.integral}") lines.append(f" 💰 余额: {r.amount:.2f}元") if r.message: lines.append(f" ⚠️ 错误: {r.message}") lines.append(f"=== 执行结束 [{now}] 耗时 {elapsed:.2f} 秒 退出码 0 ===") return "\n".join(lines) def main() -> None: setup_logging() logging.info("【衣城通】开始执行任务") if DEBUG: logging.info("APPID=%s WX_CLOUD=%s", APPID, WX_CLOUD or "(未设置)") results: List[AccountResult] = [] handled = 0 for acc, i, total in iter_selected_accounts(): handled += 1 results.append(process_account(acc, i, total)) if not SINGLE_TEST_WXID and i < total: delay = random.randint(ACCOUNT_DELAY_MIN, ACCOUNT_DELAY_MAX) if DEBUG: logging.info("[调试] 账号间隔等待 %d 秒", delay) time.sleep(delay) if handled == 0: raise RuntimeError("没有可处理账号。请检查 wx_cloud、wx_token、YCT_TEST_WXID、YCT_EXCLUDE_WXIDS 配置") print(build_report(results)) if __name__ == "__main__": try: main() except Exception as e: setup_logging() logging.error("主流程错误: %s", e) if DEBUG: traceback.print_exc() sys.exit(1)