Files
Yangmao_Script/WX_Applet/Applet_YCT.py
2026-05-23 21:30:00 +08:00

590 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# cron: 29 7 * * *
# new Env("衣城通_求职")
"""
name: 衣城通 - 养鸡场自动版
功能: 从养鸡场逐个读取微信账号,获取衣城通小程序 code登录换 token签到并完成每日任务。
外部环境变量:
wx_cloud 养鸡场地址,例如 http://192.168.0.250:666
wx_token 养鸡场 Authorization支持裸 token 或 Bearer token
YCT_TEST_WXID 可选,只跑某个 wxid
YCT_EXCLUDE_WXIDS 可选,排除 wxid支持换行/英文逗号/中文逗号分隔
脚本内配置:
TASK_CONFIG_IDS 每日任务 configId默认 [4, 5, 6, 7]
TASK_EXECUTE_TIMES 每个任务提交次数,默认 2
REQUEST_DELAY_MIN/MAX 单账号内请求间隔
ACCOUNT_DELAY_MIN/MAX 多账号之间间隔
DEBUG 调试日志,默认 False
说明:
HAR 已确认登录接口:
POST https://api.yctjob.com/client/web/wechatSession?code={小程序code}
body: {}
token路径: data.userInfo.token
"""
import json
import logging
import os
import random
import re
import sys
import time
import traceback
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Tuple
try:
import requests
requests.packages.urllib3.disable_warnings()
except ImportError:
print("缺少 requests请先安装pip install requests")
sys.exit(1)
# ===================== 固定配置 =====================
APPID = "wxc4eaf0fd0c97862f"
BASE = "https://api.yctjob.com"
LOGIN_URL = f"{BASE}/client/web/wechatSession"
SIGN_HOME_URL = f"{BASE}/client/user/signHome"
SIGN_URL = f"{BASE}/client/user/sign"
TASK_HOME_URL = f"{BASE}/client/user/taskHome"
TASK_SUB_URL = f"{BASE}/client/user/taskSub"
RESUME_URL = f"{BASE}/client/user/myResume"
REFERER = f"https://servicewechat.com/{APPID}/138/page-frame.html"
SCRIPT_DIR = Path(__file__).resolve().parent
CACHE_DIR = SCRIPT_DIR / "APP_Buffer"
TOKEN_CACHE_FILE = CACHE_DIR / "yct_token_cache.json"
UA_FILE = SCRIPT_DIR / "User_Agent.json"
# 外部环境变量
ENV = os.environ
WX_CLOUD = ENV.get("wx_cloud", "").rstrip("/")
WX_TOKEN = ENV.get("wx_token", "").strip()
if WX_TOKEN and not WX_TOKEN.lower().startswith("bearer "):
WX_TOKEN = f"Bearer {WX_TOKEN}"
SINGLE_TEST_WXID = ENV.get("YCT_TEST_WXID", "").strip()
EXCLUDE_WXIDS_RAW = ENV.get("YCT_EXCLUDE_WXIDS", "")
EXCLUDE_WXIDS = [x.strip() for x in re.split(r"[\n,]+", EXCLUDE_WXIDS_RAW) if x.strip()]
# 脚本内配置
DEBUG = False
VERIFY_SSL = False
TIMEOUT = 20.0
CLOUD_PAGE_SIZE = 1
REQUEST_DELAY_MIN = 3
REQUEST_DELAY_MAX = 5
ACCOUNT_DELAY_MIN = 20
ACCOUNT_DELAY_MAX = 40
TASK_CONFIG_IDS = [4, 5, 6, 7]
TASK_EXECUTE_TIMES = 2
TOKEN_CACHE_TTL = 7 * 24 * 3600
RUN_START_TS = time.time()
DEFAULT_UA_LIST = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) UnifiedPCWindowsWechat(0xf2541939) XWEB/19841",
]
@dataclass
class Account:
remark: str
wxid: str
ua: str = ""
@dataclass
class AccountResult:
remark: str
wxid: str
success: bool = False
login: str = "未执行"
sign: str = "未执行"
log_id: Any = ""
task_success: int = 0
task_total: int = 0
task_details: List[str] = field(default_factory=list)
nick_name: str = ""
mobile: str = ""
integral: int = 0
amount: float = 0.0
message: str = ""
# ===================== 基础工具 =====================
def setup_logging() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s\t- %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.StreamHandler()],
)
def mask(value: Any, keep_start: int = 6, keep_end: int = 4) -> str:
s = str(value or "")
if not s:
return ""
if len(s) <= keep_start + keep_end:
return "***"
return s[:keep_start] + "***" + s[-keep_end:]
def now_ts() -> int:
return int(time.time())
def debug(title: str, data: Any) -> None:
if not DEBUG:
return
try:
text = json.dumps(data, ensure_ascii=False, indent=2)
text = re.sub(r"eyJ[A-Za-z0-9_\-.]+", "<TOKEN>", text)
logging.info("[调试] %s: %s", title, text)
except Exception:
logging.info("[调试] %s: %s", title, data)
def safe_json(resp: requests.Response) -> Optional[Dict[str, Any]]:
try:
return resp.json()
except Exception:
return None
def load_ua_list() -> List[str]:
if UA_FILE.exists():
try:
uas = [x.strip() for x in UA_FILE.read_text(encoding="utf-8").splitlines() if x.strip() and not x.strip().startswith("#")]
if uas:
return uas
except Exception:
pass
return DEFAULT_UA_LIST
UA_LIST = load_ua_list()
def stable_ua(key: str) -> str:
return UA_LIST[abs(hash(key)) % len(UA_LIST)]
def sleep_request() -> None:
time.sleep(random.randint(REQUEST_DELAY_MIN, REQUEST_DELAY_MAX))
def ensure_cache_dir() -> None:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
def load_cache() -> Dict[str, Any]:
ensure_cache_dir()
if not TOKEN_CACHE_FILE.exists():
return {"_说明": "衣城通自动维护缓存。按 wxid 保存 token、昵称、积分、余额等。", "accounts": {}}
try:
data = json.loads(TOKEN_CACHE_FILE.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError("cache root is not dict")
data.setdefault("_说明", "衣城通自动维护缓存。")
data.setdefault("accounts", {})
return data
except Exception as e:
backup = TOKEN_CACHE_FILE.with_suffix(f".broken.{now_ts()}.json")
try:
TOKEN_CACHE_FILE.rename(backup)
except Exception:
pass
if DEBUG:
logging.warning("[调试] 缓存文件损坏,已重建: %s", e)
return {"_说明": "衣城通自动维护缓存。", "accounts": {}}
def save_cache(cache: Dict[str, Any]) -> None:
ensure_cache_dir()
TOKEN_CACHE_FILE.write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8")
def get_cached_token(acc: Account, cache: Dict[str, Any]) -> str:
item = (cache.get("accounts") or {}).get(acc.wxid) or {}
token = str(item.get("token") or "")
updated_at = int(item.get("updated_at") or 0)
if token and now_ts() - updated_at < TOKEN_CACHE_TTL:
return token
return ""
def update_account_cache(acc: Account, cache: Dict[str, Any], *, token: str = "", nick_name: str = "", mobile: str = "", integral: int = 0, amount: float = 0.0) -> None:
accounts = cache.setdefault("accounts", {})
old = accounts.get(acc.wxid) or {}
old.update({
"remark": acc.remark,
"wxid": acc.wxid,
"token": token or old.get("token", ""),
"nick_name": nick_name or old.get("nick_name", ""),
"mobile": mobile or old.get("mobile", ""),
"integral": integral,
"amount": amount,
"ua": acc.ua,
"updated_at": now_ts(),
"updated_at_text": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
})
accounts[acc.wxid] = old
save_cache(cache)
# ===================== 养鸡场账号/code =====================
def wx_cloud_headers() -> Dict[str, str]:
return {"Authorization": WX_TOKEN, "Content-Type": "application/json"}
def account_from_cloud_row(item: Dict[str, Any]) -> Optional[Account]:
wxid = item.get("wxId") or item.get("wxid") or item.get("Wxid") or ""
if not wxid:
return None
remark = item.get("wxName") or item.get("wxname") or item.get("remark") or item.get("nickName") or mask(wxid)
return Account(remark=str(remark), wxid=str(wxid))
def fetch_cloud_account_page(page_num: int, page_size: int) -> Tuple[List[Account], int]:
if not WX_CLOUD or not WX_TOKEN:
raise RuntimeError("缺少 wx_cloud 或 wx_token")
url = f"{WX_CLOUD}/prod-api/wechat/wechat/list"
resp = requests.get(url, headers=wx_cloud_headers(), params={"pageNum": page_num, "pageSize": page_size}, timeout=TIMEOUT, verify=VERIFY_SSL)
data = safe_json(resp)
debug(f"养鸡场账号列表 page={page_num}", data if data is not None else resp.text[:500])
if data and data.get("code") == 200 and isinstance(data.get("rows"), list):
accounts = [acc for row in data["rows"] if (acc := account_from_cloud_row(row))]
return accounts, int(data.get("total") or len(accounts))
raise RuntimeError((data or {}).get("msg") or (data or {}).get("message") or f"获取账号失败 HTTP {resp.status_code}")
def iter_accounts_from_cloud() -> Generator[Tuple[Account, int, int], None, None]:
page = 1
seen = 0
total = None
while True:
accounts, total_count = fetch_cloud_account_page(page, CLOUD_PAGE_SIZE)
total = total_count if total is None else total
if not accounts:
break
for acc in accounts:
seen += 1
yield acc, seen, total
if seen >= total:
break
page += 1
def should_skip(acc: Account) -> bool:
if acc.wxid in EXCLUDE_WXIDS:
return True
if SINGLE_TEST_WXID and acc.wxid != SINGLE_TEST_WXID:
return True
return False
def iter_selected_accounts() -> Generator[Tuple[Account, int, int], None, None]:
processed = 0
for acc, cloud_index, cloud_total in iter_accounts_from_cloud():
if should_skip(acc):
continue
processed += 1
yield acc, processed if SINGLE_TEST_WXID else cloud_index, 1 if SINGLE_TEST_WXID else cloud_total
if SINGLE_TEST_WXID and acc.wxid == SINGLE_TEST_WXID:
break
def get_mini_program_code(wxid: str) -> str:
url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode"
payload = {"wxid": wxid, "appid": APPID}
resp = requests.post(url, headers=wx_cloud_headers(), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL)
data = safe_json(resp)
debug("获取小程序code", data if data is not None else resp.text[:500])
if not data:
raise RuntimeError(f"获取 code 失败: HTTP {resp.status_code}")
if data.get("code") != 200:
raise RuntimeError(data.get("msg") or data.get("message") or f"获取 code 失败: {data}")
code = ((data.get("data") or {}).get("code") or data.get("codeData") or data.get("data"))
if isinstance(code, dict):
code = code.get("code")
if not code:
raise RuntimeError("获取 code 失败: 响应中没有 code")
return str(code)
# ===================== 衣城通接口 =====================
def yct_headers(token: str = "", ua: str = "") -> Dict[str, str]:
headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Content-Type": "application/json",
"Host": "api.yctjob.com",
"Referer": REFERER,
"User-Agent": ua or DEFAULT_UA_LIST[0],
"xweb_xhr": "1",
}
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
def yct_request(method: str, url: str, *, token: str = "", ua: str = "", params: Dict[str, Any] = None, json_body: Dict[str, Any] = None) -> Dict[str, Any]:
headers = yct_headers(token=token, ua=ua)
if method.upper() == "POST":
resp = requests.post(url, headers=headers, params=params, json=json_body if json_body is not None else {}, timeout=TIMEOUT, verify=VERIFY_SSL)
else:
resp = requests.get(url, headers=headers, params=params, timeout=TIMEOUT, verify=VERIFY_SSL)
data = safe_json(resp)
debug(f"衣城通接口 {url}", data if data is not None else resp.text[:500])
if isinstance(data, dict):
return data
return {"code": -1, "msg": f"JSON解析失败 HTTP {resp.status_code}: {resp.text[:200]}"}
def token_invalid(data: Dict[str, Any]) -> bool:
msg = str(data.get("msg") or data.get("message") or "")
code = data.get("code")
return code in (401, 403) or any(k in msg for k in ["token", "登录", "授权", "失效", "过期", "无效", "请先"])
def login_with_code(code: str, ua: str) -> Tuple[str, Dict[str, Any]]:
# HAR: POST /client/web/wechatSession?code=*** body {}
data = yct_request("POST", LOGIN_URL, ua=ua, params={"code": code}, json_body={})
if data.get("code") == 200:
user_info = ((data.get("data") or {}).get("userInfo") or {})
token = user_info.get("token")
if token:
return str(token), user_info
raise RuntimeError(data.get("msg") or data.get("message") or "code 登录换 token 失败")
def validate_token(token: str, ua: str) -> Dict[str, Any]:
data = yct_request("GET", RESUME_URL, token=token, ua=ua)
if data.get("code") == 200:
return data.get("data") or {}
if token_invalid(data):
raise RuntimeError("TOKEN_INVALID")
raise RuntimeError(data.get("msg") or data.get("message") or "验证 token 失败")
def sign_home(token: str, ua: str) -> Dict[str, Any]:
data = yct_request("GET", SIGN_HOME_URL, token=token, ua=ua)
if data.get("code") == 200:
return data.get("data") or {}
if token_invalid(data):
raise RuntimeError("TOKEN_INVALID")
raise RuntimeError(data.get("msg") or data.get("message") or "获取签到首页失败")
def find_today_log_id(home: Dict[str, Any]) -> Any:
today = datetime.now().strftime("%Y-%m-%d")
for config in home.get("configs") or []:
if config.get("signDate") == today:
return config.get("logId")
return None
def do_sign(token: str, ua: str, log_id: Any) -> str:
data = yct_request("POST", SIGN_URL, token=token, ua=ua, json_body={"logId": log_id})
if data.get("code") == 200:
return data.get("msg") or data.get("message") or "签到成功"
if token_invalid(data):
raise RuntimeError("TOKEN_INVALID")
return data.get("msg") or data.get("message") or f"签到失败: {data.get('code')}"
def task_home(token: str, ua: str) -> Dict[str, Any]:
data = yct_request("GET", TASK_HOME_URL, token=token, ua=ua)
if data.get("code") == 200:
return data.get("data") or {}
if token_invalid(data):
raise RuntimeError("TOKEN_INVALID")
raise RuntimeError(data.get("msg") or data.get("message") or "获取任务首页失败")
def do_task_sub(token: str, ua: str, config_id: int) -> str:
data = yct_request("POST", TASK_SUB_URL, token=token, ua=ua, json_body={"configId": config_id})
if data.get("code") == 200:
return data.get("msg") or data.get("message") or "操作成功"
if token_invalid(data):
raise RuntimeError("TOKEN_INVALID")
return data.get("msg") or data.get("message") or f"任务失败: {data.get('code')}"
def parse_score(home: Dict[str, Any]) -> Tuple[int, float]:
try:
integral = int(home.get("integral") or 0)
except Exception:
integral = 0
try:
amount = float(home.get("amount") or 0.0)
except Exception:
amount = 0.0
return integral, amount
# ===================== 主流程 =====================
def prepare_token(acc: Account, cache: Dict[str, Any], result: AccountResult) -> str:
token = get_cached_token(acc, cache)
if token:
try:
info = validate_token(token, acc.ua)
result.login = "使用缓存"
result.nick_name = str(info.get("name") or info.get("nickName") or "")
result.mobile = str(info.get("mobile") or "")
logging.info("账号 %s 读取并验证token缓存完成", acc.remark)
return token
except RuntimeError as e:
if str(e) != "TOKEN_INVALID":
raise
logging.info("账号 %s 缓存token失效重新获取code登录", acc.remark)
code = get_mini_program_code(acc.wxid)
logging.info("账号 %s 获取code完成", acc.remark)
sleep_request()
token, user_info = login_with_code(code, acc.ua)
result.login = "成功"
result.nick_name = str(user_info.get("nickName") or "")
result.mobile = str(user_info.get("phonenumber") or user_info.get("userName") or "")
update_account_cache(acc, cache, token=token, nick_name=result.nick_name, mobile=result.mobile)
logging.info("账号 %s 登录完成token已缓存", acc.remark)
return token
def process_account(acc: Account, index: int, total: int) -> AccountResult:
acc.ua = stable_ua(acc.wxid or acc.remark)
result = AccountResult(remark=acc.remark, wxid=acc.wxid)
logging.info("账号%d/%d %s 开始", index, total, acc.remark)
cache = load_cache()
try:
token = prepare_token(acc, cache, result)
if DEBUG:
logging.info("[调试] token=%s 缓存目录=%s", mask(token), CACHE_DIR)
sleep_request()
home = sign_home(token, acc.ua)
result.integral, result.amount = parse_score(home)
result.log_id = find_today_log_id(home)
logging.info("账号%d/%d %s 签到首页完成 | logId: %s | 积分: %s | 余额: %.2f", index, total, acc.remark, result.log_id or "", result.integral, result.amount)
sleep_request()
if result.log_id:
result.sign = do_sign(token, acc.ua, result.log_id)
else:
result.sign = "未找到今日logId跳过签到"
logging.info("账号%d/%d %s 签到完成 | %s", index, total, acc.remark, result.sign)
sleep_request()
thome = task_home(token, acc.ua)
result.integral, result.amount = parse_score(thome)
logging.info("账号%d/%d %s 任务首页完成 | 积分: %s | 余额: %.2f", index, total, acc.remark, result.integral, result.amount)
sleep_request()
for config_id in TASK_CONFIG_IDS:
for times in range(1, TASK_EXECUTE_TIMES + 1):
result.task_total += 1
msg = do_task_sub(token, acc.ua, config_id)
ok = "成功" in msg or "操作成功" in msg or "" in msg or msg == "OK"
if ok:
result.task_success += 1
line = f"configId={config_id}{times}次: {msg}"
result.task_details.append(line)
logging.info("账号%d/%d %s 任务提交完成 | %s", index, total, acc.remark, line)
sleep_request()
# 任务后再查一次任务首页,记录最终积分/余额
final_home = task_home(token, acc.ua)
result.integral, result.amount = parse_score(final_home)
update_account_cache(acc, cache, token=token, nick_name=result.nick_name, mobile=result.mobile, integral=result.integral, amount=result.amount)
logging.info("账号%d/%d %s 最终余额查询完成 | 积分: %s | 余额: %.2f", index, total, acc.remark, result.integral, result.amount)
result.success = True
except Exception as e:
result.success = False
result.message = str(e).replace("TOKEN_INVALID", "token失效")
if DEBUG:
logging.error("账号 %s 执行失败: %s", acc.remark, e)
traceback.print_exc()
if result.success:
logging.info(
"账号%d/%d %s 完成 | 登录: %s | 签到: %s | 任务: %d/%d | 积分: %s | 余额: %.2f",
index, total, acc.remark, result.login, result.sign, result.task_success, result.task_total, result.integral, result.amount,
)
else:
logging.info("账号%d/%d %s 跳过/失败 | %s", index, total, acc.remark, result.message or "失败")
return result
def build_report(results: List[AccountResult]) -> str:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
elapsed = time.time() - RUN_START_TS
total = len(results)
success = sum(1 for r in results if r.success)
sign_ok = sum(1 for r in results if r.success and ("成功" in r.sign or "" in r.sign or "操作成功" in r.sign))
task_success = sum(r.task_success for r in results)
task_total = sum(r.task_total for r in results)
total_integral = sum(r.integral for r in results)
total_amount = sum(r.amount for r in results)
lines = [
"",
"==================================================",
"📊 衣城通执行汇总",
"==================================================",
f"⏱️ 执行时间: {now}",
f"👥 总账号数: {total}",
f"✅ 成功账号: {success}",
f"📝 签到成功/已签: {sign_ok}",
f"🧩 任务完成: {task_success}/{task_total}",
f"🎯 积分合计: {total_integral}",
f"💰 余额合计: {total_amount:.2f}",
f"🧭 UA数量: {len(UA_LIST)}",
f"📁 缓存目录: {CACHE_DIR}",
"",
"📋 账号详情:",
]
for i, r in enumerate(results, 1):
icon = "" if r.success else ""
lines.append(f"{icon} 账号{i}: {r.remark} | 昵称: {r.nick_name or ''} | 手机: {r.mobile or ''}")
lines.append(f" 🔐 登录: {r.login}")
lines.append(f" 📝 签到: {r.sign}")
lines.append(f" 🧩 任务: {r.task_success}/{r.task_total}")
lines.append(f" 🎯 积分: {r.integral}")
lines.append(f" 💰 余额: {r.amount:.2f}")
if r.message:
lines.append(f" ⚠️ 错误: {r.message}")
lines.append(f"=== 执行结束 [{now}] 耗时 {elapsed:.2f} 秒 退出码 0 ===")
return "\n".join(lines)
def main() -> None:
setup_logging()
logging.info("【衣城通】开始执行任务")
if DEBUG:
logging.info("APPID=%s WX_CLOUD=%s", APPID, WX_CLOUD or "(未设置)")
results: List[AccountResult] = []
handled = 0
for acc, i, total in iter_selected_accounts():
handled += 1
results.append(process_account(acc, i, total))
if not SINGLE_TEST_WXID and i < total:
delay = random.randint(ACCOUNT_DELAY_MIN, ACCOUNT_DELAY_MAX)
if DEBUG:
logging.info("[调试] 账号间隔等待 %d", delay)
time.sleep(delay)
if handled == 0:
raise RuntimeError("没有可处理账号。请检查 wx_cloud、wx_token、YCT_TEST_WXID、YCT_EXCLUDE_WXIDS 配置")
print(build_report(results))
if __name__ == "__main__":
try:
main()
except Exception as e:
setup_logging()
logging.error("主流程错误: %s", e)
if DEBUG:
traceback.print_exc()
sys.exit(1)