添加 WX_Applet/Applet_JYHS_JYXE.py

This commit is contained in:
2026-06-02 00:51:45 +08:00
parent e675f3a487
commit cdfd9f7c45

View File

@@ -0,0 +1,212 @@
# cron: 28 7 * * *
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
旧衣小二全自动化脚本 - 支持单号测试/多号批量/自动拉取全部账号
"""
import os
import sys
import time
import random
import datetime
import requests
from typing import Optional, List, Tuple, Dict
# ==================== 固定配置 ====================
JYXE_APPID = "wx426d52c8130b8559"
JYXE_LOGIN_URL = "https://jiuyixiaoer.fzjingzhou.com/api/login/getWxMiniProgramSessionKey"
JYXE_BASE_URL = "https://jiuyixiaoer.fzjingzhou.com"
# ==================== 环境变量 ====================
WX_CLOUD = os.getenv('wx_cloud', '')
AUTH_TOKEN = os.getenv('wx_token', '')
SINGLE_TEST_WXID = os.getenv('SINGLE_TEST_WXID', '')
WX_IDS_ENV = os.getenv('WX_IDS', '')
# ==================== 获取 wxid 列表 ====================
def fetch_all_wxids_from_yjc() -> List[str]:
"""从养鸡场获取全部可用的 wxid 列表(使用 /wechat/list 接口)"""
if not WX_CLOUD or not AUTH_TOKEN:
print("❌ 养鸡场配置缺失,无法获取全部账号")
return []
url = f"{WX_CLOUD}/prod-api/wechat/wechat/list?pageNum=1&pageSize=1000"
headers = {"Authorization": f"Bearer {AUTH_TOKEN}"}
try:
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code != 200:
print(f"❌ 获取账号列表失败: HTTP {resp.status_code}")
return []
data = resp.json()
rows = data.get('rows', [])
if not rows:
print("⚠️ 养鸡场返回账号列表为空")
return []
wxids = []
for item in rows:
wxid = item.get('wxId') or item.get('wxid')
if wxid:
wxids.append(wxid)
print(f"✅ 成功获取 {len(wxids)} 个账号")
return wxids
except Exception as e:
print(f"❌ 请求异常: {e}")
return []
def get_wxid_list() -> List[str]:
"""获取待处理的 wxid 列表:优先环境变量,否则拉取全部"""
if SINGLE_TEST_WXID:
return [SINGLE_TEST_WXID]
if WX_IDS_ENV:
return [wxid.strip() for wxid in WX_IDS_ENV.split(',') if wxid.strip()]
print("📡 未指定 wxid正在从养鸡场获取全部账号...")
all_wxids = fetch_all_wxids_from_yjc()
if not all_wxids:
print("❌ 从养鸡场获取全部账号失败,请检查接口或手动设置 WX_IDS")
return []
return all_wxids
# ==================== 业务函数 ====================
def check_config() -> bool:
if not WX_CLOUD:
print("❌ 环境变量 wx_cloud 未设置")
return False
if not AUTH_TOKEN:
print("❌ 环境变量 wx_token 未设置")
return False
return True
def http_post(url: str, data=None, json_data=None, headers=None, timeout=30) -> Optional[Dict]:
try:
resp = requests.post(url, data=data, json=json_data, headers=headers, timeout=timeout)
if resp.status_code != 200:
print(f"HTTP {resp.status_code}: {url}")
return None
return resp.json()
except Exception as e:
print(f"请求失败 {url}: {str(e)}")
return None
def get_miniprogram_code(wxid: str) -> Optional[str]:
url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode"
payload = {"wxid": wxid, "appid": JYXE_APPID}
headers = {"Authorization": f"Bearer {AUTH_TOKEN}", "Content-Type": "application/json"}
result = http_post(url, json_data=payload, headers=headers, timeout=30)
if result and result.get("code") == 200 and result.get("data", {}).get("code"):
return result["data"]["code"]
print(f"获取code失败: {result}")
return None
def login_jyxe(code: str) -> Optional[str]:
payload = {"code": code, "appid": JYXE_APPID}
headers = {"Content-Type": "application/json"}
result = http_post(JYXE_LOGIN_URL, json_data=payload, headers=headers, timeout=30)
if result and result.get("code") == 1000 and result.get("data", {}).get("token"):
return result["data"]["token"]
print(f"登录失败: {result}")
return None
def get_user_info(token: str) -> Tuple[float, str, int]:
url = f"{JYXE_BASE_URL}/api/Person/index"
headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36",
"content-type": "application/x-www-form-urlencoded",
"platform": "MP-WEIXIN",
}
data = f"token={token}"
try:
resp = requests.post(url, headers=headers, data=data, timeout=30)
info = resp.json()
if "success" in info.get("msg", ""):
exchange = float(info["data"].get("exchange", 0))
mobile = info["data"].get("mobile", "未知")
sign_in_num = int(info["data"].get("sign_in_num", 0))
return exchange, mobile, sign_in_num
except Exception as e:
print(f"获取用户信息异常: {e}")
return 0.0, "未知", 0
def do_sign_in(token: str) -> Dict:
url = f"{JYXE_BASE_URL}/api/Person/sign"
headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36",
"content-type": "application/x-www-form-urlencoded",
"platform": "MP-WEIXIN",
}
data = f"token={token}"
try:
resp = requests.post(url, headers=headers, data=data, timeout=30)
return resp.json()
except Exception as e:
return {"code": -1, "msg": f"请求失败: {e}"}
def do_withdraw(token: str) -> str:
url = f"{JYXE_BASE_URL}/api/cash/scoreWithdraw"
headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36",
"content-type": "application/x-www-form-urlencoded",
"platform": "MP-WEIXIN",
}
data = f"type=wx_account&score=20&token={token}"
try:
resp = requests.post(url, headers=headers, data=data, timeout=30)
result = resp.json()
return result.get("msg", "未知")
except Exception as e:
return f"异常: {e}"
def process_one_wxid(wxid: str) -> None:
print(f"\n>>> 账号: {wxid}")
code = get_miniprogram_code(wxid)
if not code:
print("❌ 获取小程序code失败")
return
token = login_jyxe(code)
if not token:
print("❌ 登录旧衣小二失败")
return
exchange, mobile, sign_in_num = get_user_info(token)
sign_result = do_sign_in(token)
exchange_new, mobile_new, sign_in_num_new = get_user_info(token)
sign_msg = sign_result.get("msg", "未知")
is_success = sign_result.get("code") == 1000
if is_success and (sign_in_num_new % 2 == 1):
print(f"📱:{mobile}\n☁️签到:成功\n获得0.1元\n🌈现金:{exchange_new}")
elif is_success:
print(f"📱:{mobile}\n☁️签到:成功\n🌈现金:{exchange_new}")
else:
if sign_in_num_new % 2 == 1:
print(f"📱:{mobile}\n☁️签到:{sign_msg}\n获得0.1元\n🌈现金:{exchange_new}")
else:
print(f"📱:{mobile}\n☁️签到:{sign_msg}\n🌈现金:{exchange_new}")
if exchange_new >= 2.0:
withdraw_msg = do_withdraw(token)
print(f"☁️提现:{withdraw_msg}")
def main():
if not check_config():
sys.exit(1)
wxid_list = get_wxid_list()
if not wxid_list:
sys.exit(1)
print(f"{' ' * 10}꧁༺ 旧衣༒小二 (全自动版) ༻꧂\n")
total = len(wxid_list)
for idx, wxid in enumerate(wxid_list, 1):
print(f"\n----------- 🍺账号【{idx}/{total}】执行🍺 -----------")
now = datetime.datetime.now().time()
start = datetime.datetime.strptime('05:01', '%H:%M').time()
end = datetime.datetime.strptime('06:59', '%H:%M').time()
if start <= now <= end:
delay = random.randint(100, 500)
print(f"⏰ 凌晨时段,随机延迟 {delay} 秒...")
time.sleep(delay)
try:
process_one_wxid(wxid)
except Exception as e:
print(f"❌ 处理异常: {e}")
if idx < total:
time.sleep(random.randint(1, 2))
print(f'\n----------- 🎊 执 行 结 束 🎊 -----------')
if __name__ == '__main__':
main()