From 845c08c4121d63985dc016f6e870d8f1fc112424 Mon Sep 17 00:00:00 2001 From: admin <362324317@qq.com> Date: Wed, 20 May 2026 20:39:59 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20WX=5FApplet/Applet=5FWLJi.?= =?UTF-8?q?py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- WX_Applet/Applet_WLJi.py | 1278 ++++++++++++++++++++++++-------------- 1 file changed, 808 insertions(+), 470 deletions(-) diff --git a/WX_Applet/Applet_WLJi.py b/WX_Applet/Applet_WLJi.py index 185b0f3..85e7dd8 100644 --- a/WX_Applet/Applet_WLJi.py +++ b/WX_Applet/Applet_WLJi.py @@ -5,50 +5,51 @@ """ 青龙面板微信协议自动化脚本 - 王老吉扫码抽奖 账号从环境变量 WLJ_ID 获取,格式:备注#wxid#手机号,多行分割 -码字从同目录 WLJ_MZ.txt 文件中获取,每行一个码,用完自动删除 -抽奖参数从环境变量 WLJ_VT 获取,格式:上限#最大用码#最小间隔#最大间隔#成本 +码字从同目录的 WLJ_MZ.txt 中获取 """ import os import sys -import io -import time import json -import random -import hashlib +import time +import io import requests import logging -from typing import Optional, List, Dict, Any +import random +import re +import hashlib +from datetime import datetime +from typing import List, Dict, Any, Optional from urllib.parse import urlencode # ==================== 养鸡场配置 ==================== +# 养鸡场服务地址 WX_CLOUD = os.getenv('wx_cloud', '') + +# 养鸡场认证Token (需要配置) AUTH_TOKEN = os.getenv('wx_token', '') +# ==================== 码字文件配置 ==================== +# 码字文件路径(与脚本同目录) +CODE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "WLJ_MZ.txt") + # ==================== 代理配置 ==================== +# 代理API地址(返回格式:ip:端口),留空则不使用代理 PROXY_DEFAULT = "" +# 青龙变量 pgdl(如果有值且脚本默认也为空时使用) PROXY_API_URL = "" + +# 是否使用多账号代理(True=每个账号使用不同代理,False=不使用代理) USE_PROXY = True -# ==================== 抽奖参数(从环境变量 WLJ_VT 整合) ==================== -WLJ_VT_STR = os.getenv('WLJ_VT', '2#5#5#10#0.34') -try: - parts = WLJ_VT_STR.split('#') - DAILY_LOTTERY_LIMIT = int(parts[0]) - MAX_CODE_ATTEMPTS = int(parts[1]) - LOTTERY_INTERVAL_MIN = float(parts[2]) - LOTTERY_INTERVAL_MAX = float(parts[3]) - LOTTERY_CODE_PRICE = float(parts[4]) -except: - DAILY_LOTTERY_LIMIT = 2 - MAX_CODE_ATTEMPTS = 5 - LOTTERY_INTERVAL_MIN = 5 - LOTTERY_INTERVAL_MAX = 10 - LOTTERY_CODE_PRICE = 0.34 - # ==================== 账号配置 ==================== +# 是否从养鸡场API获取账号 → 固定关闭 +USE_API_ACCOUNTS = False + +# 手动配置的账号列表 → 从环境变量 WLJ_ID 读取 WX_LIST_MANUAL = os.getenv("WLJ_ID", "") +# 需要剔除的wxid列表 REMOVE_WXIDS = [ "wxid_x4nz2s4th45k22", "wxid_fog306otfw9q22", @@ -56,42 +57,63 @@ REMOVE_WXIDS = [ "wxid_5jtncnh3v8ud2", ] +# 账号间延迟(秒) DELAY_BETWEEN_ACCOUNTS = 3 + +# 小程序AppID(王老吉) WX_APPID = "wxd25dc8ba975776e3" +# ==================== 抽奖配置 ==================== +# 每账号最大成功抽奖次数 +DAILY_LOTTERY_LIMIT = 1 + +# 每账号最大使用码次数(包括成功和失败的码) +MAX_CODE_ATTEMPTS = 4 + +# 抽奖间隔时间(秒) +LOTTERY_INTERVAL_MIN = 5 +LOTTERY_INTERVAL_MAX = 15 + +# 最大重试次数 +MAX_RETRY_COUNT = 3 + +# 有奖码统一价格(元) +LOTTERY_CODE_PRICE = 0.34 + # ==================== API配置 ==================== WLJ_BASE_URL = "https://wechatec.brand.wljhealth.com" S3_BASE_URL = "https://s3.lsa0.cn" +# 固定参数(从HAR抓包获取) POSSESSOR = "50c7d6a87202429a9871bf61ec85ad99" MALL_CODE = "wxd25dc8ba975776e3" SALE_CHANNEL = "mall" # ==================== 位置配置 ==================== -SCAN_LOCATIONS = [ - ("28.228521", "112.939423"), - ("28.364827", "112.815102"), - ("28.157439", "113.632571"), - ("28.261334", "112.548920"), - ("27.832155", "113.158607"), - ("27.672908", "113.497244"), - ("27.869672", "112.912388"), - ("27.538421", "112.287695"), - ("27.751183", "112.503816"), - ("28.358012", "112.196543"), - ("28.498775", "112.221098"), - ("28.487346", "113.032817"), - ("28.683512", "112.869411"), - ("27.438907", "111.594236"), -] +# 扫码位置列表(经纬度),按顺序轮换 +SCAN_LOCATIONS = [ +    ("28.228521", "112.939423"),  # 湖南长沙 +    ("28.364827", "112.815102"),  # 望城 +    ("28.157439", "113.632571"),  # 浏阳 +    ("28.261334", "112.548920"),  # 宁乡 +    ("27.832155", "113.158607"),  # 株洲 +    ("27.672908", "113.497244"),  # 醴陵 +    ("27.869672", "112.912388"),  # 湘潭 +    ("27.538421", "112.287695"),  # 韶山 +    ("27.751183", "112.503816"),  # 湘乡 +    ("28.358012", "112.196543"),  # 益阳 +    ("28.498775", "112.221098"),  # 沅江 +    ("28.487346", "113.032817"),  # 汨罗 +    ("28.683512", "112.869411"),  # 湘阴 +    ("27.438907", "111.594236"),  # 娄底 +] -# ==================== 码字文件路径 ==================== -CODE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "WLJ_MZ.txt") - -# ==================== 调试模式 ==================== +# ==================== 调试配置 ==================== +# 是否显示调试信息 True 开启 False 关闭 DEBUG_MODE = False -# ==================== User-Agent 轮换池 ==================== + +# ==================== User-Agent轮换池 ==================== USER_AGENTS = [ "Mozilla/5.0 (Linux; Android 14; HUAWEI Mate 60 RS Build/Mate60RS; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/142.0.7444.173 Mobile Safari/537.36 XWEB/1420273 MMWEBSDK/20250201 MMWEBID/935 MicroMessenger/8.0.60.2860(0x28003C55) WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android", "Mozilla/5.0 (Linux; Android 14; Honor Magic V2 Build/MagicV2; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/142.0.7444.173 Mobile Safari/537.36 XWEB/1420273 MMWEBSDK/20250201 MMWEBID/870 MicroMessenger/8.0.60.2860(0x28003C55) WeChat/arm64 Weixin NetType/4G Language/zh_CN ABI/arm64 MiniProgramEnv/android", @@ -163,13 +185,16 @@ DEVICE_MODELS = [ "Xiaomi-RedmiK60", ] -# -------------------- 日志配置 -------------------- + +# 自定义日志格式 class SimpleFormatter(logging.Formatter): def format(self, record): return record.getMessage() + handler = logging.StreamHandler(io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")) handler.setFormatter(SimpleFormatter()) + logging.basicConfig(level=logging.INFO, handlers=[handler]) logger = logging.getLogger(__name__) @@ -182,42 +207,45 @@ class WangLaoJiAutomation: self.delay_between_accounts = DELAY_BETWEEN_ACCOUNTS self.lottery_interval_min = LOTTERY_INTERVAL_MIN self.lottery_interval_max = LOTTERY_INTERVAL_MAX + # 待使用的码字(失败后顺延,下次优先使用) self.pending_code = None - - # 代理 + # 运行时读取代理配置 env_proxy = os.getenv('pgdl', '') if PROXY_DEFAULT: self.proxy_url = PROXY_DEFAULT + logger.info(f"🌐 代理来源: 脚本配置") + if env_proxy: + logger.info(f"🌐 青龙变量 pgdl 已设置但未使用") elif env_proxy: self.proxy_url = env_proxy + logger.info(f"🌐 代理来源: 青龙变量 pgdl") else: self.proxy_url = "" - self.use_proxy = USE_PROXY and bool(self.proxy_url) + logger.info(f"⚠️ 未配置代理(脚本和青龙变量都为空)") + self.use_proxy = USE_PROXY self.current_proxy = None self.codes_exhausted = False - self.scan_loc_index = 0 - self.ua_index = 0 - self.ssl_error_count = 0 - - # 码字从文件加载 - self.code_list = self.read_codes_from_file() - + self.scan_loc_index = 0 # 位置轮换索引 + self.ua_index = 0 # UA轮换索引 + self.ssl_error_count = 0 # SSL错误计数,连续2次则重新获取IP self.load_config() self.setup_clients() - logger.info("🏮 王老吉扫码抽奖启动") - logger.info(f"👥 账号: {len(self.wx_accounts)} | 🎯 每号抽奖: {self.daily_lottery_limit} 次 | 🔢 码字剩余: {len(self.code_list)}") - if self.use_proxy: - logger.info(f"🌐 代理已启用") + logger.info("🏮 王老吉扫码抽奖 - 自动化脚本启动") + logger.info(f"👥 账号数量: {len(self.wx_accounts)}") + logger.info(f"🎯 每账号执行: {self.daily_lottery_limit} 次抽奖") + logger.info(f"📄 码字文件: {CODE_FILE}") + if self.use_proxy and self.proxy_url: + logger.info(f"🌐 代理模式: 开启") - # ==================== 码字文件管理 ==================== + # ==================== 码字文件操作 ==================== def read_codes_from_file(self) -> List[str]: - """从 WLJ_MZ.txt 读取所有码字""" + """从文件读取所有码字""" try: if os.path.exists(CODE_FILE): with open(CODE_FILE, 'r', encoding='utf-8') as f: - codes = [line.strip() for line in f if line.strip()] - return codes + lines = [line.strip() for line in f if line.strip()] + return lines else: logger.error(f"❌ 码字文件不存在: {CODE_FILE}") return [] @@ -225,91 +253,111 @@ class WangLaoJiAutomation: logger.error(f"❌ 读取码字文件失败: {e}") return [] - def remove_code_from_file(self, code: str): - """从 WLJ_MZ.txt 中删除指定码字""" + def remove_used_code(self, code: str) -> bool: + """从文件中删除已使用的码字""" try: codes = self.read_codes_from_file() if code in codes: codes.remove(code) with open(CODE_FILE, 'w', encoding='utf-8') as f: f.write('\n'.join(codes)) - logger.info(f"✅ 已从文件删除码字,剩余 {len(codes)} 个") + logger.info(f"✅ 已从文件删除码字: {code} (剩余: {len(codes)} 个)") + return True else: logger.warning(f"⚠️ 码字不在文件中: {code}") + return False except Exception as e: logger.error(f"❌ 删除码字失败: {e}") + return False - def get_scan_code(self) -> Optional[str]: - """获取一个码字(优先待重试的,其次从列表第一个取)""" - if self.pending_code: - code = self.pending_code + def get_pending_code(self) -> Optional[str]: + """获取待重试的码字""" + code = self.pending_code + if code: + logger.info(f"🔄 复用上次失败的码字: {code}") self.pending_code = None - logger.info(f"🔄 复用失败码: {code}") - return code - - if not self.code_list: - self.codes_exhausted = True - return None - - code = self.code_list[0] - logger.info(f"📄 获取码字: {code} (剩余 {len(self.code_list)-1})") return code - def commit_scan_code(self, code: str): - """确认码字使用成功,从列表和文件中删除""" - if code in self.code_list: - self.code_list.remove(code) - self.remove_code_from_file(code) - def set_pending_code(self, code: str): - """将失败码字设置为下次优先使用(不删除)""" - self.pending_code = code - logger.warning(f"⚠️ 码字失败,顺延下次: {code}") + """设置待重试的码字(失败时保留)""" + if code: + logger.warning(f"⚠️ 码字失败,顺延下次使用: {code}") + self.pending_code = code - # ==================== 代理、位置、UA ==================== + def commit_scan_code(self, code: str): + """确认码字使用成功,从文件删除""" + self.remove_used_code(code) + + # ==================== 代理相关 ==================== def get_proxy(self) -> Optional[str]: + """获取代理""" if not self.proxy_url: return None try: - resp = requests.get(self.proxy_url, timeout=10) - if resp.status_code == 200: - return resp.text.strip() + response = requests.get(self.proxy_url, timeout=10) + if response.status_code == 200: + try: + data = response.json() + if isinstance(data, dict) and "data" in data: + proxy_list = data["data"] + if isinstance(proxy_list, list) and len(proxy_list) > 0: + first_proxy = proxy_list[0] + ip = first_proxy.get("ip") + port = first_proxy.get("port") + if ip and port: + proxy = f"{ip}:{port}" + logger.info(f"🌐 获取代理成功: {proxy}") + return proxy + except: + pass + proxy = response.text.strip() + if proxy: + logger.info(f"🌐 获取代理成功: {proxy}") + return proxy except Exception as e: logger.error(f"🌐 获取代理失败: {e}") return None - def get_scan_location(self): + def get_scan_location(self) -> tuple: + """获取轮换的扫码位置""" lat, lon = SCAN_LOCATIONS[self.scan_loc_index % len(SCAN_LOCATIONS)] self.scan_loc_index += 1 return lat, lon - def get_ua(self): + def get_ua(self) -> str: + """获取轮换的User-Agent""" ua = USER_AGENTS[self.ua_index % len(USER_AGENTS)] self.ua_index += 1 return ua - def get_proxies(self, proxy): + def get_proxies(self, proxy: Optional[str] = None) -> Optional[dict]: + """获取代理字典格式""" if proxy: return {"http": f"http://{proxy}", "https": f"http://{proxy}"} return None @staticmethod def is_ssl_error(error_str: str) -> bool: - return any(k in error_str.lower() for k in ['ssl', 'eof', 'ssl.c:', 'ssl_eof']) + """检测是否为SSL错误""" + ssl_keywords = ['ssl', 'eof', 'SSLEOF', 'EOF occurred', 'ssl.c:', 'SSL'] + return any(keyword in error_str.lower() for keyword in ssl_keywords) - def refresh_proxy_ip(self): - self.ssl_error_count = 0 - new = self.get_proxy() - if new: - logger.info(f"🔄 SSL错误,更换IP: {new}") - return new + def refresh_proxy_ip(self) -> Optional[str]: + """重新获取代理IP""" + self.ssl_error_count = 0 # 重置计数 + new_proxy = self.get_proxy() + if new_proxy: + logger.info(f"🔄 SSL错误达到2次,重新获取IP: {new_proxy}") + return new_proxy + logger.error(f"❌ 重新获取IP失败") return None - # ==================== 账号加载 ==================== def load_config(self): + """加载配置""" self.wx_accounts = self.parse_manual_accounts() def setup_clients(self): + """设置HTTP客户端""" self.session.headers.update({ "Content-Type": "application/x-www-form-urlencoded", "charset": "utf-8", @@ -317,13 +365,15 @@ class WangLaoJiAutomation: "Referer": "https://servicewechat.com/wxd25dc8ba975776e3/409/page-frame.html", }) - def parse_manual_accounts(self): + def parse_manual_accounts(self) -> List[Dict[str, str]]: + """解析从环境变量 WLJ_ID 读取的账号""" accounts = [] - text = os.getenv("WLJ_ID", "").strip() - if not text: - logger.error("❌ 环境变量 WLJ_ID 为空") + account_text = os.getenv("WLJ_ID", "").strip() + if not account_text: + logger.error("❌ 未配置环境变量 WLJ_ID") return accounts - for line in text.splitlines(): + + for line in account_text.splitlines(): line = line.strip() if not line: continue @@ -336,498 +386,786 @@ class WangLaoJiAutomation: "wxName": parts[0], }) accounts = [a for a in accounts if a.get("wxId") not in REMOVE_WXIDS] - logger.info(f"✅ 加载账号 {len(accounts)} 个") + logger.info(f"✅ 从 WLJ_ID 加载账号 {len(accounts)} 个") return accounts - # ==================== 签名 ==================== - def compute_sign(self, user_code: str) -> str: - secret = "cU9(yZ3{zD6!pE4.xX7#" - return hashlib.md5((user_code + secret).encode()).hexdigest().upper() - - # ==================== API调用 ==================== - def get_code_from_chicken_farm(self, wxid): + def get_code_from_chicken_farm(self, wxid: str = None) -> Optional[str]: + """从养鸡场获取微信code""" try: url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode" - headers = {"Authorization": AUTH_TOKEN, "Content-Type": "application/json"} - payload = {"wxid": wxid, "appid": WX_APPID} - resp = requests.post(url, json=payload, headers=headers, timeout=10) - data = resp.json() - if data.get("code") == 200 and data.get("data", {}).get("code"): - return data["data"]["code"] + headers = { + "Authorization": AUTH_TOKEN, + "Content-Type": "application/json", + } + + payload = { + "wxid": wxid if wxid else "", + "appid": WX_APPID + } + + if DEBUG_MODE: + logger.info(f"📤 养鸡场API: {url}") + logger.info(f"📤 请求体: {payload}") + + response = requests.post(url, json=payload, headers=headers, timeout=10) + response.raise_for_status() + res_data = response.json() + + if DEBUG_MODE: + logger.info(f"📥 养鸡场返回: {res_data}") + + if res_data.get("code") == 200 and res_data.get("data", {}).get("code"): + code = res_data["data"]["code"] + logger.info(f"✅ 获取微信code成功: {code}") + return code + else: + logger.warning(f"⚠️ 养鸡场无可用code") + return None + except Exception as e: logger.error(f"❌ 获取code失败: {e}") - return None + return None - def step1_wlj_login(self, code, proxy): + def get_scan_code(self) -> Optional[str]: + """获取扫码码(优先待重试的码,其次从文件读取)""" + # 1. 优先使用待重试的码字 + pending = self.get_pending_code() + if pending: + return pending + + # 2. 检查文件是否还有码字 + if self.codes_exhausted: + logger.warning(f"⚠️ 扫码码已用完,不再获取") + return None + + codes = self.read_codes_from_file() + if codes: + code = codes[0] + remaining = len(codes) - 1 + logger.info(f"✅ 从文件获取扫码码: {code} (剩余: {remaining} 个)") + return code + else: + logger.warning(f"⚠️ 文件中无码字") + self.codes_exhausted = True + return None + + @staticmethod + def compute_sign(user_code: str) -> str: + """计算sign: MD5(userCode + 'cU9(yZ3{zD6!pE4.xX7#')""" + secret = "cU9(yZ3{zD6!pE4.xX7#" + raw = (user_code or "") + secret + return hashlib.md5(raw.encode('utf-8')).hexdigest().upper() + + def step1_wlj_login(self, code: str, proxy: Optional[str] = None) -> Optional[Dict]: + """步骤1: 微信code换取wljhealth登录token""" url = f"{WLJ_BASE_URL}/userInfoMini/userMemberLogin" - sign = self.compute_sign("") - payload = { - "code": code, "possessor": POSSESSOR, - "userCode": "", "mallCode": MALL_CODE, - "saleChannel": SALE_CHANNEL, "sign": sign + ua = self.get_ua() + + # sign: MD5(userCode + secret), userCode初始为空 + user_code = "" + sign = self.compute_sign(user_code) + + data = { + "code": code, + "possessor": POSSESSOR, + "userCode": user_code, + "mallCode": MALL_CODE, + "saleChannel": SALE_CHANNEL, + "sign": sign, } - headers = {"User-Agent": self.get_ua(), "Content-Type": "application/x-www-form-urlencoded"} + proxies = self.get_proxies(proxy) + + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "charset": "utf-8", + "Accept-Encoding": "gzip, deflate, br", + "Referer": "https://servicewechat.com/wxd25dc8ba975776e3/409/page-frame.html", + "User-Agent": ua, + } + try: - resp = self.session.post(url, data=urlencode(payload).encode(), - headers=headers, proxies=self.get_proxies(proxy), timeout=10) - result = resp.json() + if DEBUG_MODE: + logger.info(f"📤 请求URL: {url}") + logger.info(f"📤 请求头: {headers}") + logger.info(f"📤 请求体: {data}") + + body = urlencode(data).encode('utf-8') + response = self.session.post(url, data=body, headers=headers, proxies=proxies, timeout=10) + response.raise_for_status() + + if DEBUG_MODE: + logger.info(f"📥 响应状态码: {response.status_code}") + logger.info(f"📥 响应体: {response.text}") + logger.info(f"📥 响应Set-Cookie: {response.headers.get('Set-Cookie', '无')}") + + result = response.json() + if result.get("success") and result.get("status") == 1: content = result["content"] - if isinstance(content, dict): - token = content.get("token") - user = content.get("user_summary", {}) - return { - "token": token, - "openid": user.get("openid"), - "userCode": user.get("userCode"), - "phone": user.get("phone") or user.get("userPhone"), - } - except Exception as e: - logger.error(f"❌ 登录异常: {e}") - return None + # content可能是字符串(错误信息)也可能是字典 + if isinstance(content, str): + logger.error(f"❌ 步骤1登录失败: {content}") + return None + if not isinstance(content, dict): + logger.error(f"❌ 步骤1登录失败: content类型异常 {type(content)}") + return None + token = content.get("token", "") + # user_summary 嵌套在里面 + user_summary = content.get("user_summary", {}) + openid = user_summary.get("openid", "") + user_code = user_summary.get("userCode", "") + user_nickname = user_summary.get("userNickname", "") + phone = user_summary.get("phone", "") or user_summary.get("userPhone", "") + logger.info(f"✅ 步骤1成功 - OpenID: {openid}, 昵称: {user_nickname}, 手机: {phone}") + return { + "token": token, + "openid": openid, + "userCode": user_code, + "userNickname": user_nickname, + "phone": phone, + } + else: + logger.error(f"❌ 步骤1登录失败: {result.get('msg')}") + return None - def step2_get_s3_token(self, wlj_token, serial_code, user_code, lat, lon, proxy): + except Exception as e: + logger.error(f"❌ 步骤1请求失败: {e}") + return None + + def step2_get_s3_token(self, wlj_token: str, serial_code: str, user_code: str, scan_lat: str, scan_lon: str, proxy: Optional[str] = None) -> Optional[str]: + """步骤2: 用wljtoken换取s3.lsa0.cn的token""" url = f"{WLJ_BASE_URL}/openapi/getToken" - payload = { - "serialCode": serial_code, "latitude": lat, "longitude": lon, - "possessor": POSSESSOR, "userCode": user_code, - "mallCode": MALL_CODE, "saleChannel": SALE_CHANNEL, - } - headers = {"Authorization": wlj_token, "User-Agent": self.get_ua()} - try: - resp = self.session.post(url, data=urlencode(payload).encode(), - headers=headers, proxies=self.get_proxies(proxy), timeout=10) - data = resp.json() - if data.get("success") and data.get("status") == 1: - return data["content"] - except Exception as e: - logger.error(f"❌ getToken失败: {e}") - return None + ua = self.get_ua() - def step3_s3_third_login(self, s3_token, proxy): + # HAR中getToken没有sign参数,userCode从step1获取 + form_data = { + "serialCode": serial_code, + "latitude": scan_lat, + "longitude": scan_lon, + "possessor": POSSESSOR, + "userCode": user_code, + "mallCode": MALL_CODE, + "saleChannel": SALE_CHANNEL, + } + proxies = self.get_proxies(proxy) + + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "charset": "utf-8", + "Accept-Encoding": "gzip, deflate, br", + "Referer": "https://servicewechat.com/wxd25dc8ba975776e3/409/page-frame.html", + "User-Agent": ua, + "Authorization": wlj_token, + } + + try: + if DEBUG_MODE: + logger.info(f"📤 请求URL: {url}") + logger.info(f"📤 请求头: {headers}") + logger.info(f"📤 请求体: {form_data}") + + body = urlencode(form_data).encode('utf-8') + response = self.session.post(url, data=body, headers=headers, proxies=proxies, timeout=10) + response.raise_for_status() + + if DEBUG_MODE: + logger.info(f"📥 实际请求Cookie: {self.session.cookies.get_dict()}") + logger.info(f"📥 响应体: {response.text}") + + result = response.json() + + if result.get("success") and result.get("status") == 1: + s3_token = result["content"] + logger.info(f"✅ 步骤2成功 - 获取s3 token") + return s3_token + else: + logger.error(f"❌ 步骤2获取s3token失败: {result.get('msg')}") + return None + + except Exception as e: + logger.error(f"❌ 步骤2请求失败: {e}") + return None + + def step3_s3_third_login(self, s3_token: str, proxy: Optional[str] = None) -> Optional[str]: + """步骤3: s3第三方登录,获取最终s3 Bearer token""" url = f"{S3_BASE_URL}/openapi/promotion/consumer/auth/thirdLogin" + + payload = { + "appid": WX_APPID, + "sign": s3_token, + } + proxies = self.get_proxies(proxy) + + ua = self.get_ua() headers = { "Authorization": f"Bearer {s3_token}", - "User-Agent": self.get_ua(), "Content-Type": "application/json", + "Origin": "https://s3.lsa0.cn", + "X-Requested-With": "com.tencent.mm", + "Referer": "https://s3.lsa0.cn/", + "Accept-Encoding": "gzip, deflate, br", + "User-Agent": ua, } - payload = {"appid": WX_APPID, "sign": s3_token} + try: - resp = requests.post(url, json=payload, headers=headers, - proxies=self.get_proxies(proxy), timeout=10) - data = resp.json() - if data.get("success") and data.get("code") == "200": - return data["data"]["token"], False + if DEBUG_MODE: + logger.info(f"📤 请求URL: {url}") + logger.info(f"📤 请求头: {headers}") + logger.info(f"📤 请求体: {payload}") + + response = requests.post(url, json=payload, headers=headers, proxies=proxies, timeout=10) + response.raise_for_status() + + if DEBUG_MODE: + logger.info(f"📥 响应体: {response.text}") + + result = response.json() + + if result.get("success") and result.get("code") == "200": + final_token = result["data"]["token"] + open_id = result["data"].get("openId", "") + logger.info(f"✅ 步骤3成功 - s3登录成功") + return final_token, False + else: + logger.error(f"❌ 步骤3 s3登录失败: {result.get('message')}") + return None, False + except Exception as e: - is_ssl = self.is_ssl_error(str(e)) - logger.error(f"❌ thirdLogin异常: {e}") + error_str = str(e) + logger.error(f"❌ 步骤3请求失败: {e}") + # 判断是否为SSL错误 + is_ssl = self.is_ssl_error(error_str) + if is_ssl: + logger.warning(f"⚠️ SSL错误,码字将顺延下次使用") return None, is_ssl - return None, False - def step4_scan_mask_code(self, s3_token, mask_code, lat, lon, proxy): + def step4_scan_mask_code(self, s3_token: str, mask_code: str, scan_lat: str, scan_lon: str, proxy: Optional[str] = None) -> tuple: + """步骤4: 扫码核销 返回 (结果, 是否代理错误, 是否每日上限)""" url = f"{S3_BASE_URL}/openapi/promotion/campaignExecute/scanMaskCode" + + payload = { + "maskCode": mask_code, + "lng": scan_lon, + "lat": scan_lat, + "lastScanResult": False, + } + proxies = self.get_proxies(proxy) + + ua = self.get_ua() headers = { "Authorization": f"Bearer {s3_token}", - "User-Agent": self.get_ua(), "Content-Type": "application/json", + "Origin": "https://s3.lsa0.cn", + "X-Requested-With": "com.tencent.mm", + "Referer": "https://s3.lsa0.cn/", + "Accept-Encoding": "gzip, deflate, br", + "User-Agent": ua, } - payload = {"maskCode": mask_code, "lng": lon, "lat": lat, "lastScanResult": False} - try: - resp = requests.post(url, json=payload, headers=headers, - proxies=self.get_proxies(proxy), timeout=10) - data = resp.json() - if data.get("success") and data.get("code") == "200": - biz_msg = data["data"].get("bizMessage", "") - if "已超上限" in biz_msg or "超限" in biz_msg: - return data["data"], False, True - if data["data"].get("bizCode") == "0000000": - return data["data"], False, False - except Exception as e: - logger.error(f"❌ 扫码失败: {e}") - is_proxy_err = any(k in str(e) for k in ['Proxy', 'timeout', 'Cannot connect']) - return None, is_proxy_err, False - return None, False, False - def step5_mask_code_lottery(self, s3_token, mask_code, lat, lon, proxy): + try: + if DEBUG_MODE: + logger.info(f"📤 请求URL: {url}") + logger.info(f"📤 请求头: {headers}") + logger.info(f"📤 请求体: {payload}") + + response = requests.post(url, json=payload, headers=headers, proxies=proxies, timeout=10) + response.raise_for_status() + + if DEBUG_MODE: + logger.info(f"📥 响应体: {response.text}") + + result = response.json() + + if result.get("success") and result.get("code") == "200": + data = result["data"] + biz_code = data.get("bizCode", "") + biz_message = data.get("bizMessage", "") + auto_lottery = data.get("autoLottery", False) + campaign_name = data.get("campaignName", "") + # 每日上限检测优先判断 + if "已超上限" in biz_message or "超限" in biz_message: + logger.warning(f"⚠️ 步骤4扫码 - 每日次数已上限: {biz_message}") + return data, False, True + if biz_code == "0000000": + logger.info(f"✅ 步骤4扫码成功 - 活动: {campaign_name}, 状态: {biz_message}, 自动抽奖: {auto_lottery}") + return data, False, False + else: + logger.error(f"❌ 步骤4扫码失败 - bizCode: {biz_code}, msg: {biz_message}") + return None, False, False + else: + logger.error(f"❌ 步骤4扫码失败: {result.get('message')}") + return None, False, False + + except Exception as e: + error_str = str(e) + logger.error(f"❌ 步骤4请求失败: {e}") + is_proxy_error = any(pe in error_str for pe in ['Proxy', 'timeout', 'timed out', 'Cannot connect']) + return None, is_proxy_error, False + + def step5_mask_code_lottery(self, s3_token: str, mask_code: str, scan_lat: str, scan_lon: str, proxy: Optional[str] = None) -> tuple: + """步骤5: 抽奖 返回 (结果, 是否代理错误)""" url = f"{S3_BASE_URL}/openapi/promotion/campaignExecute/maskCodeLottery" + + payload = { + "maskCode": mask_code, + "lng": scan_lon, + "lat": scan_lat, + } + proxies = self.get_proxies(proxy) + + ua = self.get_ua() headers = { "Authorization": f"Bearer {s3_token}", - "User-Agent": self.get_ua(), "Content-Type": "application/json", + "Origin": "https://s3.lsa0.cn", + "X-Requested-With": "com.tencent.mm", + "Referer": "https://s3.lsa0.cn/", + "Accept-Encoding": "gzip, deflate, br", + "User-Agent": ua, } - payload = {"maskCode": mask_code, "lng": lon, "lat": lat} - try: - resp = requests.post(url, json=payload, headers=headers, - proxies=self.get_proxies(proxy), timeout=10) - data = resp.json() - if data.get("success") and data.get("code") == "200": - return data["data"], False - except Exception as e: - logger.error(f"❌ 抽奖失败: {e}") - is_proxy_err = any(k in str(e) for k in ['Proxy', 'timeout', 'Cannot connect']) - return None, is_proxy_err - return None, False - def step6_receive_reward(self, s3_token, user_reward_id, proxy): + try: + if DEBUG_MODE: + logger.info(f"📤 请求URL: {url}") + logger.info(f"📤 请求头: {headers}") + logger.info(f"📤 请求体: {payload}") + + response = requests.post(url, json=payload, headers=headers, proxies=proxies, timeout=10) + response.raise_for_status() + + if DEBUG_MODE: + logger.info(f"📥 响应体: {response.text}") + + result = response.json() + + if result.get("success") and result.get("code") == "200": + data = result["data"] + biz_code = data.get("bizCode", "") + biz_message = data.get("bizMessage", "") + lucky_list = data.get("luckyRewardList", []) + campaign_name = data.get("campaignName", "") + if biz_code == "0000000": + logger.info(f"✅ 步骤5抽奖成功 - 活动: {campaign_name}, 结果: {biz_message}") + return data, False + else: + logger.error(f"❌ 步骤5抽奖失败 - bizCode: {biz_code}, msg: {biz_message}") + return None, False + else: + logger.error(f"❌ 步骤5抽奖失败: {result.get('message')}") + return None, False + + except Exception as e: + error_str = str(e) + logger.error(f"❌ 步骤5请求失败: {e}") + is_proxy_error = any(pe in error_str for pe in ['Proxy', 'timeout', 'timed out', 'Cannot connect']) + return None, is_proxy_error + + def step6_receive_reward(self, s3_token: str, user_reward_id: str, proxy: Optional[str] = None) -> tuple: + """步骤6: 领取红包(先调用receiveReward)""" url = f"{S3_BASE_URL}/openapi/promotion/consumer/receiveReward" - headers = { - "Authorization": f"Bearer {s3_token}", - "User-Agent": self.get_ua(), - "Content-Type": "application/json", - } payload = {"userRewardId": user_reward_id} - try: - resp = requests.post(url, json=payload, headers=headers, - proxies=self.get_proxies(proxy), timeout=10) - data = resp.json() - if data.get("success") and data.get("code") == "200": - if data["data"].get("bizCode") == "0000000": - return data["data"]["data"], False - except Exception as e: - is_proxy = any(k in str(e) for k in ['Proxy', 'timeout', 'Cannot connect']) - return None, is_proxy - return None, False - - def step7_claim_reward(self, s3_token, user_reward_id, proxy): - url = f"{S3_BASE_URL}/openapi/promotion/consumer/claimReward" + proxies = self.get_proxies(proxy) + ua = self.get_ua() headers = { "Authorization": f"Bearer {s3_token}", - "User-Agent": self.get_ua(), "Content-Type": "application/json", + "Origin": "https://s3.lsa0.cn", + "X-Requested-With": "com.tencent.mm", + "Referer": "https://s3.lsa0.cn/", + "Accept-Encoding": "gzip, deflate, br", + "User-Agent": ua, } - payload = {"id": user_reward_id} + try: - resp = requests.post(url, json=payload, headers=headers, - proxies=self.get_proxies(proxy), timeout=10) - data = resp.json() - if data.get("success") and data.get("code") == "200": - if data["data"].get("bizCode") == "0000000": - return data["data"], False + if DEBUG_MODE: + logger.info(f"📤 请求URL: {url}") + logger.info(f"📤 请求体: {payload}") + + response = requests.post(url, json=payload, headers=headers, proxies=proxies, timeout=10) + response.raise_for_status() + + if DEBUG_MODE: + logger.info(f"📥 响应体: {response.text}") + + result = response.json() + + if result.get("success") and result.get("code") == "200": + biz_code = result.get("data", {}).get("bizCode", "") + biz_msg = result.get("data", {}).get("bizMessage", "") + inner_data = result.get("data", {}).get("data", {}) + claim_end = inner_data.get("claimEndTime", "") + auto_withdraw = inner_data.get("autoWithdraw", False) + if biz_code == "0000000": + logger.info(f"✅ 步骤6领取红包成功 - 领取截止: {claim_end}, 自动提现: {auto_withdraw}") + return inner_data, False + else: + logger.error(f"❌ 步骤6领取红包失败 - bizCode: {biz_code}, msg: {biz_msg}") + return None, False + else: + logger.error(f"❌ 步骤6领取红包失败: {result.get('message')}") + return None, False + except Exception as e: - is_proxy = any(k in str(e) for k in ['Proxy', 'timeout', 'Cannot connect']) - return None, is_proxy - return None, False + error_str = str(e) + logger.error(f"❌ 步骤6请求失败: {e}") + is_proxy_error = any(pe in error_str for pe in ['Proxy', 'timeout', 'timed out', 'Cannot connect']) + return None, is_proxy_error - # ==================== 领取奖励完整流程 ==================== - def claim_reward_flow(self, final_s3_token, reward_id, reward_value, current_proxy): - """执行步骤6和步骤7,领取并确认奖励""" - claim_data, is_claim_err = self.step6_receive_reward(final_s3_token, reward_id, current_proxy) - if not claim_data and self.use_proxy and is_claim_err: - new_proxy = self.get_proxy() - if new_proxy: - current_proxy = new_proxy - claim_data, _ = self.step6_receive_reward(final_s3_token, reward_id, current_proxy) - if claim_data: - _, is_receive_err = self.step7_claim_reward(final_s3_token, reward_id, current_proxy) - if _: - logger.info(f"💰 到账 {reward_value/100:.2f}元") - return reward_value / 100.0, current_proxy - return 0.0, current_proxy + def step7_claim_reward(self, s3_token: str, user_reward_id: str, proxy: Optional[str] = None) -> tuple: + """步骤7: 确认领取奖励(后调用claimReward)""" + url = f"{S3_BASE_URL}/openapi/promotion/consumer/claimReward" + payload = {"id": user_reward_id} + proxies = self.get_proxies(proxy) + ua = self.get_ua() + headers = { + "Authorization": f"Bearer {s3_token}", + "Content-Type": "application/json", + "Origin": "https://s3.lsa0.cn", + "X-Requested-With": "com.tencent.mm", + "Referer": "https://s3.lsa0.cn/", + "Accept-Encoding": "gzip, deflate, br", + "User-Agent": ua, + } - # ==================== 单次完整扫码抽奖流程 ==================== - def do_single_scan_and_lottery(self, final_s3_token, mask_code, scan_lat, scan_lon, current_proxy, seen_reward_ids, hits_count): - """ - 执行一次完整的扫码+抽奖流程,返回: - (lottery_result, new_hits, amount_added, prizes_list, current_proxy) - """ - # 步骤4 扫码核销 - scan_result, is_proxy_err, is_daily_limit = self.step4_scan_mask_code( - final_s3_token, mask_code, scan_lat, scan_lon, current_proxy) + try: + if DEBUG_MODE: + logger.info(f"📤 请求URL: {url}") + logger.info(f"📤 请求体: {payload}") - if is_daily_limit: - logger.warning("⚠️ 每日上限") - return "daily_limit", hits_count, 0.0, [], current_proxy + response = requests.post(url, json=payload, headers=headers, proxies=proxies, timeout=10) + response.raise_for_status() - if not scan_result: - if self.use_proxy and is_proxy_err: - new_proxy = self.get_proxy() - if new_proxy: - current_proxy = new_proxy - return None, hits_count, 0.0, [], current_proxy + if DEBUG_MODE: + logger.info(f"📥 响应体: {response.text}") - # 步骤5 抽奖 - lottery_result, is_lottery_proxy_err = self.step5_mask_code_lottery( - final_s3_token, mask_code, scan_lat, scan_lon, current_proxy) - if not lottery_result: - if self.use_proxy and is_lottery_proxy_err: - new_proxy = self.get_proxy() - if new_proxy: - current_proxy = new_proxy - lottery_result, _ = self.step5_mask_code_lottery( - final_s3_token, mask_code, scan_lat, scan_lon, current_proxy) + result = response.json() - if not lottery_result: - return None, hits_count, 0.0, [], current_proxy + if result.get("success") and result.get("code") == "200": + biz_code = result.get("data", {}).get("bizCode", "") + biz_msg = result.get("data", {}).get("bizMessage", "") + if biz_code == "0000000": + logger.info(f"✅ 步骤7确认领取成功 - {biz_msg}") + return result.get("data", {}), False + else: + logger.error(f"❌ 步骤7确认领取失败 - bizCode: {biz_code}, msg: {biz_msg}") + return None, False + else: + logger.error(f"❌ 步骤7确认领取失败: {result.get('message')}") + return None, False - amount_added = 0.0 - new_prizes = [] + except Exception as e: + error_str = str(e) + logger.error(f"❌ 步骤7请求失败: {e}") + is_proxy_error = any(pe in error_str for pe in ['Proxy', 'timeout', 'timed out', 'Cannot connect']) + return None, is_proxy_error - lucky_list = lottery_result.get("luckyRewardList", []) - if lucky_list: - hits_count += 1 - for prize in lucky_list: - reward_id = prize.get("userRewardId") - # 去重 - if reward_id and reward_id in seen_reward_ids: - continue - if reward_id: - seen_reward_ids.add(reward_id) - - name = prize.get("rewardName", "未知") - value = prize.get("rewardValue", 0) - reward_type = prize.get("rewardType", "") - new_prizes.append(name) - logger.info(f"🎁 获得 {name},价值 {value/100:.2f}元") - - custom = prize.get("customData", {}) - if reward_id and reward_type == "redpacket" and custom.get("incentive"): - logger.info("💰 红包已激活,领取中...") - earned, current_proxy = self.claim_reward_flow(final_s3_token, reward_id, value, current_proxy) - amount_added += earned - else: - logger.info(f"🎟️ 未中奖") - - return lottery_result, hits_count, amount_added, new_prizes, current_proxy - - # ==================== 账号处理 ==================== - def process_account(self, account): + def process_account(self, account: Dict) -> Dict: + """处理单个账号""" result = { - "nickname": account.get("wxName"), - "wxid": account.get("wxId"), - "phone": account.get("phone"), - "attempts": 0, # 实际用码抽奖次数 - "hits": 0, # 中奖次数 - "prizes": [], # 去重奖品名称 + "nickname": account.get("wxName", ""), + "wxid": account.get("wxId", ""), + "phone": account.get("phone", ""), + "lottery_count": 0, + "code_used_count": 0, + "prizes": [], "total_amount": 0.0, "success": False, } - wxid = account.get("wxId") + # 重置SSL错误计数(每个账号独立计算) + self.ssl_error_count = 0 + + wxid = account.get("wxId", "") + + # 获取代理 account_proxy = None if self.use_proxy: account_proxy = self.get_proxy() if account_proxy: - logger.info(f"🌐 代理: {account_proxy}") + logger.info(f"🌐 账号使用代理: {account_proxy}") + # ====== 完整登录流程(获取s3 token)====== + # 步骤1: 获取微信code并登录wljhealth login_code = self.get_code_from_chicken_farm(wxid) if not login_code: - logger.error("❌ 无code,跳过") + logger.error(f"❌ 无法获取微信code,跳过账号") return result - login_result = self.step1_wlj_login(login_code, account_proxy) + # 步骤1: wljhealth登录 + login_result = None + retry_count = 0 + + while retry_count < MAX_RETRY_COUNT and not login_result: + login_result = self.step1_wlj_login(login_code, account_proxy) + + if not login_result: + if self.use_proxy and account_proxy: + logger.warning(f"⚠️ wlj登录失败,尝试更换代理...") + new_proxy = self.get_proxy() + if new_proxy: + account_proxy = new_proxy + logger.info(f"🌐 更换新代理: {account_proxy}") + login_result = self.step1_wlj_login(login_code, account_proxy) + if login_result: + break + + if not login_result and retry_count < MAX_RETRY_COUNT - 1: + logger.warning(f"⚠️ wlj登录失败,获取新code重试 ({retry_count + 1}/{MAX_RETRY_COUNT})") + login_code = self.get_code_from_chicken_farm(wxid) + if login_code: + retry_count += 1 + time.sleep(1) + continue + else: + logger.error(f"❌ 无法获取新code") + break + else: + break + if not login_result: - logger.error("❌ 登录失败,跳过") + logger.error(f"❌ 登录失败,已重试{retry_count}次,跳过账号") return result wlj_token = login_result["token"] - phone = login_result.get("phone", "") - logger.info(f"✅ 登录成功,手机: {phone}") + logger.info(f"✅ 登录完成 - 用户: {login_result.get('userNickname', '')}, 手机: {login_result.get('phone', '')}") - hits = 0 - attempts = 0 + # ====== 抽奖主循环 ====== + success_lottery_count = 0 + code_used_count = 0 current_proxy = account_proxy + s3_token = None # 缓存s3 token + s3_token_code = None # 关联的serialCode + # 每个账号固定一个经纬度 scan_lat, scan_lon = self.get_scan_location() - logger.info(f"📍 扫码位置: {scan_lat},{scan_lon}") + logger.info(f"📍 账号扫码位置 - 纬度: {scan_lat}, 经度: {scan_lon}") - seen_reward_ids = set() # 用于奖品去重 - pending_activation = None # 待激活的奖品信息: {"reward_id": str, "value": int} - - while hits < self.daily_lottery_limit and attempts < MAX_CODE_ATTEMPTS: - # ===== 如果存在待激活的奖品,优先进行激活 ===== - if pending_activation: - logger.info(f"🔔 有待激活红包,优先激活...") - activation_code = self.get_scan_code() - if not activation_code: - logger.error("❌ 无可用码字进行激活") - break - - # 用新码走一遍步骤2-5进行激活 - s3_token = self.step2_get_s3_token(wlj_token, activation_code, login_result["userCode"], - scan_lat, scan_lon, current_proxy) - if not s3_token: - self.set_pending_code(activation_code) - break - - final_s3_token, is_ssl = self.step3_s3_third_login(s3_token, current_proxy) - if not final_s3_token: - self.set_pending_code(activation_code) - break - - # 扫码+抽奖(用于激活) - attempts += 1 - scan_result, new_hits, amount_added, new_prizes, current_proxy = self.do_single_scan_and_lottery( - final_s3_token, activation_code, scan_lat, scan_lon, current_proxy, seen_reward_ids, hits) - - # 提交激活码 - self.commit_scan_code(activation_code) - - if scan_result == "daily_limit": - break - - # 检查这次激活的结果 - if amount_added > 0: - # 激活成功,领取到钱 - result["total_amount"] += amount_added - # 将奖品名称加入列表(去重已在 do_single_scan_and_lottery 中处理) - result["prizes"].extend(new_prizes) - pending_activation = None # 激活完成 - logger.info(f"✅ 红包激活成功") - elif new_hits > hits: - # 又中奖了但未激活(incentive=false),更新hits但保持待激活状态 - hits = new_hits - result["prizes"].extend(new_prizes) - # 待激活信息可能已更新,继续激活流程 - logger.info(f"🎁 再次中奖,仍待激活") - else: - # 未中奖,待激活奖品仍在,继续尝试 - logger.info(f"🎟️ 激活未成功(未中奖),继续尝试激活") - - if hits < self.daily_lottery_limit: - time.sleep(random.uniform(self.lottery_interval_min, self.lottery_interval_max)) - continue - - # ===== 正常抽奖流程 ===== - logger.info(f"🎯 第 {hits+1}/{self.daily_lottery_limit} 次中奖 (已用码 {attempts})") - - mask_code = self.get_scan_code() - if not mask_code: + while success_lottery_count < self.daily_lottery_limit: + if code_used_count >= MAX_CODE_ATTEMPTS: + logger.warning(f"⚠️ 已使用{code_used_count}个码,退出循环") break - # 步骤2 - s3_token = self.step2_get_s3_token(wlj_token, mask_code, login_result["userCode"], - scan_lat, scan_lon, current_proxy) + logger.info(f"📝 第 {success_lottery_count + 1}/{self.daily_lottery_limit} 次成功抽奖 (已使用 {code_used_count} 个码)") + + # 获取扫码码(优先待重试的码) + scan_code = self.get_scan_code() + if not scan_code: + logger.error(f"❌ 无法获取扫码码") + break + + # 步骤2: 获取s3 token(需要serialCode) + s3_token = self.step2_get_s3_token(wlj_token, scan_code, login_result.get("userCode", ""), scan_lat, scan_lon, current_proxy) if not s3_token: - self.set_pending_code(mask_code) - time.sleep(random.uniform(self.lottery_interval_min, self.lottery_interval_max)) + logger.warning(f"⚠️ 获取s3token失败,码字顺延") + self.set_pending_code(scan_code) # 保留码字 + interval = random.uniform(self.lottery_interval_min, self.lottery_interval_max) + time.sleep(interval) continue - # 步骤3 + # 步骤3: s3第三方登录 final_s3_token, is_ssl = self.step3_s3_third_login(s3_token, current_proxy) + + # 处理SSL错误 if is_ssl: self.ssl_error_count += 1 - self.set_pending_code(mask_code) + logger.warning(f"⚠️ SSL错误 ({self.ssl_error_count}/2),码字顺延") + self.set_pending_code(scan_code) # 保留码字 + if self.ssl_error_count >= 2: + # 连续2次SSL错误,重新获取IP new_proxy = self.refresh_proxy_ip() if new_proxy: current_proxy = new_proxy else: + logger.error(f"❌ 无法重新获取IP,退出") break - time.sleep(random.uniform(self.lottery_interval_min, self.lottery_interval_max)) + interval = random.uniform(self.lottery_interval_min, self.lottery_interval_max) + time.sleep(interval) continue if not final_s3_token: - self.ssl_error_count = 0 - self.set_pending_code(mask_code) - time.sleep(random.uniform(self.lottery_interval_min, self.lottery_interval_max)) + self.ssl_error_count = 0 # 非SSL错误,重置计数 + logger.warning(f"⚠️ s3登录失败,码字顺延") + self.set_pending_code(scan_code) # 保留码字,不计入已使用 + interval = random.uniform(self.lottery_interval_min, self.lottery_interval_max) + time.sleep(interval) continue - # 使用码字 - attempts += 1 + # 步骤4: 扫码核销 + scan_result, is_proxy_error, is_daily_limit = self.step4_scan_mask_code(final_s3_token, scan_code, scan_lat, scan_lon, current_proxy) - # 执行扫码+抽奖 - lottery_result, new_hits, amount_added, new_prizes, current_proxy = self.do_single_scan_and_lottery( - final_s3_token, mask_code, scan_lat, scan_lon, current_proxy, seen_reward_ids, hits) - - # 提交码字 - self.commit_scan_code(mask_code) - - if lottery_result == "daily_limit": + if is_daily_limit: + # 每日上限,不删除码字(可以下次再用) + logger.warning(f"⚠️ 每日上限,停止此账号") break - if lottery_result is None: - time.sleep(random.uniform(self.lottery_interval_min, self.lottery_interval_max)) + if not scan_result: + if self.use_proxy and is_proxy_error: + logger.warning(f"⚠️ 扫码代理错误,尝试更换代理...") + new_proxy = self.get_proxy() + if new_proxy: + current_proxy = new_proxy + logger.info(f"🌐 更换新代理: {current_proxy}") + # 重新获取s3 token并扫码 + s3_token = self.step2_get_s3_token(wlj_token, scan_code, login_result.get("userCode", ""), scan_lat, scan_lon, current_proxy) + if s3_token: + final_s3_token = self.step3_s3_third_login(s3_token, current_proxy) + if final_s3_token: + scan_result, _, _ = self.step4_scan_mask_code(final_s3_token, scan_code, scan_lat, scan_lon, current_proxy) + if scan_result: + # 扫码成功后再检查一遍每日上限 + scan_biz_msg = scan_result.get("bizMessage", "") + if "已超上限" in scan_biz_msg or "超限" in scan_biz_msg: + logger.warning(f"⚠️ {scan_biz_msg},停止此账号") + break + + if not scan_result: + logger.warning(f"⚠️ 扫码失败,码字顺延") + self.set_pending_code(scan_code) # 保留码字 + interval = random.uniform(self.lottery_interval_min, self.lottery_interval_max) + time.sleep(interval) continue - # 更新结果 - if amount_added > 0: - result["total_amount"] += amount_added - # 如果金额是在 do_single_scan_and_lottery 中通过激活获得的, - # 说明之前有一个待激活的奖品已成功激活 - pending_activation = None - logger.info(f"✅ 红包已激活并到账") + # ====== 扫码成功,使用码字并抽奖 ====== + code_used_count += 1 + result["code_used_count"] = code_used_count - # 检查是否有待激活的奖品 - lucky_list = lottery_result.get("luckyRewardList", []) if lottery_result and lottery_result != "daily_limit" else [] - has_unactivated = False - if lucky_list: - for prize in lucky_list: - reward_id = prize.get("userRewardId") - custom = prize.get("customData", {}) - reward_type = prize.get("rewardType", "") - if reward_id and reward_type == "redpacket" and not custom.get("incentive"): - # 有未激活的红包 - pending_activation = { - "reward_id": reward_id, - "value": prize.get("rewardValue", 0) - } - has_unactivated = True - logger.info(f"🔔 红包待激活: {prize.get('rewardName', '未知')}") - break + # 步骤5: 抽奖 + lottery_result, is_lottery_proxy_error = self.step5_mask_code_lottery(final_s3_token, scan_code, scan_lat, scan_lon, current_proxy) - if new_hits > hits: - hits = new_hits - result["prizes"].extend(new_prizes) + if not lottery_result and self.use_proxy and is_lottery_proxy_error: + logger.warning(f"⚠️ 抽奖代理错误,尝试更换代理...") + new_proxy = self.get_proxy() + if new_proxy: + current_proxy = new_proxy + logger.info(f"🌐 更换新代理: {current_proxy}") + lottery_result, _ = self.step5_mask_code_lottery(final_s3_token, scan_code, scan_lat, scan_lon, current_proxy) - if hits < self.daily_lottery_limit and not has_unactivated: - time.sleep(random.uniform(self.lottery_interval_min, self.lottery_interval_max)) + if lottery_result is not None: + lucky_list = lottery_result.get("luckyRewardList", []) + # 只有抽中奖品(有luckyRewardList)才计入成功次数 + if lucky_list: + success_lottery_count += 1 + result["lottery_count"] = success_lottery_count + if lucky_list: + for prize in lucky_list: + prize_name = prize.get("rewardName", "未知") + reward_value = prize.get("rewardValue", 0) + user_reward_id = prize.get("userRewardId", "") + reward_type = prize.get("rewardType", "") + result["prizes"].append(prize_name) + logger.info(f"🎁 奖品: {prize_name}, 价值: {reward_value/100:.2f}元") - result["attempts"] = attempts - result["hits"] = hits - result["success"] = hits > 0 + # 领取奖励(步骤6+7)- 只有红包类型且incentive=true(激活)才领取 + custom_data = prize.get("customData", {}) + incentive = custom_data.get("incentive", False) + if user_reward_id and reward_type == "redpacket": + if incentive: + logger.info(f"💰 红包已激活,执行领取流程") + claim_data, is_claim_error = self.step6_receive_reward(final_s3_token, user_reward_id, current_proxy) + if not claim_data and self.use_proxy and is_claim_error: + new_proxy = self.get_proxy() + if new_proxy: + current_proxy = new_proxy + claim_data, _ = self.step6_receive_reward(final_s3_token, user_reward_id, current_proxy) + if claim_data: + receive_data, is_receive_error = self.step7_claim_reward(final_s3_token, user_reward_id, current_proxy) + if not receive_data and self.use_proxy and is_receive_error: + new_proxy = self.get_proxy() + if new_proxy: + current_proxy = new_proxy + self.step7_claim_reward(final_s3_token, user_reward_id, current_proxy) + if receive_data: + result["total_amount"] += float(reward_value) / 100.0 + logger.info(f"💰 红包已到账: {reward_value/100:.2f}元") + else: + logger.info(f"⏳ 红包未激活,等待下次扫码激活") + else: + logger.info(f"✅ 复购类型,不计入金额") + else: + biz_message = lottery_result.get("bizMessage", "") + logger.info(f"🎁 抽奖结果: {biz_message}") + else: + interval = random.uniform(self.lottery_interval_min, self.lottery_interval_max) + time.sleep(interval) + + # ====== 抽奖完成,从文件删除码字 ====== + self.commit_scan_code(scan_code) + + # 间隔时间 + if success_lottery_count < self.daily_lottery_limit: + interval = random.uniform(self.lottery_interval_min, self.lottery_interval_max) + time.sleep(interval) + + result["success"] = result["lottery_count"] > 0 return result def run(self): + """运行主流程""" if not self.wx_accounts: - logger.error("❌ 无账号,退出") + logger.error("❌ 没有可用账号") return all_results = [] - total_attempts = 0 + total_lottery = 0 total_amount = 0.0 - total_hits = 0 + success_count = 0 for idx, account in enumerate(self.wx_accounts): - nick = account.get('wxName', '未知') - logger.info(f"\n==== 账号 {idx+1}/{len(self.wx_accounts)}: {nick} ====") - res = self.process_account(account) - all_results.append(res) + nickname = account.get('wxName', '未知') + logger.info(f"\n========== 账号 {idx+1}/{len(self.wx_accounts)} 🏮 {nickname} ==========") - total_attempts += res["attempts"] - total_hits += res["hits"] - total_amount += res["total_amount"] + result = self.process_account(account) + all_results.append(result) - cost = res["attempts"] * LOTTERY_CODE_PRICE - profit = res["total_amount"] - cost - logger.info(f"💰 账号{idx+1}: 抽奖{res['attempts']}次 | 中奖{res['hits']}次 | 金额{res['total_amount']:.2f} | 成本{cost:.2f} | 利润{profit:.2f}") + total_lottery += result["lottery_count"] + total_amount += result["total_amount"] + if result["success"]: + success_count += 1 + + account_cost = result.get("code_used_count", result["lottery_count"]) * LOTTERY_CODE_PRICE + account_profit = result["total_amount"] - account_cost + logger.info(f"💰 账号 {idx+1} 成功: {result['lottery_count']}次 | 用码: {result.get('code_used_count', 0)}个 | 金额: {result['total_amount']:.2f}元 | 成本: {account_cost:.2f}元 | 利润: {account_profit:.2f}元") if self.codes_exhausted: - logger.warning("⚠️ 码字耗尽,停止后续账号") + logger.warning(f"⚠️ 扫码码已用完,停止后续账号") break if idx < len(self.wx_accounts) - 1: + logger.info(f"⏳ 等待 {self.delay_between_accounts} 秒后处理下一个账号...") time.sleep(self.delay_between_accounts) - self.print_summary(all_results, total_attempts, total_hits, total_amount) + self.print_summary(all_results, total_lottery, total_amount, success_count) - def print_summary(self, results, total_attempts, total_hits, total_amount): - total_cost = total_attempts * LOTTERY_CODE_PRICE - total_profit = total_amount - total_cost + def print_summary(self, results: List[Dict], total_lottery: int, total_amount: float, success_count: int): + """打印汇总报告""" + total_code_used = sum(r.get("code_used_count", r.get("lottery_count", 0)) for r in results) - logger.info("\n" + "="*40) + logger.info("\n" + "="*50) logger.info("📊 执行汇总") - logger.info(f"👥 账号: {len(results)} | ✅ 中奖账号: {sum(1 for r in results if r['success'])}") - logger.info(f"🎰 总抽奖: {total_attempts} 次 | 🎁 中奖: {total_hits} 次") - logger.info(f"💰 总金额: {total_amount:.2f} | 💵 总成本: {total_cost:.2f} | 📈 总利润: {total_profit:.2f}") - logger.info("📋 详情:") + logger.info("="*50) + logger.info(f"👥 总账号数: {len(results)}") + logger.info(f"✅ 成功账号: {success_count}") + logger.info(f"🎰 总成功抽奖: {total_lottery} 次") + logger.info(f"📱 总用码: {total_code_used} 个") + logger.info(f"💰 总金额: {total_amount:.2f} 元") + + cost = total_code_used * LOTTERY_CODE_PRICE + profit = total_amount - cost + logger.info(f"💵 成本: {cost:.2f} 元") + logger.info(f"📈 利润: {profit:.2f} 元") + + logger.info("\n📋 账号详情:") for i, r in enumerate(results): status = "✅" if r["success"] else "❌" - prizes = ", ".join(r["prizes"]) if r["prizes"] else "无" - logger.info(f" {status} 账号{i+1}: {r['nickname']} | 抽奖{r['attempts']}次 | 中奖{r['hits']} | 金额{r['total_amount']:.2f} | {prizes}") + prize_info = ", ".join(r["prizes"]) if r["prizes"] else "无" + logger.info(f" {status} 账号{i+1}: {r['nickname']} | 抽奖: {r['lottery_count']}次 | 金额: {r['total_amount']:.2f}元 | 奖品: {prize_info}") def main(): + """主入口""" automation = WangLaoJiAutomation() automation.run() + if __name__ == "__main__": - main() \ No newline at end of file + main()