From 73bbd7cd3d925f293d7f22bf8c05b058b73fdc60 Mon Sep 17 00:00:00 2001 From: admin <362324317@qq.com> Date: Tue, 2 Jun 2026 00:59:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20WX=5FApplet/Applet=5FJYHS?= =?UTF-8?q?=5FDDYX.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- WX_Applet/Applet_JYHS_DDYX.py | 1060 +++++++++++++++------------------ 1 file changed, 482 insertions(+), 578 deletions(-) diff --git a/WX_Applet/Applet_JYHS_DDYX.py b/WX_Applet/Applet_JYHS_DDYX.py index dbcdbe5..68889b1 100644 --- a/WX_Applet/Applet_JYHS_DDYX.py +++ b/WX_Applet/Applet_JYHS_DDYX.py @@ -1,604 +1,508 @@ # cron: 2 7 * * * # new Env("旧衣回收_铛铛一下_通用版") +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- """ -旧衣回收_铛铛一下 通用一体化版本 - -核心链路: -wx_cloud + wx_token + wxid + appid - -> 养鸡场 getMiniProgramCode - -> https://vues.dd1x.cn/wechat/login?code=xxx&channelId=154 - -> data.token - -> 后续接口使用 Header: Token - -外部环境变量尽量保持最少: -1. wx_cloud 养鸡场地址,默认 http://192.168.31.203:666 -2. wx_token 养鸡场 Authorization,默认自动补 Bearer -3. DDYX_TEST_WXID 可选,只跑指定 wxid,便于测试 -4. DDYX_EXCLUDE_WXIDS 可选,排除 wxid,支持换行/逗号/&/@ 分隔 - -缓存: -同目录 APP_Buffer/ddyx_token_cache.json - -日志: -同目录 logs/ddyx_YYYYMMDD_HHMMSS.log +脚本名称:铛铛一下签到抽奖答题(自动版 - 已修复汇总错误) +创建时间:2026-05-25 +功能:自动从养鸡场获取账号 → 登录 → 签到 → 抽奖 → 答题 → 自动提现 +环境变量: + wx_cloud : 养鸡场地址(必填) + wx_token : 养鸡场 Authorization(必填) + SINGLE_TEST_WXID : 可选,只处理指定 wxid + PUSHPLUS_TOKEN : 可选,推送结果到微信 """ +import base64 import json -import logging import os -import random -import re import sys import time -import traceback -from datetime import datetime -from pathlib import Path +import uuid +from dataclasses import dataclass +from urllib.parse import quote, urljoin, urlparse +from typing import Optional, List, Dict, Any -import requests +try: + import httpx + import requests +except ImportError: + print("错误: 需要安装 httpx 和 requests") + sys.exit(1) -# ====================== 项目固定配置 ====================== -PROJECT_NAME = "旧衣回收_铛铛一下" -APPID = "wxe378d2d7636c180e" -HOST = "vues.dd1x.cn" -BASE_URL = f"https://{HOST}" -CHANNEL_ID = 154 -LOTTERY_ACTIVITY_ID = 3438615 +try: + from SendNotify import capture_output +except ImportError: + print("[警告] 通知模块 SendNotify.py 未找到,将跳过通知推送。") + def capture_output(title: str = "脚本运行结果"): + def decorator(func): + return func + return decorator -# 默认超过该金额可提现;是否自动提现见 ENABLE_WITHDRAW -WITHDRAW_THRESHOLD = 0.3 -ENABLE_WITHDRAW = True # 原脚本默认自动提现;如需关闭改成 False +# ---------- 养鸡场客户端 ---------- +class JycClient: + def __init__(self, server_url: str, token: str): + self.server_url = server_url.rstrip('/') + self.token = token + self.client = httpx.Client(timeout=30.0) -# Token 缓存有效期,默认 7 天 -TOKEN_CACHE_TTL = 7 * 24 * 3600 - -# 多账号间隔,防止请求过快 -ACCOUNT_DELAY_RANGE = (2, 5) -REQUEST_DELAY_RANGE = (1, 3) -LOTTERY_DELAY_RANGE = (3, 5) - -# 养鸡场分页:每页 1 个账号,真正做到取一个跑一个 -CLOUD_PAGE_SIZE = 1 - -# 调试开关;普通模式不打印请求/响应详情、不打印完整 token/code -DEBUG = False -AUTO_BEARER = True - -# 代理开关 -MULTI_ACCOUNT_PROXY = False -PROXY_API_URL = os.getenv("PROXY_API_URL", "").strip() - -DEFAULT_USER_AGENT = ( - "Mozilla/5.0 (Linux; Android 12; M2012K11AC Build/SKQ1.220303.001; wv) " - "AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 " - "Chrome/134.0.6998.136 Mobile Safari/537.36 XWEB/1340129 " - "MMWEBSDK/20240301 MMWEBID/9871 MicroMessenger/8.0.48.2580(0x28003036) " - "WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android" -) - -AUTH_FAIL_KEYWORDS = [ - "未登录", "请登录", "登录失效", "登录过期", "token", "Token", "TOKEN", - "无效", "过期", "未授权", "unauthorized", "Unauthorized", "401", "403" -] - -# ====================== 路径配置 ====================== -BASE_DIR = Path(__file__).resolve().parent -BUFFER_DIR = BASE_DIR / "APP_Buffer" -LOG_DIR = BASE_DIR / "logs" -BUFFER_DIR.mkdir(parents=True, exist_ok=True) -LOG_DIR.mkdir(parents=True, exist_ok=True) - -TOKEN_CACHE_FILE = BUFFER_DIR / "ddyx_token_cache.json" -LOG_FILE = LOG_DIR / f"ddyx_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" - -# ====================== 环境变量 ====================== -wx_cloud = os.getenv("wx_cloud", "http://192.168.31.203:666").strip().rstrip("/") -_raw_wx_token = os.getenv("wx_token", "").strip() -if AUTO_BEARER and _raw_wx_token and not _raw_wx_token.lower().startswith("bearer "): - WX_TOKEN = f"Bearer {_raw_wx_token}" -else: - WX_TOKEN = _raw_wx_token - -TEST_WXID = os.getenv("DDYX_TEST_WXID", "").strip() -EXCLUDE_WXIDS_RAW = os.getenv("DDYX_EXCLUDE_WXIDS", "").strip() - -# 原脚本内置排除;保留但集中放这里 -SCRIPT_EXCLUDE_WXIDS = ["wxid_11111111111111"] - - -def setup_logging(): - logging.basicConfig( - level=logging.DEBUG if DEBUG else logging.INFO, - format="%(asctime)s - %(levelname)s\t- %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - handlers=[ - logging.StreamHandler(sys.stdout), - logging.FileHandler(LOG_FILE, encoding="utf-8"), - ], - ) - - -def split_multi_values(raw): - """支持换行、逗号、中文逗号、&、@ 分隔。""" - if not raw: - return [] - parts = re.split(r"[\n,,&@]+", raw) - return [x.strip() for x in parts if x.strip()] - - -def mask_secret(value, keep_start=6, keep_end=4): - if value is None: - return "" - value = str(value) - if len(value) <= keep_start + keep_end: - return "***" - return f"{value[:keep_start]}***{value[-keep_end:]}" - - -def mask_phone(tel): - if not tel: - return "" - tel = str(tel) - if len(tel) >= 7: - return tel[:3] + "****" + tel[-4:] - return "***" - - -def load_json_file(path, default): - if not path.exists(): - return default - try: - return json.loads(path.read_text(encoding="utf-8")) - except Exception as e: - logging.warning(f"[缓存]读取失败,使用空缓存: {path} | {e}") - return default - - -def save_json_file(path, data): - try: - path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") - except Exception as e: - logging.warning(f"[缓存]保存失败: {path} | {e}") - - -class DDYXTask: - def __init__(self): - self.token_cache = load_json_file(TOKEN_CACHE_FILE, {}) - self.exclude_wxids = set(SCRIPT_EXCLUDE_WXIDS + split_multi_values(EXCLUDE_WXIDS_RAW)) - self.total_accounts = 0 - self.success_accounts = 0 - self.login_success_accounts = 0 - self.sign_success_accounts = 0 - self.draw_success_count = 0 - self.withdraw_success_count = 0 - self.account_reports = [] - - # ---------------------- 养鸡场相关 ---------------------- - def yjc_headers(self): - return { - "Authorization": WX_TOKEN, - "Content-Type": "application/json", - } - - def iter_cloud_accounts(self): - """分页流式获取账号:取一个账号,跑完,再取下一个。""" - page_num = 1 - while True: - url = f"{wx_cloud}/prod-api/wechat/wechat/list" - params = {"pageNum": page_num, "pageSize": CLOUD_PAGE_SIZE} - try: - res = requests.get(url, params=params, headers=self.yjc_headers(), timeout=15) - res.raise_for_status() - data = res.json() - except Exception as e: - logging.error(f"[养鸡场]获取账号列表失败 page={page_num}: {e}") - if DEBUG: - logging.debug(traceback.format_exc()) - return - - if data.get("code") != 200 or not isinstance(data.get("rows"), list): - logging.error(f"[养鸡场]账号列表返回异常: {json.dumps(data, ensure_ascii=False)[:500]}") - return - - rows = data.get("rows", []) - if not rows: - return - - for row in rows: - wxid = row.get("wxId") or row.get("wxid") or "" - wxname = row.get("wxName") or row.get("remark") or row.get("nickName") or "未知昵称" - if not wxid: - logging.warning("[养鸡场]账号缺少 wxId,跳过") - continue - yield {"wxid": wxid, "wxname": wxname} - - total = data.get("total") - if total is not None and page_num * CLOUD_PAGE_SIZE >= int(total): - return - if len(rows) < CLOUD_PAGE_SIZE: - return - page_num += 1 - - def get_wx_code(self, wxid): - url = f"{wx_cloud}/prod-api/wechat/api/getMiniProgramCode" - payload = {"wxid": wxid, "appid": APPID} + def get_all_accounts(self) -> List[Dict[str, str]]: + url = f"{self.server_url}/prod-api/wechat/wechat/list" + params = {"pageNum": 1, "pageSize": 1000} + headers = {"Authorization": self.token} try: - res = requests.post(url, json=payload, headers=self.yjc_headers(), timeout=15) - res.raise_for_status() - data = res.json() - code = data.get("data", {}).get("code") - if data.get("code") == 200 and code: - logging.info("[获取code]完成") - if DEBUG: - logging.debug(f"[获取code][debug] {mask_secret(code)}") - return code - logging.warning(f"[获取code]失败: {json.dumps(data, ensure_ascii=False)[:500]}") - return None - except Exception as e: - logging.error(f"[获取code]异常: {e}") - if DEBUG: - logging.debug(traceback.format_exc()) - return None - - # ---------------------- 请求会话 ---------------------- - def build_session(self): - session = requests.Session() - session.headers.update({"User-Agent": DEFAULT_USER_AGENT}) - - if MULTI_ACCOUNT_PROXY and PROXY_API_URL: - proxy = self.get_proxy() - if proxy: - session.proxies.update({"http": f"http://{proxy}", "https": f"http://{proxy}"}) - logging.info("[代理]已启用") - return session - - def get_proxy(self): - try: - res = requests.get(PROXY_API_URL, timeout=10) - proxy = res.text.strip() - if DEBUG: - logging.debug(f"[代理]获取到: {proxy}") - return proxy - except Exception as e: - logging.warning(f"[代理]获取失败,不使用代理: {e}") - return None - - def is_auth_failed_response(self, data): - try: - text = json.dumps(data, ensure_ascii=False) - except Exception: - text = str(data) - return any(k in text for k in AUTH_FAIL_KEYWORDS) - - def validate_token(self, session): - """用查询余额/提现列表接口做缓存 Token 校验,不写入业务。""" - url = f"{BASE_URL}/api/h/get_withdrawal_trade_list" - try: - res = session.get(url, timeout=15) - if res.status_code in (401, 403): - return False - data = res.json() - if self.is_auth_failed_response(data): - return False - return True - except Exception as e: - if DEBUG: - logging.debug(f"[Token校验]异常: {e}") - return False - - def login_by_code(self, session, code): - url = f"{BASE_URL}/wechat/login" - params = {"code": code, "channelId": CHANNEL_ID} - try: - res = session.get(url, params=params, timeout=15) - res.raise_for_status() - data = res.json() - if DEBUG: - safe_data = json.loads(json.dumps(data, ensure_ascii=False)) - if isinstance(safe_data.get("data"), dict) and safe_data["data"].get("token"): - safe_data["data"]["token"] = mask_secret(safe_data["data"]["token"]) - logging.debug(f"[登录响应] {json.dumps(safe_data, ensure_ascii=False)[:1000]}") - - if data.get("code") == 0 and isinstance(data.get("data"), dict) and data["data"].get("token"): - token = data["data"]["token"] - tel = data["data"].get("tel", "") - session.headers["Token"] = token - logging.info(f"[登录]完成 | 手机: {mask_phone(tel)}") - return { - "token": token, - "tel": mask_phone(tel), - "rawTel": tel, - "updatedAt": int(time.time()), - } - logging.warning(f"[登录]失败: {data.get('msg', '未知错误')}") - return None - except Exception as e: - logging.error(f"[登录]异常: {e}") - if DEBUG: - logging.debug(traceback.format_exc()) - return None - - def prepare_token(self, session, wxid, wxname): - now = int(time.time()) - cached = self.token_cache.get(wxid, {}) - cached_token = cached.get("token") - updated_at = int(cached.get("updatedAt", 0) or 0) - - if cached_token and now - updated_at < TOKEN_CACHE_TTL: - session.headers["Token"] = cached_token - logging.info(f"[Token缓存]命中 | {cached.get('tel') or wxname},开始校验") - if self.validate_token(session): - logging.info("[Token缓存]校验通过") - return True - logging.info("[Token缓存]校验失败,重新登录") - elif cached_token: - logging.info("[Token缓存]已过期,重新登录") - else: - logging.info("[Token缓存]未命中,开始登录") - - code = self.get_wx_code(wxid) - if not code: - return False - - login_info = self.login_by_code(session, code) - if not login_info: - return False - - self.token_cache[wxid] = {"wxName": wxname, **login_info} - save_json_file(TOKEN_CACHE_FILE, self.token_cache) - logging.info("[Token缓存]已更新") - return True - - # ---------------------- 业务接口 ---------------------- - def sign_in(self, session): - url = f"{BASE_URL}/api/v2/sign_join" - try: - res = session.get(url, timeout=15) - res.raise_for_status() - data = res.json() - if data.get("code") == 0: - logging.info("[签到]完成 | 成功") - return {"ok": True, "msg": "成功"} - msg = data.get("msg", "未知错误") - logging.info(f"[签到]完成 | {msg}") - return {"ok": False, "msg": msg} - except Exception as e: - logging.warning(f"[签到]异常: {e}") - return {"ok": False, "msg": str(e)} - - def draw_once(self, session): - url = f"{BASE_URL}/front/activity/update_lottery_result" - params = {"id": LOTTERY_ACTIVITY_ID} - try: - res = session.get(url, params=params, timeout=15) - res.raise_for_status() - data = res.json() - if data.get("code") == 0: - prize = (data.get("data") or {}).get("goodName", "未知奖励") - logging.info(f"[抽奖]完成 | 获得: {prize}") - return {"ok": True, "prize": prize, "msg": "成功"} - msg = data.get("msg", "未知错误") - logging.info(f"[抽奖]完成 | {msg}") - return {"ok": False, "prize": "", "msg": msg} - except Exception as e: - logging.warning(f"[抽奖]异常: {e}") - return {"ok": False, "prize": "", "msg": str(e)} - - def add_lottery_count(self, session): - url = f"{BASE_URL}/front/activity/add_lottery_count" - try: - res = session.get(url, timeout=15) - res.raise_for_status() - data = res.json() - if data.get("code") == 0: - logging.info("[增加抽奖次数]完成 | 成功") - return {"ok": True, "msg": "成功"} - msg = data.get("msg", "未知错误") - logging.info(f"[增加抽奖次数]完成 | {msg}") - return {"ok": False, "msg": msg} - except Exception as e: - logging.warning(f"[增加抽奖次数]异常: {e}") - return {"ok": False, "msg": str(e)} - - def run_lottery_flow(self, session): - prizes = [] - - # 先直接抽,直到服务器提示不能继续 - while True: - result = self.draw_once(session) - if not result["ok"]: - break - prizes.append(result.get("prize", "未知奖励")) - self.draw_success_count += 1 - time.sleep(random.randint(*LOTTERY_DELAY_RANGE)) - - # 尝试增加次数,每次增加成功后抽一次 - while True: - add_result = self.add_lottery_count(session) - if not add_result["ok"]: - break - time.sleep(random.randint(*REQUEST_DELAY_RANGE)) - draw_result = self.draw_once(session) - if draw_result["ok"]: - prizes.append(draw_result.get("prize", "未知奖励")) - self.draw_success_count += 1 - time.sleep(random.randint(*LOTTERY_DELAY_RANGE)) - - return prizes - - def get_withdrawal_trade_list(self, session): - url = f"{BASE_URL}/api/h/get_withdrawal_trade_list" - try: - res = session.get(url, timeout=15) - res.raise_for_status() - data = res.json() - if data.get("code") == 0 and isinstance(data.get("data"), list) and data["data"]: - balance = float(data["data"][0].get("money", 0) or 0) - logging.info(f"[余额查询]完成 | 余额: {balance}元") - return {"ok": True, "balance": balance, "list": data["data"], "msg": "成功"} - msg = data.get("msg", "未知错误") - logging.info(f"[余额查询]完成 | {msg}") - return {"ok": False, "balance": 0.0, "list": [], "msg": msg} - except Exception as e: - logging.warning(f"[余额查询]异常: {e}") - return {"ok": False, "balance": 0.0, "list": [], "msg": str(e)} - - def withdraw(self, session, balance, withdrawal_list): - url = f"{BASE_URL}/api/h/withdrawal" - payload = { - "totalMoney": balance, - "type": 1, - "withdrawalDetailPojoList": withdrawal_list, - } - try: - res = session.post(url, json=payload, timeout=15) - res.raise_for_status() - data = res.json() - if data.get("code") == 0: - msg = data.get("msg", "提现成功") - logging.info(f"[提现]完成 | {msg}") - self.withdraw_success_count += 1 - return {"ok": True, "msg": msg} - msg = data.get("msg", "提现失败") - logging.info(f"[提现]完成 | {msg}") - return {"ok": False, "msg": msg} - except Exception as e: - logging.warning(f"[提现]异常: {e}") - return {"ok": False, "msg": str(e)} - - # ---------------------- 单账号执行 ---------------------- - def should_skip_account(self, wxid): - if TEST_WXID and wxid != TEST_WXID: - return "非测试账号" - if wxid in self.exclude_wxids: - return "排除列表" - return "" - - def run_one_account(self, index, wxid, wxname): - report = { - "index": index, - "wxid": wxid, - "wxname": wxname, - "login": False, - "sign": "未执行", - "draw_count": 0, - "prizes": [], - "balance": 0.0, - "withdraw": "未执行", - "status": "失败", - } - - logging.info("") - logging.info(f"========== 账号{index} | {wxname} | {wxid} 开始 ==========") - - skip_reason = self.should_skip_account(wxid) - if skip_reason: - report["status"] = f"跳过:{skip_reason}" - logging.info(f"[账号跳过]原因: {skip_reason}") - self.account_reports.append(report) - return - - session = self.build_session() - - if not self.prepare_token(session, wxid, wxname): - report["status"] = "登录失败" - logging.info(f"========== 账号{index} | 完成 | 登录失败 ==========") - self.account_reports.append(report) - return - - report["login"] = True - self.login_success_accounts += 1 - time.sleep(random.randint(*REQUEST_DELAY_RANGE)) - - sign_result = self.sign_in(session) - report["sign"] = sign_result.get("msg", "未知") - if sign_result.get("ok"): - self.sign_success_accounts += 1 - time.sleep(random.randint(*REQUEST_DELAY_RANGE)) - - prizes = self.run_lottery_flow(session) - report["prizes"] = prizes - report["draw_count"] = len(prizes) - - withdraw_data = self.get_withdrawal_trade_list(session) - if withdraw_data.get("ok"): - balance = withdraw_data.get("balance", 0.0) - report["balance"] = balance - if ENABLE_WITHDRAW: - if balance >= WITHDRAW_THRESHOLD: - withdraw_result = self.withdraw(session, balance, withdraw_data.get("list", [])) - report["withdraw"] = withdraw_result.get("msg", "未知") - else: - report["withdraw"] = f"余额不足{WITHDRAW_THRESHOLD}元,不提现" - logging.info(f"[提现]完成 | 余额{balance}元不足{WITHDRAW_THRESHOLD}元,不提现") + resp = self.client.get(url, headers=headers, params=params) + resp.raise_for_status() + data = resp.json() + if data.get("rows"): + accounts = [] + for row in data["rows"]: + wxid = row.get("wxId") or row.get("wxid") + if wxid: + nick = row.get("nickName", wxid) + accounts.append({"wxid": wxid, "nickName": nick}) + print(f"成功获取 {len(accounts)} 个账号") + return accounts else: - report["withdraw"] = "未开启自动提现" - logging.info("[提现]完成 | 未开启自动提现") + print(f"获取账号列表失败: {data}") + return [] + except Exception as e: + print(f"拉取账号列表异常: {e}") + return [] + + def get_wechat_code(self, wxid: str, appid: str) -> Optional[str]: + url = f"{self.server_url}/prod-api/wechat/api/getMiniProgramCode" + headers = {"Authorization": self.token, "Content-Type": "application/json"} + payload = {"wxid": wxid, "appid": appid} + try: + resp = self.client.post(url, json=payload, headers=headers) + resp.raise_for_status() + result = resp.json() + if result.get("code") == 200: + code = result.get("data", {}).get("code") + if code: + return code + else: + print(f"返回的 data 中没有 code: {result}") + else: + print(f"获取 {wxid} code 失败: {result.get('msg')}") + return None + except Exception as e: + print(f"获取code异常: {e}") + return None + + def close(self): + self.client.close() + + +# ---------- 铛铛一下业务类 ---------- +DEFAULT_BASE_URL = "https://vues.dd1x.cn" +COMMON_HEADERS = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090a13) UnifiedPCWindowsWechat(0xf2541022) XWEB/16467", + "Content-Type": "application/json", + "Accept": "*/*", + "Referer": "https://servicewechat.com/wxe378d2d7636c180e/801/page-frame.html", + "Accept-Language": "zh-CN,zh;q=0.9", +} + +@dataclass +class AccountConfig: + token: str + base_url: str + raw: str + +def parse_account_line(line: str) -> Optional[AccountConfig]: + parts = [part.strip() for part in line.split("#") if part.strip()] + if not parts: + return None + token = parts[0] + base_url = DEFAULT_BASE_URL + for part in parts[1:]: + if part.lower().startswith("base_url="): + base_url = part.split("=", 1)[1].strip() + parsed = urlparse(base_url) + if not parsed.scheme or not parsed.netloc: + return None + return AccountConfig(token=token, base_url=f"{parsed.scheme}://{parsed.netloc}", raw=line) + +def decode_openid_from_jwt(token: str) -> str: + try: + parts = token.split(".") + if len(parts) < 2: + return "" + payload = parts[1].replace("-", "+").replace("_", "/") + payload += "=" * (-len(payload) % 4) + data = json.loads(base64.b64decode(payload).decode("utf-8")) + return data.get("openid") or data.get("openId") or "" + except Exception: + return "" + +def assert_ok(resp: dict) -> None: + if resp.get("code") == 0: + return + raise RuntimeError(str(resp.get("msg") or resp.get("message") or "请求失败")) + +def call_api(acc: AccountConfig, method: str, path: str, body: dict | list | None = None) -> dict: + url = urljoin(acc.base_url, path) + headers = {**COMMON_HEADERS, "token": acc.token} + if method.upper() == "GET": + response = requests.get(url, headers=headers, timeout=30) + else: + response = requests.post(url, headers=headers, json=body or {}, timeout=30) + text = response.text + try: + return response.json() + except Exception as exc: + return {"code": -1, "msg": f"JSON解析失败: {exc}; body={text[:500]}{'...' if len(text) > 500 else ''}"} + +def api_get(acc: AccountConfig, path: str) -> dict: + return call_api(acc, "GET", path) + +def api_post(acc: AccountConfig, path: str, body: dict | list | None = None) -> dict: + return call_api(acc, "POST", path, body) + +def send_tracking(open_id: str, path: str, action: str, page_query_obj: dict | None = None, random_args: dict | None = None) -> None: + payload = { + "type": "1", + "platform": "weapp", + "appLaunch": { + "path": "pages/index/index", + "query": {}, + "scene": 1256, + "referrerInfo": {}, + "apiCategory": "default", + }, + "pageQueryObj": page_query_obj or {}, + "appHeader": { + "platformVersion": "4.1.0.34", + "resolution": "978*519", + "pixelRatio": 1.25, + "os": "windows", + "fontSizeSetting": 15, + "deviceModel": "microsoft", + "deviceBrand": "microsoft", + "deviceManufacturer": "microsoft", + "deviceManuid": "microsoft", + "deviceName": "microsoft", + "osVersion": "Windows 10 x64", + "language": "zh_CN", + "access": "wifi", + }, + "path": path, + "uuid": str(uuid.uuid4()), + "randomArgs": random_args or {}, + "appid": "wxe378d2d7636c180e", + "channelId": "154", + "openId": open_id, + "action": action, + } + try: + requests.post("https://data.dd1x.cn/api/test_api", headers=COMMON_HEADERS, json=payload, timeout=15) + except Exception: + pass + +def xcx_point(acc: AccountConfig, process_id: str, note: str, page_name: str) -> None: + if not process_id: + return + try: + api_get(acc, f"/front/xcxPoint?processId={process_id}&processNote={quote(note)}&channel=154&pageName={quote(page_name)}") + except Exception: + pass + +def build_answer_payload(data) -> list[dict]: + if not isinstance(data, list): + return [] + payload = [] + for item in data: + try: + questions_id = int(item.get("id")) + answer_id = int(item.get("correctAnswerId")) + except Exception: + continue + payload.append({"answerId": answer_id, "questionsId": questions_id}) + return payload + +def get_token_by_code(code: str) -> Optional[str]: + """ + 根据抓包数据实现的登录接口 + 请求 GET https://vues.dd1x.cn/wechat/login?code={code}&channelId=154 + 返回示例: {"code":0,"msg":"success","data":{"token":"xxx"}} + """ + url = f"{DEFAULT_BASE_URL}/wechat/login" + params = {"code": code, "channelId": "154"} + try: + resp = requests.get(url, params=params, timeout=10) + if resp.status_code != 200: + print(f"登录请求失败: HTTP {resp.status_code}") + return None + data = resp.json() + if data.get("code") == 0: + token = data.get("data", {}).get("token") + if token: + print(f"登录成功,获取到 token") + return token + else: + print(f"登录响应中未找到 token 字段: {data}") else: - report["withdraw"] = f"余额查询失败: {withdraw_data.get('msg', '')}" + print(f"登录失败: {data.get('msg')}") + return None + except Exception as e: + print(f"登录请求异常: {e}") + return None - report["status"] = "完成" - self.success_accounts += 1 - self.account_reports.append(report) - logging.info( - f"========== 账号{index} | 完成 | 签到:{report['sign']} | " - f"抽奖:{report['draw_count']}次 | 余额:{report['balance']}元 | 提现:{report['withdraw']} ==========" - ) +def run_for_account(acc: AccountConfig) -> Dict[str, Any]: + # 初始化结果字典,保证所有字段存在 + result = {"nickname": "未知", "success": False, "message": "", "money_before": 0, "money_after": 0} + open_id = decode_openid_from_jwt(acc.token) + print("正在初始化会话...") + access_res = api_get(acc, "/front/accessXcx?channelId=154&processId=") + process_id = str(access_res.get("data") or "") + if process_id: + print(f"会话初始化成功: {process_id}") + api_get(acc, f"/front/accessXcx?channelId=154&processId={process_id}") + else: + print("警告: 未获取到 processId,部分任务可能失效") - # ---------------------- 主入口 ---------------------- - def run(self): - logging.info(f"【{PROJECT_NAME}】开始执行") - logging.info(f"[配置] APPID: {APPID}") - logging.info(f"[配置] 养鸡场: {wx_cloud}") - logging.info(f"[配置] Token缓存: {TOKEN_CACHE_FILE}") - logging.info(f"[配置] 日志文件: {LOG_FILE}") - logging.info(f"[配置] 自动提现: {'开启' if ENABLE_WITHDRAW else '关闭'} | 阈值: {WITHDRAW_THRESHOLD}元") + send_tracking(open_id, "pages/index/index", "page_show") + send_tracking(open_id, "pages/index/index", "page_click", random_args={"event_name": "进入小程序"}) + xcx_point(acc, process_id, "进入小程序", "首页") - if not WX_TOKEN: - logging.error("[启动失败]请先设置环境变量 wx_token") - return + user_info = api_get(acc, "/ali/getUserInfo") + assert_ok(user_info) + nick_name = str(user_info.get("data", {}).get("nickName") or "未知") + result["nickname"] = nick_name + print(f"账号【{nick_name}】Token有效") - index = 0 - for account in self.iter_cloud_accounts(): - index += 1 - self.total_accounts += 1 - self.run_one_account(index, account["wxid"], account["wxname"]) - time.sleep(random.randint(*ACCOUNT_DELAY_RANGE)) + send_tracking(open_id, "pages/index/index", "page_show") + member_info = api_get(acc, "/api/v2/get_member_info") + assert_ok(member_info) + money_before = float(member_info.get("data", {}).get("money") or 0) + result["money_before"] = money_before + print(f"当前余额{money_before}元") - save_json_file(TOKEN_CACHE_FILE, self.token_cache) - self.print_summary() + sign = api_get(acc, "/api/v2/sign_join") + if sign.get("code") == 0: + print(f"签到成功,获得【{sign.get('data', {}).get('name', '未知奖励')}】") + send_tracking(open_id, "pages/index/index", "page_click", random_args={"event_name": "首页-立即签到"}) + xcx_point(acc, process_id, "首页-立即签到", "首页") + else: + msg = str(sign.get("msg") or sign.get("message") or "签到失败") + if "签" in msg and ("过" in msg or "已经" in msg): + print("今天已经签到过,继续执行抽奖/答题任务") + else: + raise RuntimeError(msg) - def print_summary(self): - logging.info("") - logging.info("==================== 执行汇总 ====================") - logging.info(f"项目: {PROJECT_NAME}") - logging.info(f"总账号数: {self.total_accounts}") - logging.info(f"登录成功账号数: {self.login_success_accounts}") - logging.info(f"任务完成账号数: {self.success_accounts}") - logging.info(f"签到成功账号数: {self.sign_success_accounts}") - logging.info(f"成功抽奖次数: {self.draw_success_count}") - logging.info(f"提现成功次数: {self.withdraw_success_count}") - logging.info(f"Token缓存文件: {TOKEN_CACHE_FILE}") - logging.info(f"日志文件: {LOG_FILE}") - logging.info("-------------------- 账号明细 --------------------") - for r in self.account_reports: - prize_text = "、".join(r.get("prizes") or []) or "无" - logging.info( - f"账号{r['index']} | {r['wxname']} | 状态:{r['status']} | " - f"签到:{r['sign']} | 抽奖:{r['draw_count']}次 | 奖品:{prize_text} | " - f"余额:{r['balance']}元 | 提现:{r['withdraw']}" + send_tracking(open_id, "pages/activity/turntable/turntable", "page_show") + lottery_info = api_get(acc, "/front/activity/get_lottery_info?id=13&channelId=154") + assert_ok(lottery_info) + times = max(int(lottery_info.get("data", {}).get("member_count") or 0), 0) + print(f"今日有{times}次抽奖机会") + + for _ in range(times): + send_tracking(open_id, "pages/activity/turntable/turntable", "page_click", random_args={"event_name": "抽奖页-立即抽奖"}) + xcx_point(acc, process_id, "抽奖页-立即抽奖", "抽奖页") + result_data = api_get(acc, "/front/activity/get_lottery_result?id=13") + assert_ok(result_data) + record_id = result_data.get("data", {}).get("record_id") + print(f"获得奖励{result_data.get('data', {}).get('prizeName', '未知奖励')}") + if record_id is not None: + assert_ok(api_get(acc, f"/front/activity/update_lottery_result?id={quote(str(record_id))}")) + + print("开始获取今日题目...") + send_tracking(open_id, "pages/find_page/index", "page_show") + send_tracking(open_id, "pages/index/index", "page_click", random_args={"event_name": "底部导航-发现"}) + send_tracking(open_id, "pages/find_page/index", "page_click", random_args={"event_name": "回收问答-立即参与"}) + send_tracking(open_id, "pages/find_page/answerQues/index", "page_show") + + questions = api_get(acc, "/api/questions/get_questions") + assert_ok(questions) + answer_payload = build_answer_payload(questions.get("data")) + if answer_payload: + send_tracking(open_id, "pages/find_page/answerSelectQues/index", "page_show") + judge = api_post(acc, "/api/questions/judge", answer_payload) + assert_ok(judge) + if judge.get("data") == 2: + print("今日已经答过题了") + else: + print("答题提交完成") + send_tracking(open_id, "pages/find_page/answerSelectQues/index", "page_click", random_args={"event_name": "提现说明-立即提现"}) + xcx_point(acc, process_id, "提现说明-立即提现", "回答问题页") + + send_tracking(open_id, "pages/mine/mine", "page_show") + send_tracking(open_id, "pages/index/index", "page_click", random_args={"event_name": "底部导航-我的"}) + + member_info = api_get(acc, "/api/v2/get_member_info") + assert_ok(member_info) + current_money = float(member_info.get("data", {}).get("money") or 0) + result["money_after"] = current_money + print(f"任务完毕,当前余额{current_money}元") + + if current_money >= 0.3: + print("余额满足提现要求,准备提现...") + send_tracking(open_id, "pages/mine/mine", "page_click", random_args={"event_name": "设置-我的钱包"}) + xcx_point(acc, process_id, "中心首页-我的钱包", "我的") + send_tracking(open_id, "pages/mine/withdrawal/index", "page_show", page_query_obj={"channelId": "154"}) + xcx_point(acc, process_id, "进入钱包", "提现") + send_tracking(open_id, "pages/mine/withdrawal/index", "page_click", random_args={"event_name": "钱包-提现"}) + xcx_point(acc, process_id, "钱包-提现", "提现") + + withdrawal_list = api_get(acc, "/api/h/get_withdrawal_trade_list") + if isinstance(withdrawal_list, list): + trade_list = withdrawal_list + else: + trade_list = withdrawal_list.get("data") if isinstance(withdrawal_list.get("data"), list) else [] + + available = [item for item in trade_list if not item.get("disabled") and float(item.get("money") or 0) >= 0.3] + if available: + total_money = f"{sum(float(item.get('money') or 0) for item in available):.2f}" + print(f"检测到可提现订单 {len(available)} 个,合计 {total_money} 元") + withdraw_res = api_post( + acc, + "/api/h/withdrawal", + {"totalMoney": total_money, "type": 1, "withdrawalDetailPojoList": available}, ) - logging.info("==================================================") + if withdraw_res.get("code") == 1: + print(f"提现成功: {withdraw_res.get('msg') or '确定'}") + send_tracking(open_id, "pages/mine/mine", "page_click", random_args={"event_name": "全选-提现成功"}) + xcx_point(acc, process_id, "全选-提现成功", "提现") + else: + print(f"提现失败: {withdraw_res.get('msg') or '未知错误'}") + else: + print("没有满足提现金额(>=0.3元)的订单") + result["success"] = True + result["message"] = "签到抽奖答题完成" + return result + + +def push_to_pushplus(token: str, title: str, content: str) -> bool: + if not token: + return False + url = "http://www.pushplus.plus/send" + data = {"token": token, "title": title, "content": content, "template": "txt"} + try: + resp = httpx.post(url, json=data, timeout=10) + if resp.status_code == 200: + result = resp.json() + if result.get("code") == 200: + print("推送成功") + return True + else: + print(f"推送失败: {result.get('msg')}") + else: + print(f"推送失败: HTTP {resp.status_code}") + except Exception as e: + print(f"推送异常: {e}") + return False + + +def load_accounts_from_env() -> List[AccountConfig]: + lines = [line.strip() for line in os.getenv("dd1x", "").splitlines() if line.strip()] + accounts = [] + for line in lines: + acc = parse_account_line(line) + if acc: + accounts.append(acc) + else: + print(f"手动配置格式错误,跳过: {line}") + return accounts + + +def load_accounts_from_jyc(jyc: JycClient, single_wxid: str) -> List[AccountConfig]: + if single_wxid: + accounts_info = [{"wxid": single_wxid, "nickName": single_wxid}] + print(f"单号测试模式: {single_wxid}") + else: + accounts_info = jyc.get_all_accounts() + if not accounts_info: + print("未获取到任何在线账号") + return [] + accounts = [] + for info in accounts_info: + wxid = info["wxid"] + nick = info.get("nickName", wxid) + print(f"正在为 {nick} ({wxid}) 获取登录 code...") + code = jyc.get_wechat_code(wxid, "wxe378d2d7636c180e") + if not code: + print(f"获取 {nick} 的 code 失败,跳过") + continue + token = get_token_by_code(code) + if not token: + print(f"登录 {nick} 失败,无法获取 token,跳过") + continue + accounts.append(AccountConfig(token=token, base_url=DEFAULT_BASE_URL, raw=f"{token}#{DEFAULT_BASE_URL}")) + return accounts + + +@capture_output("铛铛一下签到抽奖答题运行结果") +def main(): + jyc_server = os.getenv("wx_cloud") + jyc_token = os.getenv("wx_token") + single_test = os.getenv("SINGLE_TEST_WXID") + pushplus_token = os.getenv("PUSHPLUS_TOKEN") + + # 优先尝试从养鸡场获取账号 + if jyc_server and jyc_token: + jyc = JycClient(jyc_server, jyc_token) + try: + accounts = load_accounts_from_jyc(jyc, single_test) + finally: + jyc.close() + if accounts: + print(f"从养鸡场获取到 {len(accounts)} 个账号") + else: + print("养鸡场未获取到有效账号,尝试使用环境变量 dd1x 手动配置") + accounts = load_accounts_from_env() + else: + print("未配置养鸡场,使用环境变量 dd1x 手动配置") + accounts = load_accounts_from_env() + + if not accounts: + print("未找到任何可用账号,退出") + return + + print(f"共处理 {len(accounts)} 个账号") + results = [] + for idx, acc in enumerate(accounts, 1): + print(f"\n{'='*50}") + print(f"处理第 {idx}/{len(accounts)} 个账号") + print(f"{'='*50}") + try: + res = run_for_account(acc) + results.append(res) + except Exception as e: + print(f"处理失败: {e}") + results.append({ + "nickname": "未知", + "success": False, + "message": str(e), + "money_before": 0, + "money_after": 0 + }) + if idx < len(accounts): + print("等待3秒后处理下一个账号...") + time.sleep(3) + + # 汇总推送(使用 .get 防止 KeyError) + summary_lines = [] + success_cnt = 0 + for r in results: + status = "✅" if r.get("success") else "❌" + nickname = r.get("nickname", "未知") + msg = r.get("message", "") + line = f"{status} {nickname}: {msg}" + if r.get("money_before", 0) != 0 or r.get("money_after", 0) != 0: + line += f",余额 {r['money_before']} → {r['money_after']} 元" + summary_lines.append(line) + if r.get("success"): + success_cnt += 1 + summary = "\n".join(summary_lines) + title = f"铛铛一下签到结果 - 成功 {success_cnt}/{len(results)}" + print("\n" + summary) + if pushplus_token: + push_to_pushplus(pushplus_token, title, summary) + else: + print("未设置 PUSHPLUS_TOKEN,跳过推送") if __name__ == "__main__": - setup_logging() - try: - DDYXTask().run() - except KeyboardInterrupt: - logging.warning("用户中断执行") - except Exception as e: - logging.error(f"主程序异常: {e}") - if DEBUG: - logging.debug(traceback.format_exc()) - - \ No newline at end of file + main() \ No newline at end of file