Files
Yangmao_Script/WX_Applet/Applet_JYHS_DDYX.py

604 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# cron: 2 7 * * *
# new Env("旧衣回收_铛铛一下_通用版")
"""
旧衣回收_铛铛一下 通用一体化版本
核心链路:
wx_cloud + wx_token + wxid + appid
-> 养鸡场 getMiniProgramCode
-> https://vues.dd1x.cn/wechat/login?code=xxx&channelId=154
-> data.token
-> 后续接口使用 Header: Token
外部环境变量尽量保持最少:
1. wx_cloud 养鸡场地址,默认 http://192.168.31.203:666
2. wx_token 养鸡场 Authorization默认自动补 Bearer
3. DDYX_TEST_WXID 可选,只跑指定 wxid便于测试
4. DDYX_EXCLUDE_WXIDS 可选,排除 wxid支持换行/逗号/&/@ 分隔
缓存:
同目录 APP_Buffer/ddyx_token_cache.json
日志:
同目录 logs/ddyx_YYYYMMDD_HHMMSS.log
"""
import json
import logging
import os
import random
import re
import sys
import time
import traceback
from datetime import datetime
from pathlib import Path
import requests
# ====================== 项目固定配置 ======================
PROJECT_NAME = "旧衣回收_铛铛一下"
APPID = "wxe378d2d7636c180e"
HOST = "vues.dd1x.cn"
BASE_URL = f"https://{HOST}"
CHANNEL_ID = 154
LOTTERY_ACTIVITY_ID = 3438615
# 默认超过该金额可提现;是否自动提现见 ENABLE_WITHDRAW
WITHDRAW_THRESHOLD = 0.3
ENABLE_WITHDRAW = True # 原脚本默认自动提现;如需关闭改成 False
# Token 缓存有效期,默认 7 天
TOKEN_CACHE_TTL = 7 * 24 * 3600
# 多账号间隔,防止请求过快
ACCOUNT_DELAY_RANGE = (2, 5)
REQUEST_DELAY_RANGE = (1, 3)
LOTTERY_DELAY_RANGE = (3, 5)
# 养鸡场分页:每页 1 个账号,真正做到取一个跑一个
CLOUD_PAGE_SIZE = 1
# 调试开关;普通模式不打印请求/响应详情、不打印完整 token/code
DEBUG = False
AUTO_BEARER = True
# 代理开关
MULTI_ACCOUNT_PROXY = False
PROXY_API_URL = os.getenv("PROXY_API_URL", "").strip()
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Linux; Android 12; M2012K11AC Build/SKQ1.220303.001; wv) "
"AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 "
"Chrome/134.0.6998.136 Mobile Safari/537.36 XWEB/1340129 "
"MMWEBSDK/20240301 MMWEBID/9871 MicroMessenger/8.0.48.2580(0x28003036) "
"WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android"
)
AUTH_FAIL_KEYWORDS = [
"未登录", "请登录", "登录失效", "登录过期", "token", "Token", "TOKEN",
"无效", "过期", "未授权", "unauthorized", "Unauthorized", "401", "403"
]
# ====================== 路径配置 ======================
BASE_DIR = Path(__file__).resolve().parent
BUFFER_DIR = BASE_DIR / "APP_Buffer"
LOG_DIR = BASE_DIR / "logs"
BUFFER_DIR.mkdir(parents=True, exist_ok=True)
LOG_DIR.mkdir(parents=True, exist_ok=True)
TOKEN_CACHE_FILE = BUFFER_DIR / "ddyx_token_cache.json"
LOG_FILE = LOG_DIR / f"ddyx_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
# ====================== 环境变量 ======================
wx_cloud = os.getenv("wx_cloud", "http://192.168.31.203:666").strip().rstrip("/")
_raw_wx_token = os.getenv("wx_token", "").strip()
if AUTO_BEARER and _raw_wx_token and not _raw_wx_token.lower().startswith("bearer "):
WX_TOKEN = f"Bearer {_raw_wx_token}"
else:
WX_TOKEN = _raw_wx_token
TEST_WXID = os.getenv("DDYX_TEST_WXID", "").strip()
EXCLUDE_WXIDS_RAW = os.getenv("DDYX_EXCLUDE_WXIDS", "").strip()
# 原脚本内置排除;保留但集中放这里
SCRIPT_EXCLUDE_WXIDS = ["wxid_11111111111111"]
def setup_logging():
logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO,
format="%(asctime)s - %(levelname)s\t- %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(LOG_FILE, encoding="utf-8"),
],
)
def split_multi_values(raw):
"""支持换行、逗号、中文逗号、&、@ 分隔。"""
if not raw:
return []
parts = re.split(r"[\n,&@]+", raw)
return [x.strip() for x in parts if x.strip()]
def mask_secret(value, keep_start=6, keep_end=4):
if value is None:
return ""
value = str(value)
if len(value) <= keep_start + keep_end:
return "***"
return f"{value[:keep_start]}***{value[-keep_end:]}"
def mask_phone(tel):
if not tel:
return ""
tel = str(tel)
if len(tel) >= 7:
return tel[:3] + "****" + tel[-4:]
return "***"
def load_json_file(path, default):
if not path.exists():
return default
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception as e:
logging.warning(f"[缓存]读取失败,使用空缓存: {path} | {e}")
return default
def save_json_file(path, data):
try:
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
except Exception as e:
logging.warning(f"[缓存]保存失败: {path} | {e}")
class DDYXTask:
def __init__(self):
self.token_cache = load_json_file(TOKEN_CACHE_FILE, {})
self.exclude_wxids = set(SCRIPT_EXCLUDE_WXIDS + split_multi_values(EXCLUDE_WXIDS_RAW))
self.total_accounts = 0
self.success_accounts = 0
self.login_success_accounts = 0
self.sign_success_accounts = 0
self.draw_success_count = 0
self.withdraw_success_count = 0
self.account_reports = []
# ---------------------- 养鸡场相关 ----------------------
def yjc_headers(self):
return {
"Authorization": WX_TOKEN,
"Content-Type": "application/json",
}
def iter_cloud_accounts(self):
"""分页流式获取账号:取一个账号,跑完,再取下一个。"""
page_num = 1
while True:
url = f"{wx_cloud}/prod-api/wechat/wechat/list"
params = {"pageNum": page_num, "pageSize": CLOUD_PAGE_SIZE}
try:
res = requests.get(url, params=params, headers=self.yjc_headers(), timeout=15)
res.raise_for_status()
data = res.json()
except Exception as e:
logging.error(f"[养鸡场]获取账号列表失败 page={page_num}: {e}")
if DEBUG:
logging.debug(traceback.format_exc())
return
if data.get("code") != 200 or not isinstance(data.get("rows"), list):
logging.error(f"[养鸡场]账号列表返回异常: {json.dumps(data, ensure_ascii=False)[:500]}")
return
rows = data.get("rows", [])
if not rows:
return
for row in rows:
wxid = row.get("wxId") or row.get("wxid") or ""
wxname = row.get("wxName") or row.get("remark") or row.get("nickName") or "未知昵称"
if not wxid:
logging.warning("[养鸡场]账号缺少 wxId跳过")
continue
yield {"wxid": wxid, "wxname": wxname}
total = data.get("total")
if total is not None and page_num * CLOUD_PAGE_SIZE >= int(total):
return
if len(rows) < CLOUD_PAGE_SIZE:
return
page_num += 1
def get_wx_code(self, wxid):
url = f"{wx_cloud}/prod-api/wechat/api/getMiniProgramCode"
payload = {"wxid": wxid, "appid": APPID}
try:
res = requests.post(url, json=payload, headers=self.yjc_headers(), timeout=15)
res.raise_for_status()
data = res.json()
code = data.get("data", {}).get("code")
if data.get("code") == 200 and code:
logging.info("[获取code]完成")
if DEBUG:
logging.debug(f"[获取code][debug] {mask_secret(code)}")
return code
logging.warning(f"[获取code]失败: {json.dumps(data, ensure_ascii=False)[:500]}")
return None
except Exception as e:
logging.error(f"[获取code]异常: {e}")
if DEBUG:
logging.debug(traceback.format_exc())
return None
# ---------------------- 请求会话 ----------------------
def build_session(self):
session = requests.Session()
session.headers.update({"User-Agent": DEFAULT_USER_AGENT})
if MULTI_ACCOUNT_PROXY and PROXY_API_URL:
proxy = self.get_proxy()
if proxy:
session.proxies.update({"http": f"http://{proxy}", "https": f"http://{proxy}"})
logging.info("[代理]已启用")
return session
def get_proxy(self):
try:
res = requests.get(PROXY_API_URL, timeout=10)
proxy = res.text.strip()
if DEBUG:
logging.debug(f"[代理]获取到: {proxy}")
return proxy
except Exception as e:
logging.warning(f"[代理]获取失败,不使用代理: {e}")
return None
def is_auth_failed_response(self, data):
try:
text = json.dumps(data, ensure_ascii=False)
except Exception:
text = str(data)
return any(k in text for k in AUTH_FAIL_KEYWORDS)
def validate_token(self, session):
"""用查询余额/提现列表接口做缓存 Token 校验,不写入业务。"""
url = f"{BASE_URL}/api/h/get_withdrawal_trade_list"
try:
res = session.get(url, timeout=15)
if res.status_code in (401, 403):
return False
data = res.json()
if self.is_auth_failed_response(data):
return False
return True
except Exception as e:
if DEBUG:
logging.debug(f"[Token校验]异常: {e}")
return False
def login_by_code(self, session, code):
url = f"{BASE_URL}/wechat/login"
params = {"code": code, "channelId": CHANNEL_ID}
try:
res = session.get(url, params=params, timeout=15)
res.raise_for_status()
data = res.json()
if DEBUG:
safe_data = json.loads(json.dumps(data, ensure_ascii=False))
if isinstance(safe_data.get("data"), dict) and safe_data["data"].get("token"):
safe_data["data"]["token"] = mask_secret(safe_data["data"]["token"])
logging.debug(f"[登录响应] {json.dumps(safe_data, ensure_ascii=False)[:1000]}")
if data.get("code") == 0 and isinstance(data.get("data"), dict) and data["data"].get("token"):
token = data["data"]["token"]
tel = data["data"].get("tel", "")
session.headers["Token"] = token
logging.info(f"[登录]完成 | 手机: {mask_phone(tel)}")
return {
"token": token,
"tel": mask_phone(tel),
"rawTel": tel,
"updatedAt": int(time.time()),
}
logging.warning(f"[登录]失败: {data.get('msg', '未知错误')}")
return None
except Exception as e:
logging.error(f"[登录]异常: {e}")
if DEBUG:
logging.debug(traceback.format_exc())
return None
def prepare_token(self, session, wxid, wxname):
now = int(time.time())
cached = self.token_cache.get(wxid, {})
cached_token = cached.get("token")
updated_at = int(cached.get("updatedAt", 0) or 0)
if cached_token and now - updated_at < TOKEN_CACHE_TTL:
session.headers["Token"] = cached_token
logging.info(f"[Token缓存]命中 | {cached.get('tel') or wxname},开始校验")
if self.validate_token(session):
logging.info("[Token缓存]校验通过")
return True
logging.info("[Token缓存]校验失败,重新登录")
elif cached_token:
logging.info("[Token缓存]已过期,重新登录")
else:
logging.info("[Token缓存]未命中,开始登录")
code = self.get_wx_code(wxid)
if not code:
return False
login_info = self.login_by_code(session, code)
if not login_info:
return False
self.token_cache[wxid] = {"wxName": wxname, **login_info}
save_json_file(TOKEN_CACHE_FILE, self.token_cache)
logging.info("[Token缓存]已更新")
return True
# ---------------------- 业务接口 ----------------------
def sign_in(self, session):
url = f"{BASE_URL}/api/v2/sign_join"
try:
res = session.get(url, timeout=15)
res.raise_for_status()
data = res.json()
if data.get("code") == 0:
logging.info("[签到]完成 | 成功")
return {"ok": True, "msg": "成功"}
msg = data.get("msg", "未知错误")
logging.info(f"[签到]完成 | {msg}")
return {"ok": False, "msg": msg}
except Exception as e:
logging.warning(f"[签到]异常: {e}")
return {"ok": False, "msg": str(e)}
def draw_once(self, session):
url = f"{BASE_URL}/front/activity/update_lottery_result"
params = {"id": LOTTERY_ACTIVITY_ID}
try:
res = session.get(url, params=params, timeout=15)
res.raise_for_status()
data = res.json()
if data.get("code") == 0:
prize = (data.get("data") or {}).get("goodName", "未知奖励")
logging.info(f"[抽奖]完成 | 获得: {prize}")
return {"ok": True, "prize": prize, "msg": "成功"}
msg = data.get("msg", "未知错误")
logging.info(f"[抽奖]完成 | {msg}")
return {"ok": False, "prize": "", "msg": msg}
except Exception as e:
logging.warning(f"[抽奖]异常: {e}")
return {"ok": False, "prize": "", "msg": str(e)}
def add_lottery_count(self, session):
url = f"{BASE_URL}/front/activity/add_lottery_count"
try:
res = session.get(url, timeout=15)
res.raise_for_status()
data = res.json()
if data.get("code") == 0:
logging.info("[增加抽奖次数]完成 | 成功")
return {"ok": True, "msg": "成功"}
msg = data.get("msg", "未知错误")
logging.info(f"[增加抽奖次数]完成 | {msg}")
return {"ok": False, "msg": msg}
except Exception as e:
logging.warning(f"[增加抽奖次数]异常: {e}")
return {"ok": False, "msg": str(e)}
def run_lottery_flow(self, session):
prizes = []
# 先直接抽,直到服务器提示不能继续
while True:
result = self.draw_once(session)
if not result["ok"]:
break
prizes.append(result.get("prize", "未知奖励"))
self.draw_success_count += 1
time.sleep(random.randint(*LOTTERY_DELAY_RANGE))
# 尝试增加次数,每次增加成功后抽一次
while True:
add_result = self.add_lottery_count(session)
if not add_result["ok"]:
break
time.sleep(random.randint(*REQUEST_DELAY_RANGE))
draw_result = self.draw_once(session)
if draw_result["ok"]:
prizes.append(draw_result.get("prize", "未知奖励"))
self.draw_success_count += 1
time.sleep(random.randint(*LOTTERY_DELAY_RANGE))
return prizes
def get_withdrawal_trade_list(self, session):
url = f"{BASE_URL}/api/h/get_withdrawal_trade_list"
try:
res = session.get(url, timeout=15)
res.raise_for_status()
data = res.json()
if data.get("code") == 0 and isinstance(data.get("data"), list) and data["data"]:
balance = float(data["data"][0].get("money", 0) or 0)
logging.info(f"[余额查询]完成 | 余额: {balance}")
return {"ok": True, "balance": balance, "list": data["data"], "msg": "成功"}
msg = data.get("msg", "未知错误")
logging.info(f"[余额查询]完成 | {msg}")
return {"ok": False, "balance": 0.0, "list": [], "msg": msg}
except Exception as e:
logging.warning(f"[余额查询]异常: {e}")
return {"ok": False, "balance": 0.0, "list": [], "msg": str(e)}
def withdraw(self, session, balance, withdrawal_list):
url = f"{BASE_URL}/api/h/withdrawal"
payload = {
"totalMoney": balance,
"type": 1,
"withdrawalDetailPojoList": withdrawal_list,
}
try:
res = session.post(url, json=payload, timeout=15)
res.raise_for_status()
data = res.json()
if data.get("code") == 0:
msg = data.get("msg", "提现成功")
logging.info(f"[提现]完成 | {msg}")
self.withdraw_success_count += 1
return {"ok": True, "msg": msg}
msg = data.get("msg", "提现失败")
logging.info(f"[提现]完成 | {msg}")
return {"ok": False, "msg": msg}
except Exception as e:
logging.warning(f"[提现]异常: {e}")
return {"ok": False, "msg": str(e)}
# ---------------------- 单账号执行 ----------------------
def should_skip_account(self, wxid):
if TEST_WXID and wxid != TEST_WXID:
return "非测试账号"
if wxid in self.exclude_wxids:
return "排除列表"
return ""
def run_one_account(self, index, wxid, wxname):
report = {
"index": index,
"wxid": wxid,
"wxname": wxname,
"login": False,
"sign": "未执行",
"draw_count": 0,
"prizes": [],
"balance": 0.0,
"withdraw": "未执行",
"status": "失败",
}
logging.info("")
logging.info(f"========== 账号{index} | {wxname} | {wxid} 开始 ==========")
skip_reason = self.should_skip_account(wxid)
if skip_reason:
report["status"] = f"跳过:{skip_reason}"
logging.info(f"[账号跳过]原因: {skip_reason}")
self.account_reports.append(report)
return
session = self.build_session()
if not self.prepare_token(session, wxid, wxname):
report["status"] = "登录失败"
logging.info(f"========== 账号{index} | 完成 | 登录失败 ==========")
self.account_reports.append(report)
return
report["login"] = True
self.login_success_accounts += 1
time.sleep(random.randint(*REQUEST_DELAY_RANGE))
sign_result = self.sign_in(session)
report["sign"] = sign_result.get("msg", "未知")
if sign_result.get("ok"):
self.sign_success_accounts += 1
time.sleep(random.randint(*REQUEST_DELAY_RANGE))
prizes = self.run_lottery_flow(session)
report["prizes"] = prizes
report["draw_count"] = len(prizes)
withdraw_data = self.get_withdrawal_trade_list(session)
if withdraw_data.get("ok"):
balance = withdraw_data.get("balance", 0.0)
report["balance"] = balance
if ENABLE_WITHDRAW:
if balance >= WITHDRAW_THRESHOLD:
withdraw_result = self.withdraw(session, balance, withdraw_data.get("list", []))
report["withdraw"] = withdraw_result.get("msg", "未知")
else:
report["withdraw"] = f"余额不足{WITHDRAW_THRESHOLD}元,不提现"
logging.info(f"[提现]完成 | 余额{balance}元不足{WITHDRAW_THRESHOLD}元,不提现")
else:
report["withdraw"] = "未开启自动提现"
logging.info("[提现]完成 | 未开启自动提现")
else:
report["withdraw"] = f"余额查询失败: {withdraw_data.get('msg', '')}"
report["status"] = "完成"
self.success_accounts += 1
self.account_reports.append(report)
logging.info(
f"========== 账号{index} | 完成 | 签到:{report['sign']} | "
f"抽奖:{report['draw_count']}次 | 余额:{report['balance']}元 | 提现:{report['withdraw']} =========="
)
# ---------------------- 主入口 ----------------------
def run(self):
logging.info(f"{PROJECT_NAME}】开始执行")
logging.info(f"[配置] APPID: {APPID}")
logging.info(f"[配置] 养鸡场: {wx_cloud}")
logging.info(f"[配置] Token缓存: {TOKEN_CACHE_FILE}")
logging.info(f"[配置] 日志文件: {LOG_FILE}")
logging.info(f"[配置] 自动提现: {'开启' if ENABLE_WITHDRAW else '关闭'} | 阈值: {WITHDRAW_THRESHOLD}")
if not WX_TOKEN:
logging.error("[启动失败]请先设置环境变量 wx_token")
return
index = 0
for account in self.iter_cloud_accounts():
index += 1
self.total_accounts += 1
self.run_one_account(index, account["wxid"], account["wxname"])
time.sleep(random.randint(*ACCOUNT_DELAY_RANGE))
save_json_file(TOKEN_CACHE_FILE, self.token_cache)
self.print_summary()
def print_summary(self):
logging.info("")
logging.info("==================== 执行汇总 ====================")
logging.info(f"项目: {PROJECT_NAME}")
logging.info(f"总账号数: {self.total_accounts}")
logging.info(f"登录成功账号数: {self.login_success_accounts}")
logging.info(f"任务完成账号数: {self.success_accounts}")
logging.info(f"签到成功账号数: {self.sign_success_accounts}")
logging.info(f"成功抽奖次数: {self.draw_success_count}")
logging.info(f"提现成功次数: {self.withdraw_success_count}")
logging.info(f"Token缓存文件: {TOKEN_CACHE_FILE}")
logging.info(f"日志文件: {LOG_FILE}")
logging.info("-------------------- 账号明细 --------------------")
for r in self.account_reports:
prize_text = "".join(r.get("prizes") or []) or ""
logging.info(
f"账号{r['index']} | {r['wxname']} | 状态:{r['status']} | "
f"签到:{r['sign']} | 抽奖:{r['draw_count']}次 | 奖品:{prize_text} | "
f"余额:{r['balance']}元 | 提现:{r['withdraw']}"
)
logging.info("==================================================")
if __name__ == "__main__":
setup_logging()
try:
DDYXTask().run()
except KeyboardInterrupt:
logging.warning("用户中断执行")
except Exception as e:
logging.error(f"主程序异常: {e}")
if DEBUG:
logging.debug(traceback.format_exc())