#!/usr/bin/env python3 # -*- coding: utf-8 -*- # cron: 31 9,19 * * * # const $ = new Env('in嘉善阅读'); """ in嘉善 JiaShan 明文 Python 版 环境变量: JiaShan="账号1&密码1\n账号2&密码2" 多账号说明:每个账号一行,账号之间用换行分隔,不用空格分隔。 可选环境变量: JIASHAN_DEBUG="false" # true 打印请求 URL/响应摘要 JIASHAN_TIMEOUT="20" JIASHAN_TASK_DELAY_MIN="5" # 每个任务/业务接口后随机延迟下限,单位秒 JIASHAN_TASK_DELAY_MAX="10" # 每个任务/业务接口后随机延迟上限,单位秒 JIASHAN_ACCOUNT_DELAY_MIN="8" # 多账号之间随机延迟下限,单位秒 JIASHAN_ACCOUNT_DELAY_MAX="15" # 多账号之间随机延迟上限,单位秒 处理方式: - 明文 Python,无混淆。 - 移除原作者广告/通知/作者接口 token/用户校验。 - 多账号仅按换行分隔。 - 任务随机延迟 5-10 秒,多账号随机延迟 8-15 秒。 """ from __future__ import annotations import json import os import random import re import time import uuid import hashlib from dataclasses import dataclass, field from datetime import datetime, timezone, timedelta from typing import Any, Dict, List, Optional from urllib.parse import unquote, quote_plus try: import requests except ImportError: print("缺少依赖 requests,请先安装:pip3 install requests") raise NAME = "in嘉善" BASE = "https://api.app.injs.jsxww.cn" YAPI_BASE = "https://yapi.y-h5.iyunxh.com/api" OAPI_BASE = "https://oapi.injs.jsxww.cn" DEBUG = os.getenv("JIASHAN_DEBUG", "false").lower() in {"1", "true", "yes"} TIMEOUT = int(os.getenv("JIASHAN_TIMEOUT", "20") or "20") TASK_DELAY_MIN = float(os.getenv("JIASHAN_TASK_DELAY_MIN", "5") or "5") TASK_DELAY_MAX = float(os.getenv("JIASHAN_TASK_DELAY_MAX", "10") or "10") ACCOUNT_DELAY_MIN = float(os.getenv("JIASHAN_ACCOUNT_DELAY_MIN", "8") or "8") ACCOUNT_DELAY_MAX = float(os.getenv("JIASHAN_ACCOUNT_DELAY_MAX", "15") or "15") CN_TZ = timezone(timedelta(hours=8)) # 原 JS 追踪到的阅读活动入口。这个接口返回的不是普通新闻列表,而是活动页 q/id。 ARTICLE_LIST_PATH = "/app/layout/dynamic/component/data?layoutId=7853114638635438077&layoutDatasourceId=7853114638635438096&pageNo=1&pageSize=20" READ_ACTIVITY_ID = os.getenv("JIASHAN_READ_ACTIVITY_ID", "11106624") TENANT_CODE = os.getenv("JIASHAN_TENANT_CODE", "in_jiashan") MOBILE_UA = ( "Mozilla/5.0 (Linux; Android 11; 21091116AC Build/RP1A.200720.011; wv) " "AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/94.0.4606.85 " "Mobile Safari/537.36;injiashan;injiashan;native_app" ) def now_cn() -> datetime: return datetime.now(CN_TZ) def cn_month_day() -> str: now = now_cn() return f"{now.month}月{now.day}日" def mask(s: Any, keep: int = 4) -> str: s = "" if s is None else str(s) if len(s) <= keep * 2: return "*" * len(s) return s[:keep] + "***" + s[-keep:] def short_json(obj: Any, limit: int = 500) -> str: try: text = json.dumps(obj, ensure_ascii=False, separators=(",", ":")) except Exception: text = str(obj) text = re.sub( r'("(?:token|Authorization|password|accessToken|access_token|sessionId|session_id)"\s*:\s*")([^"\\]{6,})(")', lambda m: m.group(1) + mask(m.group(2)) + m.group(3), text, flags=re.I, ) return text if len(text) <= limit else text[:limit] + "..." def gen_client_id() -> str: # 原 JS 日志显示为 32 位 hex clientId return uuid.uuid4().hex def random_delay(min_seconds: float, max_seconds: float, label: str = "") -> None: if max_seconds <= 0: return if min_seconds < 0: min_seconds = 0 if max_seconds < min_seconds: max_seconds = min_seconds seconds = random.uniform(min_seconds, max_seconds) if DEBUG and label: print(f"[DEBUG] 等待{label}:{seconds:.2f}s") time.sleep(seconds) def js_wait(label: str = "") -> None: random_delay(TASK_DELAY_MIN, TASK_DELAY_MAX, label) def extract_first_q(obj: Any) -> str: text = json.dumps(obj, ensure_ascii=False) # 原 JS 需要的是活动链接里的 q 参数;不能随便取 layoutDatasourceId/componentId, # 否则 activity-api 会报“链接解密失败”。 for pat in [r'(?<=q=)[^&",\\]+', r'(?<=[?&]q%3D)[^%&",\\]+']: m = re.search(pat, text) if m: return unquote(m.group(0)) # 只有明确的 q 字段才当活动 q 使用。 def walk(obj2: Any): if isinstance(obj2, dict): if obj2.get("q"): yield obj2.get("q") for key in ("url", "jumpUrl", "linkUrl", "h5Url", "targetUrl", "externalUrl", "activityUrl"): val = obj2.get(key) if isinstance(val, str): mm = re.search(r'(?<=q=)[^&",\\]+', val) if mm: yield unquote(mm.group(0)) for v in obj2.values(): yield from walk(v) elif isinstance(obj2, list): for v in obj2: yield from walk(v) for val in walk(obj): val = str(val).strip() if val: return unquote(val) return "" def random_string(length: int = 32, prefix_u: bool = True, radix: Optional[int] = None) -> str: chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" if radix is None: radix = len(chars) out = "".join(chars[int(random.random() * radix)] for _ in range(length)) return ("u" + out[1:]) if prefix_u and out else out def urlencode_form(obj: Dict[str, Any]) -> str: return "&".join(f"{k}={quote_plus(str(v))}" for k, v in obj.items()) def yapi_signature() -> str: ts = int(time.time() * 1000) nonce = random_string(32, prefix_u=False) raw = f"jsxww{nonce}{ts}3a82b6ac78145c2a6c4ff1f7d3dced1b" return f"jsxww;{nonce};{ts};{hashlib.md5(raw.encode()).hexdigest()}" def walk_values(obj: Any): if isinstance(obj, dict): for v in obj.values(): yield v yield from walk_values(v) elif isinstance(obj, list): for v in obj: yield v yield from walk_values(v) @dataclass class Account: phone: str password: str @dataclass class ClientState: client_id: str token: str = "" user_id: str = "" read_q: str = "" read_token: str = "" yapi_user_id: str = "0" api_dt: str = "" lottery_q: str = "" lottery_token: str = "" third_id: str = "" summary_lines: List[str] = field(default_factory=list) class JiaShanRunner: def __init__(self) -> None: self.sess = requests.Session() def headers(self, state: Optional[ClientState] = None, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]: h = { "Connection": "Keep-Alive", "Accept": "application/json, text/plain, */*", "Content-Type": "application/json;charset=UTF-8", "User-Agent": MOBILE_UA, "Accept-Encoding": "gzip", } if state: h["clientId"] = state.client_id h["X-CLIENT-ID"] = state.client_id if state.token: h["Authorization"] = state.token h["token"] = state.token h["X-Token"] = state.token if extra: h.update(extra) return h def request_json(self, method: str, url: str, *, state: Optional[ClientState] = None, headers: Optional[Dict[str, str]] = None, data: Any = None, json_body: Any = None, desc: str = "", wait: bool = True) -> Dict[str, Any]: req_headers = self.headers(state) if headers: req_headers.update(headers) if DEBUG: print(f"[DEBUG] {method} {url}") if data is not None: print(f"[DEBUG] body={mask(data) if isinstance(data, str) and len(data) > 80 else data}") if json_body is not None: print(f"[DEBUG] json={short_json(json_body)}") resp = self.sess.request(method, url, headers=req_headers, data=data, json=json_body, timeout=TIMEOUT) text = resp.text try: obj = resp.json() except Exception: obj = {"code": resp.status_code, "message": text} if DEBUG: print(f"[DEBUG] status={resp.status_code} resp={short_json(obj)}") if wait: js_wait(desc or url) return obj def api_get(self, state: ClientState, path: str) -> Dict[str, Any]: url = path if path.startswith("http") else BASE + path return self.request_json("GET", url, state=state, desc=path) def api_post(self, state: ClientState, path: str, payload: Dict[str, Any]) -> Dict[str, Any]: url = path if path.startswith("http") else BASE + path return self.request_json("POST", url, state=state, json_body=payload, desc=path) def login(self, account: Account, state: ClientState) -> Optional[Dict[str, Any]]: print("登录") ret = self.request_json( "POST", BASE + "/login", state=state, json_body={"username": account.phone, "password": account.password}, desc="登录", ) data = ret.get("data") or {} token = ( data.get("token") or data.get("accessToken") or data.get("access_token") or data.get("jwt") or ret.get("token") or ret.get("accessToken") or ret.get("access_token") or "" ) user = data.get("user") or {} user_id = str(data.get("userId") or data.get("uid") or data.get("id") or user.get("id") or user.get("userId") or user.get("uid") or "") if not token: print(f"登录失败:{ret.get('message') or ret.get('msg') or short_json(ret)}") return None state.token = str(token) state.user_id = user_id print(f"登录成功:token={mask(state.token)}" + (f" user_id={mask(user_id)}" if user_id else "")) return data def yapi_headers(self, state: ClientState, authed: bool = True) -> Dict[str, str]: h = { "Connection": "Keep-Alive", "Access-T-Id-In": "49", "Access-Wxclient-Type": "wx_app", "User-Agent": "injs;android:11;version:1.1.12;clientid:" + state.client_id + ";" + MOBILE_UA, "Access-Api-Unique-Token": "1", "Access-Api-Dt": state.api_dt or str(int(time.time() * 1000)), "Access-T-Id": "49", "Accept": "*/*", "Origin": "https://jsxww.y-h5.iyunxh.com", "X-Requested-With": "info.ltit.www.cloudjiashan", "Referer": "https://jsxww.y-h5.iyunxh.com/", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7", } if authed: h["Access-User-Id"] = state.yapi_user_id or "0" h["Access-Api-Signature"] = yapi_signature() h["Access-Token"] = state.read_token return h def yapi_get(self, state: ClientState, path: str, *, authed: bool = True, wait: bool = True) -> Dict[str, Any]: return self.request_json("GET", YAPI_BASE + path, headers=self.yapi_headers(state, authed), desc=path, wait=wait) def yapi_post(self, state: ClientState, path: str, payload: Dict[str, Any], *, wait: bool = True) -> Dict[str, Any]: headers = self.yapi_headers(state, True) headers["Content-Type"] = "application/json" return self.request_json("POST", YAPI_BASE + path, headers=headers, json_body=payload, desc=path, wait=wait) def oapi_get(self, state: ClientState, path: str, *, wait: bool = True) -> Dict[str, Any]: headers = { "Connection": "Keep-Alive", "ClientId": state.client_id, "Authorization": state.token, "User-Agent": "injs;android:11;version:1.1.12;clientid:" + state.client_id, "Accept-Encoding": "gzip", } return self.request_json("GET", OAPI_BASE + path, headers=headers, desc=path, wait=wait) def app_user_init(self, account: Account, state: ClientState, app_user_token: str, login_data: Dict[str, Any]) -> bool: userinfo = login_data.get("userinfo") or login_data.get("userInfo") or login_data.get("user") or {} user_id = userinfo.get("id") or state.user_id nickname = userinfo.get("nickname") or userinfo.get("name") or userinfo.get("username") or f"用户{account.phone[-6:]}" avatar = userinfo.get("avatarUrl") or userinfo.get("avatar_url") or "" payload = { "app_user_token": app_user_token, "appid": "jsxww", "noncestr": random_string(6, prefix_u=False), "phone": account.phone, "portrait_url": ("https://oss.injs.jsxww.cn" + avatar) if avatar and str(avatar).startswith("/") else str(avatar), "timestamp": str(int(time.time())), "user_id": user_id, "user_name": nickname, "wx_openid": "", "wx_unionid": "", } payload["signature"] = hashlib.md5((urlencode_form(payload) + "&appkey=0c3eafb13e9f1ac110a432798b021862").encode()).hexdigest() ret = self.yapi_post(state, "/aosbase/_auth_appuserinit", payload) data = ret.get("data") or {} state.read_token = data.get("access_token") or data.get("token") or "" state.yapi_user_id = str((data.get("data") or {}).get("user_id") or data.get("user_id") or "0") if not state.read_token: print(f"阅读登录失败:{short_json(ret)}") return False print(f"阅读token:{state.read_token}") return True def do_read_tasks_and_lottery(self, account: Account, state: ClientState, login_data: Dict[str, Any]) -> None: print("————————————") print("阅读抽奖") print("获取id") article_list = self.api_get(state, ARTICLE_LIST_PATH) state.read_q = extract_first_q(article_list) if not state.read_q: # 历史原 JS 日志中此活动 q 为 11106624;仅在页面未暴露 q 时兜底使用。 state.read_q = READ_ACTIVITY_ID print(state.read_q) print("获取apiDt") auth_dt = self.yapi_get(state, "/aosbase/_auth_dt", authed=False) state.api_dt = str(auth_dt.get("data") or "")[32:68] print(state.api_dt) if not state.api_dt: print(f"获取apiDt失败:{short_json(auth_dt)}") return print("阅读登录") access_ret = self.yapi_get(state, "/admin/_service_custom_jsxww_getaccesstoken?access_t_id=1&access_t_id_in=1", authed=False) access_token = (access_ret.get("data") or "") open_ret = self.oapi_get(state, f"/auth/openid?access_token={access_token}&app_token={state.token}") open_data = open_ret.get("data") or {} app_user_token = f"{open_data.get('openid')}.{open_data.get('ticket')}" if open_data.get("openid") and open_data.get("ticket") else "" if not app_user_token or not self.app_user_init(account, state, app_user_token, login_data): print("未获取到活动token,跳过抽奖") return today_md = cn_month_day() pass_list_ret = self.yapi_get(state, f"/aoslearnfoot/_optionp_list?activity_id={state.read_q}") pass_list = pass_list_ret.get("data") or [] if isinstance(pass_list, dict): pass_list = pass_list.get("list") or pass_list.get("records") or [] for item in pass_list if isinstance(pass_list, list) else []: pass_id = item.get("id") title = item.get("title") or item.get("name") or f"{today_md}阅读" print(title) detail = self.yapi_get(state, f"/aoslearnfoot/optionp_detail?id={pass_id}") data = detail.get("data") or {} task_num = int(data.get("task_num") or item.get("task_num") or 0) done_num = int(data.get("user_done_num") or item.get("user_done_num") or 0) if task_num and task_num == done_num: print("已完成") continue print(f"进度:{done_num}/{task_num}") # 需要滑块时,原 JS 会请求 ddddocr。Python 版先不伪造滑块,避免错误提交。 if task_num != done_num: print("未完成任务需要滑块验证,已跳过阅读提交") if not pass_list: print(f"{today_md}阅读") print("未获取到阅读任务") ac_detail = self.yapi_get(state, f"/aoslearnfoot/_ac_detail?id={state.read_q}") lottery_id = "" try: other_set = json.loads((ac_detail.get("data") or {}).get("other_set") or "{}") lottery_id = str(((other_set.get("lottery") or {}).get("id")) or "") except Exception: lottery_id = "" lottery_times = self.yapi_get(state, f"/aoslottery/ac_lottery_times?id={lottery_id}") if lottery_id else {} try: lottery_count = int((lottery_times.get("data") or {}).get("all_remain") or 0) except Exception: lottery_count = 0 print(f"拥有{lottery_count}次抽奖") if lottery_count <= 0: return print("当前账号有抽奖次数,但抽奖接口需要滑块验证,Python版已跳过自动抽奖") return print("获取抽奖token") activity_info = self.request_json( "GET", "https://act.tmlyun.com/activity-api/task/h5/activity/getActivityInfo", state=state, headers={"Authorization": state.read_token, "X-Requested-With": "com.jsxww.injiashan"}, desc="获取活动信息", ) state.lottery_q = extract_first_q(activity_info) or state.read_q lottery_auth = self.request_json( "POST", "https://act.tmlyun.com/activity-api/lottery/api/auth/userLogin", state=state, json_body={"q": state.lottery_q, "accountId": state.user_id, "sessionId": state.token, "tenantCode": TENANT_CODE}, headers={"Authorization": state.lottery_token, "X-Requested-With": "com.jsxww.injiashan"}, desc="获取抽奖token", ) state.lottery_token = (lottery_auth.get("data") or {}).get("token") or "" state.third_id = (lottery_auth.get("data") or {}).get("thirdId") or "" if not state.lottery_token or not state.third_id: print(f"获取抽奖token失败:{short_json(lottery_auth)}") return print(f"获取抽奖token完成:{mask(state.lottery_token)}") for i in range(lottery_count): ret = self.request_json( "POST", "https://act.tmlyun.com/activity-api/lottery/h5/activity/lottery/userActivityLottery", state=state, json_body={"activityId": state.third_id, "clientId": state.client_id}, headers={"Authorization": state.lottery_token, "X-Requested-With": "com.jsxww.injiashan"}, desc="抽奖", ) prize = ((ret.get("data") or {}).get("prizeName") or ret.get("message") or ret.get("msg") or "未知") print(f"抽奖{i+1}/{lottery_count}获得:{prize}") state.summary_lines.append(f"用户:{account.phone} 抽奖获得:{prize}") def run_account(self, account: Account) -> List[str]: state = ClientState(client_id=gen_client_id()) print("随机生成clientId") print(state.client_id) print(f"用户:{account.phone}开始任务") login_data = self.login(account, state) if not login_data: return state.summary_lines self.do_read_tasks_and_lottery(account, state, login_data) return state.summary_lines def parse_accounts(raw: str) -> List[Account]: accounts: List[Account] = [] for item in raw.strip().splitlines(): item = item.strip() if not item: continue parts = item.split("&") if len(parts) < 2: print(f"账号格式错误,已跳过:{item}") continue accounts.append(Account(phone=parts[0].strip(), password=parts[1].strip())) return accounts def main() -> int: print(f"🔔{NAME}, 开始!") start = time.time() raw = os.getenv("JiaShan", "").strip() if not raw: print("先填写账号密码:export JiaShan=$'账号&密码\\n账号2&密码2'") return 1 accounts = parse_accounts(raw) if not accounts: print("未解析到有效账号") return 1 print(f"共解析账号:{len(accounts)}") all_summary: List[str] = [] runner = JiaShanRunner() for idx, account in enumerate(accounts, 1): print(f"\n========== 账号 {idx}/{len(accounts)}:{account.phone} ==========") try: all_summary.extend(runner.run_account(account)) except Exception as e: print(f"账号 {account.phone} 运行异常:{type(e).__name__}: {e}") if DEBUG: import traceback traceback.print_exc() if idx < len(accounts): random_delay(ACCOUNT_DELAY_MIN, ACCOUNT_DELAY_MAX, "多账号间隔") if all_summary: print("\n========== 汇总 ==========") for line in all_summary: print(line) print(f"\n🔔{NAME}, 结束! 🕛 {time.time() - start:.3f} 秒") return 0 if __name__ == "__main__": raise SystemExit(main())