#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 爱仙居 AiXianJu 明文 Python 版 cron: 1 6,12 * * * 环境变量: AiXianJu="账号1&密码1&X-TOKEN&支付宝姓名&支付宝账号\n账号2&密码2&X-TOKEN&支付宝姓名&支付宝账号" 多账号说明:每个账号一行,账号之间用换行分隔,不用空格分隔。 可选环境变量: AIXIANJU_ENABLE_WITHDRAW="true" # 还原原 JS:余额>0 自动提现;紧急关闭设 false AIXIANJU_DEBUG="false" # true 打印请求 URL/响应摘要 AIXIANJU_TIMEOUT="20" AIXIANJU_TASK_DELAY_MIN="5" # 每个任务/业务接口后随机延迟下限,单位秒 AIXIANJU_TASK_DELAY_MAX="10" # 每个任务/业务接口后随机延迟上限,单位秒 AIXIANJU_ACCOUNT_DELAY_MIN="8" # 多账号之间随机延迟下限,单位秒 AIXIANJU_ACCOUNT_DELAY_MAX="15" # 多账号之间随机延迟上限,单位秒 依赖:requests(青龙常见环境已内置)。RSA 加密已用纯 Python 实现,无需 pycryptodome。 """ from __future__ import annotations import base64 import hashlib import hmac import json import os import random import re import sys import time import uuid from datetime import datetime from dataclasses import dataclass, field from typing import Any, Dict, Iterable, List, Optional, Tuple from urllib.parse import quote_plus, unquote try: import requests except ImportError as e: print("缺少依赖 requests,请先安装:pip3 install requests") raise NAME = "爱仙居" TENANT_ID = "62" CLIENT_ID = "10016" APP_VERSION = "4.5.2" SIGN_SECRET = "FR*r!isE5W" ENABLE_WITHDRAW = os.getenv("AIXIANJU_ENABLE_WITHDRAW", "true").lower() in {"1", "true", "yes"} DEBUG = os.getenv("AIXIANJU_DEBUG", "false").lower() in {"1", "true", "yes"} TIMEOUT = int(os.getenv("AIXIANJU_TIMEOUT", "20") or "20") TASK_DELAY_MIN = float(os.getenv("AIXIANJU_TASK_DELAY_MIN", "5") or "5") TASK_DELAY_MAX = float(os.getenv("AIXIANJU_TASK_DELAY_MAX", "10") or "10") ACCOUNT_DELAY_MIN = float(os.getenv("AIXIANJU_ACCOUNT_DELAY_MIN", "8") or "8") ACCOUNT_DELAY_MAX = float(os.getenv("AIXIANJU_ACCOUNT_DELAY_MAX", "15") or "15") PASSPORT_BASE = "https://passport.tmuyun.com" VAPP_BASE = "https://vapp.tmuyun.com" ACT_BASE = "https://act.tmlyun.com/activity-api" MY_BASE = "https://my.tmlyun.com/equity-api" ANDROID_MODELS = [ "M1903F2A", "M2001J2E", "M2001J2C", "M2001J1E", "M2001J1C", "M2002J9E", "M2011K2C", "M2102K1C", "M2101K9C", "2107119DC", "2201123C", "2112123AC", "2201122C", "2211133C", "2210132C", "2304FPN6DC", "23127PN0CC", "24031PN0DC", "23090RA98C", "2312DRA50C", "2312CRAD3C", "2312DRAABC", "22101316UCP", "22101316C", ] PASSPORT_RSA_PUB_DER_B64 = ( "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQD6XO7e9YeAOs+cFqwa7ETJ+WXizPqQeXv68i5vqw9pFREsrqiBTRcg7wB0RIp3rJkDpaeVJLsZqYm5TW7FWx/iOiXFc+zCPvaKZric2dXCw27EvlH5rq+zwIPDAJHGAfnn1nmQH7wR3PCatEIb8pz5GFlTHMlluw4ZYmnOwg+thwIDAQAB" ) MOBILE_UA = ( "Mozilla/5.0 (Linux; Android 11; 21091116AC Build/RP1A.200720.011; wv) " "AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/94.0.4606.85 " "Mobile Safari/537.36;xsb_xiangshan;xsb_xiangshan;4.5.2;native_app;6.8.0" ) def mask(s: Any, keep: int = 4) -> str: s = "" if s is None else str(s) if len(s) <= keep * 2: return "*" * len(s) return s[:keep] + "***" + s[-keep:] def short_json(obj: Any, limit: int = 500) -> str: try: text = json.dumps(obj, ensure_ascii=False, separators=(",", ":")) except Exception: text = str(obj) text = re.sub(r'("(?:token|Authorization|X-TOKEN|password|sessionId|session_id|code)"\s*:\s*")([^"\\]{6,})(")', lambda m: m.group(1) + mask(m.group(2)) + m.group(3), text, flags=re.I) return text if len(text) <= limit else text[:limit] + "..." def gen_uuid() -> str: # 原 JS 模板:xxxxxxxx-xxxx-6xxx-yxxx-xxxxxxxxxxxx chars = [] for ch in "xxxxxxxx-xxxx-6xxx-yxxx-xxxxxxxxxxxx": if ch == "x": chars.append(format(random.randint(0, 15), "x")) elif ch == "y": chars.append(format((random.randint(0, 15) & 3) | 8, "x")) else: chars.append(ch) return "".join(chars) def make_ua() -> Tuple[str, str, str]: device_id = gen_uuid() model = random.choice(ANDROID_MODELS) common_ua = f"{APP_VERSION};{device_id};Xiaomi {model};Android;11;Release;6.8.0" app_ua = f"ANDROID;11;{CLIENT_ID};{APP_VERSION};1.0;null;{model}" return app_ua, common_ua, device_id def parse_der_len(data: bytes, idx: int) -> Tuple[int, int]: first = data[idx] idx += 1 if first < 0x80: return first, idx n = first & 0x7F return int.from_bytes(data[idx:idx+n], "big"), idx + n def read_tlv(data: bytes, idx: int, expected_tag: Optional[int] = None) -> Tuple[int, bytes, int]: tag = data[idx] idx += 1 if expected_tag is not None and tag != expected_tag: raise ValueError(f"ASN.1 tag mismatch: want {expected_tag:#x}, got {tag:#x}") length, idx = parse_der_len(data, idx) return tag, data[idx:idx+length], idx + length def rsa_pub_numbers_from_spki(der_b64: str) -> Tuple[int, int, int]: """Parse SubjectPublicKeyInfo DER and return n, e, key_bytes.""" der = base64.b64decode(der_b64) _, seq, _ = read_tlv(der, 0, 0x30) # seq = algorithm SEQUENCE + BIT STRING _, _alg, idx = read_tlv(seq, 0, 0x30) _, bitstr, _idx2 = read_tlv(seq, idx, 0x03) # first byte of BIT STRING is unused-bit count pk = bitstr[1:] _, rsa_seq, _ = read_tlv(pk, 0, 0x30) _, n_bytes, idx3 = read_tlv(rsa_seq, 0, 0x02) _, e_bytes, _ = read_tlv(rsa_seq, idx3, 0x02) n = int.from_bytes(n_bytes.lstrip(b"\x00"), "big") e = int.from_bytes(e_bytes.lstrip(b"\x00"), "big") return n, e, (n.bit_length() + 7) // 8 def rsa_pkcs1_v15_encrypt_b64(plaintext: str, der_b64: str = PASSPORT_RSA_PUB_DER_B64) -> str: """RSAES-PKCS1-v1_5 encryption, compatible with JSEncrypt.encrypt().""" n, e, k = rsa_pub_numbers_from_spki(der_b64) msg = plaintext.encode("utf-8") if len(msg) > k - 11: raise ValueError("明文过长,无法用当前 RSA 公钥加密") ps_len = k - len(msg) - 3 # PS must be non-zero random bytes ps = bytearray() while len(ps) < ps_len: b = random.randint(1, 255) ps.append(b) em = b"\x00\x02" + bytes(ps) + b"\x00" + msg c = pow(int.from_bytes(em, "big"), e, n) return base64.b64encode(c.to_bytes(k, "big")).decode() def hmac_sha256_hex(data: str, key: str) -> str: return hmac.new(key.encode("utf-8"), data.encode("utf-8"), hashlib.sha256).hexdigest() def sha256_hex(data: str) -> str: return hashlib.sha256(data.encode("utf-8")).hexdigest() def today_start_str() -> str: return time.strftime("%Y-%m-%d 00:00:00") def extract_first_q(obj: Any) -> str: m = re.search(r'(?<=q=)[^&",]+', json.dumps(obj, ensure_ascii=False)) return unquote(m.group(0)) if m else "" def extract_first_u(obj: Any) -> str: m = re.search(r'(?<=u=)[^&",]+', json.dumps(obj, ensure_ascii=False)) return unquote(m.group(0)) if m else "" def get_path_no_query(path: str) -> str: return path.split("?", 1)[0] def random_delay(min_seconds: float, max_seconds: float, label: str = "") -> None: """随机延迟;max <= 0 时不等待。""" if max_seconds <= 0: return if min_seconds < 0: min_seconds = 0 if max_seconds < min_seconds: max_seconds = min_seconds seconds = random.uniform(min_seconds, max_seconds) if DEBUG and label: print(f"[DEBUG] 等待{label}:{seconds:.2f}s") time.sleep(seconds) def js_wait(label: str = "") -> None: """每个任务/业务接口后随机等待 5-10 秒,可由 AIXIANJU_TASK_DELAY_MIN/MAX 调整。""" random_delay(TASK_DELAY_MIN, TASK_DELAY_MAX, label) @dataclass class Account: phone: str password: str x_token: str alipay_name: str = "" alipay_account: str = "" @dataclass class ClientState: app_ua: str common_ua: str device_uuid: str session_id: str = "" account_id: str = "" signature_key: str = "" read_q: str = "" read_token: str = "" lottery_q: str = "" lottery_token: str = "" wallet_u: str = "" wallet_token: str = "" summary_lines: List[str] = field(default_factory=list) class AiXianJuRunner: def __init__(self) -> None: self.sess = requests.Session() def request_json(self, method: str, url: str, *, headers: Optional[Dict[str, Any]] = None, data: Any = None, json_body: Any = None, desc: str = "") -> Dict[str, Any]: if DEBUG: print(f"[DEBUG] {desc or method} {url}") if data is not None: print(f"[DEBUG] body={mask(data) if isinstance(data, str) and len(data) > 80 else data}") if json_body is not None: print(f"[DEBUG] json={short_json(json_body)}") resp = self.sess.request(method, url, headers=headers or {}, data=data, json=json_body, timeout=TIMEOUT) text = resp.text try: obj = resp.json() except Exception: obj = {"code": resp.status_code, "message": text} if DEBUG: print(f"[DEBUG] status={resp.status_code} resp={short_json(obj)}") return obj def passport_get(self, state: ClientState, path: str) -> Dict[str, Any]: headers = { "Connection": "Keep-Alive", "Cache-Control": "no-cache", "X-REQUEST-ID": gen_uuid(), "Accept-Encoding": "gzip", "user-agent": state.app_ua, } return self.request_json("GET", PASSPORT_BASE + path, headers=headers, desc=path) def credential_auth(self, account: Account, state: ClientState) -> Dict[str, Any]: encrypted_pwd = rsa_pkcs1_v15_encrypt_b64(account.password) request_id = gen_uuid() raw_body = f"client_id={CLIENT_ID}&password={encrypted_pwd}&phone_number={account.phone}" sign_src = f"post%%/web/oauth/credential_auth?{raw_body}%%{request_id}%%" signature = hmac_sha256_hex(sign_src, state.signature_key) encoded_body = f"client_id={CLIENT_ID}&password={quote_plus(encrypted_pwd)}&phone_number={quote_plus(account.phone)}" headers = { "Connection": "Keep-Alive", "X-REQUEST-ID": request_id, "X-SIGNATURE": signature, "Cache-Control": "no-cache", "Content-Type": "application/x-www-form-urlencoded;charset=UTF-8", "Accept-Encoding": "gzip", "user-agent": state.app_ua, } return self.request_json("POST", PASSPORT_BASE + "/web/oauth/credential_auth", headers=headers, data=encoded_body, desc="获取code") def vapp_sign(self, state: ClientState, path: str) -> Dict[str, Any]: req_id = gen_uuid() ts = int(time.time() * 1000) sign_path = get_path_no_query(path) signature = sha256_hex(f"{sign_path}&&{state.session_id}&&{req_id}&&{ts}&&{SIGN_SECRET}&&{TENANT_ID}") return {"uuid": req_id, "time": ts, "signature": signature} def vapp_get(self, state: ClientState, path: str) -> Dict[str, Any]: sig = self.vapp_sign(state, path) headers = { "Connection": "Keep-Alive", "X-TIMESTAMP": str(sig["time"]), "X-SESSION-ID": state.session_id, "X-REQUEST-ID": sig["uuid"], "X-SIGNATURE": sig["signature"], "X-TENANT-ID": TENANT_ID, "X-ACCOUNT-ID": state.account_id, "Cache-Control": "no-cache", "Accept-Encoding": "gzip", "user-agent": state.common_ua, } obj = self.request_json("GET", VAPP_BASE + path, headers=headers, desc=path) js_wait(path) return obj def vapp_post(self, state: ClientState, path: str, body: str = "") -> Dict[str, Any]: sig = self.vapp_sign(state, path) headers = { "Connection": "Keep-Alive", "X-TIMESTAMP": str(sig["time"]), "X-SESSION-ID": state.session_id, "X-REQUEST-ID": sig["uuid"], "X-SIGNATURE": sig["signature"], "X-TENANT-ID": TENANT_ID, "X-ACCOUNT-ID": state.account_id, "Cache-Control": "no-cache", "Content-Type": "application/x-www-form-urlencoded", "Accept-Encoding": "gzip", "user-agent": state.common_ua, } obj = self.request_json("POST", VAPP_BASE + path, headers=headers, data=body, desc=path) js_wait(path) return obj def activity_headers(self, authorization: str, referer: Optional[str] = None, x_token: str = "") -> Dict[str, str]: headers = { "Connection": "Keep-Alive", "Accept": "application/json, text/plain, */*", "User-Agent": MOBILE_UA, "X-Requested-With": "com.aheading.news.xiangshanrb", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7", "Authorization": authorization or "", } if referer: headers["Referer"] = referer if x_token: headers["X-TOKEN"] = x_token return headers def activity_get(self, state: ClientState, path: str) -> Dict[str, Any]: headers = self.activity_headers(state.read_token, f"https://act.tmlyun.com/lottery/?q={state.read_q}") headers["Origin"] = "https://act.tmlyun.com" obj = self.request_json("GET", ACT_BASE + path, headers=headers, desc=path) js_wait(path) return obj def activity_post_full(self, url: str, payload: Dict[str, Any], authorization: str, x_token: str) -> Dict[str, Any]: headers = self.activity_headers(authorization, x_token=x_token) headers["Content-Type"] = "application/json" obj = self.request_json("POST", url, headers=headers, json_body=payload, desc=url) js_wait(url) return obj def activity_get_full(self, url: str, authorization: str, x_token: str) -> Dict[str, Any]: headers = self.activity_headers(authorization, x_token=x_token) headers["Content-Type"] = "application/json" obj = self.request_json("GET", url, headers=headers, desc=url) js_wait(url) return obj def wallet_get(self, state: ClientState, path: str) -> Dict[str, Any]: headers = { "Connection": "Keep-Alive", "Accept": "application/json, text/plain, */*", "User-Agent": MOBILE_UA, "Origin": "https://my.tmlyun.com", "X-Requested-With": "com.aheading.news.xiangshanrb", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty", "Referer": f"https://my.tmlyun.com/equitypacket/?u={state.wallet_u}", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7", "Authorization": state.wallet_token, } obj = self.request_json("GET", MY_BASE + path, headers=headers, desc=path) js_wait(path) return obj def login(self, account: Account, state: ClientState) -> bool: print("获取sessionId") init = self.vapp_post(state, "/api/account/init") state.session_id = (((init.get("data") or {}).get("session") or {}).get("id") or "") if not state.session_id: print(f"获取sessionId失败:{short_json(init)}") return False print(f"获取sessionId完成:{mask(state.session_id)}") print("获取signature_key") web_init = self.passport_get(state, f"/web/init?client_id={CLIENT_ID}") state.signature_key = (((web_init.get("data") or {}).get("client") or {}).get("signature_key") or "") if not state.signature_key: print(f"获取signature_key失败:{short_json(web_init)}") return False print(f"获取signature_key完成:{mask(state.signature_key)}") print("获取code") cred = self.credential_auth(account, state) code = ((((cred.get("data") or {}).get("authorization_code") or {}).get("code")) or "") if not code: print(f"获取code失败:{cred.get('message') or short_json(cred)}") return False print(f"获取code完成:{mask(code)}") print("登录") body = f"check_token=&code={quote_plus(code)}&token=&type=-1&union_id=" login = self.vapp_post(state, "/api/zbtxz/login", body) session = (login.get("data") or {}).get("session") or {} state.account_id = str(session.get("account_id") or "") state.session_id = str(session.get("id") or state.session_id) if not state.account_id: print(f"登录失败:{login.get('message') or short_json(login)}") return False print(f"登录完成:account_id={mask(state.account_id)} session={mask(state.session_id)}") return True def do_read_tasks_and_lottery(self, account: Account, state: ClientState) -> None: print("————————————") print("阅读抽奖") print("获取阅读活动 q") article_list = self.vapp_get(state, "/api/article/channel_list?channel_id=637c46bbad61a40b77d54c39&isRecommend=0&is_new=1&size=20") state.read_q = extract_first_q(article_list) if not state.read_q: print("获取阅读活动 q 失败,跳过阅读/抽奖") return print(f"获取阅读活动 q 完成:{mask(state.read_q)}") print("获取阅读token") auth = self.activity_post_full( "https://act.tmlyun.com/activity-api/task/h5/auth/userLogin", {"q": state.read_q, "accountId": state.account_id, "sessionId": state.session_id, "tenantCode": "xsb_xianju"}, state.read_token, account.x_token, ) state.read_token = (auth.get("data") or {}).get("token") or "" if not state.read_token: print(f"获取阅读token失败:{short_json(auth)}") return print(f"获取阅读token完成:{mask(state.read_token)}") task_home = self.activity_get(state, "/task/h5/activity/getHomeUserLevelTaskList") task_groups = task_home.get("data") or [] if isinstance(task_groups, dict): task_groups = task_groups.get("list") or task_groups.get("records") or [] for group in task_groups if isinstance(task_groups, list) else []: if group.get("limitTimeStart") != today_start_str(): continue level_id = group.get("taskLevelId") detail = self.activity_get(state, f"/task/h5/activity/getLevelTaskUserList?levelTaskId={level_id}") apps = (detail.get("data") or {}).get("appBaseList") or [] for task in apps: status = task.get("taskUserStatusBO") or {} total = int(status.get("total") or 0) complete = int(status.get("completeNum") or 0) need = max(0, total - complete) print(f"任务:{task.get('name')} 进度:{complete}/{total}") if need <= 0: continue articles = self.vapp_get(state, "/api/article/channel_list?channel_id=637c46bbad61a40b77d54c39&isRecommend=0&is_new=1&size=20") article_items = ((articles.get("data") or {}).get("article_list") or []) for i, article in enumerate(article_items[:need], 1): article_id = article.get("id") if not article_id: continue read_ret = self.vapp_get(state, f"/api/article/read_time?channel_article_id={article_id}&is_end=1&read_time=1617") print(f"阅读{i}/{need}:{read_ret.get('message') or read_ret.get('msg') or short_json(read_ret, 120)}") lottery_info = self.activity_get(state, "/task/h5/activity/getLotteryInfo") lottery_count = int((lottery_info.get("data") or {}).get("lotteryCount") or 0) print(f"拥有{lottery_count}次抽奖") if lottery_count <= 0: return print("获取抽奖token") activity_info = self.activity_get(state, "/task/h5/activity/getActivityInfo") state.lottery_q = extract_first_q(activity_info) or state.read_q lottery_auth = self.activity_post_full( "https://act.tmlyun.com/activity-api/lottery/api/auth/userLogin", {"q": state.lottery_q, "accountId": state.account_id, "sessionId": state.session_id, "tenantCode": "xsb_xianju"}, state.lottery_token, account.x_token, ) state.lottery_token = (lottery_auth.get("data") or {}).get("token") or "" third_id = (lottery_auth.get("data") or {}).get("thirdId") or "" if not state.lottery_token or not third_id: print(f"获取抽奖token失败:{short_json(lottery_auth)}") return print(f"获取抽奖token完成:{mask(state.lottery_token)}") for i in range(lottery_count): ret = self.activity_post_full( "https://act.tmlyun.com/activity-api/lottery/h5/activity/lottery/userActivityLottery", {"activityId": third_id, "clientId": state.device_uuid}, state.lottery_token, account.x_token, ) prize = ((ret.get("data") or {}).get("prizeName") or ret.get("message") or ret.get("msg") or "未知") print(f"抽奖{i+1}/{lottery_count}获得:{prize}") state.summary_lines.append(f"用户:{account.phone} 抽奖获得:{prize}") self.wallet_flow(account, state, third_id) def wallet_flow(self, account: Account, state: ClientState, activity_id: str) -> None: print("获取walletId") prize_record = self.activity_get_full( f"https://act.tmlyun.com/activity-api/lottery/h5/activity/lottery/accountPrizeRecord/userPrizeRecord?activityId={activity_id}", state.lottery_token, account.x_token, ) state.wallet_u = extract_first_u(prize_record) if not state.wallet_u: print("获取walletId失败,跳过钱包/提现") return print(f"获取walletId完成:{mask(state.wallet_u)}") print("获取钱包token") wallet_auth = self.activity_post_full( "https://my.tmlyun.com/equity-api/user/auth/userLogin", {"u": state.wallet_u, "accountId": state.account_id, "sessionId": state.session_id}, "", account.x_token, ) state.wallet_token = (wallet_auth.get("data") or {}).get("token") or "" if not state.wallet_token: print(f"获取钱包token失败:{short_json(wallet_auth)}") return print(f"获取钱包token完成:{mask(state.wallet_token)}") funds = self.wallet_get(state, "/redBag/getFundsDetail?fundsChannelType=0") if not ((funds.get("data") or {}).get("account")) and account.alipay_name and account.alipay_account: bind = self.wallet_get(state, f"/redBag/saveAliPayAccount?userName={quote_plus(account.alipay_name)}&account={quote_plus(account.alipay_account)}") print(f"绑定支付宝:{bind.get('message') or bind.get('msg') or short_json(bind, 120)}") wallet = self.wallet_get(state, f"/redBag/getWalletInfo?device={state.device_uuid}") data = wallet.get("data") or [] balance = 0.0 if isinstance(data, list) and data: try: balance = float(data[0].get("aliPayTotalPrice") or 0) except Exception: balance = 0.0 print(f"钱包余额:{balance}") if balance > 0 and ENABLE_WITHDRAW: ret = self.wallet_get(state, f"/redBag/createTrans?price={balance}&fundsChannelType=0&yToken=PYuUbXTdYvNAd1BVVQaXR3dmgO3dY9bt&deviceId={state.device_uuid}") msg = ret.get("message") or ret.get("msg") or short_json(ret, 120) print(f"提现:{msg}") state.summary_lines.append(f"用户:{account.phone} 提现:{msg}") elif balance > 0: print("提现:未开启自动提现(如需开启:AIXIANJU_ENABLE_WITHDRAW=true)") def run_account(self, account: Account) -> List[str]: app_ua, common_ua, device_uuid = make_ua() state = ClientState(app_ua=app_ua, common_ua=common_ua, device_uuid=device_uuid) print("随机生成UA") print(app_ua) print(common_ua) print(f"用户:{account.phone}开始任务") if not self.login(account, state): return state.summary_lines self.do_read_tasks_and_lottery(account, state) return state.summary_lines def parse_accounts(raw: str) -> List[Account]: accounts: List[Account] = [] # 多账号按行分隔:青龙环境变量里一行一个账号。 # 不再按空格拆分,避免支付宝姓名/账号字段或备注里带空格时被误拆。 for item in raw.strip().splitlines(): item = item.strip() if not item: continue parts = item.split("&") if len(parts) < 3: print(f"账号格式错误,已跳过:{item}") continue accounts.append(Account( phone=parts[0].strip(), password=parts[1].strip(), x_token=parts[2].strip(), alipay_name=parts[3].strip() if len(parts) > 3 else "", alipay_account=parts[4].strip() if len(parts) > 4 else (parts[0].strip()), )) return accounts def main() -> int: print(f"🔔{NAME}, 开始!") start = time.time() raw = os.getenv("AiXianJu", "").strip() if not raw: print("先填写账号密码:export AiXianJu=$'账号&密码&X-TOKEN&支付宝姓名&支付宝账号\\n账号2&密码2&X-TOKEN&支付宝姓名&支付宝账号'") return 1 accounts = parse_accounts(raw) if not accounts: print("未解析到有效账号") return 1 print(f"共解析账号:{len(accounts)}") print(f"自动提现:{'开启' if ENABLE_WITHDRAW else '关闭'}") all_summary: List[str] = [] runner = AiXianJuRunner() for idx, account in enumerate(accounts, 1): print(f"\n========== 账号 {idx}/{len(accounts)}:{account.phone} =========="); try: all_summary.extend(runner.run_account(account)) except Exception as e: print(f"账号 {account.phone} 运行异常:{type(e).__name__}: {e}") if DEBUG: import traceback traceback.print_exc() if idx < len(accounts): random_delay(ACCOUNT_DELAY_MIN, ACCOUNT_DELAY_MAX, "多账号间隔") if all_summary: print("\n========== 汇总 =========="); for line in all_summary: print(line) print(f"\n🔔{NAME}, 结束! 🕛 {time.time() - start:.3f} 秒") return 0 if __name__ == "__main__": raise SystemExit(main())