# cron: 2 7 * * * # new Env("旧衣回收_铛铛一下_通用版") """ 旧衣回收_铛铛一下 通用一体化版本 核心链路: wx_cloud + wx_token + wxid + appid -> 养鸡场 getMiniProgramCode -> https://vues.dd1x.cn/wechat/login?code=xxx&channelId=154 -> data.token -> 后续接口使用 Header: Token 外部环境变量尽量保持最少: 1. wx_cloud 养鸡场地址,默认 http://192.168.31.203:666 2. wx_token 养鸡场 Authorization,默认自动补 Bearer 3. DDYX_TEST_WXID 可选,只跑指定 wxid,便于测试 4. DDYX_EXCLUDE_WXIDS 可选,排除 wxid,支持换行/逗号/&/@ 分隔 缓存: 同目录 APP_Buffer/ddyx_token_cache.json 日志: 同目录 logs/ddyx_YYYYMMDD_HHMMSS.log """ import json import logging import os import random import re import sys import time import traceback from datetime import datetime from pathlib import Path import requests # ====================== 项目固定配置 ====================== PROJECT_NAME = "旧衣回收_铛铛一下" APPID = "wxe378d2d7636c180e" HOST = "vues.dd1x.cn" BASE_URL = f"https://{HOST}" CHANNEL_ID = 154 LOTTERY_ACTIVITY_ID = 3438615 # 默认超过该金额可提现;是否自动提现见 ENABLE_WITHDRAW WITHDRAW_THRESHOLD = 0.3 ENABLE_WITHDRAW = True # 原脚本默认自动提现;如需关闭改成 False # Token 缓存有效期,默认 7 天 TOKEN_CACHE_TTL = 7 * 24 * 3600 # 多账号间隔,防止请求过快 ACCOUNT_DELAY_RANGE = (2, 5) REQUEST_DELAY_RANGE = (1, 3) LOTTERY_DELAY_RANGE = (3, 5) # 养鸡场分页:每页 1 个账号,真正做到取一个跑一个 CLOUD_PAGE_SIZE = 1 # 调试开关;普通模式不打印请求/响应详情、不打印完整 token/code DEBUG = False AUTO_BEARER = True # 代理开关 MULTI_ACCOUNT_PROXY = False PROXY_API_URL = os.getenv("PROXY_API_URL", "").strip() DEFAULT_USER_AGENT = ( "Mozilla/5.0 (Linux; Android 12; M2012K11AC Build/SKQ1.220303.001; wv) " "AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 " "Chrome/134.0.6998.136 Mobile Safari/537.36 XWEB/1340129 " "MMWEBSDK/20240301 MMWEBID/9871 MicroMessenger/8.0.48.2580(0x28003036) " "WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android" ) AUTH_FAIL_KEYWORDS = [ "未登录", "请登录", "登录失效", "登录过期", "token", "Token", "TOKEN", "无效", "过期", "未授权", "unauthorized", "Unauthorized", "401", "403" ] # ====================== 路径配置 ====================== BASE_DIR = Path(__file__).resolve().parent BUFFER_DIR = BASE_DIR / "APP_Buffer" LOG_DIR = BASE_DIR / "logs" BUFFER_DIR.mkdir(parents=True, exist_ok=True) LOG_DIR.mkdir(parents=True, exist_ok=True) TOKEN_CACHE_FILE = BUFFER_DIR / "ddyx_token_cache.json" LOG_FILE = LOG_DIR / f"ddyx_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" # ====================== 环境变量 ====================== wx_cloud = os.getenv("wx_cloud", "http://192.168.31.203:666").strip().rstrip("/") _raw_wx_token = os.getenv("wx_token", "").strip() if AUTO_BEARER and _raw_wx_token and not _raw_wx_token.lower().startswith("bearer "): WX_TOKEN = f"Bearer {_raw_wx_token}" else: WX_TOKEN = _raw_wx_token TEST_WXID = os.getenv("DDYX_TEST_WXID", "").strip() EXCLUDE_WXIDS_RAW = os.getenv("DDYX_EXCLUDE_WXIDS", "").strip() # 原脚本内置排除;保留但集中放这里 SCRIPT_EXCLUDE_WXIDS = ["wxid_11111111111111"] def setup_logging(): logging.basicConfig( level=logging.DEBUG if DEBUG else logging.INFO, format="%(asctime)s - %(levelname)s\t- %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler(LOG_FILE, encoding="utf-8"), ], ) def split_multi_values(raw): """支持换行、逗号、中文逗号、&、@ 分隔。""" if not raw: return [] parts = re.split(r"[\n,,&@]+", raw) return [x.strip() for x in parts if x.strip()] def mask_secret(value, keep_start=6, keep_end=4): if value is None: return "" value = str(value) if len(value) <= keep_start + keep_end: return "***" return f"{value[:keep_start]}***{value[-keep_end:]}" def mask_phone(tel): if not tel: return "" tel = str(tel) if len(tel) >= 7: return tel[:3] + "****" + tel[-4:] return "***" def load_json_file(path, default): if not path.exists(): return default try: return json.loads(path.read_text(encoding="utf-8")) except Exception as e: logging.warning(f"[缓存]读取失败,使用空缓存: {path} | {e}") return default def save_json_file(path, data): try: path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") except Exception as e: logging.warning(f"[缓存]保存失败: {path} | {e}") class DDYXTask: def __init__(self): self.token_cache = load_json_file(TOKEN_CACHE_FILE, {}) self.exclude_wxids = set(SCRIPT_EXCLUDE_WXIDS + split_multi_values(EXCLUDE_WXIDS_RAW)) self.total_accounts = 0 self.success_accounts = 0 self.login_success_accounts = 0 self.sign_success_accounts = 0 self.draw_success_count = 0 self.withdraw_success_count = 0 self.account_reports = [] # ---------------------- 养鸡场相关 ---------------------- def yjc_headers(self): return { "Authorization": WX_TOKEN, "Content-Type": "application/json", } def iter_cloud_accounts(self): """分页流式获取账号:取一个账号,跑完,再取下一个。""" page_num = 1 while True: url = f"{wx_cloud}/prod-api/wechat/wechat/list" params = {"pageNum": page_num, "pageSize": CLOUD_PAGE_SIZE} try: res = requests.get(url, params=params, headers=self.yjc_headers(), timeout=15) res.raise_for_status() data = res.json() except Exception as e: logging.error(f"[养鸡场]获取账号列表失败 page={page_num}: {e}") if DEBUG: logging.debug(traceback.format_exc()) return if data.get("code") != 200 or not isinstance(data.get("rows"), list): logging.error(f"[养鸡场]账号列表返回异常: {json.dumps(data, ensure_ascii=False)[:500]}") return rows = data.get("rows", []) if not rows: return for row in rows: wxid = row.get("wxId") or row.get("wxid") or "" wxname = row.get("wxName") or row.get("remark") or row.get("nickName") or "未知昵称" if not wxid: logging.warning("[养鸡场]账号缺少 wxId,跳过") continue yield {"wxid": wxid, "wxname": wxname} total = data.get("total") if total is not None and page_num * CLOUD_PAGE_SIZE >= int(total): return if len(rows) < CLOUD_PAGE_SIZE: return page_num += 1 def get_wx_code(self, wxid): url = f"{wx_cloud}/prod-api/wechat/api/getMiniProgramCode" payload = {"wxid": wxid, "appid": APPID} try: res = requests.post(url, json=payload, headers=self.yjc_headers(), timeout=15) res.raise_for_status() data = res.json() code = data.get("data", {}).get("code") if data.get("code") == 200 and code: logging.info("[获取code]完成") if DEBUG: logging.debug(f"[获取code][debug] {mask_secret(code)}") return code logging.warning(f"[获取code]失败: {json.dumps(data, ensure_ascii=False)[:500]}") return None except Exception as e: logging.error(f"[获取code]异常: {e}") if DEBUG: logging.debug(traceback.format_exc()) return None # ---------------------- 请求会话 ---------------------- def build_session(self): session = requests.Session() session.headers.update({"User-Agent": DEFAULT_USER_AGENT}) if MULTI_ACCOUNT_PROXY and PROXY_API_URL: proxy = self.get_proxy() if proxy: session.proxies.update({"http": f"http://{proxy}", "https": f"http://{proxy}"}) logging.info("[代理]已启用") return session def get_proxy(self): try: res = requests.get(PROXY_API_URL, timeout=10) proxy = res.text.strip() if DEBUG: logging.debug(f"[代理]获取到: {proxy}") return proxy except Exception as e: logging.warning(f"[代理]获取失败,不使用代理: {e}") return None def is_auth_failed_response(self, data): try: text = json.dumps(data, ensure_ascii=False) except Exception: text = str(data) return any(k in text for k in AUTH_FAIL_KEYWORDS) def validate_token(self, session): """用查询余额/提现列表接口做缓存 Token 校验,不写入业务。""" url = f"{BASE_URL}/api/h/get_withdrawal_trade_list" try: res = session.get(url, timeout=15) if res.status_code in (401, 403): return False data = res.json() if self.is_auth_failed_response(data): return False return True except Exception as e: if DEBUG: logging.debug(f"[Token校验]异常: {e}") return False def login_by_code(self, session, code): url = f"{BASE_URL}/wechat/login" params = {"code": code, "channelId": CHANNEL_ID} try: res = session.get(url, params=params, timeout=15) res.raise_for_status() data = res.json() if DEBUG: safe_data = json.loads(json.dumps(data, ensure_ascii=False)) if isinstance(safe_data.get("data"), dict) and safe_data["data"].get("token"): safe_data["data"]["token"] = mask_secret(safe_data["data"]["token"]) logging.debug(f"[登录响应] {json.dumps(safe_data, ensure_ascii=False)[:1000]}") if data.get("code") == 0 and isinstance(data.get("data"), dict) and data["data"].get("token"): token = data["data"]["token"] tel = data["data"].get("tel", "") session.headers["Token"] = token logging.info(f"[登录]完成 | 手机: {mask_phone(tel)}") return { "token": token, "tel": mask_phone(tel), "rawTel": tel, "updatedAt": int(time.time()), } logging.warning(f"[登录]失败: {data.get('msg', '未知错误')}") return None except Exception as e: logging.error(f"[登录]异常: {e}") if DEBUG: logging.debug(traceback.format_exc()) return None def prepare_token(self, session, wxid, wxname): now = int(time.time()) cached = self.token_cache.get(wxid, {}) cached_token = cached.get("token") updated_at = int(cached.get("updatedAt", 0) or 0) if cached_token and now - updated_at < TOKEN_CACHE_TTL: session.headers["Token"] = cached_token logging.info(f"[Token缓存]命中 | {cached.get('tel') or wxname},开始校验") if self.validate_token(session): logging.info("[Token缓存]校验通过") return True logging.info("[Token缓存]校验失败,重新登录") elif cached_token: logging.info("[Token缓存]已过期,重新登录") else: logging.info("[Token缓存]未命中,开始登录") code = self.get_wx_code(wxid) if not code: return False login_info = self.login_by_code(session, code) if not login_info: return False self.token_cache[wxid] = {"wxName": wxname, **login_info} save_json_file(TOKEN_CACHE_FILE, self.token_cache) logging.info("[Token缓存]已更新") return True # ---------------------- 业务接口 ---------------------- def sign_in(self, session): url = f"{BASE_URL}/api/v2/sign_join" try: res = session.get(url, timeout=15) res.raise_for_status() data = res.json() if data.get("code") == 0: logging.info("[签到]完成 | 成功") return {"ok": True, "msg": "成功"} msg = data.get("msg", "未知错误") logging.info(f"[签到]完成 | {msg}") return {"ok": False, "msg": msg} except Exception as e: logging.warning(f"[签到]异常: {e}") return {"ok": False, "msg": str(e)} def draw_once(self, session): url = f"{BASE_URL}/front/activity/update_lottery_result" params = {"id": LOTTERY_ACTIVITY_ID} try: res = session.get(url, params=params, timeout=15) res.raise_for_status() data = res.json() if data.get("code") == 0: prize = (data.get("data") or {}).get("goodName", "未知奖励") logging.info(f"[抽奖]完成 | 获得: {prize}") return {"ok": True, "prize": prize, "msg": "成功"} msg = data.get("msg", "未知错误") logging.info(f"[抽奖]完成 | {msg}") return {"ok": False, "prize": "", "msg": msg} except Exception as e: logging.warning(f"[抽奖]异常: {e}") return {"ok": False, "prize": "", "msg": str(e)} def add_lottery_count(self, session): url = f"{BASE_URL}/front/activity/add_lottery_count" try: res = session.get(url, timeout=15) res.raise_for_status() data = res.json() if data.get("code") == 0: logging.info("[增加抽奖次数]完成 | 成功") return {"ok": True, "msg": "成功"} msg = data.get("msg", "未知错误") logging.info(f"[增加抽奖次数]完成 | {msg}") return {"ok": False, "msg": msg} except Exception as e: logging.warning(f"[增加抽奖次数]异常: {e}") return {"ok": False, "msg": str(e)} def run_lottery_flow(self, session): prizes = [] # 先直接抽,直到服务器提示不能继续 while True: result = self.draw_once(session) if not result["ok"]: break prizes.append(result.get("prize", "未知奖励")) self.draw_success_count += 1 time.sleep(random.randint(*LOTTERY_DELAY_RANGE)) # 尝试增加次数,每次增加成功后抽一次 while True: add_result = self.add_lottery_count(session) if not add_result["ok"]: break time.sleep(random.randint(*REQUEST_DELAY_RANGE)) draw_result = self.draw_once(session) if draw_result["ok"]: prizes.append(draw_result.get("prize", "未知奖励")) self.draw_success_count += 1 time.sleep(random.randint(*LOTTERY_DELAY_RANGE)) return prizes def get_withdrawal_trade_list(self, session): url = f"{BASE_URL}/api/h/get_withdrawal_trade_list" try: res = session.get(url, timeout=15) res.raise_for_status() data = res.json() if data.get("code") == 0 and isinstance(data.get("data"), list) and data["data"]: balance = float(data["data"][0].get("money", 0) or 0) logging.info(f"[余额查询]完成 | 余额: {balance}元") return {"ok": True, "balance": balance, "list": data["data"], "msg": "成功"} msg = data.get("msg", "未知错误") logging.info(f"[余额查询]完成 | {msg}") return {"ok": False, "balance": 0.0, "list": [], "msg": msg} except Exception as e: logging.warning(f"[余额查询]异常: {e}") return {"ok": False, "balance": 0.0, "list": [], "msg": str(e)} def withdraw(self, session, balance, withdrawal_list): url = f"{BASE_URL}/api/h/withdrawal" payload = { "totalMoney": balance, "type": 1, "withdrawalDetailPojoList": withdrawal_list, } try: res = session.post(url, json=payload, timeout=15) res.raise_for_status() data = res.json() if data.get("code") == 0: msg = data.get("msg", "提现成功") logging.info(f"[提现]完成 | {msg}") self.withdraw_success_count += 1 return {"ok": True, "msg": msg} msg = data.get("msg", "提现失败") logging.info(f"[提现]完成 | {msg}") return {"ok": False, "msg": msg} except Exception as e: logging.warning(f"[提现]异常: {e}") return {"ok": False, "msg": str(e)} # ---------------------- 单账号执行 ---------------------- def should_skip_account(self, wxid): if TEST_WXID and wxid != TEST_WXID: return "非测试账号" if wxid in self.exclude_wxids: return "排除列表" return "" def run_one_account(self, index, wxid, wxname): report = { "index": index, "wxid": wxid, "wxname": wxname, "login": False, "sign": "未执行", "draw_count": 0, "prizes": [], "balance": 0.0, "withdraw": "未执行", "status": "失败", } logging.info("") logging.info(f"========== 账号{index} | {wxname} | {wxid} 开始 ==========") skip_reason = self.should_skip_account(wxid) if skip_reason: report["status"] = f"跳过:{skip_reason}" logging.info(f"[账号跳过]原因: {skip_reason}") self.account_reports.append(report) return session = self.build_session() if not self.prepare_token(session, wxid, wxname): report["status"] = "登录失败" logging.info(f"========== 账号{index} | 完成 | 登录失败 ==========") self.account_reports.append(report) return report["login"] = True self.login_success_accounts += 1 time.sleep(random.randint(*REQUEST_DELAY_RANGE)) sign_result = self.sign_in(session) report["sign"] = sign_result.get("msg", "未知") if sign_result.get("ok"): self.sign_success_accounts += 1 time.sleep(random.randint(*REQUEST_DELAY_RANGE)) prizes = self.run_lottery_flow(session) report["prizes"] = prizes report["draw_count"] = len(prizes) withdraw_data = self.get_withdrawal_trade_list(session) if withdraw_data.get("ok"): balance = withdraw_data.get("balance", 0.0) report["balance"] = balance if ENABLE_WITHDRAW: if balance >= WITHDRAW_THRESHOLD: withdraw_result = self.withdraw(session, balance, withdraw_data.get("list", [])) report["withdraw"] = withdraw_result.get("msg", "未知") else: report["withdraw"] = f"余额不足{WITHDRAW_THRESHOLD}元,不提现" logging.info(f"[提现]完成 | 余额{balance}元不足{WITHDRAW_THRESHOLD}元,不提现") else: report["withdraw"] = "未开启自动提现" logging.info("[提现]完成 | 未开启自动提现") else: report["withdraw"] = f"余额查询失败: {withdraw_data.get('msg', '')}" report["status"] = "完成" self.success_accounts += 1 self.account_reports.append(report) logging.info( f"========== 账号{index} | 完成 | 签到:{report['sign']} | " f"抽奖:{report['draw_count']}次 | 余额:{report['balance']}元 | 提现:{report['withdraw']} ==========" ) # ---------------------- 主入口 ---------------------- def run(self): logging.info(f"【{PROJECT_NAME}】开始执行") logging.info(f"[配置] APPID: {APPID}") logging.info(f"[配置] 养鸡场: {wx_cloud}") logging.info(f"[配置] Token缓存: {TOKEN_CACHE_FILE}") logging.info(f"[配置] 日志文件: {LOG_FILE}") logging.info(f"[配置] 自动提现: {'开启' if ENABLE_WITHDRAW else '关闭'} | 阈值: {WITHDRAW_THRESHOLD}元") if not WX_TOKEN: logging.error("[启动失败]请先设置环境变量 wx_token") return index = 0 for account in self.iter_cloud_accounts(): index += 1 self.total_accounts += 1 self.run_one_account(index, account["wxid"], account["wxname"]) time.sleep(random.randint(*ACCOUNT_DELAY_RANGE)) save_json_file(TOKEN_CACHE_FILE, self.token_cache) self.print_summary() def print_summary(self): logging.info("") logging.info("==================== 执行汇总 ====================") logging.info(f"项目: {PROJECT_NAME}") logging.info(f"总账号数: {self.total_accounts}") logging.info(f"登录成功账号数: {self.login_success_accounts}") logging.info(f"任务完成账号数: {self.success_accounts}") logging.info(f"签到成功账号数: {self.sign_success_accounts}") logging.info(f"成功抽奖次数: {self.draw_success_count}") logging.info(f"提现成功次数: {self.withdraw_success_count}") logging.info(f"Token缓存文件: {TOKEN_CACHE_FILE}") logging.info(f"日志文件: {LOG_FILE}") logging.info("-------------------- 账号明细 --------------------") for r in self.account_reports: prize_text = "、".join(r.get("prizes") or []) or "无" logging.info( f"账号{r['index']} | {r['wxname']} | 状态:{r['status']} | " f"签到:{r['sign']} | 抽奖:{r['draw_count']}次 | 奖品:{prize_text} | " f"余额:{r['balance']}元 | 提现:{r['withdraw']}" ) logging.info("==================================================") if __name__ == "__main__": setup_logging() try: DDYXTask().run() except KeyboardInterrupt: logging.warning("用户中断执行") except Exception as e: logging.error(f"主程序异常: {e}") if DEBUG: logging.debug(traceback.format_exc())