添加 WX_Applet/Applet_DDYX.py

This commit is contained in:
2026-05-19 00:06:43 +08:00
parent a29b0a1813
commit f2ef5c3e06

530
WX_Applet/Applet_DDYX.py Normal file
View File

@@ -0,0 +1,530 @@
# cron: 22 8 * * *
# new Env("铛铛一下")
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
铛铛一下 - 养鸡场版(自动获取账号 + 自动提现 + 推送通知)
直接从养鸡场获取微信账号列表无需手动配置wxid
"""
import os
import sys
import time
import random
import requests
import json
from datetime import datetime
print("=" * 50)
print("脚本开始加载...")
print("=" * 50)
# ==================== 配置 ====================
CHICKEN_FARM_URL = os.getenv("wx_cloud", "http://127.0.0.1:666")
CHICKEN_FARM_TOKEN = os.getenv("wx_token", "")
WX_APPID = "wxe378d2d7636c180e" # 铛铛一下的appid
REMOVE_WXIDS = os.getenv("REMOVE_WXIDS", "") # 需要剔除的wxid逗号分隔
DEFAULT_WITHDRAW_BALANCE = float(os.getenv("WITHDRAW_BALANCE", "0.3")) # 默认超过0.3元提现
NOTIFY = os.getenv("LY_NOTIFY", "false").lower() == "true" # 是否推送通知
print(f"养鸡场地址: {CHICKEN_FARM_URL}")
print(f"Token状态: {'已配置' if CHICKEN_FARM_TOKEN else '未配置'}")
print(f"提现阈值: {DEFAULT_WITHDRAW_BALANCE}")
print(f"通知推送: {'开启' if NOTIFY else '关闭'}")
print(f"AppID: {WX_APPID}")
# ==================== 通知模块 ====================
def send_notify(title, content):
"""发送通知"""
if not NOTIFY:
return
try:
# 尝试导入notify模块
notify_path = "/ql/scripts/notify.py"
if os.path.exists(notify_path):
sys.path.insert(0, "/ql/scripts")
import notify
notify.send(title, content)
print("[通知] 已发送")
else:
# 尝试下载notify模块
print("[通知] notify.py不存在尝试下载...")
url = "https://raw.githubusercontent.com/whyour/qinglong/refs/heads/develop/sample/notify.py"
resp = requests.get(url, timeout=10)
with open(notify_path, "w", encoding="utf-8") as f:
f.write(resp.text)
import notify
notify.send(title, content)
print("[通知] 已发送")
except Exception as e:
print(f"[通知] 发送失败: {e}")
# ==================== 从养鸡场获取账号列表 ====================
def get_accounts_from_farm():
"""从养鸡场获取微信账号列表"""
print("[获取账号] 开始从养鸡场获取账号列表...")
if not CHICKEN_FARM_TOKEN:
print("[获取账号] 错误: 未设置wx_token")
return []
url = f"{CHICKEN_FARM_URL}/prod-api/wechat/wechat/list"
headers = {
"Authorization": CHICKEN_FARM_TOKEN,
"Content-Type": "application/json"
}
params = {"pageNum": 1, "pageSize": 1000}
try:
print(f"[获取账号] 请求URL: {url}")
resp = requests.get(url, headers=headers, params=params, timeout=10)
print(f"[获取账号] 响应状态: {resp.status_code}")
result = resp.json()
if result.get("code") == 200:
accounts = result.get("rows", [])
print(f"[获取账号] 成功,共获取 {len(accounts)} 个账号")
# 剔除指定wxid
if REMOVE_WXIDS:
remove_list = [wxid.strip() for wxid in REMOVE_WXIDS.split(",") if wxid.strip()]
accounts = [a for a in accounts if a.get("wxId") not in remove_list]
print(f"[获取账号] 剔除后剩余 {len(accounts)} 个账号")
return accounts
else:
print(f"[获取账号] 失败: {result.get('msg')}")
return []
except Exception as e:
print(f"[获取账号] 异常: {e}")
return []
# ==================== 从养鸡场获取CODE ====================
def get_code_from_farm(wxid: str):
"""从养鸡场获取code"""
print(f"[获取code] 请求: {wxid}")
if not CHICKEN_FARM_TOKEN:
print("[获取code] 错误: 未设置wx_token")
return None
url = f"{CHICKEN_FARM_URL}/prod-api/wechat/api/getMiniProgramCode"
headers = {
"Authorization": CHICKEN_FARM_TOKEN,
"Content-Type": "application/json"
}
data = {"wxid": wxid, "appid": WX_APPID}
try:
resp = requests.post(url, json=data, headers=headers, timeout=10)
result = resp.json()
if result.get("code") == 200 and result.get("data"):
code = result["data"]
if isinstance(code, dict):
code = code.get("code")
print(f"[获取code] 成功: {code[:10]}...")
return code
else:
print(f"[获取code] 失败: {result.get('msg')}")
return None
except Exception as e:
print(f"[获取code] 异常: {e}")
return None
# ==================== 铛铛一下业务接口 ====================
def wxlogin(session, code):
"""微信登录"""
try:
url = "https://vues.dd1x.cn/wechat/login"
params = {"code": code, "channelId": 154}
resp = session.get(url, params=params, timeout=10)
result = resp.json()
if result.get('code') == 0:
token = result['data']['token']
session.headers["Token"] = token
tel = result['data'].get('tel', '未知')
if len(tel) > 7:
tel = tel[:3] + "****" + tel[-4:]
print(f"[登录] 成功: {tel}")
return True, tel
else:
print(f"[登录] 失败: {result.get('msg')}")
return False, None
except Exception as e:
print(f"[登录] 异常: {e}")
return False, None
def sign_in(session):
"""签到"""
try:
url = "https://vues.dd1x.cn/api/v2/sign_join"
resp = session.get(url, timeout=10)
result = resp.json()
if result.get('code') == 0:
print(f"[签到] 成功")
return True
else:
print(f"[签到] 失败: {result.get('msg')}")
return False
except Exception as e:
print(f"[签到] 异常: {e}")
return False
def lottery(session):
"""抽奖"""
try:
url = "https://vues.dd1x.cn/front/activity/update_lottery_result"
params = {"id": 3438615}
resp = session.get(url, params=params, timeout=10)
result = resp.json()
if result.get('code') == 0:
good_name = result['data']['goodName']
print(f"[抽奖] 获得: {good_name}")
return True, good_name
else:
print(f"[抽奖] 失败: {result.get('msg')}")
return False, None
except Exception as e:
print(f"[抽奖] 异常: {e}")
return False, None
def add_lottery_count(session):
"""增加抽奖次数"""
try:
url = "https://vues.dd1x.cn/front/activity/add_lottery_count"
resp = session.get(url, timeout=10)
result = resp.json()
if result.get('code') == 0:
print(f"[增加次数] 成功")
return True
elif "达到上限" in result.get('msg', ''):
print(f"[增加次数] 已达上限")
return False
else:
print(f"[增加次数] 失败: {result.get('msg')}")
return False
except:
return False
def get_withdrawal_info(session):
"""获取提现相关信息(余额和可提现列表)"""
try:
url = "https://vues.dd1x.cn/api/h/get_withdrawal_trade_list"
resp = session.get(url, timeout=10)
result = resp.json()
if result.get('code') == 0 and result.get('data'):
data_list = result['data']
if data_list:
balance = data_list[0].get('money', 0)
print(f"[余额] {balance}")
return balance, data_list
except Exception as e:
print(f"[余额] 获取失败: {e}")
return 0, None
def withdraw(session, total_money, withdrawal_list):
"""执行提现"""
try:
url = "https://vues.dd1x.cn/api/h/withdrawal"
payload = {
"totalMoney": total_money,
"type": 1,
"withdrawalDetailPojoList": withdrawal_list
}
resp = session.post(url, json=payload, timeout=10)
result = resp.json()
if result.get('code') == 0:
print(f"[提现] 成功!提现 {total_money}")
return True, result.get('msg', '提现成功')
else:
print(f"[提现] 失败: {result.get('msg')}")
return False, result.get('msg')
except Exception as e:
print(f"[提现] 异常: {e}")
return False, str(e)
# ==================== 抽奖循环 ====================
def do_lottery_loop(session, max_rounds=3):
"""
执行抽奖循环
返回中奖记录列表
"""
prizes = []
# 先抽3次
for i in range(3):
success, prize = lottery(session)
if success and prize:
prizes.append(prize)
time.sleep(random.randint(2, 4))
# 增加抽奖次数并继续抽
for i in range(max_rounds):
if not add_lottery_count(session):
break
time.sleep(random.randint(1, 2))
success, prize = lottery(session)
if success and prize:
prizes.append(prize)
time.sleep(random.randint(2, 4))
return prizes
# ==================== 单个账号执行 ====================
def run_account(account, idx, total, results):
"""执行单个账号的任务"""
wxid = account.get("wxId")
name = account.get("wxName", wxid)
print(f"\n{'='*40}")
print(f"账号 {idx}/{total}: {name} ({wxid})")
print(f"{'='*40}")
# 记录结果
account_result = {
"index": idx,
"name": name,
"wxid": wxid,
"success": False,
"phone": "",
"sign_success": False,
"lottery_count": 0,
"prizes": [],
"balance": 0,
"withdrawn": False,
"withdraw_amount": 0,
"error_msg": ""
}
# 1. 获取code
code = get_code_from_farm(wxid)
if not code:
print(f"❌ 获取code失败跳过账号")
account_result["error_msg"] = "获取code失败"
results.append(account_result)
return
# 2. 创建session
session = requests.Session()
session.headers["User-Agent"] = "Mozilla/5.0 (Linux; Android 12) AppleWebKit/537.36"
# 3. 登录
login_success, phone = wxlogin(session, code)
if not login_success:
account_result["error_msg"] = "登录失败"
results.append(account_result)
return
account_result["phone"] = phone
time.sleep(random.randint(1, 3))
# 4. 签到
sign_success = sign_in(session)
account_result["sign_success"] = sign_success
time.sleep(random.randint(1, 3))
# 5. 抽奖
prizes = do_lottery_loop(session, max_rounds=3)
if prizes:
account_result["prizes"] = prizes
account_result["lottery_count"] = len(prizes)
print(f"[中奖记录] {', '.join(prizes)}")
# 6. 获取余额并判断是否提现
balance, withdrawal_list = get_withdrawal_info(session)
account_result["balance"] = balance
if balance >= DEFAULT_WITHDRAW_BALANCE and withdrawal_list:
print(f"[提现判断] 余额 {balance}元 >= {DEFAULT_WITHDRAW_BALANCE}元,开始提现...")
withdraw_success, withdraw_msg = withdraw(session, balance, withdrawal_list)
account_result["withdrawn"] = withdraw_success
account_result["withdraw_amount"] = balance if withdraw_success else 0
account_result["withdraw_msg"] = withdraw_msg
else:
print(f"[提现判断] 余额 {balance}元 < {DEFAULT_WITHDRAW_BALANCE}元,暂不提现")
account_result["success"] = True
results.append(account_result)
print(f"\n✅ 账号 {idx} 执行完成")
# ==================== 生成通知内容 ====================
def build_notify_content(results):
"""生成详细的通知内容"""
total = len(results)
success = sum(1 for r in results if r.get("success"))
fail = total - success
total_withdrawn = sum(1 for r in results if r.get("withdrawn"))
total_withdraw_amount = sum(r.get("withdraw_amount", 0) for r in results)
total_balance = sum(r.get("balance", 0) for r in results)
# 统计中奖
all_prizes = []
prize_count = {}
for r in results:
for prize in r.get("prizes", []):
all_prizes.append(prize)
prize_count[prize] = prize_count.get(prize, 0) + 1
# 计算成功率
success_rate = (success / total * 100) if total > 0 else 0
# 构建通知内容
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
content = f"""
╔══════════════════════════════════════════════════════════╗
║ 铛铛一下 执行报告 ║
{now}
╚══════════════════════════════════════════════════════════╝
📊 整体统计
──────────────────────────────────────────────────────────
总账号数 : {total}
成功账号 : {success}
失败账号 : {fail}
成功率 : {success_rate:.1f}%
💰 收益统计
──────────────────────────────────────────────────────────
当前总余额 : {total_balance:.2f}
本次提现总额 : {total_withdraw_amount:.2f}
提现账号数 : {total_withdrawn}
🎁 中奖统计
──────────────────────────────────────────────────────────
总中奖次数 : {len(all_prizes)}
"""
if prize_count:
content += "\n奖品明细:\n"
for prize, count in prize_count.items():
content += f"{prize}: {count}\n"
content += """
📋 账号详情
──────────────────────────────────────────────────────────
"""
for r in results:
status = "" if r.get("success") else ""
content += f"\n{status} 【账号{r['index']}{r['name']}\n"
if r.get("phone"):
content += f" 手机号: {r['phone']}\n"
if r.get("sign_success"):
content += f" 签到: 成功\n"
else:
content += f" 签到: 失败\n"
if r.get("prizes"):
content += f" 中奖: {', '.join(r['prizes'])}\n"
else:
content += f" 中奖: 无\n"
content += f" 余额: {r.get('balance', 0):.2f}\n"
if r.get("withdrawn"):
content += f" 提现: ✅ 成功 {r.get('withdraw_amount', 0):.2f}\n"
elif r.get("balance", 0) >= DEFAULT_WITHDRAW_BALANCE:
content += f" 提现: ❌ 失败 ({r.get('withdraw_msg', '未知错误')})\n"
else:
content += f" 提现: 未达到阈值({DEFAULT_WITHDRAW_BALANCE}元)\n"
if r.get("error_msg"):
content += f" 错误: {r['error_msg']}\n"
content += """
══════════════════════════════════════════════════════════
"""
return content
# ==================== 打印控制台汇总 ====================
def print_summary(results):
"""在控制台打印汇总"""
print("\n" + "=" * 60)
print(" 执行汇总")
print("=" * 60)
total = len(results)
success = sum(1 for r in results if r.get("success"))
total_balance = sum(r.get("balance", 0) for r in results)
total_withdraw = sum(r.get("withdraw_amount", 0) for r in results)
print(f"\n📊 统计信息:")
print(f" 总账号: {total}")
print(f" 成功: {success}")
print(f" 失败: {total - success}")
print(f" 总余额: {total_balance:.2f}")
print(f" 本次提现: {total_withdraw:.2f}")
print(f"\n📋 账号详情:")
print("-" * 60)
for r in results:
status = "" if r.get("success") else ""
prizes_str = ", ".join(r.get("prizes", [])) if r.get("prizes") else ""
print(f"{status} {r['name']}: {r.get('balance', 0):.2f}元 | 中奖: {prizes_str}")
print("\n" + "=" * 60)
# ==================== 主程序 ====================
def run():
print("=" * 50)
print("铛铛一下 - 养鸡场版(自动获取账号 + 自动提现 + 推送通知)")
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 50)
# 检查环境变量
if not CHICKEN_FARM_TOKEN:
print("❌ 请设置环境变量 wx_token")
if NOTIFY:
send_notify("铛铛一下 - 执行失败", "未设置 wx_token 环境变量")
return
# 从养鸡场获取账号列表
accounts = get_accounts_from_farm()
if not accounts:
print("❌ 未获取到任何微信账号")
if NOTIFY:
send_notify("铛铛一下 - 执行失败", "未获取到任何微信账号")
return
print(f"\n{len(accounts)} 个账号\n")
# 存储结果
results = []
# 遍历执行每个账号
for idx, account in enumerate(accounts, 1):
try:
run_account(account, idx, len(accounts), results)
# 账号间延迟
if idx < len(accounts):
delay = random.randint(3, 8)
print(f"\n⏳ 等待 {delay} 秒后执行下一个账号...")
time.sleep(delay)
except Exception as e:
print(f"❌ 账号执行异常: {e}")
results.append({
"index": idx,
"name": account.get("wxName", account.get("wxId")),
"success": False,
"error_msg": str(e)
})
continue
# 打印控制台汇总
print_summary(results)
# 发送通知
if NOTIFY:
success_count = sum(1 for r in results if r.get("success"))
total_balance = sum(r.get("balance", 0) for r in results)
title = f"铛铛一下 - 执行完成 (成功{success_count}/{len(results)})"
content = build_notify_content(results)
send_notify(title, content)
print("\n所有任务执行完成")
# ==================== 入口 ====================
if __name__ == "__main__":
run()