diff --git a/WX_Applet/Applet_JYHS_LMF.py b/WX_Applet/Applet_JYHS_LMF.py new file mode 100644 index 0000000..8f20df9 --- /dev/null +++ b/WX_Applet/Applet_JYHS_LMF.py @@ -0,0 +1,583 @@ +# cron: 52 7 * * * +# new Env("绿蜜蜂旧衣服回收") +""" +name: 绿蜜蜂旧衣服回收 - 养鸡场自动版 +功能: 从养鸡场逐个读取微信账号,获取小程序 code,登录绿蜜蜂,签到、查余额、满阈值自动提现。 + +外部环境变量: + wx_cloud 养鸡场地址,例如 http://192.168.0.250:666 + wx_token 养鸡场 Authorization,支持裸 token 或 Bearer token + LMF_TEST_WXID 可选,只跑某个 wxid + LMF_EXCLUDE_WXIDS 可选,排除 wxid,支持换行/英文逗号/中文逗号分隔 + +脚本内配置: + ENABLE_WITHDRAW 是否自动提现,默认 True + WITHDRAW_THRESHOLD 提现阈值,默认 3.0 元 + CLOUD_PAGE_SIZE 固定 1,逐个账号读取并执行 + REQUEST_DELAY_MIN/MAX 单账号内请求间隔 + ACCOUNT_DELAY_MIN/MAX 多账号之间间隔 + DEBUG 调试日志,默认 False + +""" + +import json +import logging +import os +import random +import re +import sys +import time +import traceback +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Generator, List, Optional, Tuple + +try: + import requests + requests.packages.urllib3.disable_warnings() +except ImportError: + print("缺少 requests,请先安装:pip install requests") + sys.exit(1) + +# ===================== 固定配置 ===================== +APPID = "wx6fcde446296d9588" +BASE = "https://lmf.lvmifo.com" +ACCESS_TOKEN_URL = f"{BASE}/api/5a60c77b79875" +LOGIN_URL = f"{BASE}/api/5e05692405c63" +TASK_URL = f"{BASE}/api/5dca57afa379e" +CASH_URL = f"{BASE}/api/5e12a7e1848ba" +REFERER = f"https://servicewechat.com/{APPID}/284/page-frame.html" +SCRIPT_DIR = Path(__file__).resolve().parent +CACHE_DIR = SCRIPT_DIR / "绿蜜蜂缓存" +TOKEN_CACHE_FILE = CACHE_DIR / "lmf_token_cache.json" +UA_FILE = SCRIPT_DIR / "User_Agent.json" + +# 只读取这 4 个外部变量 +ENV = os.environ +WX_CLOUD = ENV.get("wx_cloud", "").rstrip("/") +WX_TOKEN = ENV.get("wx_token", "").strip() +if WX_TOKEN and not WX_TOKEN.lower().startswith("bearer "): + WX_TOKEN = f"Bearer {WX_TOKEN}" +SINGLE_TEST_WXID = ENV.get("LMF_TEST_WXID", "").strip() +EXCLUDE_WXIDS_RAW = ENV.get("LMF_EXCLUDE_WXIDS", "") +EXCLUDE_WXIDS = [x.strip() for x in re.split(r"[\n,,]+", EXCLUDE_WXIDS_RAW) if x.strip()] + +# 脚本内配置 +DEBUG = False +VERIFY_SSL = False +TIMEOUT = 20.0 +CLOUD_PAGE_SIZE = 1 +REQUEST_DELAY_MIN = 3 +REQUEST_DELAY_MAX = 5 +ACCOUNT_DELAY_MIN = 20 +ACCOUNT_DELAY_MAX = 40 +ENABLE_WITHDRAW = True +WITHDRAW_THRESHOLD = 3.0 +RUN_START_TS = time.time() + +DEFAULT_UA_LIST = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090c33) XWEB/14185", +] + +ACCESS_TOKEN_PAYLOAD = { + "app_id": "75762944", + "device_id": "17518763397639", + "rand_str": "lv_mi_feng_uni_app", + "timestamp": "1751876339", + "signature": "e2e80ab21122603c622deb6dd7a89ec6", +} + + +@dataclass +class Account: + remark: str + wxid: str + ua: str = "" + + +@dataclass +class AccountResult: + remark: str + wxid: str + success: bool = False + login: str = "未执行" + sign: str = "未执行" + nick_name: str = "" + amount: float = 0.0 + withdraw: str = "未执行" + message: str = "" + + +# ===================== 基础工具 ===================== +def setup_logging() -> None: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s\t- %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[logging.StreamHandler()], + ) + + +def mask(value: Any, keep_start: int = 6, keep_end: int = 4) -> str: + s = str(value or "") + if not s: + return "" + if len(s) <= keep_start + keep_end: + return "***" + return s[:keep_start] + "***" + s[-keep_end:] + + +def debug(title: str, data: Any) -> None: + if not DEBUG: + return + try: + logging.info("[调试] %s: %s", title, json.dumps(data, ensure_ascii=False, indent=2)) + except Exception: + logging.info("[调试] %s: %s", title, data) + + +def safe_json(resp: requests.Response) -> Optional[Dict[str, Any]]: + try: + return resp.json() + except Exception: + return None + + +def load_ua_list() -> List[str]: + if UA_FILE.exists(): + try: + uas = [x.strip() for x in UA_FILE.read_text(encoding="utf-8").splitlines() if x.strip() and not x.strip().startswith("#")] + if uas: + return uas + except Exception: + pass + return DEFAULT_UA_LIST + + +UA_LIST = load_ua_list() + + +def stable_ua(key: str) -> str: + return UA_LIST[abs(hash(key)) % len(UA_LIST)] + + +def now_ts() -> int: + return int(time.time()) + + +def ensure_cache_dir() -> None: + CACHE_DIR.mkdir(parents=True, exist_ok=True) + + +def load_token_cache() -> Dict[str, Any]: + ensure_cache_dir() + if not TOKEN_CACHE_FILE.exists(): + return {"_说明": "绿蜜蜂自动维护缓存。按 wxid 保存 user-token、昵称、余额等,access_token 为全局短期凭据。", "global": {}, "accounts": {}} + try: + data = json.loads(TOKEN_CACHE_FILE.read_text(encoding="utf-8")) + if not isinstance(data, dict): + raise ValueError("cache root is not dict") + data.setdefault("_说明", "绿蜜蜂自动维护缓存。") + data.setdefault("global", {}) + data.setdefault("accounts", {}) + return data + except Exception as e: + backup = TOKEN_CACHE_FILE.with_suffix(f".broken.{now_ts()}.json") + try: + TOKEN_CACHE_FILE.rename(backup) + except Exception: + pass + if DEBUG: + logging.warning("[调试] 缓存文件损坏,已重建: %s", e) + return {"_说明": "绿蜜蜂自动维护缓存。", "global": {}, "accounts": {}} + + +def save_token_cache(cache: Dict[str, Any]) -> None: + ensure_cache_dir() + TOKEN_CACHE_FILE.write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8") + + +def get_cached_account_token(acc: Account, cache: Dict[str, Any]) -> str: + item = (cache.get("accounts") or {}).get(acc.wxid) or {} + token = str(item.get("user_token") or "") + # 绿蜜蜂 user-token 可能长期有效,但保守起见只复用 7 天内缓存。 + updated_at = int(item.get("updated_at") or 0) + if token and now_ts() - updated_at < 7 * 24 * 3600: + return token + return "" + + +def update_account_cache(acc: Account, cache: Dict[str, Any], *, user_token: str = "", nick_name: str = "", amount: float = 0.0) -> None: + accounts = cache.setdefault("accounts", {}) + old = accounts.get(acc.wxid) or {} + old.update({ + "remark": acc.remark, + "wxid": acc.wxid, + "user_token": user_token or old.get("user_token", ""), + "nick_name": nick_name or old.get("nick_name", ""), + "amount": amount, + "ua": acc.ua, + "updated_at": now_ts(), + "updated_at_text": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + }) + accounts[acc.wxid] = old + save_token_cache(cache) + + +def get_cached_access_token(cache: Dict[str, Any]) -> str: + g = cache.get("global") or {} + token = str(g.get("access_token") or "") + updated_at = int(g.get("updated_at") or 0) + # access_token 不确定有效期,保守复用 1 小时。 + if token and now_ts() - updated_at < 3600: + return token + return "" + + +def update_access_cache(cache: Dict[str, Any], access_token: str) -> None: + cache["global"] = { + "access_token": access_token, + "updated_at": now_ts(), + "updated_at_text": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + } + save_token_cache(cache) + + +def sleep_request() -> None: + time.sleep(random.randint(REQUEST_DELAY_MIN, REQUEST_DELAY_MAX)) + + +# ===================== 养鸡场账号/code ===================== +def wx_cloud_headers() -> Dict[str, str]: + return {"Authorization": WX_TOKEN, "Content-Type": "application/json"} + + +def account_from_cloud_row(item: Dict[str, Any]) -> Optional[Account]: + wxid = item.get("wxId") or item.get("wxid") or item.get("Wxid") or "" + if not wxid: + return None + remark = item.get("wxName") or item.get("wxname") or item.get("remark") or item.get("nickName") or mask(wxid) + return Account(remark=str(remark), wxid=str(wxid)) + + +def fetch_cloud_account_page(page_num: int, page_size: int) -> Tuple[List[Account], int]: + if not WX_CLOUD or not WX_TOKEN: + raise RuntimeError("缺少 wx_cloud 或 wx_token") + url = f"{WX_CLOUD}/prod-api/wechat/wechat/list" + resp = requests.get(url, headers=wx_cloud_headers(), params={"pageNum": page_num, "pageSize": page_size}, timeout=TIMEOUT, verify=VERIFY_SSL) + data = safe_json(resp) + debug(f"养鸡场账号列表 page={page_num}", data if data is not None else resp.text[:500]) + if data and data.get("code") == 200 and isinstance(data.get("rows"), list): + accounts = [acc for row in data["rows"] if (acc := account_from_cloud_row(row))] + return accounts, int(data.get("total") or len(accounts)) + raise RuntimeError((data or {}).get("msg") or (data or {}).get("message") or f"获取账号失败 HTTP {resp.status_code}") + + +def iter_accounts_from_cloud() -> Generator[Tuple[Account, int, int], None, None]: + page = 1 + seen = 0 + total = None + while True: + accounts, total_count = fetch_cloud_account_page(page, CLOUD_PAGE_SIZE) + total = total_count if total is None else total + if not accounts: + break + for acc in accounts: + seen += 1 + yield acc, seen, total + if seen >= total: + break + page += 1 + + +def should_skip(acc: Account) -> bool: + if acc.wxid in EXCLUDE_WXIDS: + return True + if SINGLE_TEST_WXID and acc.wxid != SINGLE_TEST_WXID: + return True + return False + + +def iter_selected_accounts() -> Generator[Tuple[Account, int, int], None, None]: + processed = 0 + for acc, cloud_index, cloud_total in iter_accounts_from_cloud(): + if should_skip(acc): + continue + processed += 1 + yield acc, processed if SINGLE_TEST_WXID else cloud_index, 1 if SINGLE_TEST_WXID else cloud_total + if SINGLE_TEST_WXID and acc.wxid == SINGLE_TEST_WXID: + break + + +def get_mini_program_code(wxid: str) -> str: + url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode" + payload = {"wxid": wxid, "appid": APPID} + resp = requests.post(url, headers=wx_cloud_headers(), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL) + data = safe_json(resp) + debug("获取小程序code", data if data is not None else resp.text[:500]) + if not data: + raise RuntimeError(f"获取 code 失败: HTTP {resp.status_code}") + if data.get("code") != 200: + raise RuntimeError(data.get("msg") or data.get("message") or f"获取 code 失败: {data}") + code = ((data.get("data") or {}).get("code") or data.get("codeData") or data.get("data")) + if isinstance(code, dict): + code = code.get("code") + if not code: + raise RuntimeError(f"获取 code 失败: 响应中没有 code") + return str(code) + + +# ===================== 绿蜜蜂接口 ===================== +def lmf_headers(access_token: str = "", user_token: str = "", ua: str = "") -> Dict[str, str]: + return { + "User-Agent": ua or DEFAULT_UA_LIST[0], + "lng": "", + "access-token": access_token or "", + "Content-Type": "application/x-www-form-urlencoded", + "xweb_xhr": "1", + "user-token": user_token or "", + "lat": "", + "this-shop-id": "0", + "version": "v1.0.0", + "Sec-Fetch-Site": "cross-site", + "Sec-Fetch-Mode": "cors", + "Sec-Fetch-Dest": "empty", + "Referer": REFERER, + "Accept-Language": "zh-CN,zh;q=0.9", + } + + +def lmf_request( + method: str, + url: str, + *, + headers: Dict[str, str], + params: Dict[str, Any] = None, + data_body: Dict[str, Any] = None, + json_body: Dict[str, Any] = None, +) -> Dict[str, Any]: + if method.upper() == "POST": + # 绿蜜蜂接口对 form 表单更稳定;access_token 接口用 JSON 会返回“timestamp不能为空”。 + if data_body is not None: + resp = requests.post(url, headers=headers, params=params, data=data_body, timeout=TIMEOUT, verify=VERIFY_SSL) + else: + resp = requests.post(url, headers=headers, params=params, json=json_body, timeout=TIMEOUT, verify=VERIFY_SSL) + else: + resp = requests.get(url, headers=headers, params=params, timeout=TIMEOUT, verify=VERIFY_SSL) + data = safe_json(resp) + debug(f"绿蜜蜂接口 {url}", data if data is not None else resp.text[:500]) + if isinstance(data, dict): + return data + return {"code": -1, "msg": f"JSON解析失败 HTTP {resp.status_code}: {resp.text[:200]}"} + + +def get_access_token(ua: str) -> str: + data = lmf_request("POST", ACCESS_TOKEN_URL, headers=lmf_headers(ua=ua), data_body=ACCESS_TOKEN_PAYLOAD) + if data.get("code") == 1: + token = (data.get("data") or {}).get("access_token") + if token: + return str(token) + raise RuntimeError(data.get("msg") or data.get("message") or "获取 access_token 失败") + + +def login_with_code(code: str, access_token: str, ua: str) -> str: + data = lmf_request("POST", LOGIN_URL, headers=lmf_headers(access_token=access_token, ua=ua), data_body={"code": code}) + if data.get("code") == 1: + utoken = (data.get("data") or {}).get("utoken") or (data.get("data") or {}).get("user_token") + if utoken: + return str(utoken) + raise RuntimeError(data.get("msg") or data.get("message") or "code 登录换 user-token 失败") + + +def token_invalid_message(msg: str) -> bool: + return any(k in (msg or "") for k in ["token", "登录", "授权", "失效", "过期", "无效", "请先"]) + + +def to_sign(access_token: str, user_token: str, ua: str) -> str: + data = lmf_request("GET", TASK_URL, headers=lmf_headers(access_token=access_token, user_token=user_token, ua=ua), params={"m": "toSign"}) + msg = data.get("msg") or data.get("message") or "" + if data.get("code") == 1: + return msg or "签到成功" + if token_invalid_message(msg): + raise RuntimeError(f"TOKEN_INVALID: {msg or data.get('code')}") + return msg or f"签到失败: {data.get('code')}" + + +def get_user_info(access_token: str, user_token: str, ua: str) -> Tuple[str, float]: + data = lmf_request("GET", TASK_URL, headers=lmf_headers(access_token=access_token, user_token=user_token, ua=ua), params={"m": "getUserInfo"}) + if data.get("code") == 1 and isinstance(data.get("data"), dict): + d = data["data"] + nick = str(d.get("nick_name") or d.get("nickname") or "") + try: + amount = float(d.get("amount") or 0) + except Exception: + amount = 0.0 + return nick, amount + msg = data.get("msg") or data.get("message") or "" + if token_invalid_message(msg): + raise RuntimeError(f"TOKEN_INVALID: {msg or data.get('code')}") + raise RuntimeError(msg or "获取用户信息失败") + + +def cash_apply(access_token: str, user_token: str, amount: float, ua: str) -> str: + amount_str = f"{amount:.2f}".rstrip("0").rstrip(".") + data = lmf_request("GET", CASH_URL, headers=lmf_headers(access_token=access_token, user_token=user_token, ua=ua), params={"m": "cashApply", "amount": amount_str}) + msg = data.get("msg") or data.get("message") or "" + if data.get("code") == 1: + return msg or f"提现成功: {amount_str}元" + return msg or f"提现失败: {data.get('code')}" + + +# ===================== 主流程 ===================== +def process_account(acc: Account, index: int, total: int) -> AccountResult: + acc.ua = stable_ua(acc.wxid or acc.remark) + result = AccountResult(remark=acc.remark, wxid=acc.wxid) + logging.info("账号%d/%d %s 开始", index, total, acc.remark) + cache = load_token_cache() + try: + access_token = get_cached_access_token(cache) + if access_token: + logging.info("账号%d/%d %s 读取access_token缓存完成", index, total, acc.remark) + else: + access_token = get_access_token(acc.ua) + update_access_cache(cache, access_token) + logging.info("账号%d/%d %s 获取access_token完成", index, total, acc.remark) + sleep_request() + + user_token = get_cached_account_token(acc, cache) + if user_token: + result.login = "使用缓存" + logging.info("账号%d/%d %s 读取user-token缓存完成", index, total, acc.remark) + else: + code = get_mini_program_code(acc.wxid) + logging.info("账号%d/%d %s 获取code完成", index, total, acc.remark) + sleep_request() + user_token = login_with_code(code, access_token, acc.ua) + result.login = "成功" + update_account_cache(acc, cache, user_token=user_token) + logging.info("账号%d/%d %s 登录完成,user-token已缓存", index, total, acc.remark) + if DEBUG: + logging.info("[调试] access_token=%s user_token=%s 缓存目录=%s", mask(access_token), mask(user_token), CACHE_DIR) + sleep_request() + + try: + result.sign = to_sign(access_token, user_token, acc.ua) + except RuntimeError as e: + if str(e).startswith("TOKEN_INVALID") and result.login == "使用缓存": + logging.info("账号%d/%d %s 缓存user-token失效,重新获取code登录", index, total, acc.remark) + code = get_mini_program_code(acc.wxid) + logging.info("账号%d/%d %s 获取code完成", index, total, acc.remark) + sleep_request() + user_token = login_with_code(code, access_token, acc.ua) + result.login = "缓存失效后重登成功" + update_account_cache(acc, cache, user_token=user_token) + logging.info("账号%d/%d %s 重新登录完成,user-token已更新", index, total, acc.remark) + sleep_request() + result.sign = to_sign(access_token, user_token, acc.ua) + else: + raise + logging.info("账号%d/%d %s 签到完成 | %s", index, total, acc.remark, result.sign) + sleep_request() + + result.nick_name, result.amount = get_user_info(access_token, user_token, acc.ua) + update_account_cache(acc, cache, user_token=user_token, nick_name=result.nick_name, amount=result.amount) + logging.info("账号%d/%d %s 余额查询完成 | 昵称: %s | 余额: %.2f元", index, total, acc.remark, result.nick_name or "无", result.amount) + sleep_request() + + if ENABLE_WITHDRAW: + if result.amount >= WITHDRAW_THRESHOLD: + result.withdraw = cash_apply(access_token, user_token, result.amount, acc.ua) + logging.info("账号%d/%d %s 提现完成 | %s", index, total, acc.remark, result.withdraw) + sleep_request() + else: + result.withdraw = "未达到提现阈值" + logging.info("账号%d/%d %s 提现跳过 | 余额 %.2f元 < %.2f元", index, total, acc.remark, result.amount, WITHDRAW_THRESHOLD) + else: + result.withdraw = "未开启自动提现" + logging.info("账号%d/%d %s 提现跳过 | 未开启自动提现", index, total, acc.remark) + result.success = True + except Exception as e: + result.success = False + result.message = str(e).replace("TOKEN_INVALID: ", "") + if DEBUG: + logging.error("账号 %s 执行失败: %s", acc.remark, e) + traceback.print_exc() + if result.success: + logging.info( + "账号%d/%d %s 完成 | 登录: %s | 签到: %s | 昵称: %s | 余额: %.2f元 | 提现: %s", + index, total, acc.remark, result.login, result.sign, result.nick_name or "无", result.amount, result.withdraw, + ) + else: + logging.info("账号%d/%d %s 跳过/失败 | %s", index, total, acc.remark, result.message or "失败") + return result + + +def build_report(results: List[AccountResult]) -> str: + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + elapsed = time.time() - RUN_START_TS + total = len(results) + success = sum(1 for r in results if r.success) + sign_ok = sum(1 for r in results if r.success and ("成功" in r.sign or "已" in r.sign)) + withdraw_ok = sum(1 for r in results if r.success and "成功" in r.withdraw) + total_amount = sum(r.amount for r in results) + lines = [ + "", + "==================================================", + "📊 绿蜜蜂执行汇总", + "==================================================", + f"⏱️ 执行时间: {now}", + f"👥 总账号数: {total}", + f"✅ 成功账号: {success}", + f"📝 签到成功/已签: {sign_ok}", + f"💸 自动提现: {'开启' if ENABLE_WITHDRAW else '关闭'}", + f"💸 提现成功: {withdraw_ok}", + f"💰 当前余额合计: {total_amount:.2f}元", + f"🧭 UA数量: {len(UA_LIST)}", + f"📁 缓存目录: {CACHE_DIR}", + "", + "📋 账号详情:", + ] + for i, r in enumerate(results, 1): + icon = "✅" if r.success else "❌" + lines.append(f"{icon} 账号{i}: {r.remark} | 昵称: {r.nick_name or '无'} | 余额: {r.amount:.2f}元") + lines.append(f" 🔐 登录: {r.login}") + lines.append(f" 📝 签到: {r.sign}") + lines.append(f" 💸 提现: {r.withdraw}") + if r.message: + lines.append(f" ⚠️ 错误: {r.message}") + lines.append(f"=== 执行结束 [{now}] 耗时 {elapsed:.2f} 秒 退出码 0 ===") + return "\n".join(lines) + + +def main() -> None: + setup_logging() + logging.info("【绿蜜蜂】开始执行任务") + if DEBUG: + logging.info("APPID=%s WX_CLOUD=%s", APPID, WX_CLOUD or "(未设置)") + results: List[AccountResult] = [] + handled = 0 + for acc, i, total in iter_selected_accounts(): + handled += 1 + results.append(process_account(acc, i, total)) + if not SINGLE_TEST_WXID and i < total: + delay = random.randint(ACCOUNT_DELAY_MIN, ACCOUNT_DELAY_MAX) + if DEBUG: + logging.info("[调试] 账号间隔等待 %d 秒", delay) + time.sleep(delay) + if handled == 0: + raise RuntimeError("没有可处理账号。请检查 wx_cloud、wx_token、LMF_TEST_WXID、LMF_EXCLUDE_WXIDS 配置") + print(build_report(results)) + + +if __name__ == "__main__": + try: + main() + except Exception as e: + setup_logging() + logging.error("主流程错误: %s", e) + if DEBUG: + traceback.print_exc() + sys.exit(1) + + \ No newline at end of file