diff --git a/WX_Applet/Applet_JYHS_JYXE.py b/WX_Applet/Applet_JYHS_JYXE.py new file mode 100644 index 0000000..cefe638 --- /dev/null +++ b/WX_Applet/Applet_JYHS_JYXE.py @@ -0,0 +1,212 @@ +# cron: 28 7 * * * +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +旧衣小二全自动化脚本 - 支持单号测试/多号批量/自动拉取全部账号 +""" + +import os +import sys +import time +import random +import datetime +import requests +from typing import Optional, List, Tuple, Dict + +# ==================== 固定配置 ==================== +JYXE_APPID = "wx426d52c8130b8559" +JYXE_LOGIN_URL = "https://jiuyixiaoer.fzjingzhou.com/api/login/getWxMiniProgramSessionKey" +JYXE_BASE_URL = "https://jiuyixiaoer.fzjingzhou.com" + +# ==================== 环境变量 ==================== +WX_CLOUD = os.getenv('wx_cloud', '') +AUTH_TOKEN = os.getenv('wx_token', '') +SINGLE_TEST_WXID = os.getenv('SINGLE_TEST_WXID', '') +WX_IDS_ENV = os.getenv('WX_IDS', '') + +# ==================== 获取 wxid 列表 ==================== +def fetch_all_wxids_from_yjc() -> List[str]: + """从养鸡场获取全部可用的 wxid 列表(使用 /wechat/list 接口)""" + if not WX_CLOUD or not AUTH_TOKEN: + print("❌ 养鸡场配置缺失,无法获取全部账号") + return [] + url = f"{WX_CLOUD}/prod-api/wechat/wechat/list?pageNum=1&pageSize=1000" + headers = {"Authorization": f"Bearer {AUTH_TOKEN}"} + try: + resp = requests.get(url, headers=headers, timeout=30) + if resp.status_code != 200: + print(f"❌ 获取账号列表失败: HTTP {resp.status_code}") + return [] + data = resp.json() + rows = data.get('rows', []) + if not rows: + print("⚠️ 养鸡场返回账号列表为空") + return [] + wxids = [] + for item in rows: + wxid = item.get('wxId') or item.get('wxid') + if wxid: + wxids.append(wxid) + print(f"✅ 成功获取 {len(wxids)} 个账号") + return wxids + except Exception as e: + print(f"❌ 请求异常: {e}") + return [] + +def get_wxid_list() -> List[str]: + """获取待处理的 wxid 列表:优先环境变量,否则拉取全部""" + if SINGLE_TEST_WXID: + return [SINGLE_TEST_WXID] + if WX_IDS_ENV: + return [wxid.strip() for wxid in WX_IDS_ENV.split(',') if wxid.strip()] + print("📡 未指定 wxid,正在从养鸡场获取全部账号...") + all_wxids = fetch_all_wxids_from_yjc() + if not all_wxids: + print("❌ 从养鸡场获取全部账号失败,请检查接口或手动设置 WX_IDS") + return [] + return all_wxids + +# ==================== 业务函数 ==================== +def check_config() -> bool: + if not WX_CLOUD: + print("❌ 环境变量 wx_cloud 未设置") + return False + if not AUTH_TOKEN: + print("❌ 环境变量 wx_token 未设置") + return False + return True + +def http_post(url: str, data=None, json_data=None, headers=None, timeout=30) -> Optional[Dict]: + try: + resp = requests.post(url, data=data, json=json_data, headers=headers, timeout=timeout) + if resp.status_code != 200: + print(f"HTTP {resp.status_code}: {url}") + return None + return resp.json() + except Exception as e: + print(f"请求失败 {url}: {str(e)}") + return None + +def get_miniprogram_code(wxid: str) -> Optional[str]: + url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode" + payload = {"wxid": wxid, "appid": JYXE_APPID} + headers = {"Authorization": f"Bearer {AUTH_TOKEN}", "Content-Type": "application/json"} + result = http_post(url, json_data=payload, headers=headers, timeout=30) + if result and result.get("code") == 200 and result.get("data", {}).get("code"): + return result["data"]["code"] + print(f"获取code失败: {result}") + return None + +def login_jyxe(code: str) -> Optional[str]: + payload = {"code": code, "appid": JYXE_APPID} + headers = {"Content-Type": "application/json"} + result = http_post(JYXE_LOGIN_URL, json_data=payload, headers=headers, timeout=30) + if result and result.get("code") == 1000 and result.get("data", {}).get("token"): + return result["data"]["token"] + print(f"登录失败: {result}") + return None + +def get_user_info(token: str) -> Tuple[float, str, int]: + url = f"{JYXE_BASE_URL}/api/Person/index" + headers = { + "User-Agent": "Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36", + "content-type": "application/x-www-form-urlencoded", + "platform": "MP-WEIXIN", + } + data = f"token={token}" + try: + resp = requests.post(url, headers=headers, data=data, timeout=30) + info = resp.json() + if "success" in info.get("msg", ""): + exchange = float(info["data"].get("exchange", 0)) + mobile = info["data"].get("mobile", "未知") + sign_in_num = int(info["data"].get("sign_in_num", 0)) + return exchange, mobile, sign_in_num + except Exception as e: + print(f"获取用户信息异常: {e}") + return 0.0, "未知", 0 + +def do_sign_in(token: str) -> Dict: + url = f"{JYXE_BASE_URL}/api/Person/sign" + headers = { + "User-Agent": "Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36", + "content-type": "application/x-www-form-urlencoded", + "platform": "MP-WEIXIN", + } + data = f"token={token}" + try: + resp = requests.post(url, headers=headers, data=data, timeout=30) + return resp.json() + except Exception as e: + return {"code": -1, "msg": f"请求失败: {e}"} + +def do_withdraw(token: str) -> str: + url = f"{JYXE_BASE_URL}/api/cash/scoreWithdraw" + headers = { + "User-Agent": "Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36", + "content-type": "application/x-www-form-urlencoded", + "platform": "MP-WEIXIN", + } + data = f"type=wx_account&score=20&token={token}" + try: + resp = requests.post(url, headers=headers, data=data, timeout=30) + result = resp.json() + return result.get("msg", "未知") + except Exception as e: + return f"异常: {e}" + +def process_one_wxid(wxid: str) -> None: + print(f"\n>>> 账号: {wxid}") + code = get_miniprogram_code(wxid) + if not code: + print("❌ 获取小程序code失败") + return + token = login_jyxe(code) + if not token: + print("❌ 登录旧衣小二失败") + return + exchange, mobile, sign_in_num = get_user_info(token) + sign_result = do_sign_in(token) + exchange_new, mobile_new, sign_in_num_new = get_user_info(token) + sign_msg = sign_result.get("msg", "未知") + is_success = sign_result.get("code") == 1000 + if is_success and (sign_in_num_new % 2 == 1): + print(f"📱:{mobile}\n☁️签到:成功\n☁️获得:0.1元\n🌈现金:{exchange_new}元") + elif is_success: + print(f"📱:{mobile}\n☁️签到:成功\n🌈现金:{exchange_new}元") + else: + if sign_in_num_new % 2 == 1: + print(f"📱:{mobile}\n☁️签到:{sign_msg}\n☁️获得:0.1元\n🌈现金:{exchange_new}元") + else: + print(f"📱:{mobile}\n☁️签到:{sign_msg}\n🌈现金:{exchange_new}元") + if exchange_new >= 2.0: + withdraw_msg = do_withdraw(token) + print(f"☁️提现:{withdraw_msg}") + +def main(): + if not check_config(): + sys.exit(1) + wxid_list = get_wxid_list() + if not wxid_list: + sys.exit(1) + print(f"{' ' * 10}꧁༺ 旧衣༒小二 (全自动版) ༻꧂\n") + total = len(wxid_list) + for idx, wxid in enumerate(wxid_list, 1): + print(f"\n----------- 🍺账号【{idx}/{total}】执行🍺 -----------") + now = datetime.datetime.now().time() + start = datetime.datetime.strptime('05:01', '%H:%M').time() + end = datetime.datetime.strptime('06:59', '%H:%M').time() + if start <= now <= end: + delay = random.randint(100, 500) + print(f"⏰ 凌晨时段,随机延迟 {delay} 秒...") + time.sleep(delay) + try: + process_one_wxid(wxid) + except Exception as e: + print(f"❌ 处理异常: {e}") + if idx < total: + time.sleep(random.randint(1, 2)) + print(f'\n----------- 🎊 执 行 结 束 🎊 -----------') + +if __name__ == '__main__': + main() \ No newline at end of file