# new Env("浓五的酒馆签到+抽奖") # cron: 15 11 * * * #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 浓五酒馆 / 五粮浓香积分商城 - 养鸡场自动登录并执行签到、抽奖一体版 version: 1.0.0 功能: 1. 从养鸡场/微信协议服务读取微信账号列表,或手动指定 wxid 2. 对“浓五的酒馆签到”小程序获取 code -> 登录 -> 内部拿 token -> 执行签到 3. 对“五粮浓香积分商城抽奖”小程序获取 code -> 登录 -> 内部拿 token -> 积分抽奖/红包领取 4. 不打印 token,只打印执行结果 仅用于你自己有权限的微信账号和服务。 ==================== 必填环境变量 ==================== wx_cloud / WECHAT_SERVER 养鸡场地址,例如:http://127.0.0.1:666 wx_token 养鸡场 Authorization,支持裸 token 或 Bearer token,脚本自动识别 ==================== 可选环境变量 ==================== TASK_MODE 执行模式:all / sign / lottery,默认 all ACCOUNT_SOURCE 账号来源:cloud / manual,默认 cloud MANUAL_ACCOUNTS 手动账号,格式:备注#wxid,一行一个,仅 ACCOUNT_SOURCE=manual 时使用 SINGLE_TEST_WXID 只测试一个 wxid EXCLUDE_WXIDS 排除 wxid,支持换行分隔,也兼容英文/中文逗号分隔 CLOUD_PAGE_SIZE 养鸡场分页大小,默认 1;用于一个账号处理完再取下一个 AUTO_BEARER 是否自动给 wx_token 补 Bearer,默认 true;如接口要求裸 token,设 false DELAY 账号间延迟秒数,默认 5 TIMEOUT 请求超时秒数,默认 20 DEBUG true/false,默认 false VERIFY_SSL true/false,默认 false MAX_LOTTERY_FAILS 抽奖连续失败上限,默认 3 LOTTERY_DELAY_MIN 抽奖间隔最小秒数,默认 8 LOTTERY_DELAY_MAX 抽奖间隔最大秒数,默认 15 LOTTERY_ACTIVITY_ID 手动指定【抽奖端】活动ID;不填则从浓友购活动列表自动抓取 LOTTERY_DAILY_LIMIT 每个账号每天最多抽奖次数,默认 3 LOTTERY_COST 每次抽奖消耗积分,默认 50 CACHE_TTL token 缓存秒数,默认 259200;过期或验证失败自动重新登录 NWJG_CACHE_FILE token/UA 缓存文件,默认脚本同目录 nw_jg_cookie_cache.json UA_FILE UA 文件路径,默认脚本同目录 UA_LIST.txt;一行一个 UA,支持 # 注释 ==================== 示例 ==================== export wx_cloud="http://127.0.0.1:666" export wx_token="Bearer xxxxx" python3 nw_jg_yjc_auto.py 只签到: export TASK_MODE="sign" python3 nw_jg_yjc_auto.py 只抽奖: export TASK_MODE="lottery" python3 nw_jg_yjc_auto.py """ import json import os import random import re import sys import time import traceback from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from urllib.parse import urlencode try: import requests except ImportError: print("缺少 requests,请先安装:pip install requests") sys.exit(1) try: import urllib3 urllib3.disable_warnings() except Exception: pass # ===================== 项目固定配置 ===================== SIGN_APPID = "wxed3cf95a14b58a26" SIGN_BASE = "https://stdcrm.dtmiller.com" SIGN_LOGIN_URL = f"{SIGN_BASE}/std-weixin-mp-service/miniApp/custom/login" SIGN_USER_API = f"{SIGN_BASE}/scrm-promotion-service/mini/wly/user/info" SIGN_MODULE_API = f"{SIGN_BASE}/scrm-promotion-service/mini/module/config/list" SIGN_FIXED_API = f"{SIGN_BASE}/scrm-promotion-service/promotion/sign/today?promotionId=PI69eb321d37c48c000a05ee4e" # 抽奖实际跳转到“浓友购/五粮浓香积分商城”小程序,不是旧 jf.wlnxjc.com/mini 那套。 LOTTERY_APPID = "wx99fa98e883130aa3" LOTTERY_BASE = "https://www.wlnxjc.com:8088" LOTTERY_LOGIN_URL = f"{LOTTERY_BASE}/app-api/member/auth/weixin-mini-app-login" LOTTERY_ACTIVITY_API = f"{LOTTERY_BASE}/app-api/promotion/activity/list" LOTTERY_ACTIVITY_DETAIL_API = f"{LOTTERY_BASE}/app-api/promotion/activity/get-detail" LOTTERY_INTEGRAL_API = f"{LOTTERY_BASE}/app-api/member/integral/get" # 浓友购积分大转盘提交/领取接口。 LOTTERY_DRAW_API_DEFAULT = f"{LOTTERY_BASE}/app-api/promotion/activity/draw" LOTTERY_RECEIVE_API_DEFAULT = f"{LOTTERY_BASE}/app-api/promotion/activity/receive" DEFAULT_UA_LIST = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) UnifiedPCWindowsWechat(0xf2541939) XWEB/19841", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.6778.86 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) UnifiedPCWindowsWechat(0xf2541939) XWEB/19763", "Mozilla/5.0 (Linux; Android 15; 24129PN74C Build/AQ3A.240912.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/132.0.6834.163 Mobile Safari/537.36 XWEB/1320093 MMWEBSDK/20250201 MMWEBID/128 MicroMessenger/8.0.56.2800(0x2800383D) WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android", "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.55(0x1800372f) NetType/WIFI Language/zh_CN MiniProgramEnv/ios", ] def load_ua_list() -> List[str]: ua_file = Path(os.environ.get("UA_FILE", Path(__file__).with_name("UA_LIST.txt"))) if ua_file.exists(): uas: List[str] = [] for raw in ua_file.read_text(encoding="utf-8").splitlines(): line = raw.strip() if not line or line.startswith("#"): continue uas.append(line) if uas: return uas return DEFAULT_UA_LIST UA_LIST = load_ua_list() # ===================== 环境变量 ===================== ENV = os.environ WX_CLOUD = (ENV.get("wx_cloud") or ENV.get("WECHAT_SERVER") or "").rstrip("/") WX_TOKEN = ENV.get("wx_token", "").strip() AUTO_BEARER = ENV.get("AUTO_BEARER", "true").lower() != "false" if AUTO_BEARER and WX_TOKEN and not WX_TOKEN.lower().startswith("bearer "): WX_TOKEN = f"Bearer {WX_TOKEN}" TASK_MODE = ENV.get("TASK_MODE", "all").lower() # all/sign/lottery ACCOUNT_SOURCE = ENV.get("ACCOUNT_SOURCE", "cloud").lower() MANUAL_ACCOUNTS = ENV.get("MANUAL_ACCOUNTS", "") SINGLE_TEST_WXID = ENV.get("SINGLE_TEST_WXID", "").strip() EXCLUDE_WXIDS = [x.strip() for x in re.split(r"[\n,,]+", ENV.get("EXCLUDE_WXIDS", "")) if x.strip()] CLOUD_PAGE_SIZE = max(1, int(ENV.get("CLOUD_PAGE_SIZE", "1"))) DELAY = float(ENV.get("DELAY", "5")) TIMEOUT = float(ENV.get("TIMEOUT", "20")) DEBUG = ENV.get("DEBUG", "false").lower() == "true" VERIFY_SSL = ENV.get("VERIFY_SSL", "false").lower() == "true" MAX_LOTTERY_FAILS = int(ENV.get("MAX_LOTTERY_FAILS", "3")) LOTTERY_DAILY_LIMIT = int(ENV.get("LOTTERY_DAILY_LIMIT", "3")) LOTTERY_COST = int(ENV.get("LOTTERY_COST", "50")) LOTTERY_DELAY_MIN = int(ENV.get("LOTTERY_DELAY_MIN", "8")) LOTTERY_DELAY_MAX = int(ENV.get("LOTTERY_DELAY_MAX", "15")) LOTTERY_ACTIVITY_ID = ENV.get("LOTTERY_ACTIVITY_ID", "").strip() LOTTERY_DRAW_API = ENV.get("LOTTERY_DRAW_API", LOTTERY_DRAW_API_DEFAULT).strip() LOTTERY_RECEIVE_API = ENV.get("LOTTERY_RECEIVE_API", LOTTERY_RECEIVE_API_DEFAULT).strip() CACHE_FILE = Path(ENV.get("NWJG_CACHE_FILE", Path(__file__).with_name("nw_jg_cookie_cache.json"))) CACHE_TTL = int(ENV.get("CACHE_TTL", "259200")) # 默认 3 天,服务端失效会自动重登 SESSION_LOG_LINES: List[str] = [] RUN_START_TS = time.time() @dataclass class Account: remark: str wxid: str sign_token: str = "" lottery_token: str = "" ua: str = "" # ===================== 基础工具 ===================== def emit(prefix: str, *args): line = f"{prefix} {' '.join(str(a) for a in args)}" print(line) SESSION_LOG_LINES.append(line) def log(*args): emit("[INFO]", *args) def ok(*args): emit("[ OK ]", *args) def warn(*args): emit("[WARN]", *args) def err(*args): emit("[ERR ]", *args) def debug(title: str, data: Any): if not DEBUG: return print(f"\n[DEBUG] {title}") try: print(json.dumps(data, ensure_ascii=False, indent=2)) except Exception: print(data) def mask(value: Any, keep_start: int = 8, keep_end: int = 4) -> str: s = str(value or "") if not s: return "" if len(s) <= keep_start + keep_end: return "***" return s[:keep_start] + "***" + s[-keep_end:] def safe_json_response(resp: requests.Response) -> Optional[Any]: try: return resp.json() except Exception: return None def load_cache() -> Dict[str, Any]: if not CACHE_FILE.exists(): return {"accounts": {}} try: data = json.loads(CACHE_FILE.read_text(encoding="utf-8")) return data if isinstance(data, dict) else {"accounts": {}} except Exception: return {"accounts": {}} def save_cache(cache: Dict[str, Any]) -> None: CACHE_FILE.parent.mkdir(parents=True, exist_ok=True) CACHE_FILE.write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8") def choose_stable_ua(wxid: str) -> str: # 只在首次没有缓存时选一次;之后跟随 cookie/token 缓存,避免每次换设备指纹。 idx = abs(hash(wxid)) % len(UA_LIST) return UA_LIST[idx] def cache_get_account(wxid: str) -> Dict[str, Any]: cache = load_cache() accounts = cache.setdefault("accounts", {}) return accounts.setdefault(wxid, {}) def cache_update_account(wxid: str, updates: Dict[str, Any]) -> None: cache = load_cache() accounts = cache.setdefault("accounts", {}) item = accounts.setdefault(wxid, {}) item.update(updates) item["updated_at"] = int(time.time()) save_cache(cache) def get_cached_ua(wxid: str) -> str: item = cache_get_account(wxid) ua = item.get("ua") if not ua: ua = choose_stable_ua(wxid) cache_update_account(wxid, {"ua": ua}) return ua def is_cache_fresh(item: Dict[str, Any], key: str) -> bool: value = item.get(key) ts = int(item.get(f"{key}_ts") or 0) return bool(value) and (time.time() - ts < CACHE_TTL) def ensure_config(): if not WX_CLOUD: raise RuntimeError("缺少 wx_cloud 或 WECHAT_SERVER") if not WX_TOKEN: raise RuntimeError("缺少 wx_token") if TASK_MODE not in {"all", "sign", "lottery"}: raise RuntimeError("TASK_MODE 只支持 all / sign / lottery") # ===================== 养鸡场账号与 code ===================== def account_from_cloud_row(item: Dict[str, Any]) -> Optional[Account]: wxid = item.get("wxId") or item.get("wxid") or item.get("Wxid") or "" if not wxid: return None remark = item.get("wxName") or item.get("wxname") or item.get("remark") or item.get("nickName") or mask(wxid) return Account(remark=remark, wxid=wxid) def fetch_cloud_account_page(page_num: int, page_size: int) -> tuple[List[Account], int]: """按页读取养鸡场账号。默认 page_size=1,实现处理完一个账号再取下一个账号。""" url = f"{WX_CLOUD}/prod-api/wechat/wechat/list" headers = {"Authorization": WX_TOKEN, "Content-Type": "application/json"} params = {"pageNum": page_num, "pageSize": page_size} resp = requests.get(url, headers=headers, params=params, timeout=TIMEOUT, verify=VERIFY_SSL) data = safe_json_response(resp) if DEBUG: preview = data if isinstance(data, dict) and isinstance(data.get("rows"), list): preview = {**data, "rows": [ {"wxId": r.get("wxId") or r.get("wxid") or r.get("Wxid"), "wxName": r.get("wxName") or r.get("wxname")} for r in data.get("rows", []) ]} debug(f"养鸡场账号列表响应 page={page_num} size={page_size}", preview if preview is not None else resp.text[:1000]) if data and data.get("code") == 200 and isinstance(data.get("rows"), list): accounts = [acc for item in data["rows"] if (acc := account_from_cloud_row(item))] total = int(data.get("total") or len(accounts)) return accounts, total raise RuntimeError((data or {}).get("msg") or (data or {}).get("message") or f"获取养鸡场账号列表失败: HTTP {resp.status_code}") def iter_accounts_from_cloud(): page = 1 seen = 0 while True: accounts, total = fetch_cloud_account_page(page, CLOUD_PAGE_SIZE) if not accounts: break for acc in accounts: seen += 1 yield acc, seen, total if seen >= total: break page += 1 def get_accounts_from_cloud() -> List[Account]: # 兼容 SINGLE_TEST_WXID:需要扫描分页直到找到匹配账号。 return [acc for acc, _, _ in iter_accounts_from_cloud()] def get_accounts_from_manual() -> List[Account]: accounts: List[Account] = [] for line in MANUAL_ACCOUNTS.splitlines(): line = line.strip() if not line: continue parts = [p.strip() for p in line.split("#") if p.strip()] if len(parts) >= 2: remark, wxid = parts[0], parts[1] else: remark, wxid = parts[0], parts[0] accounts.append(Account(remark=remark or mask(wxid), wxid=wxid)) return accounts def get_mini_program_code(wxid: str, appid: str) -> str: url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode" headers = {"Authorization": WX_TOKEN, "Content-Type": "application/json"} payload = {"wxid": wxid, "appid": appid} resp = requests.post(url, headers=headers, json=payload, timeout=TIMEOUT, verify=VERIFY_SSL) data = safe_json_response(resp) debug(f"取 code 响应 appid={appid} wxid={wxid}", data if data is not None else resp.text[:1000]) if data and data.get("code") == 200: d = data.get("data") if isinstance(d, dict) and d.get("code"): return str(d["code"]) if isinstance(d, str) and d: return d raise RuntimeError((data or {}).get("msg") or (data or {}).get("message") or f"获取小程序 code 失败: HTTP {resp.status_code}") # ===================== 浓五酒馆签到端 ===================== def sign_headers(token: str = "", ua: str = "") -> Dict[str, str]: h = { "Host": "stdcrm.dtmiller.com", "Connection": "keep-alive", "charset": "utf-8", "User-Agent": ua or random.choice(UA_LIST), "content-type": "application/json", "Referer": f"https://servicewechat.com/{SIGN_APPID}/226/page-frame.html", "Accept-Encoding": "gzip,compress,br,deflate", } if token: h["authorization"] = token if token.lower().startswith("bearer ") else f"Bearer {token}" h["Authorization"] = h["authorization"] return h def sign_login(code: str, ua: str) -> str: payload = {"code": code, "appId": SIGN_APPID} resp = requests.post(SIGN_LOGIN_URL, headers=sign_headers(ua=ua), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL) data = safe_json_response(resp) debug("浓五酒馆签到登录响应", data if data is not None else resp.text[:1000]) if not data: raise RuntimeError(f"签到端登录响应不是 JSON: HTTP {resp.status_code}, {resp.text[:200]}") if data.get("code") != 0: raise RuntimeError(data.get("msg") or data.get("message") or "签到端登录失败") token = data.get("data") if not token: raise RuntimeError("签到端登录成功但未返回 token") return token if str(token).lower().startswith("bearer ") else f"Bearer {token}" def sign_get_json(url: str, token: str, ua: str) -> Dict[str, Any]: resp = requests.get(url, headers=sign_headers(token, ua), timeout=TIMEOUT, verify=VERIFY_SSL) data = safe_json_response(resp) if data is None: return {"code": -1, "msg": f"JSON解析失败: HTTP {resp.status_code}; body={resp.text[:200]}"} return data def sign_post_json(url: str, token: str, ua: str, body: Optional[dict] = None) -> Dict[str, Any]: resp = requests.post(url, headers=sign_headers(token, ua), json=body or {}, timeout=TIMEOUT, verify=VERIFY_SSL) data = safe_json_response(resp) if data is None: return {"code": -1, "msg": f"JSON解析失败: HTTP {resp.status_code}; body={resp.text[:200]}"} return data def get_dynamic_sign_act_id(token: str, ua: str) -> Optional[str]: data = sign_post_json(SIGN_MODULE_API, token, ua, {}) debug("每日签到模块响应", data) if not data or data.get("code") != 0: return None for module in data.get("data", []) or []: for detail in module.get("detailList", []) or []: try: detail_json = json.loads(detail.get("detailJson") or "{}") if detail_json.get("title") == "每日签到": page_path = detail_json.get("jumpData", {}).get("pagePath", "") m = re.search(r"actId=([^&]+)", page_path) if m: return m.group(1) except Exception: continue return None def run_sign_task(acc: Account) -> Dict[str, Any]: cache_item = cache_get_account(acc.wxid) token = cache_item.get("sign_token") if is_cache_fresh(cache_item, "sign_token") else "" if token: user_probe = sign_get_json(SIGN_USER_API, token, acc.ua) if user_probe.get("code") == 0: ok(f"{acc.remark} 签到端使用缓存 token") else: token = "" warn(f"{acc.remark} 签到端缓存 token 已失效,重新登录") if not token: code = get_mini_program_code(acc.wxid, SIGN_APPID) ok(f"{acc.remark} 签到端获取 code 成功: {mask(code)}") token = sign_login(code, acc.ua) cache_update_account(acc.wxid, {"sign_token": token, "sign_token_ts": int(time.time()), "ua": acc.ua, "remark": acc.remark}) acc.sign_token = token ok(f"{acc.remark} 签到端登录成功,开始签到") user = sign_get_json(SIGN_USER_API, token, acc.ua) debug("签到端用户信息", user) nickname = "" try: nickname = (user.get("data") or {}).get("member", {}).get("nick_name") or (user.get("data") or {}).get("visitor", {}).get("open_id") or "" except Exception: nickname = "" act_id = get_dynamic_sign_act_id(token, acc.ua) if act_id: sign_url = f"{SIGN_BASE}/scrm-promotion-service/promotion/sign/today?promotionId={act_id}" else: sign_url = SIGN_FIXED_API sign_res = sign_get_json(sign_url, token, acc.ua) debug("签到响应", sign_res) msg = sign_res.get("msg") or sign_res.get("message") or sign_res.get("info") or "" if sign_res.get("code") == 0: sign_msg = msg or "签到成功" ok(f"{acc.remark} 签到成功: {sign_msg}") return {"success": True, "nickname": nickname, "sign_msg": sign_msg} else: sign_msg = msg or "签到失败" if "今日已签到" in sign_msg or "已签到" in sign_msg: ok(f"{acc.remark} 签到已完成: {sign_msg}") return {"success": True, "nickname": nickname, "sign_msg": sign_msg} warn(f"{acc.remark} 签到结果: {sign_msg}") return {"success": False, "nickname": nickname, "sign_msg": sign_msg} # ===================== 五粮浓香抽奖端 ===================== def lottery_headers(token: str = "", ua: str = "") -> Dict[str, str]: h = { "Connection": "keep-alive", "User-Agent": ua or random.choice(UA_LIST), "Accept": "*/*", "xweb_xhr": "1", "terminal": "10", "content-type": "application/json;charset=UTF-8", "platform": "WechatMiniProgram", "tenant-id": "1", "Referer": f"https://servicewechat.com/{LOTTERY_APPID}/46/page-frame.html", "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "zh-CN,zh;q=0.9", } if token: # 浓友购接口抓包里是 authorization: token,不能加 Bearer。 h["authorization"] = token return h def extract_lottery_token(data: Dict[str, Any]) -> str: candidates = [ data.get("accessToken"), data.get("token"), data.get("authorization"), (data.get("data") or {}).get("accessToken") if isinstance(data.get("data"), dict) else None, (data.get("data") or {}).get("token") if isinstance(data.get("data"), dict) else None, (data.get("data") or {}).get("authorization") if isinstance(data.get("data"), dict) else None, ] for token in candidates: if token: return str(token) return "" def lottery_login(code: str, ua: str) -> str: payload = {"loginCode": code, "state": "default", "sourceId": "1"} resp = requests.post(LOTTERY_LOGIN_URL, headers=lottery_headers(ua=ua), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL) data = safe_json_response(resp) debug("浓友购抽奖端登录响应", data if data is not None else resp.text[:1000]) if not data: raise RuntimeError(f"抽奖端登录响应不是 JSON: HTTP {resp.status_code}, {resp.text[:200]}") if data.get("code") not in (0, 200): raise RuntimeError(data.get("msg") or data.get("message") or data.get("info") or "抽奖端登录失败") token = extract_lottery_token(data) if not token: raise RuntimeError("抽奖端登录成功但未返回 accessToken/token;请开 DEBUG=true 查看响应字段") return token def lottery_get_json(url: str, token: str, ua: str) -> Dict[str, Any]: resp = requests.get(url, headers=lottery_headers(token, ua), timeout=TIMEOUT, verify=VERIFY_SSL) data = safe_json_response(resp) if data is None: return {"code": -1, "msg": f"JSON解析失败: HTTP {resp.status_code}; body={resp.text[:200]}"} return data def lottery_post_json(url: str, token: str, ua: str, body: Dict[str, Any]) -> Dict[str, Any]: resp = requests.post(url, headers=lottery_headers(token, ua), json=body, timeout=TIMEOUT, verify=VERIFY_SSL) data = safe_json_response(resp) if data is None: return {"code": -1, "msg": f"JSON解析失败: HTTP {resp.status_code}; body={resp.text[:200]}"} return data def normalize_activity_list(payload: Any) -> List[Dict[str, Any]]: """兼容活动列表返回结构:data 可能是 list,也可能包在 records/list/items/rows 里。""" if isinstance(payload, list): return [x for x in payload if isinstance(x, dict)] if isinstance(payload, dict): for key in ("records", "list", "items", "rows", "activityList", "data"): value = payload.get(key) if isinstance(value, list): return [x for x in value if isinstance(x, dict)] if isinstance(value, dict): nested = normalize_activity_list(value) if nested: return nested return [] def get_activity_id(token: str, ua: str) -> Optional[Any]: if LOTTERY_ACTIVITY_ID: if str(LOTTERY_ACTIVITY_ID).startswith("PI"): warn("当前 LOTTERY_ACTIVITY_ID 看起来是酒馆签到端 promotionId,不是浓友购 activityId;将忽略它并尝试自动抓取活动") else: log(f"使用手动抽奖/活动ID: {LOTTERY_ACTIVITY_ID}") return LOTTERY_ACTIVITY_ID data = lottery_get_json(LOTTERY_ACTIVITY_API, token, ua) debug("浓友购活动列表响应", data) if data.get("code") not in (0, 200): warn(f"活动列表请求失败: {data.get('msg') or data.get('message') or data.get('info') if isinstance(data, dict) else data}") return None activities = normalize_activity_list(data.get("data")) if not activities: warn("浓友购活动列表为空或结构未识别;请用 DEBUG=true 查看响应,或设置 LOTTERY_ACTIVITY_ID 手动指定") if not DEBUG: print("活动列表原始响应预览:") try: print(json.dumps(data, ensure_ascii=False)[:800]) except Exception: print(str(data)[:800]) return None def get_title(act: Dict[str, Any]) -> str: return str(act.get("title") or act.get("name") or act.get("activityName") or act.get("activityTitle") or "") def get_id(act: Dict[str, Any]) -> Any: return act.get("id") or act.get("activityId") or act.get("activity_id") or act.get("actId") print(f"📋 抽奖活动列表 ({len(activities)} 个):") for i, act in enumerate(activities[:10], 1): print(f" {i}. {get_title(act) or '无标题'} ID={get_id(act)} 状态={act.get('status')}") lottery_activities = [ act for act in activities if ("抽奖" in get_title(act) or "转盘" in get_title(act) or "红包" in get_title(act) or "积分" in get_title(act)) and (act.get("status") in (None, 0, 1, "0", "1", True)) and get_id(act) ] chosen = lottery_activities[0] if lottery_activities else next((act for act in activities if get_id(act)), None) if not chosen: warn("活动列表里没有可用 ID;请设置 LOTTERY_ACTIVITY_ID 手动指定") return None activity_id = get_id(chosen) log(f"抽奖活动: {get_title(chosen) or '无标题'} ID={activity_id}") return activity_id def parse_integral_value(data: Dict[str, Any]) -> Optional[int]: payload = data.get("data") or {} integral = payload.get("integral") or payload.get("point") or payload.get("points") or payload.get("balance") try: return int(float(integral)) except Exception: return None def get_integral_detail(token: str, ua: str) -> Dict[str, Any]: data = lottery_get_json(LOTTERY_INTEGRAL_API, token, ua) debug("积分响应", data) integral = parse_integral_value(data) if isinstance(data, dict) else None if not isinstance(data, dict): return {"ok": False, "integral": None, "message": "积分响应不是 JSON", "auth_invalid": False} if data.get("code") not in (0, 200): msg = str(data.get("msg") or data.get("message") or data.get("info") or "积分请求失败") auth_invalid = any(k in msg for k in ["未登录", "登录已过期", "token", "Token", "认证", "授权"]) warn(f"积分请求失败: {msg}") return {"ok": False, "integral": integral, "message": msg, "auth_invalid": auth_invalid} if integral is None: msg = str(data.get("msg") or data.get("message") or data.get("info") or "未找到积分字段") return {"ok": False, "integral": None, "message": msg, "auth_invalid": False} return {"ok": True, "integral": integral, "message": "", "auth_invalid": False} def get_integral(token: str, ua: str) -> Optional[int]: detail = get_integral_detail(token, ua) return detail.get("integral") if detail.get("ok") else None def receive_cash(record_id: Any, token: str, ua: str) -> str: if not LOTTERY_RECEIVE_API: return "未配置红包领取接口,跳过自动领取" data = lottery_post_json(LOTTERY_RECEIVE_API, token, ua, {"id": record_id}) debug("红包领取响应", data) if data.get("code") in (0, 200): return "红包领取成功" return f"红包领取失败:{data.get('msg') or data.get('message') or data.get('info') or '未知错误'}" def parse_prize_value(prize_name: str) -> Dict[str, float]: text = str(prize_name or "") money = 0.0 points = 0.0 money_matches = re.findall(r"(\d+(?:\.\d+)?)\s*元", text) if money_matches: try: money = sum(float(x) for x in money_matches) except Exception: money = 0.0 point_matches = re.findall(r"(\d+(?:\.\d+)?)\s*积分", text) if point_matches: try: points = sum(float(x) for x in point_matches) except Exception: points = 0.0 return {"money": money, "points": points} def find_first_key(obj: Any, keys: set) -> Any: if isinstance(obj, dict): for k, v in obj.items(): if k in keys and v not in (None, ""): return v for v in obj.values(): found = find_first_key(v, keys) if found not in (None, ""): return found elif isinstance(obj, list): for item in obj: found = find_first_key(item, keys) if found not in (None, ""): return found return None def draw_once(activity_id: Any, token: str, ua: str) -> Dict[str, Any]: draw_url = f"{LOTTERY_DRAW_API}?{urlencode({'activityId': activity_id})}" data = lottery_post_json(draw_url, token, ua, {"activityId": str(activity_id)}) debug("抽奖响应", data) if data.get("code") in (0, 200): body = data.get("data") or {} prize_name = ( find_first_key(body, {"prizeName", "prize_name", "name", "title", "rewardName", "couponName"}) or "抽奖成功" ) draw_id = find_first_key(body, {"id", "recordId", "record_id", "drawRecordId", "receiveId", "prizeRecordId"}) receive_msg = "" if draw_id: receive_msg = receive_cash(draw_id, token, ua) return {"status": "success", "prize": str(prize_name), "receive_msg": receive_msg} info = data.get("msg") or data.get("message") or data.get("info") or "未知错误" if "积分" in str(info) and ("不足" in str(info) or "余额" in str(info)): return {"status": "insufficient", "message": info} return {"status": "failed", "message": info} def run_lottery_task(acc: Account) -> Dict[str, Any]: cache_item = cache_get_account(acc.wxid) token = cache_item.get("lottery_token") if is_cache_fresh(cache_item, "lottery_token") else "" if token: probe_detail = get_integral_detail(token, acc.ua) if probe_detail.get("ok"): ok(f"{acc.remark} 抽奖端使用缓存 token") elif probe_detail.get("auth_invalid"): token = "" warn(f"{acc.remark} 抽奖端缓存 token 已失效,重新登录") else: ok(f"{acc.remark} 抽奖端缓存 token 可用,但当前积分不可用:{probe_detail.get('message')}") if not token: code = get_mini_program_code(acc.wxid, LOTTERY_APPID) ok(f"{acc.remark} 抽奖端获取 code 成功: {mask(code)}") token = lottery_login(code, acc.ua) cache_update_account(acc.wxid, {"lottery_token": token, "lottery_token_ts": int(time.time()), "ua": acc.ua, "remark": acc.remark}) acc.lottery_token = token ok(f"{acc.remark} 抽奖端登录成功,开始抽奖") activity_id = get_activity_id(token, acc.ua) if not activity_id: return {"success": False, "lottery_msgs": [], "draw_records": [], "integral": None, "message": "未找到有效抽奖活动ID"} lottery_msgs: List[str] = [] draw_records: List[Dict[str, Any]] = [] consecutive_failures = 0 draw_count = 0 total_cost = 0 total_prize_points = 0.0 total_prize_money = 0.0 start_integral: Optional[int] = None last_integral: Optional[int] = None first_detail = get_integral_detail(token, acc.ua) if not first_detail.get("ok"): lottery_msgs.append(f"积分不可用或账号不是品牌会员,停止抽奖:{first_detail.get('message') or ''}".strip(":")) return { "success": True, "lottery_msgs": lottery_msgs, "draw_records": draw_records, "integral": first_detail.get("integral"), "start_integral": first_detail.get("integral"), "total_cost": 0, "total_prize_points": 0, "total_prize_money": 0.0, "draw_count": 0, "activity_id": activity_id, } start_integral = int(first_detail["integral"]) last_integral = start_integral log(f"{acc.remark} 签到后/抽奖前积分:{start_integral}") if start_integral < LOTTERY_COST: lottery_msgs.append(f"当前积分 {start_integral},不足 {LOTTERY_COST},停止抽奖") return { "success": True, "lottery_msgs": lottery_msgs, "draw_records": draw_records, "integral": start_integral, "start_integral": start_integral, "total_cost": 0, "total_prize_points": 0, "total_prize_money": 0.0, "draw_count": 0, "activity_id": activity_id, } while consecutive_failures < MAX_LOTTERY_FAILS and draw_count < LOTTERY_DAILY_LIMIT: before_integral = last_integral if before_integral is None: detail = get_integral_detail(token, acc.ua) if not detail.get("ok"): lottery_msgs.append(f"积分不可用或账号不是品牌会员,停止抽奖:{detail.get('message') or ''}".strip(":")) break before_integral = int(detail["integral"]) last_integral = before_integral if before_integral < LOTTERY_COST: lottery_msgs.append(f"当前积分 {before_integral},不足 {LOTTERY_COST},停止抽奖") break draw = draw_once(activity_id, token, acc.ua) if draw["status"] == "success": consecutive_failures = 0 draw_count += 1 prize_name = str(draw.get("prize") or "未知奖励") prize_value = parse_prize_value(prize_name) after_detail = get_integral_detail(token, acc.ua) after_integral = after_detail.get("integral") if after_detail.get("ok") else None cost = LOTTERY_COST if after_integral is None else max(0, before_integral + int(prize_value.get("points") or 0) - int(after_integral)) total_cost += int(cost) total_prize_points += float(prize_value.get("points") or 0) total_prize_money += float(prize_value.get("money") or 0) last_integral = int(after_integral) if after_integral is not None else None record = { "index": draw_count, "before_integral": before_integral, "cost": int(cost), "prize": prize_name, "prize_points": float(prize_value.get("points") or 0), "prize_money": float(prize_value.get("money") or 0), "after_integral": after_integral, "receive_msg": draw.get("receive_msg") or "", } draw_records.append(record) msg = ( f"第{draw_count}/{LOTTERY_DAILY_LIMIT}次抽奖:抽前积分 {before_integral}," f"消耗 {int(cost)},抽中:{prize_name},中奖后积分 {after_integral if after_integral is not None else '未知'}" ) if draw.get("receive_msg"): msg += f";{draw.get('receive_msg')}" lottery_msgs.append(msg) ok(f"{acc.remark} {msg}") elif draw["status"] == "insufficient": lottery_msgs.append(f"积分不足:{draw.get('message')}") break else: message = str(draw.get('message') or '') if "次数已达上限" in message or "今日参与次数" in message or "已达上限" in message: msg = f"抽奖停止:{message}" lottery_msgs.append(msg) warn(f"{acc.remark} {msg}") break consecutive_failures += 1 msg = f"抽奖失败({consecutive_failures}/{MAX_LOTTERY_FAILS}):{message or '未知错误'}" lottery_msgs.append(msg) warn(f"{acc.remark} {msg}") if draw_count >= LOTTERY_DAILY_LIMIT: limit_msg = f"已达到脚本设置的每日最多参与 {LOTTERY_DAILY_LIMIT} 次,停止抽奖" lottery_msgs.append(limit_msg) log(f"{acc.remark} {limit_msg}") break delay = random.randint(LOTTERY_DELAY_MIN, LOTTERY_DELAY_MAX) time.sleep(delay) return { "success": True, "lottery_msgs": lottery_msgs, "draw_records": draw_records, "integral": last_integral, "start_integral": start_integral, "total_cost": total_cost, "total_prize_points": total_prize_points, "total_prize_money": total_prize_money, "draw_count": draw_count, "activity_id": activity_id, } # ===================== 主流程 ===================== def process_account(acc: Account, index: int, total: int) -> Dict[str, Any]: acc.ua = get_cached_ua(acc.wxid) print(f"\n--- 账号 {index}/{total}: {acc.remark} ({mask(acc.wxid)}) ---") log(f"{acc.remark} 使用固定 UA: {mask(acc.ua, 24, 18)}") result: Dict[str, Any] = {"remark": acc.remark, "wxid": acc.wxid, "success": True, "ua": acc.ua} if TASK_MODE in {"all", "sign"}: try: result["sign"] = run_sign_task(acc) except Exception as e: warn(f"{acc.remark} 签到任务失败: {e}") if DEBUG: traceback.print_exc() result["success"] = False result["sign"] = {"success": False, "sign_msg": str(e)} if TASK_MODE in {"all", "lottery"}: try: result["lottery"] = run_lottery_task(acc) except Exception as e: warn(f"{acc.remark} 抽奖任务失败: {e}") if DEBUG: traceback.print_exc() result["success"] = False result["lottery"] = {"success": False, "lottery_msgs": [], "message": str(e)} return result def should_skip_account(acc: Account) -> bool: if EXCLUDE_WXIDS and acc.wxid in EXCLUDE_WXIDS: return True if SINGLE_TEST_WXID and acc.wxid != SINGLE_TEST_WXID: return True return False def load_accounts() -> List[Account]: accounts = get_accounts_from_manual() if ACCOUNT_SOURCE == "manual" else get_accounts_from_cloud() if EXCLUDE_WXIDS: accounts = [a for a in accounts if a.wxid not in EXCLUDE_WXIDS] log(f"已排除 {len(EXCLUDE_WXIDS)} 个 wxid,剩余 {len(accounts)}") if SINGLE_TEST_WXID: accounts = [a for a in accounts if a.wxid == SINGLE_TEST_WXID] log(f"单号测试 wxid={SINGLE_TEST_WXID},匹配 {len(accounts)} 个") return accounts def iter_selected_accounts(): """逐个产生账号。cloud 模式默认 pageSize=1,处理完一个才取下一页。""" if ACCOUNT_SOURCE == "manual": accounts = get_accounts_from_manual() selected = [a for a in accounts if not should_skip_account(a)] total = len(selected) for idx, acc in enumerate(selected, 1): yield acc, idx, total return matched = 0 for acc, seen, total in iter_accounts_from_cloud(): if should_skip_account(acc): continue matched += 1 yield acc, seen if not SINGLE_TEST_WXID else matched, total if not SINGLE_TEST_WXID else matched if SINGLE_TEST_WXID and acc.wxid == SINGLE_TEST_WXID: break def build_report(results: List[Dict[str, Any]]) -> str: now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") elapsed = time.time() - RUN_START_TS total = len(results) success_count = sum(1 for r in results if r.get("success")) sign_ok = sum(1 for r in results if (r.get("sign") or {}).get("success")) draw_total = sum(int((r.get("lottery") or {}).get("draw_count") or 0) for r in results) total_cost = sum(int((r.get("lottery") or {}).get("total_cost") or 0) for r in results) total_prize_points = sum(float((r.get("lottery") or {}).get("total_prize_points") or 0) for r in results) total_prize_money = sum(float((r.get("lottery") or {}).get("total_prize_money") or 0) for r in results) lines = [ "", "==================================================", "📊 执行汇总", "==================================================", f"⏱️ 执行时间: {now}", f"👥 总账号数: {total}", f"✅ 成功账号: {success_count}", f"📝 签到成功: {sign_ok}", f"🎰 总成功抽奖: {draw_total}", f"📱 总消耗积分: {total_cost}", f"📈 总中奖积分: {int(total_prize_points)}", f"💰 总中奖金额: {total_prize_money:.2f}元", f"🎯 单号抽奖上限: {LOTTERY_DAILY_LIMIT}", f"🧩 单次抽奖消耗: {LOTTERY_COST}积分", f"📦 缓存文件: {CACHE_FILE}", f"🧭 UA数量: {len(UA_LIST)}", "", "📋 账号详情:", ] for idx, r in enumerate(results, 1): remark = r.get("remark") or mask(r.get("wxid")) l = r.get("lottery") or {} draw_records = l.get("draw_records") or [] if not isinstance(draw_records, list): draw_records = [] draw_count = int(l.get("draw_count") or 0) money = float(l.get("total_prize_money") or 0) points = float(l.get("total_prize_points") or 0) prizes = [str(x.get("prize") or "未知奖励") for x in draw_records] or ["无"] icon = "✅" if draw_count > 0 else "❌" lines.append( f"{icon} 账号{idx}: {remark} | 抽奖: {draw_count}次 | " f"消耗: {int(l.get('total_cost') or 0)}积分 | 中奖积分: {int(points)} | " f"金额: {money:.2f}元 | 奖品: {', '.join(prizes)}" ) if l: lines.append(f" ├─ 签到后/抽奖前积分: {l.get('start_integral') if l.get('start_integral') is not None else '-'}") for rec in draw_records: lines.append( f" ├─ 第{rec.get('index')}次: 抽前 {rec.get('before_integral')}," f"消耗 {rec.get('cost')},奖品 {rec.get('prize')}," f"中奖积分 +{int(float(rec.get('prize_points') or 0))}," f"中奖金额 +{float(rec.get('prize_money') or 0):.2f}元," f"中奖后积分 {rec.get('after_integral') if rec.get('after_integral') is not None else '-'}" ) if l.get("integral") is not None: lines.append(f" └─ 最后积分: {l.get('integral')}") else: stop_msg = (l.get("lottery_msgs") or [l.get("message") or "无抽奖结果"])[-1] lines.append(f" └─ 结果: {stop_msg}") if "sign" in r: s = r.get("sign") or {} lines.append(f" 📝 签到: {s.get('sign_msg') or s.get('message') or '无结果'}") lines.append(f"=== 执行结束 [{now}] 耗时 {elapsed:.2f} 秒 退出码 0 ===") return "\n".join(lines) def write_run_log(report: str) -> Path: log_dir = Path(__file__).with_name("logs") log_dir.mkdir(parents=True, exist_ok=True) path = log_dir / f"nw_jg_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" content = "\n".join(SESSION_LOG_LINES) + "\n" + report + "\n" path.write_text(content, encoding="utf-8") return path def main() -> None: print("========== 浓五酒馆 - 养鸡场自动登录任务一体版 ==========") log(f"TASK_MODE={TASK_MODE}") log(f"WX_CLOUD={WX_CLOUD or '(未设置)'}") log(f"ACCOUNT_SOURCE={ACCOUNT_SOURCE}") log(f"SIGN_APPID={SIGN_APPID}") log(f"LOTTERY_APPID={LOTTERY_APPID}") ensure_config() print("\n=== 任务开始 ===") results: List[Dict[str, Any]] = [] processed = 0 for acc, i, total in iter_selected_accounts(): if processed == 0: print(f"账号总数: {total if not SINGLE_TEST_WXID else 1}") processed += 1 results.append(process_account(acc, i if not SINGLE_TEST_WXID else processed, total if not SINGLE_TEST_WXID else 1)) if DELAY > 0: time.sleep(DELAY) if not results: raise RuntimeError("没有可处理账号") report = build_report(results) print(report) log_path = write_run_log(report) print(f"\n🧾 日志文件:{log_path}") print("\n=== 任务完成 ===") if __name__ == "__main__": try: main() except Exception as e: err("主流程错误:", e) if DEBUG: traceback.print_exc() sys.exit(1)