Files
Yangmao_Script/WX_Applet/Applet_LWDJG.py

1054 lines
43 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# new Env("浓五的酒馆签到+抽奖")
# cron: 15 11 * * *
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
浓五酒馆 / 五粮浓香积分商城 - 养鸡场自动登录并执行签到、抽奖一体版
version: 1.0.0
功能:
1. 从养鸡场/微信协议服务读取微信账号列表,或手动指定 wxid
2. 对“浓五的酒馆签到”小程序获取 code -> 登录 -> 内部拿 token -> 执行签到
3. 对“五粮浓香积分商城抽奖”小程序获取 code -> 登录 -> 内部拿 token -> 积分抽奖/红包领取
4. 不打印 token只打印执行结果
仅用于你自己有权限的微信账号和服务。
==================== 必填环境变量 ====================
wx_cloud / WECHAT_SERVER 养鸡场地址例如http://127.0.0.1:666
wx_token 养鸡场 Authorization支持裸 token 或 Bearer token脚本自动识别
==================== 可选环境变量 ====================
TASK_MODE 执行模式all / sign / lottery默认 all
ACCOUNT_SOURCE 账号来源cloud / manual默认 cloud
MANUAL_ACCOUNTS 手动账号,格式:备注#wxid一行一个仅 ACCOUNT_SOURCE=manual 时使用
SINGLE_TEST_WXID 只测试一个 wxid
NWDJG_WXIDS 排除 wxid支持换行分隔也兼容英文/中文逗号分隔
CLOUD_PAGE_SIZE 养鸡场分页大小,默认 1用于一个账号处理完再取下一个
AUTO_BEARER 是否自动给 wx_token 补 Bearer默认 true如接口要求裸 token设 false
DELAY 账号间延迟秒数,默认 5
TIMEOUT 请求超时秒数,默认 20
DEBUG true/false默认 false
VERIFY_SSL true/false默认 false
MAX_LOTTERY_FAILS 抽奖连续失败上限,默认 3
LOTTERY_DELAY_MIN 抽奖间隔最小秒数,默认 8
LOTTERY_DELAY_MAX 抽奖间隔最大秒数,默认 15
LOTTERY_ACTIVITY_ID 手动指定【抽奖端】活动ID不填则从浓友购活动列表自动抓取
LOTTERY_DAILY_LIMIT 每个账号每天最多抽奖次数,默认 3
LOTTERY_COST 每次抽奖消耗积分,默认 50
CACHE_TTL token 缓存秒数,默认 259200过期或验证失败自动重新登录
NWJG_CACHE_FILE token/UA 缓存文件,默认脚本同目录 nw_jg_cookie_cache.json
UA_FILE UA 文件路径,默认脚本同目录 UA_LIST.txt一行一个 UA支持 # 注释
==================== 示例 ====================
export wx_cloud="http://127.0.0.1:666"
export wx_token="Bearer xxxxx"
python3 nw_jg_yjc_auto.py
只签到:
export TASK_MODE="sign"
python3 nw_jg_yjc_auto.py
只抽奖:
export TASK_MODE="lottery"
python3 nw_jg_yjc_auto.py
"""
import json
import os
import random
import re
import sys
import time
import traceback
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import urlencode
try:
import requests
except ImportError:
print("缺少 requests请先安装pip install requests")
sys.exit(1)
try:
import urllib3
urllib3.disable_warnings()
except Exception:
pass
# ===================== 项目固定配置 =====================
SIGN_APPID = "wxed3cf95a14b58a26"
SIGN_BASE = "https://stdcrm.dtmiller.com"
SIGN_LOGIN_URL = f"{SIGN_BASE}/std-weixin-mp-service/miniApp/custom/login"
SIGN_USER_API = f"{SIGN_BASE}/scrm-promotion-service/mini/wly/user/info"
SIGN_MODULE_API = f"{SIGN_BASE}/scrm-promotion-service/mini/module/config/list"
SIGN_FIXED_API = f"{SIGN_BASE}/scrm-promotion-service/promotion/sign/today?promotionId=PI69eb321d37c48c000a05ee4e"
# 抽奖实际跳转到“浓友购/五粮浓香积分商城”小程序,不是旧 jf.wlnxjc.com/mini 那套。
LOTTERY_APPID = "wx99fa98e883130aa3"
LOTTERY_BASE = "https://www.wlnxjc.com:8088"
LOTTERY_LOGIN_URL = f"{LOTTERY_BASE}/app-api/member/auth/weixin-mini-app-login"
LOTTERY_ACTIVITY_API = f"{LOTTERY_BASE}/app-api/promotion/activity/list"
LOTTERY_ACTIVITY_DETAIL_API = f"{LOTTERY_BASE}/app-api/promotion/activity/get-detail"
LOTTERY_INTEGRAL_API = f"{LOTTERY_BASE}/app-api/member/integral/get"
# 浓友购积分大转盘提交/领取接口。
LOTTERY_DRAW_API_DEFAULT = f"{LOTTERY_BASE}/app-api/promotion/activity/draw"
LOTTERY_RECEIVE_API_DEFAULT = f"{LOTTERY_BASE}/app-api/promotion/activity/receive"
DEFAULT_UA_LIST = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) UnifiedPCWindowsWechat(0xf2541939) XWEB/19841",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.6778.86 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) UnifiedPCWindowsWechat(0xf2541939) XWEB/19763",
"Mozilla/5.0 (Linux; Android 15; 24129PN74C Build/AQ3A.240912.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/132.0.6834.163 Mobile Safari/537.36 XWEB/1320093 MMWEBSDK/20250201 MMWEBID/128 MicroMessenger/8.0.56.2800(0x2800383D) WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android",
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.55(0x1800372f) NetType/WIFI Language/zh_CN MiniProgramEnv/ios",
]
def load_ua_list() -> List[str]:
ua_file = Path(os.environ.get("UA_FILE", Path(__file__).with_name("UA_LIST.txt")))
if ua_file.exists():
uas: List[str] = []
for raw in ua_file.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
uas.append(line)
if uas:
return uas
return DEFAULT_UA_LIST
UA_LIST = load_ua_list()
# ===================== 环境变量 =====================
ENV = os.environ
WX_CLOUD = (ENV.get("wx_cloud") or ENV.get("WECHAT_SERVER") or "").rstrip("/")
WX_TOKEN = ENV.get("wx_token", "").strip()
AUTO_BEARER = ENV.get("AUTO_BEARER", "true").lower() != "false"
if AUTO_BEARER and WX_TOKEN and not WX_TOKEN.lower().startswith("bearer "):
WX_TOKEN = f"Bearer {WX_TOKEN}"
TASK_MODE = ENV.get("TASK_MODE", "all").lower() # all/sign/lottery
ACCOUNT_SOURCE = ENV.get("ACCOUNT_SOURCE", "cloud").lower()
MANUAL_ACCOUNTS = ENV.get("MANUAL_ACCOUNTS", "")
SINGLE_TEST_WXID = ENV.get("SINGLE_TEST_WXID", "").strip()
NWDJG_WXIDS = [x.strip() for x in re.split(r"[\n,]+", ENV.get("NWDJG_WXIDS", "")) if x.strip()]
CLOUD_PAGE_SIZE = max(1, int(ENV.get("CLOUD_PAGE_SIZE", "1")))
DELAY = float(ENV.get("DELAY", "5"))
TIMEOUT = float(ENV.get("TIMEOUT", "20"))
DEBUG = ENV.get("DEBUG", "false").lower() == "true"
VERIFY_SSL = ENV.get("VERIFY_SSL", "false").lower() == "true"
MAX_LOTTERY_FAILS = int(ENV.get("MAX_LOTTERY_FAILS", "3"))
LOTTERY_DAILY_LIMIT = int(ENV.get("LOTTERY_DAILY_LIMIT", "3"))
LOTTERY_COST = int(ENV.get("LOTTERY_COST", "50"))
LOTTERY_DELAY_MIN = int(ENV.get("LOTTERY_DELAY_MIN", "8"))
LOTTERY_DELAY_MAX = int(ENV.get("LOTTERY_DELAY_MAX", "15"))
LOTTERY_ACTIVITY_ID = ENV.get("LOTTERY_ACTIVITY_ID", "").strip()
LOTTERY_DRAW_API = ENV.get("LOTTERY_DRAW_API", LOTTERY_DRAW_API_DEFAULT).strip()
LOTTERY_RECEIVE_API = ENV.get("LOTTERY_RECEIVE_API", LOTTERY_RECEIVE_API_DEFAULT).strip()
CACHE_FILE = Path(ENV.get("NWJG_CACHE_FILE", Path(__file__).with_name("nw_jg_cookie_cache.json")))
CACHE_TTL = int(ENV.get("CACHE_TTL", "259200")) # 默认 3 天,服务端失效会自动重登
SESSION_LOG_LINES: List[str] = []
RUN_START_TS = time.time()
@dataclass
class Account:
remark: str
wxid: str
sign_token: str = ""
lottery_token: str = ""
ua: str = ""
# ===================== 基础工具 =====================
def emit(prefix: str, *args):
line = f"{prefix} {' '.join(str(a) for a in args)}"
print(line)
SESSION_LOG_LINES.append(line)
def log(*args):
emit("[INFO]", *args)
def ok(*args):
emit("[ OK ]", *args)
def warn(*args):
emit("[WARN]", *args)
def err(*args):
emit("[ERR ]", *args)
def debug(title: str, data: Any):
if not DEBUG:
return
print(f"\n[DEBUG] {title}")
try:
print(json.dumps(data, ensure_ascii=False, indent=2))
except Exception:
print(data)
def mask(value: Any, keep_start: int = 8, keep_end: int = 4) -> str:
s = str(value or "")
if not s:
return ""
if len(s) <= keep_start + keep_end:
return "***"
return s[:keep_start] + "***" + s[-keep_end:]
def safe_json_response(resp: requests.Response) -> Optional[Any]:
try:
return resp.json()
except Exception:
return None
def load_cache() -> Dict[str, Any]:
if not CACHE_FILE.exists():
return {"accounts": {}}
try:
data = json.loads(CACHE_FILE.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {"accounts": {}}
except Exception:
return {"accounts": {}}
def save_cache(cache: Dict[str, Any]) -> None:
CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
CACHE_FILE.write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8")
def choose_stable_ua(wxid: str) -> str:
# 只在首次没有缓存时选一次;之后跟随 cookie/token 缓存,避免每次换设备指纹。
idx = abs(hash(wxid)) % len(UA_LIST)
return UA_LIST[idx]
def cache_get_account(wxid: str) -> Dict[str, Any]:
cache = load_cache()
accounts = cache.setdefault("accounts", {})
return accounts.setdefault(wxid, {})
def cache_update_account(wxid: str, updates: Dict[str, Any]) -> None:
cache = load_cache()
accounts = cache.setdefault("accounts", {})
item = accounts.setdefault(wxid, {})
item.update(updates)
item["updated_at"] = int(time.time())
save_cache(cache)
def get_cached_ua(wxid: str) -> str:
item = cache_get_account(wxid)
ua = item.get("ua")
if not ua:
ua = choose_stable_ua(wxid)
cache_update_account(wxid, {"ua": ua})
return ua
def is_cache_fresh(item: Dict[str, Any], key: str) -> bool:
value = item.get(key)
ts = int(item.get(f"{key}_ts") or 0)
return bool(value) and (time.time() - ts < CACHE_TTL)
def ensure_config():
if not WX_CLOUD:
raise RuntimeError("缺少 wx_cloud 或 WECHAT_SERVER")
if not WX_TOKEN:
raise RuntimeError("缺少 wx_token")
if TASK_MODE not in {"all", "sign", "lottery"}:
raise RuntimeError("TASK_MODE 只支持 all / sign / lottery")
# ===================== 养鸡场账号与 code =====================
def account_from_cloud_row(item: Dict[str, Any]) -> Optional[Account]:
wxid = item.get("wxId") or item.get("wxid") or item.get("Wxid") or ""
if not wxid:
return None
remark = item.get("wxName") or item.get("wxname") or item.get("remark") or item.get("nickName") or mask(wxid)
return Account(remark=remark, wxid=wxid)
def fetch_cloud_account_page(page_num: int, page_size: int) -> tuple[List[Account], int]:
"""按页读取养鸡场账号。默认 page_size=1实现处理完一个账号再取下一个账号。"""
url = f"{WX_CLOUD}/prod-api/wechat/wechat/list"
headers = {"Authorization": WX_TOKEN, "Content-Type": "application/json"}
params = {"pageNum": page_num, "pageSize": page_size}
resp = requests.get(url, headers=headers, params=params, timeout=TIMEOUT, verify=VERIFY_SSL)
data = safe_json_response(resp)
if DEBUG:
preview = data
if isinstance(data, dict) and isinstance(data.get("rows"), list):
preview = {**data, "rows": [
{"wxId": r.get("wxId") or r.get("wxid") or r.get("Wxid"), "wxName": r.get("wxName") or r.get("wxname")}
for r in data.get("rows", [])
]}
debug(f"养鸡场账号列表响应 page={page_num} size={page_size}", preview if preview is not None else resp.text[:1000])
if data and data.get("code") == 200 and isinstance(data.get("rows"), list):
accounts = [acc for item in data["rows"] if (acc := account_from_cloud_row(item))]
total = int(data.get("total") or len(accounts))
return accounts, total
raise RuntimeError((data or {}).get("msg") or (data or {}).get("message") or f"获取养鸡场账号列表失败: HTTP {resp.status_code}")
def iter_accounts_from_cloud():
page = 1
seen = 0
while True:
accounts, total = fetch_cloud_account_page(page, CLOUD_PAGE_SIZE)
if not accounts:
break
for acc in accounts:
seen += 1
yield acc, seen, total
if seen >= total:
break
page += 1
def get_accounts_from_cloud() -> List[Account]:
# 兼容 SINGLE_TEST_WXID需要扫描分页直到找到匹配账号。
return [acc for acc, _, _ in iter_accounts_from_cloud()]
def get_accounts_from_manual() -> List[Account]:
accounts: List[Account] = []
for line in MANUAL_ACCOUNTS.splitlines():
line = line.strip()
if not line:
continue
parts = [p.strip() for p in line.split("#") if p.strip()]
if len(parts) >= 2:
remark, wxid = parts[0], parts[1]
else:
remark, wxid = parts[0], parts[0]
accounts.append(Account(remark=remark or mask(wxid), wxid=wxid))
return accounts
def get_mini_program_code(wxid: str, appid: str) -> str:
url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode"
headers = {"Authorization": WX_TOKEN, "Content-Type": "application/json"}
payload = {"wxid": wxid, "appid": appid}
resp = requests.post(url, headers=headers, json=payload, timeout=TIMEOUT, verify=VERIFY_SSL)
data = safe_json_response(resp)
debug(f"取 code 响应 appid={appid} wxid={wxid}", data if data is not None else resp.text[:1000])
if data and data.get("code") == 200:
d = data.get("data")
if isinstance(d, dict) and d.get("code"):
return str(d["code"])
if isinstance(d, str) and d:
return d
raise RuntimeError((data or {}).get("msg") or (data or {}).get("message") or f"获取小程序 code 失败: HTTP {resp.status_code}")
# ===================== 浓五酒馆签到端 =====================
def sign_headers(token: str = "", ua: str = "") -> Dict[str, str]:
h = {
"Host": "stdcrm.dtmiller.com",
"Connection": "keep-alive",
"charset": "utf-8",
"User-Agent": ua or random.choice(UA_LIST),
"content-type": "application/json",
"Referer": f"https://servicewechat.com/{SIGN_APPID}/226/page-frame.html",
"Accept-Encoding": "gzip,compress,br,deflate",
}
if token:
h["authorization"] = token if token.lower().startswith("bearer ") else f"Bearer {token}"
h["Authorization"] = h["authorization"]
return h
def sign_login(code: str, ua: str) -> str:
payload = {"code": code, "appId": SIGN_APPID}
resp = requests.post(SIGN_LOGIN_URL, headers=sign_headers(ua=ua), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL)
data = safe_json_response(resp)
debug("浓五酒馆签到登录响应", data if data is not None else resp.text[:1000])
if not data:
raise RuntimeError(f"签到端登录响应不是 JSON: HTTP {resp.status_code}, {resp.text[:200]}")
if data.get("code") != 0:
raise RuntimeError(data.get("msg") or data.get("message") or "签到端登录失败")
token = data.get("data")
if not token:
raise RuntimeError("签到端登录成功但未返回 token")
return token if str(token).lower().startswith("bearer ") else f"Bearer {token}"
def sign_get_json(url: str, token: str, ua: str) -> Dict[str, Any]:
resp = requests.get(url, headers=sign_headers(token, ua), timeout=TIMEOUT, verify=VERIFY_SSL)
data = safe_json_response(resp)
if data is None:
return {"code": -1, "msg": f"JSON解析失败: HTTP {resp.status_code}; body={resp.text[:200]}"}
return data
def sign_post_json(url: str, token: str, ua: str, body: Optional[dict] = None) -> Dict[str, Any]:
resp = requests.post(url, headers=sign_headers(token, ua), json=body or {}, timeout=TIMEOUT, verify=VERIFY_SSL)
data = safe_json_response(resp)
if data is None:
return {"code": -1, "msg": f"JSON解析失败: HTTP {resp.status_code}; body={resp.text[:200]}"}
return data
def get_dynamic_sign_act_id(token: str, ua: str) -> Optional[str]:
data = sign_post_json(SIGN_MODULE_API, token, ua, {})
debug("每日签到模块响应", data)
if not data or data.get("code") != 0:
return None
for module in data.get("data", []) or []:
for detail in module.get("detailList", []) or []:
try:
detail_json = json.loads(detail.get("detailJson") or "{}")
if detail_json.get("title") == "每日签到":
page_path = detail_json.get("jumpData", {}).get("pagePath", "")
m = re.search(r"actId=([^&]+)", page_path)
if m:
return m.group(1)
except Exception:
continue
return None
def run_sign_task(acc: Account) -> Dict[str, Any]:
cache_item = cache_get_account(acc.wxid)
token = cache_item.get("sign_token") if is_cache_fresh(cache_item, "sign_token") else ""
if token:
user_probe = sign_get_json(SIGN_USER_API, token, acc.ua)
if user_probe.get("code") == 0:
ok(f"{acc.remark} 签到端使用缓存 token")
else:
token = ""
warn(f"{acc.remark} 签到端缓存 token 已失效,重新登录")
if not token:
code = get_mini_program_code(acc.wxid, SIGN_APPID)
ok(f"{acc.remark} 签到端获取 code 成功: {mask(code)}")
token = sign_login(code, acc.ua)
cache_update_account(acc.wxid, {"sign_token": token, "sign_token_ts": int(time.time()), "ua": acc.ua, "remark": acc.remark})
acc.sign_token = token
ok(f"{acc.remark} 签到端登录成功,开始签到")
user = sign_get_json(SIGN_USER_API, token, acc.ua)
debug("签到端用户信息", user)
nickname = ""
try:
nickname = (user.get("data") or {}).get("member", {}).get("nick_name") or (user.get("data") or {}).get("visitor", {}).get("open_id") or ""
except Exception:
nickname = ""
act_id = get_dynamic_sign_act_id(token, acc.ua)
if act_id:
sign_url = f"{SIGN_BASE}/scrm-promotion-service/promotion/sign/today?promotionId={act_id}"
else:
sign_url = SIGN_FIXED_API
sign_res = sign_get_json(sign_url, token, acc.ua)
debug("签到响应", sign_res)
msg = sign_res.get("msg") or sign_res.get("message") or sign_res.get("info") or ""
if sign_res.get("code") == 0:
sign_msg = msg or "签到成功"
ok(f"{acc.remark} 签到成功: {sign_msg}")
return {"success": True, "nickname": nickname, "sign_msg": sign_msg}
else:
sign_msg = msg or "签到失败"
if "今日已签到" in sign_msg or "已签到" in sign_msg:
ok(f"{acc.remark} 签到已完成: {sign_msg}")
return {"success": True, "nickname": nickname, "sign_msg": sign_msg}
warn(f"{acc.remark} 签到结果: {sign_msg}")
return {"success": False, "nickname": nickname, "sign_msg": sign_msg}
# ===================== 五粮浓香抽奖端 =====================
def lottery_headers(token: str = "", ua: str = "") -> Dict[str, str]:
h = {
"Connection": "keep-alive",
"User-Agent": ua or random.choice(UA_LIST),
"Accept": "*/*",
"xweb_xhr": "1",
"terminal": "10",
"content-type": "application/json;charset=UTF-8",
"platform": "WechatMiniProgram",
"tenant-id": "1",
"Referer": f"https://servicewechat.com/{LOTTERY_APPID}/46/page-frame.html",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
}
if token:
# 浓友购接口抓包里是 authorization: token不能加 Bearer。
h["authorization"] = token
return h
def extract_lottery_token(data: Dict[str, Any]) -> str:
candidates = [
data.get("accessToken"),
data.get("token"),
data.get("authorization"),
(data.get("data") or {}).get("accessToken") if isinstance(data.get("data"), dict) else None,
(data.get("data") or {}).get("token") if isinstance(data.get("data"), dict) else None,
(data.get("data") or {}).get("authorization") if isinstance(data.get("data"), dict) else None,
]
for token in candidates:
if token:
return str(token)
return ""
def lottery_login(code: str, ua: str) -> str:
payload = {"loginCode": code, "state": "default", "sourceId": "1"}
resp = requests.post(LOTTERY_LOGIN_URL, headers=lottery_headers(ua=ua), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL)
data = safe_json_response(resp)
debug("浓友购抽奖端登录响应", data if data is not None else resp.text[:1000])
if not data:
raise RuntimeError(f"抽奖端登录响应不是 JSON: HTTP {resp.status_code}, {resp.text[:200]}")
if data.get("code") not in (0, 200):
raise RuntimeError(data.get("msg") or data.get("message") or data.get("info") or "抽奖端登录失败")
token = extract_lottery_token(data)
if not token:
raise RuntimeError("抽奖端登录成功但未返回 accessToken/token请开 DEBUG=true 查看响应字段")
return token
def lottery_get_json(url: str, token: str, ua: str) -> Dict[str, Any]:
resp = requests.get(url, headers=lottery_headers(token, ua), timeout=TIMEOUT, verify=VERIFY_SSL)
data = safe_json_response(resp)
if data is None:
return {"code": -1, "msg": f"JSON解析失败: HTTP {resp.status_code}; body={resp.text[:200]}"}
return data
def lottery_post_json(url: str, token: str, ua: str, body: Dict[str, Any]) -> Dict[str, Any]:
resp = requests.post(url, headers=lottery_headers(token, ua), json=body, timeout=TIMEOUT, verify=VERIFY_SSL)
data = safe_json_response(resp)
if data is None:
return {"code": -1, "msg": f"JSON解析失败: HTTP {resp.status_code}; body={resp.text[:200]}"}
return data
def normalize_activity_list(payload: Any) -> List[Dict[str, Any]]:
"""兼容活动列表返回结构data 可能是 list也可能包在 records/list/items/rows 里。"""
if isinstance(payload, list):
return [x for x in payload if isinstance(x, dict)]
if isinstance(payload, dict):
for key in ("records", "list", "items", "rows", "activityList", "data"):
value = payload.get(key)
if isinstance(value, list):
return [x for x in value if isinstance(x, dict)]
if isinstance(value, dict):
nested = normalize_activity_list(value)
if nested:
return nested
return []
def get_activity_id(token: str, ua: str) -> Optional[Any]:
if LOTTERY_ACTIVITY_ID:
if str(LOTTERY_ACTIVITY_ID).startswith("PI"):
warn("当前 LOTTERY_ACTIVITY_ID 看起来是酒馆签到端 promotionId不是浓友购 activityId将忽略它并尝试自动抓取活动")
else:
log(f"使用手动抽奖/活动ID: {LOTTERY_ACTIVITY_ID}")
return LOTTERY_ACTIVITY_ID
data = lottery_get_json(LOTTERY_ACTIVITY_API, token, ua)
debug("浓友购活动列表响应", data)
if data.get("code") not in (0, 200):
warn(f"活动列表请求失败: {data.get('msg') or data.get('message') or data.get('info') if isinstance(data, dict) else data}")
return None
activities = normalize_activity_list(data.get("data"))
if not activities:
warn("浓友购活动列表为空或结构未识别;请用 DEBUG=true 查看响应,或设置 LOTTERY_ACTIVITY_ID 手动指定")
if not DEBUG:
print("活动列表原始响应预览:")
try:
print(json.dumps(data, ensure_ascii=False)[:800])
except Exception:
print(str(data)[:800])
return None
def get_title(act: Dict[str, Any]) -> str:
return str(act.get("title") or act.get("name") or act.get("activityName") or act.get("activityTitle") or "")
def get_id(act: Dict[str, Any]) -> Any:
return act.get("id") or act.get("activityId") or act.get("activity_id") or act.get("actId")
print(f"📋 抽奖活动列表 ({len(activities)} 个):")
for i, act in enumerate(activities[:10], 1):
print(f" {i}. {get_title(act) or '无标题'} ID={get_id(act)} 状态={act.get('status')}")
lottery_activities = [
act for act in activities
if ("抽奖" in get_title(act) or "转盘" in get_title(act) or "红包" in get_title(act) or "积分" in get_title(act))
and (act.get("status") in (None, 0, 1, "0", "1", True))
and get_id(act)
]
chosen = lottery_activities[0] if lottery_activities else next((act for act in activities if get_id(act)), None)
if not chosen:
warn("活动列表里没有可用 ID请设置 LOTTERY_ACTIVITY_ID 手动指定")
return None
activity_id = get_id(chosen)
log(f"抽奖活动: {get_title(chosen) or '无标题'} ID={activity_id}")
return activity_id
def parse_integral_value(data: Dict[str, Any]) -> Optional[int]:
payload = data.get("data") or {}
integral = payload.get("integral") or payload.get("point") or payload.get("points") or payload.get("balance")
try:
return int(float(integral))
except Exception:
return None
def get_integral_detail(token: str, ua: str) -> Dict[str, Any]:
data = lottery_get_json(LOTTERY_INTEGRAL_API, token, ua)
debug("积分响应", data)
integral = parse_integral_value(data) if isinstance(data, dict) else None
if not isinstance(data, dict):
return {"ok": False, "integral": None, "message": "积分响应不是 JSON", "auth_invalid": False}
if data.get("code") not in (0, 200):
msg = str(data.get("msg") or data.get("message") or data.get("info") or "积分请求失败")
auth_invalid = any(k in msg for k in ["未登录", "登录已过期", "token", "Token", "认证", "授权"])
warn(f"积分请求失败: {msg}")
return {"ok": False, "integral": integral, "message": msg, "auth_invalid": auth_invalid}
if integral is None:
msg = str(data.get("msg") or data.get("message") or data.get("info") or "未找到积分字段")
return {"ok": False, "integral": None, "message": msg, "auth_invalid": False}
return {"ok": True, "integral": integral, "message": "", "auth_invalid": False}
def get_integral(token: str, ua: str) -> Optional[int]:
detail = get_integral_detail(token, ua)
return detail.get("integral") if detail.get("ok") else None
def receive_cash(record_id: Any, token: str, ua: str) -> str:
if not LOTTERY_RECEIVE_API:
return "未配置红包领取接口,跳过自动领取"
data = lottery_post_json(LOTTERY_RECEIVE_API, token, ua, {"id": record_id})
debug("红包领取响应", data)
if data.get("code") in (0, 200):
return "红包领取成功"
return f"红包领取失败:{data.get('msg') or data.get('message') or data.get('info') or '未知错误'}"
def parse_prize_value(prize_name: str) -> Dict[str, float]:
text = str(prize_name or "")
money = 0.0
points = 0.0
money_matches = re.findall(r"(\d+(?:\.\d+)?)\s*元", text)
if money_matches:
try:
money = sum(float(x) for x in money_matches)
except Exception:
money = 0.0
point_matches = re.findall(r"(\d+(?:\.\d+)?)\s*积分", text)
if point_matches:
try:
points = sum(float(x) for x in point_matches)
except Exception:
points = 0.0
return {"money": money, "points": points}
def find_first_key(obj: Any, keys: set) -> Any:
if isinstance(obj, dict):
for k, v in obj.items():
if k in keys and v not in (None, ""):
return v
for v in obj.values():
found = find_first_key(v, keys)
if found not in (None, ""):
return found
elif isinstance(obj, list):
for item in obj:
found = find_first_key(item, keys)
if found not in (None, ""):
return found
return None
def draw_once(activity_id: Any, token: str, ua: str) -> Dict[str, Any]:
draw_url = f"{LOTTERY_DRAW_API}?{urlencode({'activityId': activity_id})}"
data = lottery_post_json(draw_url, token, ua, {"activityId": str(activity_id)})
debug("抽奖响应", data)
if data.get("code") in (0, 200):
body = data.get("data") or {}
prize_name = (
find_first_key(body, {"prizeName", "prize_name", "name", "title", "rewardName", "couponName"})
or "抽奖成功"
)
draw_id = find_first_key(body, {"id", "recordId", "record_id", "drawRecordId", "receiveId", "prizeRecordId"})
receive_msg = ""
if draw_id:
receive_msg = receive_cash(draw_id, token, ua)
return {"status": "success", "prize": str(prize_name), "receive_msg": receive_msg}
info = data.get("msg") or data.get("message") or data.get("info") or "未知错误"
if "积分" in str(info) and ("不足" in str(info) or "余额" in str(info)):
return {"status": "insufficient", "message": info}
return {"status": "failed", "message": info}
def run_lottery_task(acc: Account) -> Dict[str, Any]:
cache_item = cache_get_account(acc.wxid)
token = cache_item.get("lottery_token") if is_cache_fresh(cache_item, "lottery_token") else ""
if token:
probe_detail = get_integral_detail(token, acc.ua)
if probe_detail.get("ok"):
ok(f"{acc.remark} 抽奖端使用缓存 token")
elif probe_detail.get("auth_invalid"):
token = ""
warn(f"{acc.remark} 抽奖端缓存 token 已失效,重新登录")
else:
ok(f"{acc.remark} 抽奖端缓存 token 可用,但当前积分不可用:{probe_detail.get('message')}")
if not token:
code = get_mini_program_code(acc.wxid, LOTTERY_APPID)
ok(f"{acc.remark} 抽奖端获取 code 成功: {mask(code)}")
token = lottery_login(code, acc.ua)
cache_update_account(acc.wxid, {"lottery_token": token, "lottery_token_ts": int(time.time()), "ua": acc.ua, "remark": acc.remark})
acc.lottery_token = token
ok(f"{acc.remark} 抽奖端登录成功,开始抽奖")
activity_id = get_activity_id(token, acc.ua)
if not activity_id:
return {"success": False, "lottery_msgs": [], "draw_records": [], "integral": None, "message": "未找到有效抽奖活动ID"}
lottery_msgs: List[str] = []
draw_records: List[Dict[str, Any]] = []
consecutive_failures = 0
draw_count = 0
total_cost = 0
total_prize_points = 0.0
total_prize_money = 0.0
start_integral: Optional[int] = None
last_integral: Optional[int] = None
first_detail = get_integral_detail(token, acc.ua)
if not first_detail.get("ok"):
lottery_msgs.append(f"积分不可用或账号不是品牌会员,停止抽奖:{first_detail.get('message') or ''}".strip(""))
return {
"success": True,
"lottery_msgs": lottery_msgs,
"draw_records": draw_records,
"integral": first_detail.get("integral"),
"start_integral": first_detail.get("integral"),
"total_cost": 0,
"total_prize_points": 0,
"total_prize_money": 0.0,
"draw_count": 0,
"activity_id": activity_id,
}
start_integral = int(first_detail["integral"])
last_integral = start_integral
log(f"{acc.remark} 签到后/抽奖前积分:{start_integral}")
if start_integral < LOTTERY_COST:
lottery_msgs.append(f"当前积分 {start_integral},不足 {LOTTERY_COST},停止抽奖")
return {
"success": True,
"lottery_msgs": lottery_msgs,
"draw_records": draw_records,
"integral": start_integral,
"start_integral": start_integral,
"total_cost": 0,
"total_prize_points": 0,
"total_prize_money": 0.0,
"draw_count": 0,
"activity_id": activity_id,
}
while consecutive_failures < MAX_LOTTERY_FAILS and draw_count < LOTTERY_DAILY_LIMIT:
before_integral = last_integral
if before_integral is None:
detail = get_integral_detail(token, acc.ua)
if not detail.get("ok"):
lottery_msgs.append(f"积分不可用或账号不是品牌会员,停止抽奖:{detail.get('message') or ''}".strip(""))
break
before_integral = int(detail["integral"])
last_integral = before_integral
if before_integral < LOTTERY_COST:
lottery_msgs.append(f"当前积分 {before_integral},不足 {LOTTERY_COST},停止抽奖")
break
draw = draw_once(activity_id, token, acc.ua)
if draw["status"] == "success":
consecutive_failures = 0
draw_count += 1
prize_name = str(draw.get("prize") or "未知奖励")
prize_value = parse_prize_value(prize_name)
after_detail = get_integral_detail(token, acc.ua)
after_integral = after_detail.get("integral") if after_detail.get("ok") else None
cost = LOTTERY_COST if after_integral is None else max(0, before_integral + int(prize_value.get("points") or 0) - int(after_integral))
total_cost += int(cost)
total_prize_points += float(prize_value.get("points") or 0)
total_prize_money += float(prize_value.get("money") or 0)
last_integral = int(after_integral) if after_integral is not None else None
record = {
"index": draw_count,
"before_integral": before_integral,
"cost": int(cost),
"prize": prize_name,
"prize_points": float(prize_value.get("points") or 0),
"prize_money": float(prize_value.get("money") or 0),
"after_integral": after_integral,
"receive_msg": draw.get("receive_msg") or "",
}
draw_records.append(record)
msg = (
f"{draw_count}/{LOTTERY_DAILY_LIMIT}次抽奖:抽前积分 {before_integral}"
f"消耗 {int(cost)},抽中:{prize_name},中奖后积分 {after_integral if after_integral is not None else '未知'}"
)
if draw.get("receive_msg"):
msg += f"{draw.get('receive_msg')}"
lottery_msgs.append(msg)
ok(f"{acc.remark} {msg}")
elif draw["status"] == "insufficient":
lottery_msgs.append(f"积分不足:{draw.get('message')}")
break
else:
message = str(draw.get('message') or '')
if "次数已达上限" in message or "今日参与次数" in message or "已达上限" in message:
msg = f"抽奖停止:{message}"
lottery_msgs.append(msg)
warn(f"{acc.remark} {msg}")
break
consecutive_failures += 1
msg = f"抽奖失败({consecutive_failures}/{MAX_LOTTERY_FAILS}){message or '未知错误'}"
lottery_msgs.append(msg)
warn(f"{acc.remark} {msg}")
if draw_count >= LOTTERY_DAILY_LIMIT:
limit_msg = f"已达到脚本设置的每日最多参与 {LOTTERY_DAILY_LIMIT} 次,停止抽奖"
lottery_msgs.append(limit_msg)
log(f"{acc.remark} {limit_msg}")
break
delay = random.randint(LOTTERY_DELAY_MIN, LOTTERY_DELAY_MAX)
time.sleep(delay)
return {
"success": True,
"lottery_msgs": lottery_msgs,
"draw_records": draw_records,
"integral": last_integral,
"start_integral": start_integral,
"total_cost": total_cost,
"total_prize_points": total_prize_points,
"total_prize_money": total_prize_money,
"draw_count": draw_count,
"activity_id": activity_id,
}
# ===================== 主流程 =====================
def process_account(acc: Account, index: int, total: int) -> Dict[str, Any]:
acc.ua = get_cached_ua(acc.wxid)
print(f"\n--- 账号 {index}/{total}: {acc.remark} ({mask(acc.wxid)}) ---")
log(f"{acc.remark} 使用固定 UA: {mask(acc.ua, 24, 18)}")
result: Dict[str, Any] = {"remark": acc.remark, "wxid": acc.wxid, "success": True, "ua": acc.ua}
if TASK_MODE in {"all", "sign"}:
try:
result["sign"] = run_sign_task(acc)
except Exception as e:
warn(f"{acc.remark} 签到任务失败: {e}")
if DEBUG:
traceback.print_exc()
result["success"] = False
result["sign"] = {"success": False, "sign_msg": str(e)}
if TASK_MODE in {"all", "lottery"}:
try:
result["lottery"] = run_lottery_task(acc)
except Exception as e:
warn(f"{acc.remark} 抽奖任务失败: {e}")
if DEBUG:
traceback.print_exc()
result["success"] = False
result["lottery"] = {"success": False, "lottery_msgs": [], "message": str(e)}
return result
def should_skip_account(acc: Account) -> bool:
if NWDJG_WXIDS and acc.wxid in NWDJG_WXIDS:
return True
if SINGLE_TEST_WXID and acc.wxid != SINGLE_TEST_WXID:
return True
return False
def load_accounts() -> List[Account]:
accounts = get_accounts_from_manual() if ACCOUNT_SOURCE == "manual" else get_accounts_from_cloud()
if NWDJG_WXIDS:
accounts = [a for a in accounts if a.wxid not in NWDJG_WXIDS]
log(f"已排除 {len(NWDJG_WXIDS)} 个 wxid剩余 {len(accounts)}")
if SINGLE_TEST_WXID:
accounts = [a for a in accounts if a.wxid == SINGLE_TEST_WXID]
log(f"单号测试 wxid={SINGLE_TEST_WXID},匹配 {len(accounts)}")
return accounts
def iter_selected_accounts():
"""逐个产生账号。cloud 模式默认 pageSize=1处理完一个才取下一页。"""
if ACCOUNT_SOURCE == "manual":
accounts = get_accounts_from_manual()
selected = [a for a in accounts if not should_skip_account(a)]
total = len(selected)
for idx, acc in enumerate(selected, 1):
yield acc, idx, total
return
matched = 0
for acc, seen, total in iter_accounts_from_cloud():
if should_skip_account(acc):
continue
matched += 1
yield acc, seen if not SINGLE_TEST_WXID else matched, total if not SINGLE_TEST_WXID else matched
if SINGLE_TEST_WXID and acc.wxid == SINGLE_TEST_WXID:
break
def build_report(results: List[Dict[str, Any]]) -> str:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
elapsed = time.time() - RUN_START_TS
total = len(results)
success_count = sum(1 for r in results if r.get("success"))
sign_ok = sum(1 for r in results if (r.get("sign") or {}).get("success"))
draw_total = sum(int((r.get("lottery") or {}).get("draw_count") or 0) for r in results)
total_cost = sum(int((r.get("lottery") or {}).get("total_cost") or 0) for r in results)
total_prize_points = sum(float((r.get("lottery") or {}).get("total_prize_points") or 0) for r in results)
total_prize_money = sum(float((r.get("lottery") or {}).get("total_prize_money") or 0) for r in results)
lines = [
"",
"==================================================",
"📊 执行汇总",
"==================================================",
f"⏱️ 执行时间: {now}",
f"👥 总账号数: {total}",
f"✅ 成功账号: {success_count}",
f"📝 签到成功: {sign_ok}",
f"🎰 总成功抽奖: {draw_total}",
f"📱 总消耗积分: {total_cost}",
f"📈 总中奖积分: {int(total_prize_points)}",
f"💰 总中奖金额: {total_prize_money:.2f}",
f"🎯 单号抽奖上限: {LOTTERY_DAILY_LIMIT}",
f"🧩 单次抽奖消耗: {LOTTERY_COST}积分",
f"📦 缓存文件: {CACHE_FILE}",
f"🧭 UA数量: {len(UA_LIST)}",
"",
"📋 账号详情:",
]
for idx, r in enumerate(results, 1):
remark = r.get("remark") or mask(r.get("wxid"))
l = r.get("lottery") or {}
draw_records = l.get("draw_records") or []
if not isinstance(draw_records, list):
draw_records = []
draw_count = int(l.get("draw_count") or 0)
money = float(l.get("total_prize_money") or 0)
points = float(l.get("total_prize_points") or 0)
prizes = [str(x.get("prize") or "未知奖励") for x in draw_records] or [""]
icon = "" if draw_count > 0 else ""
lines.append(
f"{icon} 账号{idx}: {remark} | 抽奖: {draw_count}次 | "
f"消耗: {int(l.get('total_cost') or 0)}积分 | 中奖积分: {int(points)} | "
f"金额: {money:.2f}元 | 奖品: {', '.join(prizes)}"
)
if l:
lines.append(f" ├─ 签到后/抽奖前积分: {l.get('start_integral') if l.get('start_integral') is not None else '-'}")
for rec in draw_records:
lines.append(
f" ├─ 第{rec.get('index')}次: 抽前 {rec.get('before_integral')}"
f"消耗 {rec.get('cost')},奖品 {rec.get('prize')}"
f"中奖积分 +{int(float(rec.get('prize_points') or 0))}"
f"中奖金额 +{float(rec.get('prize_money') or 0):.2f}元,"
f"中奖后积分 {rec.get('after_integral') if rec.get('after_integral') is not None else '-'}"
)
if l.get("integral") is not None:
lines.append(f" └─ 最后积分: {l.get('integral')}")
else:
stop_msg = (l.get("lottery_msgs") or [l.get("message") or "无抽奖结果"])[-1]
lines.append(f" └─ 结果: {stop_msg}")
if "sign" in r:
s = r.get("sign") or {}
lines.append(f" 📝 签到: {s.get('sign_msg') or s.get('message') or '无结果'}")
lines.append(f"=== 执行结束 [{now}] 耗时 {elapsed:.2f} 秒 退出码 0 ===")
return "\n".join(lines)
def write_run_log(report: str) -> Path:
log_dir = Path(__file__).with_name("logs")
log_dir.mkdir(parents=True, exist_ok=True)
path = log_dir / f"nw_jg_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
content = "\n".join(SESSION_LOG_LINES) + "\n" + report + "\n"
path.write_text(content, encoding="utf-8")
return path
def main() -> None:
print("========== 浓五酒馆 - 养鸡场自动登录任务一体版 ==========")
log(f"TASK_MODE={TASK_MODE}")
log(f"WX_CLOUD={WX_CLOUD or '(未设置)'}")
log(f"ACCOUNT_SOURCE={ACCOUNT_SOURCE}")
log(f"SIGN_APPID={SIGN_APPID}")
log(f"LOTTERY_APPID={LOTTERY_APPID}")
ensure_config()
print("\n=== 任务开始 ===")
results: List[Dict[str, Any]] = []
processed = 0
for acc, i, total in iter_selected_accounts():
if processed == 0:
print(f"账号总数: {total if not SINGLE_TEST_WXID else 1}")
processed += 1
results.append(process_account(acc, i if not SINGLE_TEST_WXID else processed, total if not SINGLE_TEST_WXID else 1))
if DELAY > 0:
time.sleep(DELAY)
if not results:
raise RuntimeError("没有可处理账号")
report = build_report(results)
print(report)
log_path = write_run_log(report)
print(f"\n🧾 日志文件:{log_path}")
print("\n=== 任务完成 ===")
if __name__ == "__main__":
try:
main()
except Exception as e:
err("主流程错误:", e)
if DEBUG:
traceback.print_exc()
sys.exit(1)