上传文件至「WX_Applet」

This commit is contained in:
2026-05-16 00:45:02 +08:00
commit f88af55073
2 changed files with 1062 additions and 0 deletions

359
WX_Applet/Applet_YYYX.py Normal file
View File

@@ -0,0 +1,359 @@
# cron: 28 8 * * *
# const $ = new Env("云影优选");
"""
云影优选 - 完全自动版1并发 + 独立代理 + 5秒延迟
配置通过环境变量加载,不再硬编码敏感信息。
"""
import os
import requests
import json
import uuid
import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import List, Dict, Optional
# ========== 从环境变量读取配置 ==========
wx_cloud = os.getenv("wx_cloud", "http://127.0.0.1:666")
API_URL = f"{wx_cloud}/prod-api"
APPID = "wx1661b44e984b6fcb"
API_BASE = "https://cid-cps-api.heliang.cc"
# 敏感凭据:必须在环境变量中设置,脚本内不留痕迹
wx_token = os.getenv("wx_token", "")
if not wx_token:
print("❌ 错误:环境变量 wx_token 未设置,请先 export wx_token='Bearer ...'")
exit(1)
HEADERS = {
"Authorization": f"Bearer {wx_token}",
"Content-Type": "application/json"
}
# ========== 代理配置 ==========
USE_PROXY = os.getenv("USE_PROXY", "true").lower() == "true"
PROXY_YYYX_URL = os.getenv("PROXY_YYYX_URL", "")
# 并发数
MAX_WORKERS = int(os.getenv("MAX_WORKERS", "1"))
# 账号间启动延迟(秒)
ACCOUNT_START_DELAY = float(os.getenv("ACCOUNT_START_DELAY", "5"))
# 线程锁
print_lock = threading.Lock()
def safe_print(msg):
with print_lock:
print(msg)
def get_one_proxy() -> Optional[Dict]:
"""获取一个代理IP每个账号独立调用"""
if not PROXY_YYYX_URL:
safe_print(" ⚠️ 未配置代理提取链接,跳过获取代理")
return None
try:
resp = requests.get(PROXY_YYYX_URL, timeout=10)
proxy_ip = resp.text.strip()
if proxy_ip and len(proxy_ip) > 5 and not proxy_ip.startswith("{"):
proxy_url = f"http://{proxy_ip}"
return {"http": proxy_url, "https": proxy_url}
return None
except Exception as e:
safe_print(f" ⚠️ 获取代理失败: {e}")
return None
# ========== 云端微信接口 ==========
def get_online_accounts() -> List[Dict]:
resp = requests.get(f"{API_URL}/wechat/wechat/list", headers=HEADERS,
params={"pageNum": 1, "pageSize": 999}, verify=False)
result = resp.json()
if result.get("code") == 200:
return [a for a in result.get("rows", []) if a.get("onlineStatus") == "1"]
return []
def get_code(wxid: str) -> Optional[str]:
resp = requests.post(f"{API_URL}/wechat/api/getMiniProgramCode",
headers=HEADERS,
json={"wxid": wxid, "appid": APPID},
verify=False)
if resp.status_code == 200:
data = resp.json()
return data.get("data", {}).get("code") or data.get("code") or data.get("data")
return None
def login_to_get_token(code: str) -> Optional[str]:
headers = {
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15",
"Content-Type": "application/json",
"appid": APPID,
"mp-platform": "weapp",
"x-web-id": str(uuid.uuid4()),
"Referer": f"https://servicewechat.com/{APPID}/91/page-frame.html"
}
body = {"code": code, "errMsg": "login:ok"}
try:
resp = requests.post(f"{API_BASE}/user/login", headers=headers, json=body, verify=False)
result = resp.json()
if result.get("error_code") == 0:
return result.get("data", {}).get("token")
except:
pass
return None
# ========== 云影优选任务 ==========
class YunyingTask:
def __init__(self, token: str, name: str, proxy: dict = None):
self.token = token
self.name = name
self.base_url = "https://cid-cps-api.heliang.cc"
self.appid = APPID
self.proxy = proxy
def _generate_uuid(self) -> str:
return str(uuid.uuid4())
def _build_headers(self, pageurl="pages%2Fpackage%2Fcashback%2Findex", pageurl_pre="pages%2Fcustom-tabbar%2Findex"):
ua = "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 Mobile/15E148 MicroMessenger/8.0.50 NetType/WIFI Language/zh_CN"
return {
"User-Agent": ua,
"Content-Type": "application/json",
"appid": self.appid,
"mp-platform": "weapp",
"x-token": self.token,
"x-web-id": self._generate_uuid(),
"pageurl": pageurl,
"pageurl-pre": pageurl_pre,
"Referer": f"https://servicewechat.com/{self.appid}/91/page-frame.html"
}
def _request(self, url: str, data: dict = None, headers: dict = None) -> dict:
try:
resp = requests.post(url, headers=headers or self._build_headers(),
json=data or {}, timeout=30, verify=False, proxies=self.proxy)
return resp.json()
except Exception as e:
safe_print(f" [{self.name}] 请求失败: {e}")
return {}
def checkin(self) -> bool:
result = self._request(f"{self.base_url}/activity/checkin")
if result.get("error_code") == 0:
coin = result.get("data", {}).get("coin", 0)
safe_print(f" [{self.name}] ✅ 签到成功 +{coin}金币")
return True
else:
msg = result.get("msg", "失败")
if "已签到" in msg:
safe_print(f" [{self.name}] ⚠️ 今日已签到")
else:
safe_print(f" [{self.name}] ❌ 签到失败: {msg}")
return False
def get_balance(self) -> Dict:
result = self._request(f"{self.base_url}/coin/mine")
if result.get("error_code") == 0:
coin = result.get("data", {}).get("coin", 0)
amount = result.get("data", {}).get("left_amount", 0)
safe_print(f" [{self.name}] 💰 金币: {coin} | 余额: {amount}")
return {"coin": coin, "amount": amount}
return {"coin": 0, "amount": 0}
def watch_video(self) -> bool:
result = self._request(f"{self.base_url}/activity/video/reward")
if result.get("error_code") == 0:
coin = result.get("data", {}).get("reward_amount", 0)
safe_print(f" [{self.name}] 📺 看广告 +{coin}金币")
return True
return False
def coin_to_rmb(self, coin: int) -> bool:
result = self._request(f"{self.base_url}/coin/to-rmb", data={"version": 1})
if result.get("error_code") == 0:
safe_print(f" [{self.name}] 💱 兑换成功! {coin}金币 → {coin/10}")
return True
else:
safe_print(f" [{self.name}] ❌ 兑换失败: {result.get('msg')}")
return False
def get_withdrawal_list(self) -> List[Dict]:
headers = self._build_headers(
pageurl="pages%2Fpackage%2Fcashback%2Fmy-cash%2Findex",
pageurl_pre="pages%2Fpackage%2Fcashback%2Findex"
)
result = self._request(f"{self.base_url}/coin/withdrawal-list", headers=headers)
if result.get("error_code") == 0:
items = result.get("data", [])
available = [{"permission_id": i.get("permission_id"), "amount": i.get("amount"), "rmb": i.get("rmb")}
for i in items if i.get("is_available") == True]
available.sort(key=lambda x: x.get("rmb", 0), reverse=True)
return available
return []
def withdraw(self, permission_id: str, amount: int, rmb: float) -> bool:
headers = self._build_headers(
pageurl="pages%2Fpackage%2Fcashback%2Fmy-cash%2Findex",
pageurl_pre="pages%2Fpackage%2Fcashback%2Findex"
)
result = self._request(f"{self.base_url}/coin/withdrawal",
data={"permission_id": permission_id, "amount": amount},
headers=headers)
if result.get("error_code") == 0:
safe_print(f" [{self.name}] 💸 提现成功! {rmb}元已到账")
return True
else:
safe_print(f" [{self.name}] ❌ 提现失败: {result.get('msg')}")
return False
def auto_withdraw(self):
balance = self.get_balance()
if balance.get("coin", 0) > 1:
self.coin_to_rmb(balance.get("coin", 0))
time.sleep(random.uniform(1, 2))
balance = self.get_balance()
withdraw_list = self.get_withdrawal_list()
if not withdraw_list:
safe_print(f" [{self.name}] 暂无可用提现选项")
return
for item in withdraw_list:
rmb = item.get("rmb", 0)
if balance.get("amount", 0) >= rmb:
self.withdraw(item.get("permission_id"), item.get("amount"), rmb)
time.sleep(random.uniform(1, 2))
balance = self.get_balance()
else:
safe_print(f" [{self.name}] ⚠️ 余额不足,无法提现 {rmb}")
break
def do_tasks(self):
safe_print(f"\n{'='*50}")
safe_print(f"👤 账号: {self.name}")
if self.proxy:
safe_print(f"🌐 代理: {self.proxy.get('http', '')[:50]}...")
safe_print(f"{'='*50}")
self.checkin()
time.sleep(random.uniform(1, 2))
self.get_balance()
time.sleep(random.uniform(1, 2))
safe_print(f" [{self.name}] 🎬 开始看广告...")
success_count = 0
for i in range(10):
if self.watch_video():
success_count += 1
time.sleep(random.uniform(2, 4))
safe_print(f" [{self.name}] 📊 完成: 成功看广告 {success_count}/5 次")
self.auto_withdraw()
safe_print(f" [{self.name}] 📈 最终状态:")
self.get_balance()
def process_one_account(acc: Dict, index: int, total: int) -> Dict:
"""处理单个账号 - 每个账号独立获取代理"""
wxid = acc.get("wxId")
name = acc.get("wxName", f"账号{index}")
safe_print(f"\n🔑 [{index}/{total}] {name} 开始处理...")
# 每个账号独立获取一个新代理
proxy = get_one_proxy() if USE_PROXY else None
if proxy:
safe_print(f" [{name}] 🌐 获取独立代理成功")
else:
safe_print(f" [{name}] 🌐 无代理/代理未配置")
# 获取code
code = get_code(wxid)
if not code:
safe_print(f" [{name}] ❌ 获取code失败")
return {"name": name, "success": False, "error": "获取code失败"}
# 换取token
token = login_to_get_token(code)
if not token:
safe_print(f" [{name}] ❌ 换取token失败")
return {"name": name, "success": False, "error": "换取token失败"}
safe_print(f" [{name}] ✅ token: {token[:20]}...")
# 执行任务
task = YunyingTask(token, name, proxy)
task.do_tasks()
return {"name": name, "success": True}
def main():
print("=" * 60)
print("🔄 云影优选 - 全自动版")
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"🚀 并发数: {MAX_WORKERS}")
print(f"⏱️ 启动延迟: 每账号 {ACCOUNT_START_DELAY}")
print(f"🌐 代理模式: {'每个账号独立代理' if USE_PROXY else '关闭'}")
print("=" * 60)
# 获取云端微信账号
print("\n📱 获取云端微信账号...")
accounts = get_online_accounts()
if not accounts:
print("❌ 没有在线账号")
return
print(f"✅ 在线账号: {len(accounts)}")
print(f"🚀 开始并发处理,每 {ACCOUNT_START_DELAY} 秒启动一个账号...\n")
results = []
futures = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 逐个提交任务,每个任务延迟启动
for i, acc in enumerate(accounts):
future = executor.submit(process_one_account, acc, i+1, len(accounts))
futures.append(future)
# 每个任务提交后延迟2秒再提交下一个
if i < len(accounts) - 1:
time.sleep(ACCOUNT_START_DELAY)
# 等待所有任务完成
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
safe_print(f"❌ 线程执行失败: {e}")
# 统计结果
success_count = sum(1 for r in results if r.get("success"))
print("\n" + "=" * 60)
print(f"📊 执行完成!")
print(f" 总账号: {len(accounts)}")
print(f" 成功: {success_count}")
print(f" 失败: {len(accounts) - success_count}")
print("=" * 60)
if __name__ == "__main__":
import urllib3
urllib3.disable_warnings()
main()