Files
Yangmao_Script/WX_Applet/Applet_JYHS_JYXE.py

213 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# cron: 28 7 * * *
# new Env("旧衣回收_旧衣小二")
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
旧衣小二全自动化脚本 - 支持单号测试/多号批量/自动拉取全部账号
"""
import os
import sys
import time
import random
import datetime
import requests
from typing import Optional, List, Tuple, Dict
# ==================== 固定配置 ====================
JYXE_APPID = "wx426d52c8130b8559"
JYXE_LOGIN_URL = "https://jiuyixiaoer.fzjingzhou.com/api/login/getWxMiniProgramSessionKey"
JYXE_BASE_URL = "https://jiuyixiaoer.fzjingzhou.com"
# ==================== 环境变量 ====================
WX_CLOUD = os.getenv('wx_cloud', '')
AUTH_TOKEN = os.getenv('wx_token', '')
SINGLE_TEST_WXID = os.getenv('SINGLE_TEST_WXID', '')
WX_IDS_ENV = os.getenv('WX_IDS', '')
# ==================== 获取 wxid 列表 ====================
def fetch_all_wxids_from_yjc() -> List[str]:
"""从养鸡场获取全部可用的 wxid 列表(使用 /wechat/list 接口)"""
if not WX_CLOUD or not AUTH_TOKEN:
print("❌ 养鸡场配置缺失,无法获取全部账号")
return []
url = f"{WX_CLOUD}/prod-api/wechat/wechat/list?pageNum=1&pageSize=1000"
headers = {"Authorization": f"Bearer {AUTH_TOKEN}"}
try:
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code != 200:
print(f"❌ 获取账号列表失败: HTTP {resp.status_code}")
return []
data = resp.json()
rows = data.get('rows', [])
if not rows:
print("⚠️ 养鸡场返回账号列表为空")
return []
wxids = []
for item in rows:
wxid = item.get('wxId') or item.get('wxid')
if wxid:
wxids.append(wxid)
print(f"✅ 成功获取 {len(wxids)} 个账号")
return wxids
except Exception as e:
print(f"❌ 请求异常: {e}")
return []
def get_wxid_list() -> List[str]:
"""获取待处理的 wxid 列表:优先环境变量,否则拉取全部"""
if SINGLE_TEST_WXID:
return [SINGLE_TEST_WXID]
if WX_IDS_ENV:
return [wxid.strip() for wxid in WX_IDS_ENV.split(',') if wxid.strip()]
print("📡 未指定 wxid正在从养鸡场获取全部账号...")
all_wxids = fetch_all_wxids_from_yjc()
if not all_wxids:
print("❌ 从养鸡场获取全部账号失败,请检查接口或手动设置 WX_IDS")
return []
return all_wxids
# ==================== 业务函数 ====================
def check_config() -> bool:
if not WX_CLOUD:
print("❌ 环境变量 wx_cloud 未设置")
return False
if not AUTH_TOKEN:
print("❌ 环境变量 wx_token 未设置")
return False
return True
def http_post(url: str, data=None, json_data=None, headers=None, timeout=30) -> Optional[Dict]:
try:
resp = requests.post(url, data=data, json=json_data, headers=headers, timeout=timeout)
if resp.status_code != 200:
print(f"HTTP {resp.status_code}: {url}")
return None
return resp.json()
except Exception as e:
print(f"请求失败 {url}: {str(e)}")
return None
def get_miniprogram_code(wxid: str) -> Optional[str]:
url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode"
payload = {"wxid": wxid, "appid": JYXE_APPID}
headers = {"Authorization": f"Bearer {AUTH_TOKEN}", "Content-Type": "application/json"}
result = http_post(url, json_data=payload, headers=headers, timeout=30)
if result and result.get("code") == 200 and result.get("data", {}).get("code"):
return result["data"]["code"]
print(f"获取code失败: {result}")
return None
def login_jyxe(code: str) -> Optional[str]:
payload = {"code": code, "appid": JYXE_APPID}
headers = {"Content-Type": "application/json"}
result = http_post(JYXE_LOGIN_URL, json_data=payload, headers=headers, timeout=30)
if result and result.get("code") == 1000 and result.get("data", {}).get("token"):
return result["data"]["token"]
print(f"登录失败: {result}")
return None
def get_user_info(token: str) -> Tuple[float, str, int]:
url = f"{JYXE_BASE_URL}/api/Person/index"
headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36",
"content-type": "application/x-www-form-urlencoded",
"platform": "MP-WEIXIN",
}
data = f"token={token}"
try:
resp = requests.post(url, headers=headers, data=data, timeout=30)
info = resp.json()
if "success" in info.get("msg", ""):
exchange = float(info["data"].get("exchange", 0))
mobile = info["data"].get("mobile", "未知")
sign_in_num = int(info["data"].get("sign_in_num", 0))
return exchange, mobile, sign_in_num
except Exception as e:
print(f"获取用户信息异常: {e}")
return 0.0, "未知", 0
def do_sign_in(token: str) -> Dict:
url = f"{JYXE_BASE_URL}/api/Person/sign"
headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36",
"content-type": "application/x-www-form-urlencoded",
"platform": "MP-WEIXIN",
}
data = f"token={token}"
try:
resp = requests.post(url, headers=headers, data=data, timeout=30)
return resp.json()
except Exception as e:
return {"code": -1, "msg": f"请求失败: {e}"}
def do_withdraw(token: str) -> str:
url = f"{JYXE_BASE_URL}/api/cash/scoreWithdraw"
headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36",
"content-type": "application/x-www-form-urlencoded",
"platform": "MP-WEIXIN",
}
data = f"type=wx_account&score=20&token={token}"
try:
resp = requests.post(url, headers=headers, data=data, timeout=30)
result = resp.json()
return result.get("msg", "未知")
except Exception as e:
return f"异常: {e}"
def process_one_wxid(wxid: str) -> None:
print(f"\n>>> 账号: {wxid}")
code = get_miniprogram_code(wxid)
if not code:
print("❌ 获取小程序code失败")
return
token = login_jyxe(code)
if not token:
print("❌ 登录旧衣小二失败")
return
exchange, mobile, sign_in_num = get_user_info(token)
sign_result = do_sign_in(token)
exchange_new, mobile_new, sign_in_num_new = get_user_info(token)
sign_msg = sign_result.get("msg", "未知")
is_success = sign_result.get("code") == 1000
if is_success and (sign_in_num_new % 2 == 1):
print(f"📱:{mobile}\n☁️签到:成功\n获得0.1元\n🌈现金:{exchange_new}")
elif is_success:
print(f"📱:{mobile}\n☁️签到:成功\n🌈现金:{exchange_new}")
else:
if sign_in_num_new % 2 == 1:
print(f"📱:{mobile}\n☁️签到:{sign_msg}\n获得0.1元\n🌈现金:{exchange_new}")
else:
print(f"📱:{mobile}\n☁️签到:{sign_msg}\n🌈现金:{exchange_new}")
if exchange_new >= 2.0:
withdraw_msg = do_withdraw(token)
print(f"☁️提现:{withdraw_msg}")
def main():
if not check_config():
sys.exit(1)
wxid_list = get_wxid_list()
if not wxid_list:
sys.exit(1)
print(f"{' ' * 10}꧁༺ 旧衣༒小二 (全自动版) ༻꧂\n")
total = len(wxid_list)
for idx, wxid in enumerate(wxid_list, 1):
print(f"\n----------- 🍺账号【{idx}/{total}】执行🍺 -----------")
now = datetime.datetime.now().time()
start = datetime.datetime.strptime('05:01', '%H:%M').time()
end = datetime.datetime.strptime('06:59', '%H:%M').time()
if start <= now <= end:
delay = random.randint(100, 500)
print(f"⏰ 凌晨时段,随机延迟 {delay} 秒...")
time.sleep(delay)
try:
process_one_wxid(wxid)
except Exception as e:
print(f"❌ 处理异常: {e}")
if idx < total:
time.sleep(random.randint(1, 2))
print(f'\n----------- 🎊 执 行 结 束 🎊 -----------')
if __name__ == '__main__':
main()