From a778cfdf082019e63471dabc41b75b556e47783e Mon Sep 17 00:00:00 2001 From: admin <362324317@qq.com> Date: Sat, 23 May 2026 17:38:14 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20WX=5FApplet/Applet=5FLWDJG?= =?UTF-8?q?.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- WX_Applet/Applet_LWDJG.py | 1054 +++++++++++++++++++++++++++++++++++++ 1 file changed, 1054 insertions(+) create mode 100644 WX_Applet/Applet_LWDJG.py diff --git a/WX_Applet/Applet_LWDJG.py b/WX_Applet/Applet_LWDJG.py new file mode 100644 index 0000000..3cce9ce --- /dev/null +++ b/WX_Applet/Applet_LWDJG.py @@ -0,0 +1,1054 @@ +# new Env("浓五的酒馆签到+抽奖") +# cron: 15 11 * * * +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +浓五酒馆 / 五粮浓香积分商城 - 养鸡场自动登录并执行签到、抽奖一体版 +version: 1.0.0 + +功能: +1. 从养鸡场/微信协议服务读取微信账号列表,或手动指定 wxid +2. 对“浓五的酒馆签到”小程序获取 code -> 登录 -> 内部拿 token -> 执行签到 +3. 对“五粮浓香积分商城抽奖”小程序获取 code -> 登录 -> 内部拿 token -> 积分抽奖/红包领取 +4. 不打印 token,只打印执行结果 + +仅用于你自己有权限的微信账号和服务。 + +==================== 必填环境变量 ==================== +wx_cloud / WECHAT_SERVER 养鸡场地址,例如:http://127.0.0.1:666 +wx_token 养鸡场 Authorization,支持裸 token 或 Bearer token,脚本自动识别 + +==================== 可选环境变量 ==================== +TASK_MODE 执行模式:all / sign / lottery,默认 all +ACCOUNT_SOURCE 账号来源:cloud / manual,默认 cloud +MANUAL_ACCOUNTS 手动账号,格式:备注#wxid,一行一个,仅 ACCOUNT_SOURCE=manual 时使用 +SINGLE_TEST_WXID 只测试一个 wxid +EXCLUDE_WXIDS 排除 wxid,支持换行分隔,也兼容英文/中文逗号分隔 +CLOUD_PAGE_SIZE 养鸡场分页大小,默认 1;用于一个账号处理完再取下一个 +AUTO_BEARER 是否自动给 wx_token 补 Bearer,默认 true;如接口要求裸 token,设 false +DELAY 账号间延迟秒数,默认 5 +TIMEOUT 请求超时秒数,默认 20 +DEBUG true/false,默认 false +VERIFY_SSL true/false,默认 false +MAX_LOTTERY_FAILS 抽奖连续失败上限,默认 3 +LOTTERY_DELAY_MIN 抽奖间隔最小秒数,默认 8 +LOTTERY_DELAY_MAX 抽奖间隔最大秒数,默认 15 +LOTTERY_ACTIVITY_ID 手动指定【抽奖端】活动ID;不填则从浓友购活动列表自动抓取 +LOTTERY_DAILY_LIMIT 每个账号每天最多抽奖次数,默认 3 +LOTTERY_COST 每次抽奖消耗积分,默认 50 +CACHE_TTL token 缓存秒数,默认 259200;过期或验证失败自动重新登录 +NWJG_CACHE_FILE token/UA 缓存文件,默认脚本同目录 nw_jg_cookie_cache.json +UA_FILE UA 文件路径,默认脚本同目录 UA_LIST.txt;一行一个 UA,支持 # 注释 + +==================== 示例 ==================== +export wx_cloud="http://127.0.0.1:666" +export wx_token="Bearer xxxxx" +python3 nw_jg_yjc_auto.py + +只签到: +export TASK_MODE="sign" +python3 nw_jg_yjc_auto.py + +只抽奖: +export TASK_MODE="lottery" +python3 nw_jg_yjc_auto.py +""" + +import json +import os +import random +import re +import sys +import time +import traceback +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional +from urllib.parse import urlencode + +try: + import requests +except ImportError: + print("缺少 requests,请先安装:pip install requests") + sys.exit(1) + +try: + import urllib3 + urllib3.disable_warnings() +except Exception: + pass + + +# ===================== 项目固定配置 ===================== +SIGN_APPID = "wxed3cf95a14b58a26" +SIGN_BASE = "https://stdcrm.dtmiller.com" +SIGN_LOGIN_URL = f"{SIGN_BASE}/std-weixin-mp-service/miniApp/custom/login" +SIGN_USER_API = f"{SIGN_BASE}/scrm-promotion-service/mini/wly/user/info" +SIGN_MODULE_API = f"{SIGN_BASE}/scrm-promotion-service/mini/module/config/list" +SIGN_FIXED_API = f"{SIGN_BASE}/scrm-promotion-service/promotion/sign/today?promotionId=PI69eb321d37c48c000a05ee4e" + +# 抽奖实际跳转到“浓友购/五粮浓香积分商城”小程序,不是旧 jf.wlnxjc.com/mini 那套。 +LOTTERY_APPID = "wx99fa98e883130aa3" +LOTTERY_BASE = "https://www.wlnxjc.com:8088" +LOTTERY_LOGIN_URL = f"{LOTTERY_BASE}/app-api/member/auth/weixin-mini-app-login" +LOTTERY_ACTIVITY_API = f"{LOTTERY_BASE}/app-api/promotion/activity/list" +LOTTERY_ACTIVITY_DETAIL_API = f"{LOTTERY_BASE}/app-api/promotion/activity/get-detail" +LOTTERY_INTEGRAL_API = f"{LOTTERY_BASE}/app-api/member/integral/get" +# 浓友购积分大转盘提交/领取接口。 +LOTTERY_DRAW_API_DEFAULT = f"{LOTTERY_BASE}/app-api/promotion/activity/draw" +LOTTERY_RECEIVE_API_DEFAULT = f"{LOTTERY_BASE}/app-api/promotion/activity/receive" + +DEFAULT_UA_LIST = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) UnifiedPCWindowsWechat(0xf2541939) XWEB/19841", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.6778.86 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) UnifiedPCWindowsWechat(0xf2541939) XWEB/19763", + "Mozilla/5.0 (Linux; Android 15; 24129PN74C Build/AQ3A.240912.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/132.0.6834.163 Mobile Safari/537.36 XWEB/1320093 MMWEBSDK/20250201 MMWEBID/128 MicroMessenger/8.0.56.2800(0x2800383D) WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android", + "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.55(0x1800372f) NetType/WIFI Language/zh_CN MiniProgramEnv/ios", +] + + +def load_ua_list() -> List[str]: + ua_file = Path(os.environ.get("UA_FILE", Path(__file__).with_name("UA_LIST.txt"))) + if ua_file.exists(): + uas: List[str] = [] + for raw in ua_file.read_text(encoding="utf-8").splitlines(): + line = raw.strip() + if not line or line.startswith("#"): + continue + uas.append(line) + if uas: + return uas + return DEFAULT_UA_LIST + + +UA_LIST = load_ua_list() + + +# ===================== 环境变量 ===================== +ENV = os.environ +WX_CLOUD = (ENV.get("wx_cloud") or ENV.get("WECHAT_SERVER") or "").rstrip("/") +WX_TOKEN = ENV.get("wx_token", "").strip() +AUTO_BEARER = ENV.get("AUTO_BEARER", "true").lower() != "false" +if AUTO_BEARER and WX_TOKEN and not WX_TOKEN.lower().startswith("bearer "): + WX_TOKEN = f"Bearer {WX_TOKEN}" + +TASK_MODE = ENV.get("TASK_MODE", "all").lower() # all/sign/lottery +ACCOUNT_SOURCE = ENV.get("ACCOUNT_SOURCE", "cloud").lower() +MANUAL_ACCOUNTS = ENV.get("MANUAL_ACCOUNTS", "") +SINGLE_TEST_WXID = ENV.get("SINGLE_TEST_WXID", "").strip() +EXCLUDE_WXIDS = [x.strip() for x in re.split(r"[\n,,]+", ENV.get("EXCLUDE_WXIDS", "")) if x.strip()] +CLOUD_PAGE_SIZE = max(1, int(ENV.get("CLOUD_PAGE_SIZE", "1"))) +DELAY = float(ENV.get("DELAY", "5")) +TIMEOUT = float(ENV.get("TIMEOUT", "20")) +DEBUG = ENV.get("DEBUG", "false").lower() == "true" +VERIFY_SSL = ENV.get("VERIFY_SSL", "false").lower() == "true" +MAX_LOTTERY_FAILS = int(ENV.get("MAX_LOTTERY_FAILS", "3")) +LOTTERY_DAILY_LIMIT = int(ENV.get("LOTTERY_DAILY_LIMIT", "3")) +LOTTERY_COST = int(ENV.get("LOTTERY_COST", "50")) +LOTTERY_DELAY_MIN = int(ENV.get("LOTTERY_DELAY_MIN", "8")) +LOTTERY_DELAY_MAX = int(ENV.get("LOTTERY_DELAY_MAX", "15")) +LOTTERY_ACTIVITY_ID = ENV.get("LOTTERY_ACTIVITY_ID", "").strip() +LOTTERY_DRAW_API = ENV.get("LOTTERY_DRAW_API", LOTTERY_DRAW_API_DEFAULT).strip() +LOTTERY_RECEIVE_API = ENV.get("LOTTERY_RECEIVE_API", LOTTERY_RECEIVE_API_DEFAULT).strip() +CACHE_FILE = Path(ENV.get("NWJG_CACHE_FILE", Path(__file__).with_name("nw_jg_cookie_cache.json"))) +CACHE_TTL = int(ENV.get("CACHE_TTL", "259200")) # 默认 3 天,服务端失效会自动重登 +SESSION_LOG_LINES: List[str] = [] +RUN_START_TS = time.time() + + +@dataclass +class Account: + remark: str + wxid: str + sign_token: str = "" + lottery_token: str = "" + ua: str = "" + + +# ===================== 基础工具 ===================== +def emit(prefix: str, *args): + line = f"{prefix} {' '.join(str(a) for a in args)}" + print(line) + SESSION_LOG_LINES.append(line) + + +def log(*args): + emit("[INFO]", *args) + + +def ok(*args): + emit("[ OK ]", *args) + + +def warn(*args): + emit("[WARN]", *args) + + +def err(*args): + emit("[ERR ]", *args) + + +def debug(title: str, data: Any): + if not DEBUG: + return + print(f"\n[DEBUG] {title}") + try: + print(json.dumps(data, ensure_ascii=False, indent=2)) + except Exception: + print(data) + + +def mask(value: Any, keep_start: int = 8, keep_end: int = 4) -> str: + s = str(value or "") + if not s: + return "" + if len(s) <= keep_start + keep_end: + return "***" + return s[:keep_start] + "***" + s[-keep_end:] + + +def safe_json_response(resp: requests.Response) -> Optional[Any]: + try: + return resp.json() + except Exception: + return None + + +def load_cache() -> Dict[str, Any]: + if not CACHE_FILE.exists(): + return {"accounts": {}} + try: + data = json.loads(CACHE_FILE.read_text(encoding="utf-8")) + return data if isinstance(data, dict) else {"accounts": {}} + except Exception: + return {"accounts": {}} + + +def save_cache(cache: Dict[str, Any]) -> None: + CACHE_FILE.parent.mkdir(parents=True, exist_ok=True) + CACHE_FILE.write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8") + + +def choose_stable_ua(wxid: str) -> str: + # 只在首次没有缓存时选一次;之后跟随 cookie/token 缓存,避免每次换设备指纹。 + idx = abs(hash(wxid)) % len(UA_LIST) + return UA_LIST[idx] + + +def cache_get_account(wxid: str) -> Dict[str, Any]: + cache = load_cache() + accounts = cache.setdefault("accounts", {}) + return accounts.setdefault(wxid, {}) + + +def cache_update_account(wxid: str, updates: Dict[str, Any]) -> None: + cache = load_cache() + accounts = cache.setdefault("accounts", {}) + item = accounts.setdefault(wxid, {}) + item.update(updates) + item["updated_at"] = int(time.time()) + save_cache(cache) + + +def get_cached_ua(wxid: str) -> str: + item = cache_get_account(wxid) + ua = item.get("ua") + if not ua: + ua = choose_stable_ua(wxid) + cache_update_account(wxid, {"ua": ua}) + return ua + + +def is_cache_fresh(item: Dict[str, Any], key: str) -> bool: + value = item.get(key) + ts = int(item.get(f"{key}_ts") or 0) + return bool(value) and (time.time() - ts < CACHE_TTL) + + +def ensure_config(): + if not WX_CLOUD: + raise RuntimeError("缺少 wx_cloud 或 WECHAT_SERVER") + if not WX_TOKEN: + raise RuntimeError("缺少 wx_token") + if TASK_MODE not in {"all", "sign", "lottery"}: + raise RuntimeError("TASK_MODE 只支持 all / sign / lottery") + + +# ===================== 养鸡场账号与 code ===================== +def account_from_cloud_row(item: Dict[str, Any]) -> Optional[Account]: + wxid = item.get("wxId") or item.get("wxid") or item.get("Wxid") or "" + if not wxid: + return None + remark = item.get("wxName") or item.get("wxname") or item.get("remark") or item.get("nickName") or mask(wxid) + return Account(remark=remark, wxid=wxid) + + +def fetch_cloud_account_page(page_num: int, page_size: int) -> tuple[List[Account], int]: + """按页读取养鸡场账号。默认 page_size=1,实现处理完一个账号再取下一个账号。""" + url = f"{WX_CLOUD}/prod-api/wechat/wechat/list" + headers = {"Authorization": WX_TOKEN, "Content-Type": "application/json"} + params = {"pageNum": page_num, "pageSize": page_size} + resp = requests.get(url, headers=headers, params=params, timeout=TIMEOUT, verify=VERIFY_SSL) + data = safe_json_response(resp) + if DEBUG: + preview = data + if isinstance(data, dict) and isinstance(data.get("rows"), list): + preview = {**data, "rows": [ + {"wxId": r.get("wxId") or r.get("wxid") or r.get("Wxid"), "wxName": r.get("wxName") or r.get("wxname")} + for r in data.get("rows", []) + ]} + debug(f"养鸡场账号列表响应 page={page_num} size={page_size}", preview if preview is not None else resp.text[:1000]) + + if data and data.get("code") == 200 and isinstance(data.get("rows"), list): + accounts = [acc for item in data["rows"] if (acc := account_from_cloud_row(item))] + total = int(data.get("total") or len(accounts)) + return accounts, total + raise RuntimeError((data or {}).get("msg") or (data or {}).get("message") or f"获取养鸡场账号列表失败: HTTP {resp.status_code}") + + +def iter_accounts_from_cloud(): + page = 1 + seen = 0 + while True: + accounts, total = fetch_cloud_account_page(page, CLOUD_PAGE_SIZE) + if not accounts: + break + for acc in accounts: + seen += 1 + yield acc, seen, total + if seen >= total: + break + page += 1 + + +def get_accounts_from_cloud() -> List[Account]: + # 兼容 SINGLE_TEST_WXID:需要扫描分页直到找到匹配账号。 + return [acc for acc, _, _ in iter_accounts_from_cloud()] + + +def get_accounts_from_manual() -> List[Account]: + accounts: List[Account] = [] + for line in MANUAL_ACCOUNTS.splitlines(): + line = line.strip() + if not line: + continue + parts = [p.strip() for p in line.split("#") if p.strip()] + if len(parts) >= 2: + remark, wxid = parts[0], parts[1] + else: + remark, wxid = parts[0], parts[0] + accounts.append(Account(remark=remark or mask(wxid), wxid=wxid)) + return accounts + + +def get_mini_program_code(wxid: str, appid: str) -> str: + url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode" + headers = {"Authorization": WX_TOKEN, "Content-Type": "application/json"} + payload = {"wxid": wxid, "appid": appid} + resp = requests.post(url, headers=headers, json=payload, timeout=TIMEOUT, verify=VERIFY_SSL) + data = safe_json_response(resp) + debug(f"取 code 响应 appid={appid} wxid={wxid}", data if data is not None else resp.text[:1000]) + + if data and data.get("code") == 200: + d = data.get("data") + if isinstance(d, dict) and d.get("code"): + return str(d["code"]) + if isinstance(d, str) and d: + return d + raise RuntimeError((data or {}).get("msg") or (data or {}).get("message") or f"获取小程序 code 失败: HTTP {resp.status_code}") + + +# ===================== 浓五酒馆签到端 ===================== +def sign_headers(token: str = "", ua: str = "") -> Dict[str, str]: + h = { + "Host": "stdcrm.dtmiller.com", + "Connection": "keep-alive", + "charset": "utf-8", + "User-Agent": ua or random.choice(UA_LIST), + "content-type": "application/json", + "Referer": f"https://servicewechat.com/{SIGN_APPID}/226/page-frame.html", + "Accept-Encoding": "gzip,compress,br,deflate", + } + if token: + h["authorization"] = token if token.lower().startswith("bearer ") else f"Bearer {token}" + h["Authorization"] = h["authorization"] + return h + + +def sign_login(code: str, ua: str) -> str: + payload = {"code": code, "appId": SIGN_APPID} + resp = requests.post(SIGN_LOGIN_URL, headers=sign_headers(ua=ua), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL) + data = safe_json_response(resp) + debug("浓五酒馆签到登录响应", data if data is not None else resp.text[:1000]) + if not data: + raise RuntimeError(f"签到端登录响应不是 JSON: HTTP {resp.status_code}, {resp.text[:200]}") + if data.get("code") != 0: + raise RuntimeError(data.get("msg") or data.get("message") or "签到端登录失败") + token = data.get("data") + if not token: + raise RuntimeError("签到端登录成功但未返回 token") + return token if str(token).lower().startswith("bearer ") else f"Bearer {token}" + + +def sign_get_json(url: str, token: str, ua: str) -> Dict[str, Any]: + resp = requests.get(url, headers=sign_headers(token, ua), timeout=TIMEOUT, verify=VERIFY_SSL) + data = safe_json_response(resp) + if data is None: + return {"code": -1, "msg": f"JSON解析失败: HTTP {resp.status_code}; body={resp.text[:200]}"} + return data + + +def sign_post_json(url: str, token: str, ua: str, body: Optional[dict] = None) -> Dict[str, Any]: + resp = requests.post(url, headers=sign_headers(token, ua), json=body or {}, timeout=TIMEOUT, verify=VERIFY_SSL) + data = safe_json_response(resp) + if data is None: + return {"code": -1, "msg": f"JSON解析失败: HTTP {resp.status_code}; body={resp.text[:200]}"} + return data + + +def get_dynamic_sign_act_id(token: str, ua: str) -> Optional[str]: + data = sign_post_json(SIGN_MODULE_API, token, ua, {}) + debug("每日签到模块响应", data) + if not data or data.get("code") != 0: + return None + for module in data.get("data", []) or []: + for detail in module.get("detailList", []) or []: + try: + detail_json = json.loads(detail.get("detailJson") or "{}") + if detail_json.get("title") == "每日签到": + page_path = detail_json.get("jumpData", {}).get("pagePath", "") + m = re.search(r"actId=([^&]+)", page_path) + if m: + return m.group(1) + except Exception: + continue + return None + + +def run_sign_task(acc: Account) -> Dict[str, Any]: + cache_item = cache_get_account(acc.wxid) + token = cache_item.get("sign_token") if is_cache_fresh(cache_item, "sign_token") else "" + if token: + user_probe = sign_get_json(SIGN_USER_API, token, acc.ua) + if user_probe.get("code") == 0: + ok(f"{acc.remark} 签到端使用缓存 token") + else: + token = "" + warn(f"{acc.remark} 签到端缓存 token 已失效,重新登录") + + if not token: + code = get_mini_program_code(acc.wxid, SIGN_APPID) + ok(f"{acc.remark} 签到端获取 code 成功: {mask(code)}") + token = sign_login(code, acc.ua) + cache_update_account(acc.wxid, {"sign_token": token, "sign_token_ts": int(time.time()), "ua": acc.ua, "remark": acc.remark}) + + acc.sign_token = token + ok(f"{acc.remark} 签到端登录成功,开始签到") + + user = sign_get_json(SIGN_USER_API, token, acc.ua) + debug("签到端用户信息", user) + nickname = "" + try: + nickname = (user.get("data") or {}).get("member", {}).get("nick_name") or (user.get("data") or {}).get("visitor", {}).get("open_id") or "" + except Exception: + nickname = "" + + act_id = get_dynamic_sign_act_id(token, acc.ua) + if act_id: + sign_url = f"{SIGN_BASE}/scrm-promotion-service/promotion/sign/today?promotionId={act_id}" + else: + sign_url = SIGN_FIXED_API + + sign_res = sign_get_json(sign_url, token, acc.ua) + debug("签到响应", sign_res) + msg = sign_res.get("msg") or sign_res.get("message") or sign_res.get("info") or "" + if sign_res.get("code") == 0: + sign_msg = msg or "签到成功" + ok(f"{acc.remark} 签到成功: {sign_msg}") + return {"success": True, "nickname": nickname, "sign_msg": sign_msg} + else: + sign_msg = msg or "签到失败" + if "今日已签到" in sign_msg or "已签到" in sign_msg: + ok(f"{acc.remark} 签到已完成: {sign_msg}") + return {"success": True, "nickname": nickname, "sign_msg": sign_msg} + warn(f"{acc.remark} 签到结果: {sign_msg}") + return {"success": False, "nickname": nickname, "sign_msg": sign_msg} + + +# ===================== 五粮浓香抽奖端 ===================== +def lottery_headers(token: str = "", ua: str = "") -> Dict[str, str]: + h = { + "Connection": "keep-alive", + "User-Agent": ua or random.choice(UA_LIST), + "Accept": "*/*", + "xweb_xhr": "1", + "terminal": "10", + "content-type": "application/json;charset=UTF-8", + "platform": "WechatMiniProgram", + "tenant-id": "1", + "Referer": f"https://servicewechat.com/{LOTTERY_APPID}/46/page-frame.html", + "Accept-Encoding": "gzip, deflate, br", + "Accept-Language": "zh-CN,zh;q=0.9", + } + if token: + # 浓友购接口抓包里是 authorization: token,不能加 Bearer。 + h["authorization"] = token + return h + + +def extract_lottery_token(data: Dict[str, Any]) -> str: + candidates = [ + data.get("accessToken"), + data.get("token"), + data.get("authorization"), + (data.get("data") or {}).get("accessToken") if isinstance(data.get("data"), dict) else None, + (data.get("data") or {}).get("token") if isinstance(data.get("data"), dict) else None, + (data.get("data") or {}).get("authorization") if isinstance(data.get("data"), dict) else None, + ] + for token in candidates: + if token: + return str(token) + return "" + + +def lottery_login(code: str, ua: str) -> str: + payload = {"loginCode": code, "state": "default", "sourceId": "1"} + resp = requests.post(LOTTERY_LOGIN_URL, headers=lottery_headers(ua=ua), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL) + data = safe_json_response(resp) + debug("浓友购抽奖端登录响应", data if data is not None else resp.text[:1000]) + if not data: + raise RuntimeError(f"抽奖端登录响应不是 JSON: HTTP {resp.status_code}, {resp.text[:200]}") + if data.get("code") not in (0, 200): + raise RuntimeError(data.get("msg") or data.get("message") or data.get("info") or "抽奖端登录失败") + token = extract_lottery_token(data) + if not token: + raise RuntimeError("抽奖端登录成功但未返回 accessToken/token;请开 DEBUG=true 查看响应字段") + return token + + +def lottery_get_json(url: str, token: str, ua: str) -> Dict[str, Any]: + resp = requests.get(url, headers=lottery_headers(token, ua), timeout=TIMEOUT, verify=VERIFY_SSL) + data = safe_json_response(resp) + if data is None: + return {"code": -1, "msg": f"JSON解析失败: HTTP {resp.status_code}; body={resp.text[:200]}"} + return data + + +def lottery_post_json(url: str, token: str, ua: str, body: Dict[str, Any]) -> Dict[str, Any]: + resp = requests.post(url, headers=lottery_headers(token, ua), json=body, timeout=TIMEOUT, verify=VERIFY_SSL) + data = safe_json_response(resp) + if data is None: + return {"code": -1, "msg": f"JSON解析失败: HTTP {resp.status_code}; body={resp.text[:200]}"} + return data + + +def normalize_activity_list(payload: Any) -> List[Dict[str, Any]]: + """兼容活动列表返回结构:data 可能是 list,也可能包在 records/list/items/rows 里。""" + if isinstance(payload, list): + return [x for x in payload if isinstance(x, dict)] + if isinstance(payload, dict): + for key in ("records", "list", "items", "rows", "activityList", "data"): + value = payload.get(key) + if isinstance(value, list): + return [x for x in value if isinstance(x, dict)] + if isinstance(value, dict): + nested = normalize_activity_list(value) + if nested: + return nested + return [] + + +def get_activity_id(token: str, ua: str) -> Optional[Any]: + if LOTTERY_ACTIVITY_ID: + if str(LOTTERY_ACTIVITY_ID).startswith("PI"): + warn("当前 LOTTERY_ACTIVITY_ID 看起来是酒馆签到端 promotionId,不是浓友购 activityId;将忽略它并尝试自动抓取活动") + else: + log(f"使用手动抽奖/活动ID: {LOTTERY_ACTIVITY_ID}") + return LOTTERY_ACTIVITY_ID + + data = lottery_get_json(LOTTERY_ACTIVITY_API, token, ua) + debug("浓友购活动列表响应", data) + if data.get("code") not in (0, 200): + warn(f"活动列表请求失败: {data.get('msg') or data.get('message') or data.get('info') if isinstance(data, dict) else data}") + return None + + activities = normalize_activity_list(data.get("data")) + if not activities: + warn("浓友购活动列表为空或结构未识别;请用 DEBUG=true 查看响应,或设置 LOTTERY_ACTIVITY_ID 手动指定") + if not DEBUG: + print("活动列表原始响应预览:") + try: + print(json.dumps(data, ensure_ascii=False)[:800]) + except Exception: + print(str(data)[:800]) + return None + + def get_title(act: Dict[str, Any]) -> str: + return str(act.get("title") or act.get("name") or act.get("activityName") or act.get("activityTitle") or "") + + def get_id(act: Dict[str, Any]) -> Any: + return act.get("id") or act.get("activityId") or act.get("activity_id") or act.get("actId") + + print(f"📋 抽奖活动列表 ({len(activities)} 个):") + for i, act in enumerate(activities[:10], 1): + print(f" {i}. {get_title(act) or '无标题'} ID={get_id(act)} 状态={act.get('status')}") + + lottery_activities = [ + act for act in activities + if ("抽奖" in get_title(act) or "转盘" in get_title(act) or "红包" in get_title(act) or "积分" in get_title(act)) + and (act.get("status") in (None, 0, 1, "0", "1", True)) + and get_id(act) + ] + chosen = lottery_activities[0] if lottery_activities else next((act for act in activities if get_id(act)), None) + if not chosen: + warn("活动列表里没有可用 ID;请设置 LOTTERY_ACTIVITY_ID 手动指定") + return None + + activity_id = get_id(chosen) + log(f"抽奖活动: {get_title(chosen) or '无标题'} ID={activity_id}") + return activity_id + + +def parse_integral_value(data: Dict[str, Any]) -> Optional[int]: + payload = data.get("data") or {} + integral = payload.get("integral") or payload.get("point") or payload.get("points") or payload.get("balance") + try: + return int(float(integral)) + except Exception: + return None + + +def get_integral_detail(token: str, ua: str) -> Dict[str, Any]: + data = lottery_get_json(LOTTERY_INTEGRAL_API, token, ua) + debug("积分响应", data) + integral = parse_integral_value(data) if isinstance(data, dict) else None + if not isinstance(data, dict): + return {"ok": False, "integral": None, "message": "积分响应不是 JSON", "auth_invalid": False} + if data.get("code") not in (0, 200): + msg = str(data.get("msg") or data.get("message") or data.get("info") or "积分请求失败") + auth_invalid = any(k in msg for k in ["未登录", "登录已过期", "token", "Token", "认证", "授权"]) + warn(f"积分请求失败: {msg}") + return {"ok": False, "integral": integral, "message": msg, "auth_invalid": auth_invalid} + if integral is None: + msg = str(data.get("msg") or data.get("message") or data.get("info") or "未找到积分字段") + return {"ok": False, "integral": None, "message": msg, "auth_invalid": False} + return {"ok": True, "integral": integral, "message": "", "auth_invalid": False} + + +def get_integral(token: str, ua: str) -> Optional[int]: + detail = get_integral_detail(token, ua) + return detail.get("integral") if detail.get("ok") else None + + +def receive_cash(record_id: Any, token: str, ua: str) -> str: + if not LOTTERY_RECEIVE_API: + return "未配置红包领取接口,跳过自动领取" + data = lottery_post_json(LOTTERY_RECEIVE_API, token, ua, {"id": record_id}) + debug("红包领取响应", data) + if data.get("code") in (0, 200): + return "红包领取成功" + return f"红包领取失败:{data.get('msg') or data.get('message') or data.get('info') or '未知错误'}" + + +def parse_prize_value(prize_name: str) -> Dict[str, float]: + text = str(prize_name or "") + money = 0.0 + points = 0.0 + money_matches = re.findall(r"(\d+(?:\.\d+)?)\s*元", text) + if money_matches: + try: + money = sum(float(x) for x in money_matches) + except Exception: + money = 0.0 + point_matches = re.findall(r"(\d+(?:\.\d+)?)\s*积分", text) + if point_matches: + try: + points = sum(float(x) for x in point_matches) + except Exception: + points = 0.0 + return {"money": money, "points": points} + + +def find_first_key(obj: Any, keys: set) -> Any: + if isinstance(obj, dict): + for k, v in obj.items(): + if k in keys and v not in (None, ""): + return v + for v in obj.values(): + found = find_first_key(v, keys) + if found not in (None, ""): + return found + elif isinstance(obj, list): + for item in obj: + found = find_first_key(item, keys) + if found not in (None, ""): + return found + return None + + +def draw_once(activity_id: Any, token: str, ua: str) -> Dict[str, Any]: + draw_url = f"{LOTTERY_DRAW_API}?{urlencode({'activityId': activity_id})}" + data = lottery_post_json(draw_url, token, ua, {"activityId": str(activity_id)}) + debug("抽奖响应", data) + if data.get("code") in (0, 200): + body = data.get("data") or {} + prize_name = ( + find_first_key(body, {"prizeName", "prize_name", "name", "title", "rewardName", "couponName"}) + or "抽奖成功" + ) + draw_id = find_first_key(body, {"id", "recordId", "record_id", "drawRecordId", "receiveId", "prizeRecordId"}) + receive_msg = "" + if draw_id: + receive_msg = receive_cash(draw_id, token, ua) + return {"status": "success", "prize": str(prize_name), "receive_msg": receive_msg} + + info = data.get("msg") or data.get("message") or data.get("info") or "未知错误" + if "积分" in str(info) and ("不足" in str(info) or "余额" in str(info)): + return {"status": "insufficient", "message": info} + return {"status": "failed", "message": info} + + +def run_lottery_task(acc: Account) -> Dict[str, Any]: + cache_item = cache_get_account(acc.wxid) + token = cache_item.get("lottery_token") if is_cache_fresh(cache_item, "lottery_token") else "" + if token: + probe_detail = get_integral_detail(token, acc.ua) + if probe_detail.get("ok"): + ok(f"{acc.remark} 抽奖端使用缓存 token") + elif probe_detail.get("auth_invalid"): + token = "" + warn(f"{acc.remark} 抽奖端缓存 token 已失效,重新登录") + else: + ok(f"{acc.remark} 抽奖端缓存 token 可用,但当前积分不可用:{probe_detail.get('message')}") + + if not token: + code = get_mini_program_code(acc.wxid, LOTTERY_APPID) + ok(f"{acc.remark} 抽奖端获取 code 成功: {mask(code)}") + token = lottery_login(code, acc.ua) + cache_update_account(acc.wxid, {"lottery_token": token, "lottery_token_ts": int(time.time()), "ua": acc.ua, "remark": acc.remark}) + + acc.lottery_token = token + ok(f"{acc.remark} 抽奖端登录成功,开始抽奖") + + activity_id = get_activity_id(token, acc.ua) + if not activity_id: + return {"success": False, "lottery_msgs": [], "draw_records": [], "integral": None, "message": "未找到有效抽奖活动ID"} + + lottery_msgs: List[str] = [] + draw_records: List[Dict[str, Any]] = [] + consecutive_failures = 0 + draw_count = 0 + total_cost = 0 + total_prize_points = 0.0 + total_prize_money = 0.0 + start_integral: Optional[int] = None + last_integral: Optional[int] = None + + first_detail = get_integral_detail(token, acc.ua) + if not first_detail.get("ok"): + lottery_msgs.append(f"积分不可用或账号不是品牌会员,停止抽奖:{first_detail.get('message') or ''}".strip(":")) + return { + "success": True, + "lottery_msgs": lottery_msgs, + "draw_records": draw_records, + "integral": first_detail.get("integral"), + "start_integral": first_detail.get("integral"), + "total_cost": 0, + "total_prize_points": 0, + "total_prize_money": 0.0, + "draw_count": 0, + "activity_id": activity_id, + } + start_integral = int(first_detail["integral"]) + last_integral = start_integral + log(f"{acc.remark} 签到后/抽奖前积分:{start_integral}") + + if start_integral < LOTTERY_COST: + lottery_msgs.append(f"当前积分 {start_integral},不足 {LOTTERY_COST},停止抽奖") + return { + "success": True, + "lottery_msgs": lottery_msgs, + "draw_records": draw_records, + "integral": start_integral, + "start_integral": start_integral, + "total_cost": 0, + "total_prize_points": 0, + "total_prize_money": 0.0, + "draw_count": 0, + "activity_id": activity_id, + } + + while consecutive_failures < MAX_LOTTERY_FAILS and draw_count < LOTTERY_DAILY_LIMIT: + before_integral = last_integral + if before_integral is None: + detail = get_integral_detail(token, acc.ua) + if not detail.get("ok"): + lottery_msgs.append(f"积分不可用或账号不是品牌会员,停止抽奖:{detail.get('message') or ''}".strip(":")) + break + before_integral = int(detail["integral"]) + last_integral = before_integral + + if before_integral < LOTTERY_COST: + lottery_msgs.append(f"当前积分 {before_integral},不足 {LOTTERY_COST},停止抽奖") + break + + draw = draw_once(activity_id, token, acc.ua) + if draw["status"] == "success": + consecutive_failures = 0 + draw_count += 1 + prize_name = str(draw.get("prize") or "未知奖励") + prize_value = parse_prize_value(prize_name) + after_detail = get_integral_detail(token, acc.ua) + after_integral = after_detail.get("integral") if after_detail.get("ok") else None + cost = LOTTERY_COST if after_integral is None else max(0, before_integral + int(prize_value.get("points") or 0) - int(after_integral)) + total_cost += int(cost) + total_prize_points += float(prize_value.get("points") or 0) + total_prize_money += float(prize_value.get("money") or 0) + last_integral = int(after_integral) if after_integral is not None else None + record = { + "index": draw_count, + "before_integral": before_integral, + "cost": int(cost), + "prize": prize_name, + "prize_points": float(prize_value.get("points") or 0), + "prize_money": float(prize_value.get("money") or 0), + "after_integral": after_integral, + "receive_msg": draw.get("receive_msg") or "", + } + draw_records.append(record) + msg = ( + f"第{draw_count}/{LOTTERY_DAILY_LIMIT}次抽奖:抽前积分 {before_integral}," + f"消耗 {int(cost)},抽中:{prize_name},中奖后积分 {after_integral if after_integral is not None else '未知'}" + ) + if draw.get("receive_msg"): + msg += f";{draw.get('receive_msg')}" + lottery_msgs.append(msg) + ok(f"{acc.remark} {msg}") + elif draw["status"] == "insufficient": + lottery_msgs.append(f"积分不足:{draw.get('message')}") + break + else: + message = str(draw.get('message') or '') + if "次数已达上限" in message or "今日参与次数" in message or "已达上限" in message: + msg = f"抽奖停止:{message}" + lottery_msgs.append(msg) + warn(f"{acc.remark} {msg}") + break + consecutive_failures += 1 + msg = f"抽奖失败({consecutive_failures}/{MAX_LOTTERY_FAILS}):{message or '未知错误'}" + lottery_msgs.append(msg) + warn(f"{acc.remark} {msg}") + + if draw_count >= LOTTERY_DAILY_LIMIT: + limit_msg = f"已达到脚本设置的每日最多参与 {LOTTERY_DAILY_LIMIT} 次,停止抽奖" + lottery_msgs.append(limit_msg) + log(f"{acc.remark} {limit_msg}") + break + + delay = random.randint(LOTTERY_DELAY_MIN, LOTTERY_DELAY_MAX) + time.sleep(delay) + + return { + "success": True, + "lottery_msgs": lottery_msgs, + "draw_records": draw_records, + "integral": last_integral, + "start_integral": start_integral, + "total_cost": total_cost, + "total_prize_points": total_prize_points, + "total_prize_money": total_prize_money, + "draw_count": draw_count, + "activity_id": activity_id, + } + + +# ===================== 主流程 ===================== +def process_account(acc: Account, index: int, total: int) -> Dict[str, Any]: + acc.ua = get_cached_ua(acc.wxid) + print(f"\n--- 账号 {index}/{total}: {acc.remark} ({mask(acc.wxid)}) ---") + log(f"{acc.remark} 使用固定 UA: {mask(acc.ua, 24, 18)}") + result: Dict[str, Any] = {"remark": acc.remark, "wxid": acc.wxid, "success": True, "ua": acc.ua} + + if TASK_MODE in {"all", "sign"}: + try: + result["sign"] = run_sign_task(acc) + except Exception as e: + warn(f"{acc.remark} 签到任务失败: {e}") + if DEBUG: + traceback.print_exc() + result["success"] = False + result["sign"] = {"success": False, "sign_msg": str(e)} + + if TASK_MODE in {"all", "lottery"}: + try: + result["lottery"] = run_lottery_task(acc) + except Exception as e: + warn(f"{acc.remark} 抽奖任务失败: {e}") + if DEBUG: + traceback.print_exc() + result["success"] = False + result["lottery"] = {"success": False, "lottery_msgs": [], "message": str(e)} + + return result + + +def should_skip_account(acc: Account) -> bool: + if EXCLUDE_WXIDS and acc.wxid in EXCLUDE_WXIDS: + return True + if SINGLE_TEST_WXID and acc.wxid != SINGLE_TEST_WXID: + return True + return False + + +def load_accounts() -> List[Account]: + accounts = get_accounts_from_manual() if ACCOUNT_SOURCE == "manual" else get_accounts_from_cloud() + if EXCLUDE_WXIDS: + accounts = [a for a in accounts if a.wxid not in EXCLUDE_WXIDS] + log(f"已排除 {len(EXCLUDE_WXIDS)} 个 wxid,剩余 {len(accounts)}") + if SINGLE_TEST_WXID: + accounts = [a for a in accounts if a.wxid == SINGLE_TEST_WXID] + log(f"单号测试 wxid={SINGLE_TEST_WXID},匹配 {len(accounts)} 个") + return accounts + + +def iter_selected_accounts(): + """逐个产生账号。cloud 模式默认 pageSize=1,处理完一个才取下一页。""" + if ACCOUNT_SOURCE == "manual": + accounts = get_accounts_from_manual() + selected = [a for a in accounts if not should_skip_account(a)] + total = len(selected) + for idx, acc in enumerate(selected, 1): + yield acc, idx, total + return + + matched = 0 + for acc, seen, total in iter_accounts_from_cloud(): + if should_skip_account(acc): + continue + matched += 1 + yield acc, seen if not SINGLE_TEST_WXID else matched, total if not SINGLE_TEST_WXID else matched + if SINGLE_TEST_WXID and acc.wxid == SINGLE_TEST_WXID: + break + + +def build_report(results: List[Dict[str, Any]]) -> str: + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + elapsed = time.time() - RUN_START_TS + total = len(results) + success_count = sum(1 for r in results if r.get("success")) + sign_ok = sum(1 for r in results if (r.get("sign") or {}).get("success")) + draw_total = sum(int((r.get("lottery") or {}).get("draw_count") or 0) for r in results) + total_cost = sum(int((r.get("lottery") or {}).get("total_cost") or 0) for r in results) + total_prize_points = sum(float((r.get("lottery") or {}).get("total_prize_points") or 0) for r in results) + total_prize_money = sum(float((r.get("lottery") or {}).get("total_prize_money") or 0) for r in results) + + lines = [ + "", + "==================================================", + "📊 执行汇总", + "==================================================", + f"⏱️ 执行时间: {now}", + f"👥 总账号数: {total}", + f"✅ 成功账号: {success_count}", + f"📝 签到成功: {sign_ok}", + f"🎰 总成功抽奖: {draw_total}", + f"📱 总消耗积分: {total_cost}", + f"📈 总中奖积分: {int(total_prize_points)}", + f"💰 总中奖金额: {total_prize_money:.2f}元", + f"🎯 单号抽奖上限: {LOTTERY_DAILY_LIMIT}", + f"🧩 单次抽奖消耗: {LOTTERY_COST}积分", + f"📦 缓存文件: {CACHE_FILE}", + f"🧭 UA数量: {len(UA_LIST)}", + "", + "📋 账号详情:", + ] + + for idx, r in enumerate(results, 1): + remark = r.get("remark") or mask(r.get("wxid")) + l = r.get("lottery") or {} + draw_records = l.get("draw_records") or [] + if not isinstance(draw_records, list): + draw_records = [] + draw_count = int(l.get("draw_count") or 0) + + money = float(l.get("total_prize_money") or 0) + points = float(l.get("total_prize_points") or 0) + prizes = [str(x.get("prize") or "未知奖励") for x in draw_records] or ["无"] + icon = "✅" if draw_count > 0 else "❌" + lines.append( + f"{icon} 账号{idx}: {remark} | 抽奖: {draw_count}次 | " + f"消耗: {int(l.get('total_cost') or 0)}积分 | 中奖积分: {int(points)} | " + f"金额: {money:.2f}元 | 奖品: {', '.join(prizes)}" + ) + if l: + lines.append(f" ├─ 签到后/抽奖前积分: {l.get('start_integral') if l.get('start_integral') is not None else '-'}") + for rec in draw_records: + lines.append( + f" ├─ 第{rec.get('index')}次: 抽前 {rec.get('before_integral')}," + f"消耗 {rec.get('cost')},奖品 {rec.get('prize')}," + f"中奖积分 +{int(float(rec.get('prize_points') or 0))}," + f"中奖金额 +{float(rec.get('prize_money') or 0):.2f}元," + f"中奖后积分 {rec.get('after_integral') if rec.get('after_integral') is not None else '-'}" + ) + if l.get("integral") is not None: + lines.append(f" └─ 最后积分: {l.get('integral')}") + else: + stop_msg = (l.get("lottery_msgs") or [l.get("message") or "无抽奖结果"])[-1] + lines.append(f" └─ 结果: {stop_msg}") + if "sign" in r: + s = r.get("sign") or {} + lines.append(f" 📝 签到: {s.get('sign_msg') or s.get('message') or '无结果'}") + + lines.append(f"=== 执行结束 [{now}] 耗时 {elapsed:.2f} 秒 退出码 0 ===") + return "\n".join(lines) + + +def write_run_log(report: str) -> Path: + log_dir = Path(__file__).with_name("logs") + log_dir.mkdir(parents=True, exist_ok=True) + path = log_dir / f"nw_jg_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" + content = "\n".join(SESSION_LOG_LINES) + "\n" + report + "\n" + path.write_text(content, encoding="utf-8") + return path + + +def main() -> None: + print("========== 浓五酒馆 - 养鸡场自动登录任务一体版 ==========") + log(f"TASK_MODE={TASK_MODE}") + log(f"WX_CLOUD={WX_CLOUD or '(未设置)'}") + log(f"ACCOUNT_SOURCE={ACCOUNT_SOURCE}") + log(f"SIGN_APPID={SIGN_APPID}") + log(f"LOTTERY_APPID={LOTTERY_APPID}") + ensure_config() + + print("\n=== 任务开始 ===") + results: List[Dict[str, Any]] = [] + processed = 0 + for acc, i, total in iter_selected_accounts(): + if processed == 0: + print(f"账号总数: {total if not SINGLE_TEST_WXID else 1}") + processed += 1 + results.append(process_account(acc, i if not SINGLE_TEST_WXID else processed, total if not SINGLE_TEST_WXID else 1)) + if DELAY > 0: + time.sleep(DELAY) + + if not results: + raise RuntimeError("没有可处理账号") + + report = build_report(results) + print(report) + log_path = write_run_log(report) + print(f"\n🧾 日志文件:{log_path}") + print("\n=== 任务完成 ===") + + +if __name__ == "__main__": + try: + main() + except Exception as e: + err("主流程错误:", e) + if DEBUG: + traceback.print_exc() + sys.exit(1) + + \ No newline at end of file