更新 WX_Applet/Applet_JYHS_DDYX.py

This commit is contained in:
2026-05-24 02:27:18 +08:00
parent 5834aaca48
commit 16b15b71f7

View File

@@ -1,469 +1,604 @@
# cron: 2 7 * * *
# new Env("旧衣回收_铛铛一下")
import hashlib
import random
import time
import requests
import os
import logging
import traceback
import base64
# new Env("旧衣回收_铛铛一下_通用版")
"""
旧衣回收_铛铛一下 通用一体化版本
核心链路:
wx_cloud + wx_token + wxid + appid
-> 养鸡场 getMiniProgramCode
-> https://vues.dd1x.cn/wechat/login?code=xxx&channelId=154
-> data.token
-> 后续接口使用 Header: Token
外部环境变量尽量保持最少:
1. wx_cloud 养鸡场地址,默认 http://192.168.31.203:666
2. wx_token 养鸡场 Authorization默认自动补 Bearer
3. DDYX_TEST_WXID 可选,只跑指定 wxid便于测试
4. DDYX_EXCLUDE_WXIDS 可选,排除 wxid支持换行/逗号/&/@ 分隔
缓存:
同目录 APP_Buffer/ddyx_token_cache.json
日志:
同目录 logs/ddyx_YYYYMMDD_HHMMSS.log
"""
import json
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
import logging
import os
import random
import re
import sys
import time
import traceback
from datetime import datetime
from pathlib import Path
# ====================== 养鸡场配置 ======================
WX_TOKEN = os.getenv('wx_token') # 养鸡场 Authorization 令牌
wx_cloud = os.getenv('wx_cloud', 'http://192.168.31.203:666') # 养鸡场服务地址
# ========================================================
import requests
remove_wxids1 = ["wxid_11111111111111"] # 需要剔除的多个wxid
DEFAULT_WITHDRAW_BALANCE = 0.3 # 默认超过该金额进行提现需大于等于0.3
MULTI_ACCOUNT_SPLIT = ["\n", "@"] # 分隔符列表
MULTI_ACCOUNT_PROXY = False # 是否使用多账号代理默认不使用True则使用多账号代理
# ====================== 项目固定配置 ======================
PROJECT_NAME = "旧衣回收_铛铛一下"
APPID = "wxe378d2d7636c180e"
HOST = "vues.dd1x.cn"
BASE_URL = f"https://{HOST}"
CHANNEL_ID = 154
LOTTERY_ACTIVITY_ID = 3438615
# 微信小程序ID根据实际业务调整
WX_APPID = "wxe378d2d7636c180e"
# 默认超过该金额可提现;是否自动提现见 ENABLE_WITHDRAW
WITHDRAW_THRESHOLD = 0.3
ENABLE_WITHDRAW = True # 原脚本默认自动提现;如需关闭改成 False
class WXBizDataCryptUtil:
"""
微信小程序加解密工具
"""
def __init__(self, sessionKey):
self.sessionKey = sessionKey
# Token 缓存有效期,默认 7 天
TOKEN_CACHE_TTL = 7 * 24 * 3600
def encrypt(self, data, iv=None):
"""
data: dict或str若为dict自动转为json字符串
iv: base64字符串若为None自动生成
返回: (加密数据base64, iv base64)
"""
if isinstance(data, dict):
data = json.dumps(data, separators=(',', ':'))
if iv is None:
iv_bytes = get_random_bytes(16)
iv = base64.b64encode(iv_bytes).decode('utf-8')
else:
iv_bytes = base64.b64decode(iv)
sessionKey = base64.b64decode(self.sessionKey)
cipher = AES.new(sessionKey, AES.MODE_CBC, iv_bytes)
padded = self._pad(data.encode('utf-8'))
encrypted = cipher.encrypt(padded)
encrypted_b64 = base64.b64encode(encrypted).decode('utf-8')
return encrypted_b64, iv
# 多账号间隔,防止请求过快
ACCOUNT_DELAY_RANGE = (2, 5)
REQUEST_DELAY_RANGE = (1, 3)
LOTTERY_DELAY_RANGE = (3, 5)
def decrypt(self, encryptedData, iv):
"""
encryptedData: base64字符串
iv: base64字符串
返回: dict或str
"""
sessionKey = base64.b64decode(self.sessionKey)
encryptedData = base64.b64decode(encryptedData)
iv = base64.b64decode(iv)
cipher = AES.new(sessionKey, AES.MODE_CBC, iv)
decrypted = self._unpad(cipher.decrypt(encryptedData))
try:
return json.loads(decrypted)
except Exception:
return decrypted.decode('utf-8')
# 养鸡场分页:每页 1 个账号,真正做到取一个跑一个
CLOUD_PAGE_SIZE = 1
def _pad(self, s):
pad_len = 16 - len(s) % 16
return s + bytes([pad_len] * pad_len)
# 调试开关;普通模式不打印请求/响应详情、不打印完整 token/code
DEBUG = False
AUTO_BEARER = True
def _unpad(self, s):
return s[:-s[-1]]
# 代理开关
MULTI_ACCOUNT_PROXY = False
PROXY_API_URL = os.getenv("PROXY_API_URL", "").strip()
class AutoTask:
def __init__(self, site_name):
"""
初始化自动任务类
:param site_name: 站点名称,用于日志显示
"""
self.site_name = site_name
self.proxy_url = os.getenv("PROXY_API_URL") # 代理api返回一条txt文本内容为代理ip:端口
self.wx_appid = WX_APPID # 微信小程序id
self.host = "vues.dd1x.cn"
self.user_agent = "Mozilla/5.0 (Linux; Android 12; M2012K11AC Build/SKQ1.220303.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/134.0.6998.136 Mobile Safari/537.36 XWEB/1340129 MMWEBSDK/20240301 MMWEBID/9871 MicroMessenger/8.0.48.2580(0x28003036) WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android"
self.setup_logging()
def setup_logging(self):
"""
配置日志系统
"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s\t- %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.StreamHandler()
]
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Linux; Android 12; M2012K11AC Build/SKQ1.220303.001; wv) "
"AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 "
"Chrome/134.0.6998.136 Mobile Safari/537.36 XWEB/1340129 "
"MMWEBSDK/20240301 MMWEBID/9871 MicroMessenger/8.0.48.2580(0x28003036) "
"WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android"
)
def get_proxy(self):
"""
获取代理
:return: 代理
"""
if not self.proxy_url:
logging.warning("[获取代理]没有找到环境变量PROXY_API_URL不使用代理")
return None
url = self.proxy_url
response = requests.get(url)
proxy = response.text.strip() # 去除首尾空格和换行
logging.info(f"[获取代理]: {proxy}")
return proxy
AUTH_FAIL_KEYWORDS = [
"未登录", "请登录", "登录失效", "登录过期", "token", "Token", "TOKEN",
"无效", "过期", "未授权", "unauthorized", "Unauthorized", "401", "403"
]
def check_proxy(self, proxy, session):
"""
检查代理
:param proxy: 代理
:param session: session
:return: 是否可用
"""
try:
url = f"http://{self.host}/api/v2/get_sign_list"
session.headers["Token"] = ""
response = session.get(url, timeout=5)
if response.status_code == 200:
logging.info(f"[检查代理]: {proxy} 应该可用")
return True
# ====================== 路径配置 ======================
BASE_DIR = Path(__file__).resolve().parent
BUFFER_DIR = BASE_DIR / "APP_Buffer"
LOG_DIR = BASE_DIR / "logs"
BUFFER_DIR.mkdir(parents=True, exist_ok=True)
LOG_DIR.mkdir(parents=True, exist_ok=True)
TOKEN_CACHE_FILE = BUFFER_DIR / "ddyx_token_cache.json"
LOG_FILE = LOG_DIR / f"ddyx_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
# ====================== 环境变量 ======================
wx_cloud = os.getenv("wx_cloud", "http://192.168.31.203:666").strip().rstrip("/")
_raw_wx_token = os.getenv("wx_token", "").strip()
if AUTO_BEARER and _raw_wx_token and not _raw_wx_token.lower().startswith("bearer "):
WX_TOKEN = f"Bearer {_raw_wx_token}"
else:
logging.info(f"[检查代理]: {response.text}")
return False
except Exception as e:
logging.error(f"[检查代理]代理 {proxy} 不可用: {str(e)}")
return False
WX_TOKEN = _raw_wx_token
def get_wechat_account_list(self):
"""
从养鸡场接口获取账号列表
:return: 账号列表/False
"""
TEST_WXID = os.getenv("DDYX_TEST_WXID", "").strip()
EXCLUDE_WXIDS_RAW = os.getenv("DDYX_EXCLUDE_WXIDS", "").strip()
# 原脚本内置排除;保留但集中放这里
SCRIPT_EXCLUDE_WXIDS = ["wxid_11111111111111"]
def setup_logging():
logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO,
format="%(asctime)s - %(levelname)s\t- %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(LOG_FILE, encoding="utf-8"),
],
)
def split_multi_values(raw):
"""支持换行、逗号、中文逗号、&、@ 分隔。"""
if not raw:
return []
parts = re.split(r"[\n,&@]+", raw)
return [x.strip() for x in parts if x.strip()]
def mask_secret(value, keep_start=6, keep_end=4):
if value is None:
return ""
value = str(value)
if len(value) <= keep_start + keep_end:
return "***"
return f"{value[:keep_start]}***{value[-keep_end:]}"
def mask_phone(tel):
if not tel:
return ""
tel = str(tel)
if len(tel) >= 7:
return tel[:3] + "****" + tel[-4:]
return "***"
def load_json_file(path, default):
if not path.exists():
return default
try:
if not WX_TOKEN:
logging.error("[获取账号列表]未设置环境变量 wx_token")
return False
return json.loads(path.read_text(encoding="utf-8"))
except Exception as e:
logging.warning(f"[缓存]读取失败,使用空缓存: {path} | {e}")
return default
url = f"{wx_cloud}/prod-api/wechat/wechat/list?pageNum=1&pageSize=1000"
headers = {
'Authorization': WX_TOKEN,
'Content-Type': 'application/json',
def save_json_file(path, data):
try:
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
except Exception as e:
logging.warning(f"[缓存]保存失败: {path} | {e}")
class DDYXTask:
def __init__(self):
self.token_cache = load_json_file(TOKEN_CACHE_FILE, {})
self.exclude_wxids = set(SCRIPT_EXCLUDE_WXIDS + split_multi_values(EXCLUDE_WXIDS_RAW))
self.total_accounts = 0
self.success_accounts = 0
self.login_success_accounts = 0
self.sign_success_accounts = 0
self.draw_success_count = 0
self.withdraw_success_count = 0
self.account_reports = []
# ---------------------- 养鸡场相关 ----------------------
def yjc_headers(self):
return {
"Authorization": WX_TOKEN,
"Content-Type": "application/json",
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
res_data = response.json()
if res_data and res_data.get('code') == 200 and isinstance(res_data.get('rows'), list):
accounts = res_data['rows']
logging.info(f"[获取账号列表]成功获取到 {len(accounts)} 个账号")
return accounts
else:
logging.error(f"[获取账号列表]接口返回异常: {json.dumps(res_data)}")
return False
except Exception as e:
logging.error(f"[获取账号列表]发生错误: {str(e)}\n{traceback.format_exc()}")
return False
def get_wx_code_yjc(self, wxid):
"""
使用养鸡场接口获取微信小程序Code
:param wxid: 微信ID
:return: code/False
"""
def iter_cloud_accounts(self):
"""分页流式获取账号:取一个账号,跑完,再取下一个。"""
page_num = 1
while True:
url = f"{wx_cloud}/prod-api/wechat/wechat/list"
params = {"pageNum": page_num, "pageSize": CLOUD_PAGE_SIZE}
try:
if not WX_TOKEN:
logging.error("[获取Code]未设置环境变量 wx_token")
return False
res = requests.get(url, params=params, headers=self.yjc_headers(), timeout=15)
res.raise_for_status()
data = res.json()
except Exception as e:
logging.error(f"[养鸡场]获取账号列表失败 page={page_num}: {e}")
if DEBUG:
logging.debug(traceback.format_exc())
return
if data.get("code") != 200 or not isinstance(data.get("rows"), list):
logging.error(f"[养鸡场]账号列表返回异常: {json.dumps(data, ensure_ascii=False)[:500]}")
return
rows = data.get("rows", [])
if not rows:
return
for row in rows:
wxid = row.get("wxId") or row.get("wxid") or ""
wxname = row.get("wxName") or row.get("remark") or row.get("nickName") or "未知昵称"
if not wxid:
logging.warning("[养鸡场]账号缺少 wxId跳过")
continue
yield {"wxid": wxid, "wxname": wxname}
total = data.get("total")
if total is not None and page_num * CLOUD_PAGE_SIZE >= int(total):
return
if len(rows) < CLOUD_PAGE_SIZE:
return
page_num += 1
def get_wx_code(self, wxid):
url = f"{wx_cloud}/prod-api/wechat/api/getMiniProgramCode"
headers = {
'Authorization': WX_TOKEN,
'Content-Type': 'application/json'
}
payload = {
"wxid": wxid,
"appid": self.wx_appid
}
response = requests.post(url, json=payload, headers=headers, timeout=10)
response.raise_for_status()
res_data = response.json()
if res_data and res_data.get('code') == 200 and res_data.get('data', {}).get('code'):
code = res_data['data']['code']
logging.info(f"[获取Code]成功获取 wxid:{wxid} 的code")
payload = {"wxid": wxid, "appid": APPID}
try:
res = requests.post(url, json=payload, headers=self.yjc_headers(), timeout=15)
res.raise_for_status()
data = res.json()
code = data.get("data", {}).get("code")
if data.get("code") == 200 and code:
logging.info("[获取code]完成")
if DEBUG:
logging.debug(f"[获取code][debug] {mask_secret(code)}")
return code
else:
logging.error(f"[获取Code]接口返回异常: {json.dumps(res_data)}")
return False
logging.warning(f"[获取code]失败: {json.dumps(data, ensure_ascii=False)[:500]}")
return None
except Exception as e:
logging.error(f"[获取Code]wxid:{wxid} 发生错误: {str(e)}\n{traceback.format_exc()}")
logging.error(f"[获取code]异常: {e}")
if DEBUG:
logging.debug(traceback.format_exc())
return None
# ---------------------- 请求会话 ----------------------
def build_session(self):
session = requests.Session()
session.headers.update({"User-Agent": DEFAULT_USER_AGENT})
if MULTI_ACCOUNT_PROXY and PROXY_API_URL:
proxy = self.get_proxy()
if proxy:
session.proxies.update({"http": f"http://{proxy}", "https": f"http://{proxy}"})
logging.info("[代理]已启用")
return session
def get_proxy(self):
try:
res = requests.get(PROXY_API_URL, timeout=10)
proxy = res.text.strip()
if DEBUG:
logging.debug(f"[代理]获取到: {proxy}")
return proxy
except Exception as e:
logging.warning(f"[代理]获取失败,不使用代理: {e}")
return None
def is_auth_failed_response(self, data):
try:
text = json.dumps(data, ensure_ascii=False)
except Exception:
text = str(data)
return any(k in text for k in AUTH_FAIL_KEYWORDS)
def validate_token(self, session):
"""用查询余额/提现列表接口做缓存 Token 校验,不写入业务。"""
url = f"{BASE_URL}/api/h/get_withdrawal_trade_list"
try:
res = session.get(url, timeout=15)
if res.status_code in (401, 403):
return False
data = res.json()
if self.is_auth_failed_response(data):
return False
return True
except Exception as e:
if DEBUG:
logging.debug(f"[Token校验]异常: {e}")
return False
def wxlogin(self, session, code):
"""
登录
:param session: session
:param code: 微信code
:return: 登录结果
"""
def login_by_code(self, session, code):
url = f"{BASE_URL}/wechat/login"
params = {"code": code, "channelId": CHANNEL_ID}
try:
url = f"https://{self.host}/wechat/login"
params = {
"code": code,
"channelId": 154
}
response = session.get(url, params=params, timeout=10)
response.raise_for_status()
response_json = response.json()
res = session.get(url, params=params, timeout=15)
res.raise_for_status()
data = res.json()
if DEBUG:
safe_data = json.loads(json.dumps(data, ensure_ascii=False))
if isinstance(safe_data.get("data"), dict) and safe_data["data"].get("token"):
safe_data["data"]["token"] = mask_secret(safe_data["data"]["token"])
logging.debug(f"[登录响应] {json.dumps(safe_data, ensure_ascii=False)[:1000]}")
if response_json.get('code') == 0:
tel = response_json['data'].get('tel', '')
# 号码中间4位*号代替
if tel:
tel = tel[:3] + "****" + tel[-4:]
logging.info(f"[登录]成功: 当前账号 {tel}")
token = response_json['data']['token']
if data.get("code") == 0 and isinstance(data.get("data"), dict) and data["data"].get("token"):
token = data["data"]["token"]
tel = data["data"].get("tel", "")
session.headers["Token"] = token
return True
else:
logging.error(f"[登录]发生错误: {response_json.get('msg', '未知错误')}")
return False
except requests.RequestException as e:
logging.error(f"[登录]发生网络错误: {str(e)}\n{traceback.format_exc()}")
return False
logging.info(f"[登录]完成 | 手机: {mask_phone(tel)}")
return {
"token": token,
"tel": mask_phone(tel),
"rawTel": tel,
"updatedAt": int(time.time()),
}
logging.warning(f"[登录]失败: {data.get('msg', '未知错误')}")
return None
except Exception as e:
logging.error(f"[登录]发生错误: {str(e)}\n{traceback.format_exc()}")
logging.error(f"[登录]异常: {e}")
if DEBUG:
logging.debug(traceback.format_exc())
return None
def prepare_token(self, session, wxid, wxname):
now = int(time.time())
cached = self.token_cache.get(wxid, {})
cached_token = cached.get("token")
updated_at = int(cached.get("updatedAt", 0) or 0)
if cached_token and now - updated_at < TOKEN_CACHE_TTL:
session.headers["Token"] = cached_token
logging.info(f"[Token缓存]命中 | {cached.get('tel') or wxname},开始校验")
if self.validate_token(session):
logging.info("[Token缓存]校验通过")
return True
logging.info("[Token缓存]校验失败,重新登录")
elif cached_token:
logging.info("[Token缓存]已过期,重新登录")
else:
logging.info("[Token缓存]未命中,开始登录")
code = self.get_wx_code(wxid)
if not code:
return False
login_info = self.login_by_code(session, code)
if not login_info:
return False
self.token_cache[wxid] = {"wxName": wxname, **login_info}
save_json_file(TOKEN_CACHE_FILE, self.token_cache)
logging.info("[Token缓存]已更新")
return True
# ---------------------- 业务接口 ----------------------
def sign_in(self, session):
"""
签到
:param session: session
:return: 签到结果
"""
url = f"{BASE_URL}/api/v2/sign_join"
try:
url = f"https://{self.host}/api/v2/sign_join"
response = session.get(url, timeout=10)
response.raise_for_status()
response_json = response.json()
if response_json.get('code') == 0:
logging.info(f"[签到]: 成功")
return True
else:
logging.warning(f"[签到]: {response_json.get('msg', '未知错误')}")
return False
res = session.get(url, timeout=15)
res.raise_for_status()
data = res.json()
if data.get("code") == 0:
logging.info("[签到]完成 | 成功")
return {"ok": True, "msg": "成功"}
msg = data.get("msg", "未知错误")
logging.info(f"[签到]完成 | {msg}")
return {"ok": False, "msg": msg}
except Exception as e:
logging.error(f"[签到]发生错误: {str(e)}\n{traceback.format_exc()}")
return False
logging.warning(f"[签到]异常: {e}")
return {"ok": False, "msg": str(e)}
def draw_once(self, session):
url = f"{BASE_URL}/front/activity/update_lottery_result"
params = {"id": LOTTERY_ACTIVITY_ID}
try:
res = session.get(url, params=params, timeout=15)
res.raise_for_status()
data = res.json()
if data.get("code") == 0:
prize = (data.get("data") or {}).get("goodName", "未知奖励")
logging.info(f"[抽奖]完成 | 获得: {prize}")
return {"ok": True, "prize": prize, "msg": "成功"}
msg = data.get("msg", "未知错误")
logging.info(f"[抽奖]完成 | {msg}")
return {"ok": False, "prize": "", "msg": msg}
except Exception as e:
logging.warning(f"[抽奖]异常: {e}")
return {"ok": False, "prize": "", "msg": str(e)}
def add_lottery_count(self, session):
"""
增加抽奖次数
:param session: session
:return: 增加抽奖次数结果
"""
url = f"{BASE_URL}/front/activity/add_lottery_count"
try:
url = f"https://{self.host}/front/activity/add_lottery_count"
response = session.get(url, timeout=10)
response.raise_for_status()
response_json = response.json()
if response_json.get('code') == 0:
logging.info(f"[增加抽奖次数]: 成功")
return True
elif "达到上限" in response_json.get('msg', ''):
logging.warning(f"[增加抽奖次数]: {response_json.get('msg')}")
return False
else:
logging.error(f"[增加抽奖次数]发生错误: {response_json.get('msg', '未知错误')}")
return False
res = session.get(url, timeout=15)
res.raise_for_status()
data = res.json()
if data.get("code") == 0:
logging.info("[增加抽奖次数]完成 | 成功")
return {"ok": True, "msg": "成功"}
msg = data.get("msg", "未知错误")
logging.info(f"[增加抽奖次数]完成 | {msg}")
return {"ok": False, "msg": msg}
except Exception as e:
logging.error(f"[增加抽奖次数]发生错误: {str(e)}\n{traceback.format_exc()}")
return False
logging.warning(f"[增加抽奖次数]异常: {e}")
return {"ok": False, "msg": str(e)}
def update_lottery_result(self, session):
"""
更新抽奖结果(执行抽奖)
:param session: session
:return: 抽奖结果
"""
try:
url = f"https://{self.host}/front/activity/update_lottery_result"
params = {
"id": 3438615
}
response = session.get(url, params=params, timeout=10)
response.raise_for_status()
response_json = response.json()
def run_lottery_flow(self, session):
prizes = []
if response_json.get('code') == 0:
good_name = response_json['data'].get('goodName', '未知奖励')
logging.info(f"[抽奖]: 获得{good_name}")
return True
else:
logging.warning(f"[抽奖]: {response_json.get('msg', '未知错误')}")
return False
except Exception as e:
logging.error(f"[抽奖]发生错误: {str(e)}\n{traceback.format_exc()}")
return False
# 先直接抽,直到服务器提示不能继续
while True:
result = self.draw_once(session)
if not result["ok"]:
break
prizes.append(result.get("prize", "未知奖励"))
self.draw_success_count += 1
time.sleep(random.randint(*LOTTERY_DELAY_RANGE))
# 尝试增加次数,每次增加成功后抽一次
while True:
add_result = self.add_lottery_count(session)
if not add_result["ok"]:
break
time.sleep(random.randint(*REQUEST_DELAY_RANGE))
draw_result = self.draw_once(session)
if draw_result["ok"]:
prizes.append(draw_result.get("prize", "未知奖励"))
self.draw_success_count += 1
time.sleep(random.randint(*LOTTERY_DELAY_RANGE))
return prizes
def get_withdrawal_trade_list(self, session):
"""
获取提现相关数据
:param session: session
:return: (余额, 提现数据)/False
"""
url = f"{BASE_URL}/api/h/get_withdrawal_trade_list"
try:
url = f"https://{self.host}/api/h/get_withdrawal_trade_list"
response = session.get(url, timeout=10)
response.raise_for_status()
response_json = response.json()
if response_json.get('code') == 0 and isinstance(response_json.get('data'), list) and len(response_json['data']) > 0:
balance = float(response_json['data'][0].get('money', 0))
logging.info(f"[余额]: {balance}")
return balance, response_json['data']
else:
logging.error(f"[获取提现相关数据]发生错误: {response_json.get('msg', '未知错误')}")
return False
res = session.get(url, timeout=15)
res.raise_for_status()
data = res.json()
if data.get("code") == 0 and isinstance(data.get("data"), list) and data["data"]:
balance = float(data["data"][0].get("money", 0) or 0)
logging.info(f"[余额查询]完成 | 余额: {balance}")
return {"ok": True, "balance": balance, "list": data["data"], "msg": "成功"}
msg = data.get("msg", "未知错误")
logging.info(f"[余额查询]完成 | {msg}")
return {"ok": False, "balance": 0.0, "list": [], "msg": msg}
except Exception as e:
logging.error(f"[获取提现相关数据]发生错误: {str(e)}\n{traceback.format_exc()}")
return False
logging.warning(f"[余额查询]异常: {e}")
return {"ok": False, "balance": 0.0, "list": [], "msg": str(e)}
def withdraw(self, session, balance, withdrawal_trade_list):
"""
提现
:param session: session
:param balance: 余额
:param withdrawal_trade_list: 提现相关数据
:return: 提现结果
"""
try:
url = f"https://{self.host}/api/h/withdrawal"
def withdraw(self, session, balance, withdrawal_list):
url = f"{BASE_URL}/api/h/withdrawal"
payload = {
"totalMoney": balance,
"type": 1,
"withdrawalDetailPojoList": withdrawal_trade_list
"withdrawalDetailPojoList": withdrawal_list,
}
response = session.post(url, json=payload, timeout=10)
response.raise_for_status()
response_json = response.json()
if response_json.get('code') == 0:
logging.info(f"[提现]: {response_json.get('msg', '提现成功')}")
return True
else:
logging.warning(f"[提现]: {response_json.get('msg', '提现失败')}")
return False
except Exception as e:
logging.error(f"[提现]发生错误: {str(e)}\n{traceback.format_exc()}")
return False
def run(self):
"""
运行任务
"""
try:
logging.info(f"{self.site_name}】开始执行任务")
res = session.post(url, json=payload, timeout=15)
res.raise_for_status()
data = res.json()
if data.get("code") == 0:
msg = data.get("msg", "提现成功")
logging.info(f"[提现]完成 | {msg}")
self.withdraw_success_count += 1
return {"ok": True, "msg": msg}
msg = data.get("msg", "提现失败")
logging.info(f"[提现]完成 | {msg}")
return {"ok": False, "msg": msg}
except Exception as e:
logging.warning(f"[提现]异常: {e}")
return {"ok": False, "msg": str(e)}
# 从养鸡场获取账号列表
accounts = self.get_wechat_account_list()
if not accounts:
logging.error("{self.site_name}】获取账号列表失败,任务终止")
return
# ---------------------- 单账号执行 ----------------------
def should_skip_account(self, wxid):
if TEST_WXID and wxid != TEST_WXID:
return "非测试账号"
if wxid in self.exclude_wxids:
return "排除列表"
return ""
# 过滤需要剔除的账号
combined_remove = set(remove_wxids1)
filtered_accounts = [acc for acc in accounts if acc.get('wxId') not in combined_remove]
logging.info(f"[账号过滤]剔除后剩余 {len(filtered_accounts)} 个账号")
# 执行每个账号的任务
for index, account in enumerate(filtered_accounts, 1):
wxid = account.get('wxId')
wxname = account.get('wxName', '未知昵称')
def run_one_account(self, index, wxid, wxname):
report = {
"index": index,
"wxid": wxid,
"wxname": wxname,
"login": False,
"sign": "未执行",
"draw_count": 0,
"prizes": [],
"balance": 0.0,
"withdraw": "未执行",
"status": "失败",
}
logging.info("")
logging.info(f"------ 【账号{index}: {wxname} ({wxid})】开始执行任务 ------")
logging.info(f"========== 账号{index} | {wxname} | {wxid} 开始 ==========")
# 初始化session和代理
if MULTI_ACCOUNT_PROXY:
proxy = self.get_proxy()
session = requests.Session()
if proxy:
session.proxies.update({
"http": f"http://{proxy}",
"https": f"http://{proxy}"
})
# 检查代理,不可用则不使用代理
if not self.check_proxy(proxy, session):
session.proxies.clear()
logging.warning("[代理]使用默认网络环境")
else:
session = requests.Session()
skip_reason = self.should_skip_account(wxid)
if skip_reason:
report["status"] = f"跳过:{skip_reason}"
logging.info(f"[账号跳过]原因: {skip_reason}")
self.account_reports.append(report)
return
session.headers["User-Agent"] = self.user_agent
session = self.build_session()
# 执行微信授权获取code
code = self.get_wx_code_yjc(wxid)
if code:
# 登录
login_result = self.wxlogin(session, code)
time.sleep(random.randint(1, 3))
if not self.prepare_token(session, wxid, wxname):
report["status"] = "登录失败"
logging.info(f"========== 账号{index} | 完成 | 登录失败 ==========")
self.account_reports.append(report)
return
if login_result:
# 签到
self.sign_in(session)
time.sleep(random.randint(1, 3))
report["login"] = True
self.login_success_accounts += 1
time.sleep(random.randint(*REQUEST_DELAY_RANGE))
# 抽奖循环
lottery_result = self.update_lottery_result(session)
while lottery_result:
time.sleep(random.randint(3, 5))
lottery_result = self.update_lottery_result(session)
sign_result = self.sign_in(session)
report["sign"] = sign_result.get("msg", "未知")
if sign_result.get("ok"):
self.sign_success_accounts += 1
time.sleep(random.randint(*REQUEST_DELAY_RANGE))
# 增加抽奖次数并继续抽奖
add_count_result = self.add_lottery_count(session)
while add_count_result:
self.update_lottery_result(session)
time.sleep(random.randint(3, 5))
add_count_result = self.add_lottery_count(session)
prizes = self.run_lottery_flow(session)
report["prizes"] = prizes
report["draw_count"] = len(prizes)
# 提现逻辑
withdraw_data = self.get_withdrawal_trade_list(session)
if withdraw_data:
balance, withdrawal_list = withdraw_data
if balance >= DEFAULT_WITHDRAW_BALANCE:
self.withdraw(session, balance, withdrawal_list)
time.sleep(random.randint(1, 3))
if withdraw_data.get("ok"):
balance = withdraw_data.get("balance", 0.0)
report["balance"] = balance
if ENABLE_WITHDRAW:
if balance >= WITHDRAW_THRESHOLD:
withdraw_result = self.withdraw(session, balance, withdraw_data.get("list", []))
report["withdraw"] = withdraw_result.get("msg", "未知")
else:
logging.warning(f"[提现]: 余额{balance}元不足{DEFAULT_WITHDRAW_BALANCE}元,不进行提现")
report["withdraw"] = f"余额不足{WITHDRAW_THRESHOLD}元,不提现"
logging.info(f"[提现]完成 | 余额{balance}元不足{WITHDRAW_THRESHOLD}元,不提现")
else:
logging.warning("[提现]: 获取提现数据失败")
report["withdraw"] = "未开启自动提现"
logging.info("[提现]完成 | 未开启自动提现")
else:
logging.error(f"[授权]wxid:{wxid} 获取Code失败跳过该账号")
report["withdraw"] = f"余额查询失败: {withdraw_data.get('msg', '')}"
logging.info(f"------ 【账号{index}: {wxname} ({wxid})】执行任务完成 ------")
# 账号间增加延迟,避免请求过快
if index < len(filtered_accounts):
time.sleep(random.randint(2, 5))
report["status"] = "完成"
self.success_accounts += 1
self.account_reports.append(report)
logging.info(
f"========== 账号{index} | 完成 | 签到:{report['sign']} | "
f"抽奖:{report['draw_count']}次 | 余额:{report['balance']}元 | 提现:{report['withdraw']} =========="
)
except Exception as e:
logging.error(f"{self.site_name}】执行过程中发生错误: {str(e)}\n{traceback.format_exc()}")
finally:
logging.info(f"{self.site_name}】所有账号任务执行完毕")
# ---------------------- 主入口 ----------------------
def run(self):
logging.info(f"{PROJECT_NAME}】开始执行")
logging.info(f"[配置] APPID: {APPID}")
logging.info(f"[配置] 养鸡场: {wx_cloud}")
logging.info(f"[配置] Token缓存: {TOKEN_CACHE_FILE}")
logging.info(f"[配置] 日志文件: {LOG_FILE}")
logging.info(f"[配置] 自动提现: {'开启' if ENABLE_WITHDRAW else '关闭'} | 阈值: {WITHDRAW_THRESHOLD}")
if not WX_TOKEN:
logging.error("[启动失败]请先设置环境变量 wx_token")
return
index = 0
for account in self.iter_cloud_accounts():
index += 1
self.total_accounts += 1
self.run_one_account(index, account["wxid"], account["wxname"])
time.sleep(random.randint(*ACCOUNT_DELAY_RANGE))
save_json_file(TOKEN_CACHE_FILE, self.token_cache)
self.print_summary()
def print_summary(self):
logging.info("")
logging.info("==================== 执行汇总 ====================")
logging.info(f"项目: {PROJECT_NAME}")
logging.info(f"总账号数: {self.total_accounts}")
logging.info(f"登录成功账号数: {self.login_success_accounts}")
logging.info(f"任务完成账号数: {self.success_accounts}")
logging.info(f"签到成功账号数: {self.sign_success_accounts}")
logging.info(f"成功抽奖次数: {self.draw_success_count}")
logging.info(f"提现成功次数: {self.withdraw_success_count}")
logging.info(f"Token缓存文件: {TOKEN_CACHE_FILE}")
logging.info(f"日志文件: {LOG_FILE}")
logging.info("-------------------- 账号明细 --------------------")
for r in self.account_reports:
prize_text = "".join(r.get("prizes") or []) or ""
logging.info(
f"账号{r['index']} | {r['wxname']} | 状态:{r['status']} | "
f"签到:{r['sign']} | 抽奖:{r['draw_count']}次 | 奖品:{prize_text} | "
f"余额:{r['balance']}元 | 提现:{r['withdraw']}"
)
logging.info("==================================================")
if __name__ == "__main__":
# 检查必要环境变量
if not WX_TOKEN:
print("错误:请先设置环境变量 wx_token")
exit(1)
setup_logging()
try:
DDYXTask().run()
except KeyboardInterrupt:
logging.warning("用户中断执行")
except Exception as e:
logging.error(f"主程序异常: {e}")
if DEBUG:
logging.debug(traceback.format_exc())
auto_task = AutoTask("铛铛一下")
auto_task.run()