From 91423cc2cb49fea3f19148afe9cae067fbbf4ba6 Mon Sep 17 00:00:00 2001 From: admin <362324317@qq.com> Date: Tue, 19 May 2026 04:53:31 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=20WX=5FApplet/Applet=5FDDYX.?= =?UTF-8?q?py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- WX_Applet/Applet_DDYX.py | 530 --------------------------------------- 1 file changed, 530 deletions(-) delete mode 100644 WX_Applet/Applet_DDYX.py diff --git a/WX_Applet/Applet_DDYX.py b/WX_Applet/Applet_DDYX.py deleted file mode 100644 index e29f85b..0000000 --- a/WX_Applet/Applet_DDYX.py +++ /dev/null @@ -1,530 +0,0 @@ -# cron: 22 8 * * * -# new Env("铛铛一下") -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -铛铛一下 - 养鸡场版(自动获取账号 + 自动提现 + 推送通知) -直接从养鸡场获取微信账号列表,无需手动配置wxid -""" - -import os -import sys -import time -import random -import requests -import json -from datetime import datetime - -print("=" * 50) -print("脚本开始加载...") -print("=" * 50) - -# ==================== 配置 ==================== -CHICKEN_FARM_URL = os.getenv("wx_cloud", "http://127.0.0.1:666") -CHICKEN_FARM_TOKEN = os.getenv("wx_token", "") -WX_APPID = "wxe378d2d7636c180e" # 铛铛一下的appid -REMOVE_WXIDS = os.getenv("REMOVE_WXIDS", "") # 需要剔除的wxid,逗号分隔 -DEFAULT_WITHDRAW_BALANCE = float(os.getenv("WITHDRAW_BALANCE", "0.3")) # 默认超过0.3元提现 -NOTIFY = os.getenv("LY_NOTIFY", "false").lower() == "true" # 是否推送通知 - -print(f"养鸡场地址: {CHICKEN_FARM_URL}") -print(f"Token状态: {'已配置' if CHICKEN_FARM_TOKEN else '未配置'}") -print(f"提现阈值: {DEFAULT_WITHDRAW_BALANCE}元") -print(f"通知推送: {'开启' if NOTIFY else '关闭'}") -print(f"AppID: {WX_APPID}") - -# ==================== 通知模块 ==================== -def send_notify(title, content): - """发送通知""" - if not NOTIFY: - return - - try: - # 尝试导入notify模块 - notify_path = "/ql/scripts/notify.py" - if os.path.exists(notify_path): - sys.path.insert(0, "/ql/scripts") - import notify - notify.send(title, content) - print("[通知] 已发送") - else: - # 尝试下载notify模块 - print("[通知] notify.py不存在,尝试下载...") - url = "https://raw.githubusercontent.com/whyour/qinglong/refs/heads/develop/sample/notify.py" - resp = requests.get(url, timeout=10) - with open(notify_path, "w", encoding="utf-8") as f: - f.write(resp.text) - import notify - notify.send(title, content) - print("[通知] 已发送") - except Exception as e: - print(f"[通知] 发送失败: {e}") - -# ==================== 从养鸡场获取账号列表 ==================== -def get_accounts_from_farm(): - """从养鸡场获取微信账号列表""" - print("[获取账号] 开始从养鸡场获取账号列表...") - - if not CHICKEN_FARM_TOKEN: - print("[获取账号] 错误: 未设置wx_token") - return [] - - url = f"{CHICKEN_FARM_URL}/prod-api/wechat/wechat/list" - headers = { - "Authorization": CHICKEN_FARM_TOKEN, - "Content-Type": "application/json" - } - params = {"pageNum": 1, "pageSize": 1000} - - try: - print(f"[获取账号] 请求URL: {url}") - resp = requests.get(url, headers=headers, params=params, timeout=10) - print(f"[获取账号] 响应状态: {resp.status_code}") - result = resp.json() - - if result.get("code") == 200: - accounts = result.get("rows", []) - print(f"[获取账号] 成功,共获取 {len(accounts)} 个账号") - - # 剔除指定wxid - if REMOVE_WXIDS: - remove_list = [wxid.strip() for wxid in REMOVE_WXIDS.split(",") if wxid.strip()] - accounts = [a for a in accounts if a.get("wxId") not in remove_list] - print(f"[获取账号] 剔除后剩余 {len(accounts)} 个账号") - - return accounts - else: - print(f"[获取账号] 失败: {result.get('msg')}") - return [] - except Exception as e: - print(f"[获取账号] 异常: {e}") - return [] - -# ==================== 从养鸡场获取CODE ==================== -def get_code_from_farm(wxid: str): - """从养鸡场获取code""" - print(f"[获取code] 请求: {wxid}") - - if not CHICKEN_FARM_TOKEN: - print("[获取code] 错误: 未设置wx_token") - return None - - url = f"{CHICKEN_FARM_URL}/prod-api/wechat/api/getMiniProgramCode" - headers = { - "Authorization": CHICKEN_FARM_TOKEN, - "Content-Type": "application/json" - } - data = {"wxid": wxid, "appid": WX_APPID} - - try: - resp = requests.post(url, json=data, headers=headers, timeout=10) - result = resp.json() - - if result.get("code") == 200 and result.get("data"): - code = result["data"] - if isinstance(code, dict): - code = code.get("code") - print(f"[获取code] 成功: {code[:10]}...") - return code - else: - print(f"[获取code] 失败: {result.get('msg')}") - return None - except Exception as e: - print(f"[获取code] 异常: {e}") - return None - -# ==================== 铛铛一下业务接口 ==================== -def wxlogin(session, code): - """微信登录""" - try: - url = "https://vues.dd1x.cn/wechat/login" - params = {"code": code, "channelId": 154} - resp = session.get(url, params=params, timeout=10) - result = resp.json() - - if result.get('code') == 0: - token = result['data']['token'] - session.headers["Token"] = token - tel = result['data'].get('tel', '未知') - if len(tel) > 7: - tel = tel[:3] + "****" + tel[-4:] - print(f"[登录] 成功: {tel}") - return True, tel - else: - print(f"[登录] 失败: {result.get('msg')}") - return False, None - except Exception as e: - print(f"[登录] 异常: {e}") - return False, None - -def sign_in(session): - """签到""" - try: - url = "https://vues.dd1x.cn/api/v2/sign_join" - resp = session.get(url, timeout=10) - result = resp.json() - if result.get('code') == 0: - print(f"[签到] 成功") - return True - else: - print(f"[签到] 失败: {result.get('msg')}") - return False - except Exception as e: - print(f"[签到] 异常: {e}") - return False - -def lottery(session): - """抽奖""" - try: - url = "https://vues.dd1x.cn/front/activity/update_lottery_result" - params = {"id": 3438615} - resp = session.get(url, params=params, timeout=10) - result = resp.json() - if result.get('code') == 0: - good_name = result['data']['goodName'] - print(f"[抽奖] 获得: {good_name}") - return True, good_name - else: - print(f"[抽奖] 失败: {result.get('msg')}") - return False, None - except Exception as e: - print(f"[抽奖] 异常: {e}") - return False, None - -def add_lottery_count(session): - """增加抽奖次数""" - try: - url = "https://vues.dd1x.cn/front/activity/add_lottery_count" - resp = session.get(url, timeout=10) - result = resp.json() - if result.get('code') == 0: - print(f"[增加次数] 成功") - return True - elif "达到上限" in result.get('msg', ''): - print(f"[增加次数] 已达上限") - return False - else: - print(f"[增加次数] 失败: {result.get('msg')}") - return False - except: - return False - -def get_withdrawal_info(session): - """获取提现相关信息(余额和可提现列表)""" - try: - url = "https://vues.dd1x.cn/api/h/get_withdrawal_trade_list" - resp = session.get(url, timeout=10) - result = resp.json() - if result.get('code') == 0 and result.get('data'): - data_list = result['data'] - if data_list: - balance = data_list[0].get('money', 0) - print(f"[余额] {balance}元") - return balance, data_list - except Exception as e: - print(f"[余额] 获取失败: {e}") - return 0, None - -def withdraw(session, total_money, withdrawal_list): - """执行提现""" - try: - url = "https://vues.dd1x.cn/api/h/withdrawal" - payload = { - "totalMoney": total_money, - "type": 1, - "withdrawalDetailPojoList": withdrawal_list - } - resp = session.post(url, json=payload, timeout=10) - result = resp.json() - if result.get('code') == 0: - print(f"[提现] 成功!提现 {total_money} 元") - return True, result.get('msg', '提现成功') - else: - print(f"[提现] 失败: {result.get('msg')}") - return False, result.get('msg') - except Exception as e: - print(f"[提现] 异常: {e}") - return False, str(e) - -# ==================== 抽奖循环 ==================== -def do_lottery_loop(session, max_rounds=3): - """ - 执行抽奖循环 - 返回中奖记录列表 - """ - prizes = [] - - # 先抽3次 - for i in range(3): - success, prize = lottery(session) - if success and prize: - prizes.append(prize) - time.sleep(random.randint(2, 4)) - - # 增加抽奖次数并继续抽 - for i in range(max_rounds): - if not add_lottery_count(session): - break - time.sleep(random.randint(1, 2)) - success, prize = lottery(session) - if success and prize: - prizes.append(prize) - time.sleep(random.randint(2, 4)) - - return prizes - -# ==================== 单个账号执行 ==================== -def run_account(account, idx, total, results): - """执行单个账号的任务""" - wxid = account.get("wxId") - name = account.get("wxName", wxid) - - print(f"\n{'='*40}") - print(f"账号 {idx}/{total}: {name} ({wxid})") - print(f"{'='*40}") - - # 记录结果 - account_result = { - "index": idx, - "name": name, - "wxid": wxid, - "success": False, - "phone": "", - "sign_success": False, - "lottery_count": 0, - "prizes": [], - "balance": 0, - "withdrawn": False, - "withdraw_amount": 0, - "error_msg": "" - } - - # 1. 获取code - code = get_code_from_farm(wxid) - if not code: - print(f"❌ 获取code失败,跳过账号") - account_result["error_msg"] = "获取code失败" - results.append(account_result) - return - - # 2. 创建session - session = requests.Session() - session.headers["User-Agent"] = "Mozilla/5.0 (Linux; Android 12) AppleWebKit/537.36" - - # 3. 登录 - login_success, phone = wxlogin(session, code) - if not login_success: - account_result["error_msg"] = "登录失败" - results.append(account_result) - return - - account_result["phone"] = phone - time.sleep(random.randint(1, 3)) - - # 4. 签到 - sign_success = sign_in(session) - account_result["sign_success"] = sign_success - time.sleep(random.randint(1, 3)) - - # 5. 抽奖 - prizes = do_lottery_loop(session, max_rounds=3) - if prizes: - account_result["prizes"] = prizes - account_result["lottery_count"] = len(prizes) - print(f"[中奖记录] {', '.join(prizes)}") - - # 6. 获取余额并判断是否提现 - balance, withdrawal_list = get_withdrawal_info(session) - account_result["balance"] = balance - - if balance >= DEFAULT_WITHDRAW_BALANCE and withdrawal_list: - print(f"[提现判断] 余额 {balance}元 >= {DEFAULT_WITHDRAW_BALANCE}元,开始提现...") - withdraw_success, withdraw_msg = withdraw(session, balance, withdrawal_list) - account_result["withdrawn"] = withdraw_success - account_result["withdraw_amount"] = balance if withdraw_success else 0 - account_result["withdraw_msg"] = withdraw_msg - else: - print(f"[提现判断] 余额 {balance}元 < {DEFAULT_WITHDRAW_BALANCE}元,暂不提现") - - account_result["success"] = True - results.append(account_result) - print(f"\n✅ 账号 {idx} 执行完成") - -# ==================== 生成通知内容 ==================== -def build_notify_content(results): - """生成详细的通知内容""" - total = len(results) - success = sum(1 for r in results if r.get("success")) - fail = total - success - - total_withdrawn = sum(1 for r in results if r.get("withdrawn")) - total_withdraw_amount = sum(r.get("withdraw_amount", 0) for r in results) - total_balance = sum(r.get("balance", 0) for r in results) - - # 统计中奖 - all_prizes = [] - prize_count = {} - for r in results: - for prize in r.get("prizes", []): - all_prizes.append(prize) - prize_count[prize] = prize_count.get(prize, 0) + 1 - - # 计算成功率 - success_rate = (success / total * 100) if total > 0 else 0 - - # 构建通知内容 - now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - content = f""" -╔══════════════════════════════════════════════════════════╗ -║ 铛铛一下 执行报告 ║ -║ {now} ║ -╚══════════════════════════════════════════════════════════╝ - -📊 整体统计 -────────────────────────────────────────────────────────── -总账号数 : {total} -成功账号 : {success} -失败账号 : {fail} -成功率 : {success_rate:.1f}% - -💰 收益统计 -────────────────────────────────────────────────────────── -当前总余额 : {total_balance:.2f} 元 -本次提现总额 : {total_withdraw_amount:.2f} 元 -提现账号数 : {total_withdrawn} - -🎁 中奖统计 -────────────────────────────────────────────────────────── -总中奖次数 : {len(all_prizes)} -""" - - if prize_count: - content += "\n奖品明细:\n" - for prize, count in prize_count.items(): - content += f" • {prize}: {count}次\n" - - content += """ -📋 账号详情 -────────────────────────────────────────────────────────── -""" - - for r in results: - status = "✅" if r.get("success") else "❌" - content += f"\n{status} 【账号{r['index']}】{r['name']}\n" - if r.get("phone"): - content += f" 手机号: {r['phone']}\n" - if r.get("sign_success"): - content += f" 签到: 成功\n" - else: - content += f" 签到: 失败\n" - if r.get("prizes"): - content += f" 中奖: {', '.join(r['prizes'])}\n" - else: - content += f" 中奖: 无\n" - content += f" 余额: {r.get('balance', 0):.2f}元\n" - if r.get("withdrawn"): - content += f" 提现: ✅ 成功 {r.get('withdraw_amount', 0):.2f}元\n" - elif r.get("balance", 0) >= DEFAULT_WITHDRAW_BALANCE: - content += f" 提现: ❌ 失败 ({r.get('withdraw_msg', '未知错误')})\n" - else: - content += f" 提现: 未达到阈值({DEFAULT_WITHDRAW_BALANCE}元)\n" - if r.get("error_msg"): - content += f" 错误: {r['error_msg']}\n" - - content += """ -══════════════════════════════════════════════════════════ -""" - - return content - -# ==================== 打印控制台汇总 ==================== -def print_summary(results): - """在控制台打印汇总""" - print("\n" + "=" * 60) - print(" 执行汇总") - print("=" * 60) - - total = len(results) - success = sum(1 for r in results if r.get("success")) - total_balance = sum(r.get("balance", 0) for r in results) - total_withdraw = sum(r.get("withdraw_amount", 0) for r in results) - - print(f"\n📊 统计信息:") - print(f" 总账号: {total}") - print(f" 成功: {success}") - print(f" 失败: {total - success}") - print(f" 总余额: {total_balance:.2f}元") - print(f" 本次提现: {total_withdraw:.2f}元") - - print(f"\n📋 账号详情:") - print("-" * 60) - for r in results: - status = "✅" if r.get("success") else "❌" - prizes_str = ", ".join(r.get("prizes", [])) if r.get("prizes") else "无" - print(f"{status} {r['name']}: {r.get('balance', 0):.2f}元 | 中奖: {prizes_str}") - - print("\n" + "=" * 60) - -# ==================== 主程序 ==================== -def run(): - print("=" * 50) - print("铛铛一下 - 养鸡场版(自动获取账号 + 自动提现 + 推送通知)") - print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - print("=" * 50) - - # 检查环境变量 - if not CHICKEN_FARM_TOKEN: - print("❌ 请设置环境变量 wx_token") - if NOTIFY: - send_notify("铛铛一下 - 执行失败", "未设置 wx_token 环境变量") - return - - # 从养鸡场获取账号列表 - accounts = get_accounts_from_farm() - if not accounts: - print("❌ 未获取到任何微信账号") - if NOTIFY: - send_notify("铛铛一下 - 执行失败", "未获取到任何微信账号") - return - - print(f"\n共 {len(accounts)} 个账号\n") - - # 存储结果 - results = [] - - # 遍历执行每个账号 - for idx, account in enumerate(accounts, 1): - try: - run_account(account, idx, len(accounts), results) - # 账号间延迟 - if idx < len(accounts): - delay = random.randint(3, 8) - print(f"\n⏳ 等待 {delay} 秒后执行下一个账号...") - time.sleep(delay) - except Exception as e: - print(f"❌ 账号执行异常: {e}") - results.append({ - "index": idx, - "name": account.get("wxName", account.get("wxId")), - "success": False, - "error_msg": str(e) - }) - continue - - # 打印控制台汇总 - print_summary(results) - - # 发送通知 - if NOTIFY: - success_count = sum(1 for r in results if r.get("success")) - total_balance = sum(r.get("balance", 0) for r in results) - title = f"铛铛一下 - 执行完成 (成功{success_count}/{len(results)})" - content = build_notify_content(results) - send_notify(title, content) - - print("\n所有任务执行完成") - -# ==================== 入口 ==================== -if __name__ == "__main__": - run() \ No newline at end of file