Files
Yangmao_Script/WX_Applet/Applet_YYYX.py
2026-05-16 00:45:02 +08:00

359 lines
13 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# cron: 28 8 * * *
# const $ = new Env("云影优选");
"""
云影优选 - 完全自动版1并发 + 独立代理 + 5秒延迟
配置通过环境变量加载,不再硬编码敏感信息。
"""
import os
import requests
import json
import uuid
import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import List, Dict, Optional
# ========== 从环境变量读取配置 ==========
wx_cloud = os.getenv("wx_cloud", "http://127.0.0.1:666")
API_URL = f"{wx_cloud}/prod-api"
APPID = "wx1661b44e984b6fcb"
API_BASE = "https://cid-cps-api.heliang.cc"
# 敏感凭据:必须在环境变量中设置,脚本内不留痕迹
wx_token = os.getenv("wx_token", "")
if not wx_token:
print("❌ 错误:环境变量 wx_token 未设置,请先 export wx_token='Bearer ...'")
exit(1)
HEADERS = {
"Authorization": f"Bearer {wx_token}",
"Content-Type": "application/json"
}
# ========== 代理配置 ==========
USE_PROXY = os.getenv("USE_PROXY", "true").lower() == "true"
PROXY_YYYX_URL = os.getenv("PROXY_YYYX_URL", "")
# 并发数
MAX_WORKERS = int(os.getenv("MAX_WORKERS", "1"))
# 账号间启动延迟(秒)
ACCOUNT_START_DELAY = float(os.getenv("ACCOUNT_START_DELAY", "5"))
# 线程锁
print_lock = threading.Lock()
def safe_print(msg):
with print_lock:
print(msg)
def get_one_proxy() -> Optional[Dict]:
"""获取一个代理IP每个账号独立调用"""
if not PROXY_YYYX_URL:
safe_print(" ⚠️ 未配置代理提取链接,跳过获取代理")
return None
try:
resp = requests.get(PROXY_YYYX_URL, timeout=10)
proxy_ip = resp.text.strip()
if proxy_ip and len(proxy_ip) > 5 and not proxy_ip.startswith("{"):
proxy_url = f"http://{proxy_ip}"
return {"http": proxy_url, "https": proxy_url}
return None
except Exception as e:
safe_print(f" ⚠️ 获取代理失败: {e}")
return None
# ========== 云端微信接口 ==========
def get_online_accounts() -> List[Dict]:
resp = requests.get(f"{API_URL}/wechat/wechat/list", headers=HEADERS,
params={"pageNum": 1, "pageSize": 999}, verify=False)
result = resp.json()
if result.get("code") == 200:
return [a for a in result.get("rows", []) if a.get("onlineStatus") == "1"]
return []
def get_code(wxid: str) -> Optional[str]:
resp = requests.post(f"{API_URL}/wechat/api/getMiniProgramCode",
headers=HEADERS,
json={"wxid": wxid, "appid": APPID},
verify=False)
if resp.status_code == 200:
data = resp.json()
return data.get("data", {}).get("code") or data.get("code") or data.get("data")
return None
def login_to_get_token(code: str) -> Optional[str]:
headers = {
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15",
"Content-Type": "application/json",
"appid": APPID,
"mp-platform": "weapp",
"x-web-id": str(uuid.uuid4()),
"Referer": f"https://servicewechat.com/{APPID}/91/page-frame.html"
}
body = {"code": code, "errMsg": "login:ok"}
try:
resp = requests.post(f"{API_BASE}/user/login", headers=headers, json=body, verify=False)
result = resp.json()
if result.get("error_code") == 0:
return result.get("data", {}).get("token")
except:
pass
return None
# ========== 云影优选任务 ==========
class YunyingTask:
def __init__(self, token: str, name: str, proxy: dict = None):
self.token = token
self.name = name
self.base_url = "https://cid-cps-api.heliang.cc"
self.appid = APPID
self.proxy = proxy
def _generate_uuid(self) -> str:
return str(uuid.uuid4())
def _build_headers(self, pageurl="pages%2Fpackage%2Fcashback%2Findex", pageurl_pre="pages%2Fcustom-tabbar%2Findex"):
ua = "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 Mobile/15E148 MicroMessenger/8.0.50 NetType/WIFI Language/zh_CN"
return {
"User-Agent": ua,
"Content-Type": "application/json",
"appid": self.appid,
"mp-platform": "weapp",
"x-token": self.token,
"x-web-id": self._generate_uuid(),
"pageurl": pageurl,
"pageurl-pre": pageurl_pre,
"Referer": f"https://servicewechat.com/{self.appid}/91/page-frame.html"
}
def _request(self, url: str, data: dict = None, headers: dict = None) -> dict:
try:
resp = requests.post(url, headers=headers or self._build_headers(),
json=data or {}, timeout=30, verify=False, proxies=self.proxy)
return resp.json()
except Exception as e:
safe_print(f" [{self.name}] 请求失败: {e}")
return {}
def checkin(self) -> bool:
result = self._request(f"{self.base_url}/activity/checkin")
if result.get("error_code") == 0:
coin = result.get("data", {}).get("coin", 0)
safe_print(f" [{self.name}] ✅ 签到成功 +{coin}金币")
return True
else:
msg = result.get("msg", "失败")
if "已签到" in msg:
safe_print(f" [{self.name}] ⚠️ 今日已签到")
else:
safe_print(f" [{self.name}] ❌ 签到失败: {msg}")
return False
def get_balance(self) -> Dict:
result = self._request(f"{self.base_url}/coin/mine")
if result.get("error_code") == 0:
coin = result.get("data", {}).get("coin", 0)
amount = result.get("data", {}).get("left_amount", 0)
safe_print(f" [{self.name}] 💰 金币: {coin} | 余额: {amount}")
return {"coin": coin, "amount": amount}
return {"coin": 0, "amount": 0}
def watch_video(self) -> bool:
result = self._request(f"{self.base_url}/activity/video/reward")
if result.get("error_code") == 0:
coin = result.get("data", {}).get("reward_amount", 0)
safe_print(f" [{self.name}] 📺 看广告 +{coin}金币")
return True
return False
def coin_to_rmb(self, coin: int) -> bool:
result = self._request(f"{self.base_url}/coin/to-rmb", data={"version": 1})
if result.get("error_code") == 0:
safe_print(f" [{self.name}] 💱 兑换成功! {coin}金币 → {coin/10}")
return True
else:
safe_print(f" [{self.name}] ❌ 兑换失败: {result.get('msg')}")
return False
def get_withdrawal_list(self) -> List[Dict]:
headers = self._build_headers(
pageurl="pages%2Fpackage%2Fcashback%2Fmy-cash%2Findex",
pageurl_pre="pages%2Fpackage%2Fcashback%2Findex"
)
result = self._request(f"{self.base_url}/coin/withdrawal-list", headers=headers)
if result.get("error_code") == 0:
items = result.get("data", [])
available = [{"permission_id": i.get("permission_id"), "amount": i.get("amount"), "rmb": i.get("rmb")}
for i in items if i.get("is_available") == True]
available.sort(key=lambda x: x.get("rmb", 0), reverse=True)
return available
return []
def withdraw(self, permission_id: str, amount: int, rmb: float) -> bool:
headers = self._build_headers(
pageurl="pages%2Fpackage%2Fcashback%2Fmy-cash%2Findex",
pageurl_pre="pages%2Fpackage%2Fcashback%2Findex"
)
result = self._request(f"{self.base_url}/coin/withdrawal",
data={"permission_id": permission_id, "amount": amount},
headers=headers)
if result.get("error_code") == 0:
safe_print(f" [{self.name}] 💸 提现成功! {rmb}元已到账")
return True
else:
safe_print(f" [{self.name}] ❌ 提现失败: {result.get('msg')}")
return False
def auto_withdraw(self):
balance = self.get_balance()
if balance.get("coin", 0) > 1:
self.coin_to_rmb(balance.get("coin", 0))
time.sleep(random.uniform(1, 2))
balance = self.get_balance()
withdraw_list = self.get_withdrawal_list()
if not withdraw_list:
safe_print(f" [{self.name}] 暂无可用提现选项")
return
for item in withdraw_list:
rmb = item.get("rmb", 0)
if balance.get("amount", 0) >= rmb:
self.withdraw(item.get("permission_id"), item.get("amount"), rmb)
time.sleep(random.uniform(1, 2))
balance = self.get_balance()
else:
safe_print(f" [{self.name}] ⚠️ 余额不足,无法提现 {rmb}")
break
def do_tasks(self):
safe_print(f"\n{'='*50}")
safe_print(f"👤 账号: {self.name}")
if self.proxy:
safe_print(f"🌐 代理: {self.proxy.get('http', '')[:50]}...")
safe_print(f"{'='*50}")
self.checkin()
time.sleep(random.uniform(1, 2))
self.get_balance()
time.sleep(random.uniform(1, 2))
safe_print(f" [{self.name}] 🎬 开始看广告...")
success_count = 0
for i in range(10):
if self.watch_video():
success_count += 1
time.sleep(random.uniform(2, 4))
safe_print(f" [{self.name}] 📊 完成: 成功看广告 {success_count}/5 次")
self.auto_withdraw()
safe_print(f" [{self.name}] 📈 最终状态:")
self.get_balance()
def process_one_account(acc: Dict, index: int, total: int) -> Dict:
"""处理单个账号 - 每个账号独立获取代理"""
wxid = acc.get("wxId")
name = acc.get("wxName", f"账号{index}")
safe_print(f"\n🔑 [{index}/{total}] {name} 开始处理...")
# 每个账号独立获取一个新代理
proxy = get_one_proxy() if USE_PROXY else None
if proxy:
safe_print(f" [{name}] 🌐 获取独立代理成功")
else:
safe_print(f" [{name}] 🌐 无代理/代理未配置")
# 获取code
code = get_code(wxid)
if not code:
safe_print(f" [{name}] ❌ 获取code失败")
return {"name": name, "success": False, "error": "获取code失败"}
# 换取token
token = login_to_get_token(code)
if not token:
safe_print(f" [{name}] ❌ 换取token失败")
return {"name": name, "success": False, "error": "换取token失败"}
safe_print(f" [{name}] ✅ token: {token[:20]}...")
# 执行任务
task = YunyingTask(token, name, proxy)
task.do_tasks()
return {"name": name, "success": True}
def main():
print("=" * 60)
print("🔄 云影优选 - 全自动版")
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"🚀 并发数: {MAX_WORKERS}")
print(f"⏱️ 启动延迟: 每账号 {ACCOUNT_START_DELAY}")
print(f"🌐 代理模式: {'每个账号独立代理' if USE_PROXY else '关闭'}")
print("=" * 60)
# 获取云端微信账号
print("\n📱 获取云端微信账号...")
accounts = get_online_accounts()
if not accounts:
print("❌ 没有在线账号")
return
print(f"✅ 在线账号: {len(accounts)}")
print(f"🚀 开始并发处理,每 {ACCOUNT_START_DELAY} 秒启动一个账号...\n")
results = []
futures = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 逐个提交任务,每个任务延迟启动
for i, acc in enumerate(accounts):
future = executor.submit(process_one_account, acc, i+1, len(accounts))
futures.append(future)
# 每个任务提交后延迟2秒再提交下一个
if i < len(accounts) - 1:
time.sleep(ACCOUNT_START_DELAY)
# 等待所有任务完成
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
safe_print(f"❌ 线程执行失败: {e}")
# 统计结果
success_count = sum(1 for r in results if r.get("success"))
print("\n" + "=" * 60)
print(f"📊 执行完成!")
print(f" 总账号: {len(accounts)}")
print(f" 成功: {success_count}")
print(f" 失败: {len(accounts) - success_count}")
print("=" * 60)
if __name__ == "__main__":
import urllib3
urllib3.disable_warnings()
main()