Files
Yangmao_Script/Cash_Based/JiaShan.py
2026-06-06 00:53:18 +08:00

391 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
in嘉善 JiaShan 明文 Python 版
cron: 31 9,19 * * *
环境变量:
JiaShan="账号1&密码1\n账号2&密码2"
多账号说明:每个账号一行,账号之间用换行分隔,不用空格分隔。
可选环境变量:
JIASHAN_DEBUG="false" # true 打印请求 URL/响应摘要
JIASHAN_TIMEOUT="20"
JIASHAN_TASK_DELAY_MIN="5" # 每个任务/业务接口后随机延迟下限,单位秒
JIASHAN_TASK_DELAY_MAX="10" # 每个任务/业务接口后随机延迟上限,单位秒
JIASHAN_ACCOUNT_DELAY_MIN="8" # 多账号之间随机延迟下限,单位秒
JIASHAN_ACCOUNT_DELAY_MAX="15" # 多账号之间随机延迟上限,单位秒
处理方式:
- 明文 Python无混淆。
- 移除原作者广告/通知/作者接口 token/用户校验。
- 多账号仅按换行分隔。
- 任务随机延迟 5-10 秒,多账号随机延迟 8-15 秒。
"""
from __future__ import annotations
import json
import os
import random
import re
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from urllib.parse import unquote
try:
import requests
except ImportError:
print("缺少依赖 requests请先安装pip3 install requests")
raise
NAME = "in嘉善"
BASE = "https://api.app.injs.jsxww.cn"
DEBUG = os.getenv("JIASHAN_DEBUG", "false").lower() in {"1", "true", "yes"}
TIMEOUT = int(os.getenv("JIASHAN_TIMEOUT", "20") or "20")
TASK_DELAY_MIN = float(os.getenv("JIASHAN_TASK_DELAY_MIN", "5") or "5")
TASK_DELAY_MAX = float(os.getenv("JIASHAN_TASK_DELAY_MAX", "10") or "10")
ACCOUNT_DELAY_MIN = float(os.getenv("JIASHAN_ACCOUNT_DELAY_MIN", "8") or "8")
ACCOUNT_DELAY_MAX = float(os.getenv("JIASHAN_ACCOUNT_DELAY_MAX", "15") or "15")
# 原 JS 追踪到的首页动态组件文章列表入口
ARTICLE_LIST_PATH = "/app/layout/dynamic/component/data?layoutId=7853114638635438077&layoutDatasourceId=7853114638635438096&pageNo=1&pageSize=20"
MOBILE_UA = (
"Mozilla/5.0 (Linux; Android 11; 21091116AC Build/RP1A.200720.011; wv) "
"AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/94.0.4606.85 "
"Mobile Safari/537.36;injiashan;injiashan;native_app"
)
def mask(s: Any, keep: int = 4) -> str:
s = "" if s is None else str(s)
if len(s) <= keep * 2:
return "*" * len(s)
return s[:keep] + "***" + s[-keep:]
def short_json(obj: Any, limit: int = 500) -> str:
try:
text = json.dumps(obj, ensure_ascii=False, separators=(",", ":"))
except Exception:
text = str(obj)
text = re.sub(
r'("(?:token|Authorization|password|accessToken|access_token|sessionId|session_id)"\s*:\s*")([^"\\]{6,})(")',
lambda m: m.group(1) + mask(m.group(2)) + m.group(3),
text,
flags=re.I,
)
return text if len(text) <= limit else text[:limit] + "..."
def gen_client_id() -> str:
# 原 JS 日志显示为 32 位 hex clientId
return uuid.uuid4().hex
def random_delay(min_seconds: float, max_seconds: float, label: str = "") -> None:
if max_seconds <= 0:
return
if min_seconds < 0:
min_seconds = 0
if max_seconds < min_seconds:
max_seconds = min_seconds
seconds = random.uniform(min_seconds, max_seconds)
if DEBUG and label:
print(f"[DEBUG] 等待{label}{seconds:.2f}s")
time.sleep(seconds)
def js_wait(label: str = "") -> None:
random_delay(TASK_DELAY_MIN, TASK_DELAY_MAX, label)
def extract_first_q(obj: Any) -> str:
m = re.search(r'(?<=q=)[^&",]+', json.dumps(obj, ensure_ascii=False))
return unquote(m.group(0)) if m else ""
def walk_values(obj: Any):
if isinstance(obj, dict):
for v in obj.values():
yield v
yield from walk_values(v)
elif isinstance(obj, list):
for v in obj:
yield v
yield from walk_values(v)
@dataclass
class Account:
phone: str
password: str
@dataclass
class ClientState:
client_id: str
token: str = ""
user_id: str = ""
read_q: str = ""
read_token: str = ""
lottery_q: str = ""
lottery_token: str = ""
third_id: str = ""
summary_lines: List[str] = field(default_factory=list)
class JiaShanRunner:
def __init__(self) -> None:
self.sess = requests.Session()
def headers(self, state: Optional[ClientState] = None, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]:
h = {
"Connection": "Keep-Alive",
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json;charset=UTF-8",
"User-Agent": MOBILE_UA,
"Accept-Encoding": "gzip",
}
if state:
h["clientId"] = state.client_id
h["X-CLIENT-ID"] = state.client_id
if state.token:
h["Authorization"] = state.token
h["token"] = state.token
h["X-Token"] = state.token
if extra:
h.update(extra)
return h
def request_json(self, method: str, url: str, *, state: Optional[ClientState] = None,
headers: Optional[Dict[str, str]] = None, data: Any = None,
json_body: Any = None, desc: str = "", wait: bool = True) -> Dict[str, Any]:
req_headers = self.headers(state)
if headers:
req_headers.update(headers)
if DEBUG:
print(f"[DEBUG] {method} {url}")
if data is not None:
print(f"[DEBUG] body={mask(data) if isinstance(data, str) and len(data) > 80 else data}")
if json_body is not None:
print(f"[DEBUG] json={short_json(json_body)}")
resp = self.sess.request(method, url, headers=req_headers, data=data, json=json_body, timeout=TIMEOUT)
text = resp.text
try:
obj = resp.json()
except Exception:
obj = {"code": resp.status_code, "message": text}
if DEBUG:
print(f"[DEBUG] status={resp.status_code} resp={short_json(obj)}")
if wait:
js_wait(desc or url)
return obj
def api_get(self, state: ClientState, path: str) -> Dict[str, Any]:
url = path if path.startswith("http") else BASE + path
return self.request_json("GET", url, state=state, desc=path)
def api_post(self, state: ClientState, path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
url = path if path.startswith("http") else BASE + path
return self.request_json("POST", url, state=state, json_body=payload, desc=path)
def login(self, account: Account, state: ClientState) -> bool:
print("登录")
ret = self.request_json(
"POST",
BASE + "/login",
state=state,
json_body={"username": account.phone, "password": account.password},
desc="登录",
)
data = ret.get("data") or {}
token = (
data.get("token") or data.get("accessToken") or data.get("access_token") or data.get("jwt")
or ret.get("token") or ret.get("accessToken") or ret.get("access_token") or ""
)
user = data.get("user") or {}
user_id = str(data.get("userId") or data.get("uid") or data.get("id") or user.get("id") or user.get("userId") or user.get("uid") or "")
if not token:
print(f"登录失败:{ret.get('message') or ret.get('msg') or short_json(ret)}")
return False
state.token = str(token)
state.user_id = user_id
print(f"登录成功token={mask(state.token)}" + (f" user_id={mask(user_id)}" if user_id else ""))
return True
def pick_article_items(self, obj: Dict[str, Any]) -> List[Dict[str, Any]]:
candidates = []
data = obj.get("data")
for val in [data, *list(walk_values(data))]:
if isinstance(val, list):
dicts = [x for x in val if isinstance(x, dict)]
if dicts and any(("id" in x or "articleId" in x or "contentId" in x) for x in dicts):
candidates.append(dicts)
return candidates[0] if candidates else []
def article_id_of(self, article: Dict[str, Any]) -> str:
return str(article.get("id") or article.get("articleId") or article.get("contentId") or article.get("resourceId") or "")
def do_read_tasks_and_lottery(self, account: Account, state: ClientState) -> None:
print("————————————")
print("阅读抽奖")
print("获取id")
article_list = self.api_get(state, ARTICLE_LIST_PATH)
state.read_q = extract_first_q(article_list)
articles = self.pick_article_items(article_list)
if state.read_q:
print(f"获取id完成{mask(state.read_q)}")
elif articles:
print(f"获取文章列表完成:{len(articles)}")
else:
print("获取id失败")
return
# in嘉善原脚本主要是阅读抽奖。若活动接口与天目云一致则按同类接口尝试失败不影响阅读。
if state.read_q:
print("获取阅读token")
auth = self.request_json(
"POST",
"https://act.tmlyun.com/activity-api/task/h5/auth/userLogin",
state=state,
json_body={"q": state.read_q, "accountId": state.user_id, "sessionId": state.token, "tenantCode": "in_jiashan"},
headers={"Authorization": state.read_token, "X-Requested-With": "com.jsxww.injiashan"},
desc="获取阅读token",
)
state.read_token = (auth.get("data") or {}).get("token") or ""
if state.read_token:
print(f"获取阅读token完成{mask(state.read_token)}")
# 阅读文章:不同版本接口字段可能不同,这里按常见阅读上报接口做兼容尝试。
for i, article in enumerate(articles, 1):
aid = self.article_id_of(article)
if not aid:
continue
ret = self.api_get(state, f"/app/article/read_time?channel_article_id={aid}&is_end=1&read_time=1617")
print(f"阅读{i}/{len(articles)}{ret.get('message') or ret.get('msg') or short_json(ret, 120)}")
if not state.read_token:
print("未获取到活动token跳过抽奖")
return
lottery_info = self.request_json(
"GET",
"https://act.tmlyun.com/activity-api/task/h5/activity/getLotteryInfo",
state=state,
headers={"Authorization": state.read_token, "X-Requested-With": "com.jsxww.injiashan"},
desc="获取抽奖次数",
)
try:
lottery_count = int((lottery_info.get("data") or {}).get("lotteryCount") or 0)
except Exception:
lottery_count = 0
print(f"拥有{lottery_count}次抽奖")
if lottery_count <= 0:
return
print("获取抽奖token")
activity_info = self.request_json(
"GET",
"https://act.tmlyun.com/activity-api/task/h5/activity/getActivityInfo",
state=state,
headers={"Authorization": state.read_token, "X-Requested-With": "com.jsxww.injiashan"},
desc="获取活动信息",
)
state.lottery_q = extract_first_q(activity_info) or state.read_q
lottery_auth = self.request_json(
"POST",
"https://act.tmlyun.com/activity-api/lottery/api/auth/userLogin",
state=state,
json_body={"q": state.lottery_q, "accountId": state.user_id, "sessionId": state.token, "tenantCode": "in_jiashan"},
headers={"Authorization": state.lottery_token, "X-Requested-With": "com.jsxww.injiashan"},
desc="获取抽奖token",
)
state.lottery_token = (lottery_auth.get("data") or {}).get("token") or ""
state.third_id = (lottery_auth.get("data") or {}).get("thirdId") or ""
if not state.lottery_token or not state.third_id:
print(f"获取抽奖token失败{short_json(lottery_auth)}")
return
print(f"获取抽奖token完成{mask(state.lottery_token)}")
for i in range(lottery_count):
ret = self.request_json(
"POST",
"https://act.tmlyun.com/activity-api/lottery/h5/activity/lottery/userActivityLottery",
state=state,
json_body={"activityId": state.third_id, "clientId": state.client_id},
headers={"Authorization": state.lottery_token, "X-Requested-With": "com.jsxww.injiashan"},
desc="抽奖",
)
prize = ((ret.get("data") or {}).get("prizeName") or ret.get("message") or ret.get("msg") or "未知")
print(f"抽奖{i+1}/{lottery_count}获得:{prize}")
state.summary_lines.append(f"用户:{account.phone} 抽奖获得:{prize}")
def run_account(self, account: Account) -> List[str]:
state = ClientState(client_id=gen_client_id())
print("随机生成clientId")
print(state.client_id)
print(f"用户:{account.phone}开始任务")
if not self.login(account, state):
return state.summary_lines
self.do_read_tasks_and_lottery(account, state)
return state.summary_lines
def parse_accounts(raw: str) -> List[Account]:
accounts: List[Account] = []
for item in raw.strip().splitlines():
item = item.strip()
if not item:
continue
parts = item.split("&")
if len(parts) < 2:
print(f"账号格式错误,已跳过:{item}")
continue
accounts.append(Account(phone=parts[0].strip(), password=parts[1].strip()))
return accounts
def main() -> int:
print(f"🔔{NAME}, 开始!")
start = time.time()
raw = os.getenv("JiaShan", "").strip()
if not raw:
print("先填写账号密码export JiaShan=$'账号&密码\\n账号2&密码2'")
return 1
accounts = parse_accounts(raw)
if not accounts:
print("未解析到有效账号")
return 1
print(f"共解析账号:{len(accounts)}")
all_summary: List[str] = []
runner = JiaShanRunner()
for idx, account in enumerate(accounts, 1):
print(f"\n========== 账号 {idx}/{len(accounts)}{account.phone} ==========")
try:
all_summary.extend(runner.run_account(account))
except Exception as e:
print(f"账号 {account.phone} 运行异常:{type(e).__name__}: {e}")
if DEBUG:
import traceback
traceback.print_exc()
if idx < len(accounts):
random_delay(ACCOUNT_DELAY_MIN, ACCOUNT_DELAY_MAX, "多账号间隔")
if all_summary:
print("\n========== 汇总 ==========")
for line in all_summary:
print(line)
print(f"\n🔔{NAME}, 结束! 🕛 {time.time() - start:.3f}")
return 0
if __name__ == "__main__":
raise SystemExit(main())