# cron: 28 8 * * * # const $ = new Env("云影优选"); """ 云影优选 - 完全自动版(1并发 + 独立代理 + 5秒延迟) 配置通过环境变量加载,不再硬编码敏感信息。 """ import os import requests import json import uuid import time import random import threading from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from typing import List, Dict, Optional # ========== 从环境变量读取配置 ========== wx_cloud = os.getenv("wx_cloud", "http://127.0.0.1:666") API_URL = f"{wx_cloud}/prod-api" APPID = "wx1661b44e984b6fcb" API_BASE = "https://cid-cps-api.heliang.cc" # 敏感凭据:必须在环境变量中设置,脚本内不留痕迹 wx_token = os.getenv("wx_token", "") if not wx_token: print("❌ 错误:环境变量 wx_token 未设置,请先 export wx_token='Bearer ...'") exit(1) HEADERS = { "Authorization": f"Bearer {wx_token}", "Content-Type": "application/json" } # ========== 代理配置 ========== USE_PROXY = os.getenv("USE_PROXY", "true").lower() == "true" PROXY_YYYX_URL = os.getenv("PROXY_YYYX_URL", "") # 并发数 MAX_WORKERS = int(os.getenv("MAX_WORKERS", "1")) # 账号间启动延迟(秒) ACCOUNT_START_DELAY = float(os.getenv("ACCOUNT_START_DELAY", "5")) # 线程锁 print_lock = threading.Lock() def safe_print(msg): with print_lock: print(msg) def get_one_proxy() -> Optional[Dict]: """获取一个代理IP(每个账号独立调用)""" if not PROXY_YYYX_URL: safe_print(" ⚠️ 未配置代理提取链接,跳过获取代理") return None try: resp = requests.get(PROXY_YYYX_URL, timeout=10) proxy_ip = resp.text.strip() if proxy_ip and len(proxy_ip) > 5 and not proxy_ip.startswith("{"): proxy_url = f"http://{proxy_ip}" return {"http": proxy_url, "https": proxy_url} return None except Exception as e: safe_print(f" ⚠️ 获取代理失败: {e}") return None # ========== 云端微信接口 ========== def get_online_accounts() -> List[Dict]: resp = requests.get(f"{API_URL}/wechat/wechat/list", headers=HEADERS, params={"pageNum": 1, "pageSize": 999}, verify=False) result = resp.json() if result.get("code") == 200: return [a for a in result.get("rows", []) if a.get("onlineStatus") == "1"] return [] def get_code(wxid: str) -> Optional[str]: resp = requests.post(f"{API_URL}/wechat/api/getMiniProgramCode", headers=HEADERS, json={"wxid": wxid, "appid": APPID}, verify=False) if resp.status_code == 200: data = resp.json() return data.get("data", {}).get("code") or data.get("code") or data.get("data") return None def login_to_get_token(code: str) -> Optional[str]: headers = { "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15", "Content-Type": "application/json", "appid": APPID, "mp-platform": "weapp", "x-web-id": str(uuid.uuid4()), "Referer": f"https://servicewechat.com/{APPID}/91/page-frame.html" } body = {"code": code, "errMsg": "login:ok"} try: resp = requests.post(f"{API_BASE}/user/login", headers=headers, json=body, verify=False) result = resp.json() if result.get("error_code") == 0: return result.get("data", {}).get("token") except: pass return None # ========== 云影优选任务 ========== class YunyingTask: def __init__(self, token: str, name: str, proxy: dict = None): self.token = token self.name = name self.base_url = "https://cid-cps-api.heliang.cc" self.appid = APPID self.proxy = proxy def _generate_uuid(self) -> str: return str(uuid.uuid4()) def _build_headers(self, pageurl="pages%2Fpackage%2Fcashback%2Findex", pageurl_pre="pages%2Fcustom-tabbar%2Findex"): ua = "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 Mobile/15E148 MicroMessenger/8.0.50 NetType/WIFI Language/zh_CN" return { "User-Agent": ua, "Content-Type": "application/json", "appid": self.appid, "mp-platform": "weapp", "x-token": self.token, "x-web-id": self._generate_uuid(), "pageurl": pageurl, "pageurl-pre": pageurl_pre, "Referer": f"https://servicewechat.com/{self.appid}/91/page-frame.html" } def _request(self, url: str, data: dict = None, headers: dict = None) -> dict: try: resp = requests.post(url, headers=headers or self._build_headers(), json=data or {}, timeout=30, verify=False, proxies=self.proxy) return resp.json() except Exception as e: safe_print(f" [{self.name}] 请求失败: {e}") return {} def checkin(self) -> bool: result = self._request(f"{self.base_url}/activity/checkin") if result.get("error_code") == 0: coin = result.get("data", {}).get("coin", 0) safe_print(f" [{self.name}] ✅ 签到成功 +{coin}金币") return True else: msg = result.get("msg", "失败") if "已签到" in msg: safe_print(f" [{self.name}] ⚠️ 今日已签到") else: safe_print(f" [{self.name}] ❌ 签到失败: {msg}") return False def get_balance(self) -> Dict: result = self._request(f"{self.base_url}/coin/mine") if result.get("error_code") == 0: coin = result.get("data", {}).get("coin", 0) amount = result.get("data", {}).get("left_amount", 0) safe_print(f" [{self.name}] 💰 金币: {coin} | 余额: {amount}元") return {"coin": coin, "amount": amount} return {"coin": 0, "amount": 0} def watch_video(self) -> bool: result = self._request(f"{self.base_url}/activity/video/reward") if result.get("error_code") == 0: coin = result.get("data", {}).get("reward_amount", 0) safe_print(f" [{self.name}] 📺 看广告 +{coin}金币") return True return False def coin_to_rmb(self, coin: int) -> bool: result = self._request(f"{self.base_url}/coin/to-rmb", data={"version": 1}) if result.get("error_code") == 0: safe_print(f" [{self.name}] 💱 兑换成功! {coin}金币 → {coin/10}元") return True else: safe_print(f" [{self.name}] ❌ 兑换失败: {result.get('msg')}") return False def get_withdrawal_list(self) -> List[Dict]: headers = self._build_headers( pageurl="pages%2Fpackage%2Fcashback%2Fmy-cash%2Findex", pageurl_pre="pages%2Fpackage%2Fcashback%2Findex" ) result = self._request(f"{self.base_url}/coin/withdrawal-list", headers=headers) if result.get("error_code") == 0: items = result.get("data", []) available = [{"permission_id": i.get("permission_id"), "amount": i.get("amount"), "rmb": i.get("rmb")} for i in items if i.get("is_available") == True] available.sort(key=lambda x: x.get("rmb", 0), reverse=True) return available return [] def withdraw(self, permission_id: str, amount: int, rmb: float) -> bool: headers = self._build_headers( pageurl="pages%2Fpackage%2Fcashback%2Fmy-cash%2Findex", pageurl_pre="pages%2Fpackage%2Fcashback%2Findex" ) result = self._request(f"{self.base_url}/coin/withdrawal", data={"permission_id": permission_id, "amount": amount}, headers=headers) if result.get("error_code") == 0: safe_print(f" [{self.name}] 💸 提现成功! {rmb}元已到账") return True else: safe_print(f" [{self.name}] ❌ 提现失败: {result.get('msg')}") return False def auto_withdraw(self): balance = self.get_balance() if balance.get("coin", 0) > 1: self.coin_to_rmb(balance.get("coin", 0)) time.sleep(random.uniform(1, 2)) balance = self.get_balance() withdraw_list = self.get_withdrawal_list() if not withdraw_list: safe_print(f" [{self.name}] ℹ️ 暂无可用提现选项") return for item in withdraw_list: rmb = item.get("rmb", 0) if balance.get("amount", 0) >= rmb: self.withdraw(item.get("permission_id"), item.get("amount"), rmb) time.sleep(random.uniform(1, 2)) balance = self.get_balance() else: safe_print(f" [{self.name}] ⚠️ 余额不足,无法提现 {rmb}元") break def do_tasks(self): safe_print(f"\n{'='*50}") safe_print(f"👤 账号: {self.name}") if self.proxy: safe_print(f"🌐 代理: {self.proxy.get('http', '')[:50]}...") safe_print(f"{'='*50}") self.checkin() time.sleep(random.uniform(1, 2)) self.get_balance() time.sleep(random.uniform(1, 2)) safe_print(f" [{self.name}] 🎬 开始看广告...") success_count = 0 for i in range(10): if self.watch_video(): success_count += 1 time.sleep(random.uniform(2, 4)) safe_print(f" [{self.name}] 📊 完成: 成功看广告 {success_count}/5 次") self.auto_withdraw() safe_print(f" [{self.name}] 📈 最终状态:") self.get_balance() def process_one_account(acc: Dict, index: int, total: int) -> Dict: """处理单个账号 - 每个账号独立获取代理""" wxid = acc.get("wxId") name = acc.get("wxName", f"账号{index}") safe_print(f"\n🔑 [{index}/{total}] {name} 开始处理...") # 每个账号独立获取一个新代理 proxy = get_one_proxy() if USE_PROXY else None if proxy: safe_print(f" [{name}] 🌐 获取独立代理成功") else: safe_print(f" [{name}] 🌐 无代理/代理未配置") # 获取code code = get_code(wxid) if not code: safe_print(f" [{name}] ❌ 获取code失败") return {"name": name, "success": False, "error": "获取code失败"} # 换取token token = login_to_get_token(code) if not token: safe_print(f" [{name}] ❌ 换取token失败") return {"name": name, "success": False, "error": "换取token失败"} safe_print(f" [{name}] ✅ token: {token[:20]}...") # 执行任务 task = YunyingTask(token, name, proxy) task.do_tasks() return {"name": name, "success": True} def main(): print("=" * 60) print("🔄 云影优选 - 全自动版") print(f"⏰ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"🚀 并发数: {MAX_WORKERS}") print(f"⏱️ 启动延迟: 每账号 {ACCOUNT_START_DELAY} 秒") print(f"🌐 代理模式: {'每个账号独立代理' if USE_PROXY else '关闭'}") print("=" * 60) # 获取云端微信账号 print("\n📱 获取云端微信账号...") accounts = get_online_accounts() if not accounts: print("❌ 没有在线账号") return print(f"✅ 在线账号: {len(accounts)}个") print(f"🚀 开始并发处理,每 {ACCOUNT_START_DELAY} 秒启动一个账号...\n") results = [] futures = [] with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # 逐个提交任务,每个任务延迟启动 for i, acc in enumerate(accounts): future = executor.submit(process_one_account, acc, i+1, len(accounts)) futures.append(future) # 每个任务提交后延迟2秒再提交下一个 if i < len(accounts) - 1: time.sleep(ACCOUNT_START_DELAY) # 等待所有任务完成 for future in as_completed(futures): try: result = future.result() results.append(result) except Exception as e: safe_print(f"❌ 线程执行失败: {e}") # 统计结果 success_count = sum(1 for r in results if r.get("success")) print("\n" + "=" * 60) print(f"📊 执行完成!") print(f" 总账号: {len(accounts)}") print(f" 成功: {success_count}") print(f" 失败: {len(accounts) - success_count}") print("=" * 60) if __name__ == "__main__": import urllib3 urllib3.disable_warnings() main()