#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ in嘉善 JiaShan 明文 Python 版 # cron: 31 9,19 * * * # const $ = new Env('in嘉善阅读'); cron: 31 9,19 * * * 环境变量: JiaShan="账号1&密码1\n账号2&密码2" 多账号说明:每个账号一行,账号之间用换行分隔,不用空格分隔。 可选环境变量: JIASHAN_DEBUG="false" # true 打印请求 URL/响应摘要 JIASHAN_TIMEOUT="20" JIASHAN_TASK_DELAY_MIN="5" # 每个任务/业务接口后随机延迟下限,单位秒 JIASHAN_TASK_DELAY_MAX="10" # 每个任务/业务接口后随机延迟上限,单位秒 JIASHAN_ACCOUNT_DELAY_MIN="8" # 多账号之间随机延迟下限,单位秒 JIASHAN_ACCOUNT_DELAY_MAX="15" # 多账号之间随机延迟上限,单位秒 处理方式: - 明文 Python,无混淆。 - 移除原作者广告/通知/作者接口 token/用户校验。 - 多账号仅按换行分隔。 - 任务随机延迟 5-10 秒,多账号随机延迟 8-15 秒。 """ from __future__ import annotations import json import os import random import re import time import uuid from dataclasses import dataclass, field from typing import Any, Dict, List, Optional from urllib.parse import unquote try: import requests except ImportError: print("缺少依赖 requests,请先安装:pip3 install requests") raise NAME = "in嘉善" BASE = "https://api.app.injs.jsxww.cn" DEBUG = os.getenv("JIASHAN_DEBUG", "false").lower() in {"1", "true", "yes"} TIMEOUT = int(os.getenv("JIASHAN_TIMEOUT", "20") or "20") TASK_DELAY_MIN = float(os.getenv("JIASHAN_TASK_DELAY_MIN", "5") or "5") TASK_DELAY_MAX = float(os.getenv("JIASHAN_TASK_DELAY_MAX", "10") or "10") ACCOUNT_DELAY_MIN = float(os.getenv("JIASHAN_ACCOUNT_DELAY_MIN", "8") or "8") ACCOUNT_DELAY_MAX = float(os.getenv("JIASHAN_ACCOUNT_DELAY_MAX", "15") or "15") # 原 JS 追踪到的首页动态组件文章列表入口 ARTICLE_LIST_PATH = "/app/layout/dynamic/component/data?layoutId=7853114638635438077&layoutDatasourceId=7853114638635438096&pageNo=1&pageSize=20" MOBILE_UA = ( "Mozilla/5.0 (Linux; Android 11; 21091116AC Build/RP1A.200720.011; wv) " "AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/94.0.4606.85 " "Mobile Safari/537.36;injiashan;injiashan;native_app" ) def mask(s: Any, keep: int = 4) -> str: s = "" if s is None else str(s) if len(s) <= keep * 2: return "*" * len(s) return s[:keep] + "***" + s[-keep:] def short_json(obj: Any, limit: int = 500) -> str: try: text = json.dumps(obj, ensure_ascii=False, separators=(",", ":")) except Exception: text = str(obj) text = re.sub( r'("(?:token|Authorization|password|accessToken|access_token|sessionId|session_id)"\s*:\s*")([^"\\]{6,})(")', lambda m: m.group(1) + mask(m.group(2)) + m.group(3), text, flags=re.I, ) return text if len(text) <= limit else text[:limit] + "..." def gen_client_id() -> str: # 原 JS 日志显示为 32 位 hex clientId return uuid.uuid4().hex def random_delay(min_seconds: float, max_seconds: float, label: str = "") -> None: if max_seconds <= 0: return if min_seconds < 0: min_seconds = 0 if max_seconds < min_seconds: max_seconds = min_seconds seconds = random.uniform(min_seconds, max_seconds) if DEBUG and label: print(f"[DEBUG] 等待{label}:{seconds:.2f}s") time.sleep(seconds) def js_wait(label: str = "") -> None: random_delay(TASK_DELAY_MIN, TASK_DELAY_MAX, label) def extract_first_q(obj: Any) -> str: m = re.search(r'(?<=q=)[^&",]+', json.dumps(obj, ensure_ascii=False)) return unquote(m.group(0)) if m else "" def walk_values(obj: Any): if isinstance(obj, dict): for v in obj.values(): yield v yield from walk_values(v) elif isinstance(obj, list): for v in obj: yield v yield from walk_values(v) @dataclass class Account: phone: str password: str @dataclass class ClientState: client_id: str token: str = "" user_id: str = "" read_q: str = "" read_token: str = "" lottery_q: str = "" lottery_token: str = "" third_id: str = "" summary_lines: List[str] = field(default_factory=list) class JiaShanRunner: def __init__(self) -> None: self.sess = requests.Session() def headers(self, state: Optional[ClientState] = None, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]: h = { "Connection": "Keep-Alive", "Accept": "application/json, text/plain, */*", "Content-Type": "application/json;charset=UTF-8", "User-Agent": MOBILE_UA, "Accept-Encoding": "gzip", } if state: h["clientId"] = state.client_id h["X-CLIENT-ID"] = state.client_id if state.token: h["Authorization"] = state.token h["token"] = state.token h["X-Token"] = state.token if extra: h.update(extra) return h def request_json(self, method: str, url: str, *, state: Optional[ClientState] = None, headers: Optional[Dict[str, str]] = None, data: Any = None, json_body: Any = None, desc: str = "", wait: bool = True) -> Dict[str, Any]: req_headers = self.headers(state) if headers: req_headers.update(headers) if DEBUG: print(f"[DEBUG] {method} {url}") if data is not None: print(f"[DEBUG] body={mask(data) if isinstance(data, str) and len(data) > 80 else data}") if json_body is not None: print(f"[DEBUG] json={short_json(json_body)}") resp = self.sess.request(method, url, headers=req_headers, data=data, json=json_body, timeout=TIMEOUT) text = resp.text try: obj = resp.json() except Exception: obj = {"code": resp.status_code, "message": text} if DEBUG: print(f"[DEBUG] status={resp.status_code} resp={short_json(obj)}") if wait: js_wait(desc or url) return obj def api_get(self, state: ClientState, path: str) -> Dict[str, Any]: url = path if path.startswith("http") else BASE + path return self.request_json("GET", url, state=state, desc=path) def api_post(self, state: ClientState, path: str, payload: Dict[str, Any]) -> Dict[str, Any]: url = path if path.startswith("http") else BASE + path return self.request_json("POST", url, state=state, json_body=payload, desc=path) def login(self, account: Account, state: ClientState) -> bool: print("登录") ret = self.request_json( "POST", BASE + "/login", state=state, json_body={"username": account.phone, "password": account.password}, desc="登录", ) data = ret.get("data") or {} token = ( data.get("token") or data.get("accessToken") or data.get("access_token") or data.get("jwt") or ret.get("token") or ret.get("accessToken") or ret.get("access_token") or "" ) user = data.get("user") or {} user_id = str(data.get("userId") or data.get("uid") or data.get("id") or user.get("id") or user.get("userId") or user.get("uid") or "") if not token: print(f"登录失败:{ret.get('message') or ret.get('msg') or short_json(ret)}") return False state.token = str(token) state.user_id = user_id print(f"登录成功:token={mask(state.token)}" + (f" user_id={mask(user_id)}" if user_id else "")) return True def pick_article_items(self, obj: Dict[str, Any]) -> List[Dict[str, Any]]: candidates = [] data = obj.get("data") for val in [data, *list(walk_values(data))]: if isinstance(val, list): dicts = [x for x in val if isinstance(x, dict)] if dicts and any(("id" in x or "articleId" in x or "contentId" in x) for x in dicts): candidates.append(dicts) return candidates[0] if candidates else [] def article_id_of(self, article: Dict[str, Any]) -> str: return str(article.get("id") or article.get("articleId") or article.get("contentId") or article.get("resourceId") or "") def do_read_tasks_and_lottery(self, account: Account, state: ClientState) -> None: print("————————————") print("阅读抽奖") print("获取id") article_list = self.api_get(state, ARTICLE_LIST_PATH) state.read_q = extract_first_q(article_list) articles = self.pick_article_items(article_list) if state.read_q: print(f"获取id完成:{mask(state.read_q)}") elif articles: print(f"获取文章列表完成:{len(articles)}篇") else: print("获取id失败") return # in嘉善原脚本主要是阅读抽奖。若活动接口与天目云一致,则按同类接口尝试;失败不影响阅读。 if state.read_q: print("获取阅读token") auth = self.request_json( "POST", "https://act.tmlyun.com/activity-api/task/h5/auth/userLogin", state=state, json_body={"q": state.read_q, "accountId": state.user_id, "sessionId": state.token, "tenantCode": "in_jiashan"}, headers={"Authorization": state.read_token, "X-Requested-With": "com.jsxww.injiashan"}, desc="获取阅读token", ) state.read_token = (auth.get("data") or {}).get("token") or "" if state.read_token: print(f"获取阅读token完成:{mask(state.read_token)}") # 阅读文章:不同版本接口字段可能不同,这里按常见阅读上报接口做兼容尝试。 for i, article in enumerate(articles, 1): aid = self.article_id_of(article) if not aid: continue ret = self.api_get(state, f"/app/article/read_time?channel_article_id={aid}&is_end=1&read_time=1617") print(f"阅读{i}/{len(articles)}:{ret.get('message') or ret.get('msg') or short_json(ret, 120)}") if not state.read_token: print("未获取到活动token,跳过抽奖") return lottery_info = self.request_json( "GET", "https://act.tmlyun.com/activity-api/task/h5/activity/getLotteryInfo", state=state, headers={"Authorization": state.read_token, "X-Requested-With": "com.jsxww.injiashan"}, desc="获取抽奖次数", ) try: lottery_count = int((lottery_info.get("data") or {}).get("lotteryCount") or 0) except Exception: lottery_count = 0 print(f"拥有{lottery_count}次抽奖") if lottery_count <= 0: return print("获取抽奖token") activity_info = self.request_json( "GET", "https://act.tmlyun.com/activity-api/task/h5/activity/getActivityInfo", state=state, headers={"Authorization": state.read_token, "X-Requested-With": "com.jsxww.injiashan"}, desc="获取活动信息", ) state.lottery_q = extract_first_q(activity_info) or state.read_q lottery_auth = self.request_json( "POST", "https://act.tmlyun.com/activity-api/lottery/api/auth/userLogin", state=state, json_body={"q": state.lottery_q, "accountId": state.user_id, "sessionId": state.token, "tenantCode": "in_jiashan"}, headers={"Authorization": state.lottery_token, "X-Requested-With": "com.jsxww.injiashan"}, desc="获取抽奖token", ) state.lottery_token = (lottery_auth.get("data") or {}).get("token") or "" state.third_id = (lottery_auth.get("data") or {}).get("thirdId") or "" if not state.lottery_token or not state.third_id: print(f"获取抽奖token失败:{short_json(lottery_auth)}") return print(f"获取抽奖token完成:{mask(state.lottery_token)}") for i in range(lottery_count): ret = self.request_json( "POST", "https://act.tmlyun.com/activity-api/lottery/h5/activity/lottery/userActivityLottery", state=state, json_body={"activityId": state.third_id, "clientId": state.client_id}, headers={"Authorization": state.lottery_token, "X-Requested-With": "com.jsxww.injiashan"}, desc="抽奖", ) prize = ((ret.get("data") or {}).get("prizeName") or ret.get("message") or ret.get("msg") or "未知") print(f"抽奖{i+1}/{lottery_count}获得:{prize}") state.summary_lines.append(f"用户:{account.phone} 抽奖获得:{prize}") def run_account(self, account: Account) -> List[str]: state = ClientState(client_id=gen_client_id()) print("随机生成clientId") print(state.client_id) print(f"用户:{account.phone}开始任务") if not self.login(account, state): return state.summary_lines self.do_read_tasks_and_lottery(account, state) return state.summary_lines def parse_accounts(raw: str) -> List[Account]: accounts: List[Account] = [] for item in raw.strip().splitlines(): item = item.strip() if not item: continue parts = item.split("&") if len(parts) < 2: print(f"账号格式错误,已跳过:{item}") continue accounts.append(Account(phone=parts[0].strip(), password=parts[1].strip())) return accounts def main() -> int: print(f"🔔{NAME}, 开始!") start = time.time() raw = os.getenv("JiaShan", "").strip() if not raw: print("先填写账号密码:export JiaShan=$'账号&密码\\n账号2&密码2'") return 1 accounts = parse_accounts(raw) if not accounts: print("未解析到有效账号") return 1 print(f"共解析账号:{len(accounts)}") all_summary: List[str] = [] runner = JiaShanRunner() for idx, account in enumerate(accounts, 1): print(f"\n========== 账号 {idx}/{len(accounts)}:{account.phone} ==========") try: all_summary.extend(runner.run_account(account)) except Exception as e: print(f"账号 {account.phone} 运行异常:{type(e).__name__}: {e}") if DEBUG: import traceback traceback.print_exc() if idx < len(accounts): random_delay(ACCOUNT_DELAY_MIN, ACCOUNT_DELAY_MAX, "多账号间隔") if all_summary: print("\n========== 汇总 ==========") for line in all_summary: print(line) print(f"\n🔔{NAME}, 结束! 🕛 {time.time() - start:.3f} 秒") return 0 if __name__ == "__main__": raise SystemExit(main())