Files
Yangmao_Script/WX_Applet/Applet_JYHS_LMF.py

583 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# cron: 52 7 * * *
# new Env("绿蜜蜂旧衣服回收")
"""
name: 绿蜜蜂旧衣服回收 - 养鸡场自动版
功能: 从养鸡场逐个读取微信账号,获取小程序 code登录绿蜜蜂签到、查余额、满阈值自动提现。
外部环境变量:
wx_cloud 养鸡场地址,例如 http://192.168.0.250:666
wx_token 养鸡场 Authorization支持裸 token 或 Bearer token
LMF_TEST_WXID 可选,只跑某个 wxid
LMF_EXCLUDE_WXIDS 可选,排除 wxid支持换行/英文逗号/中文逗号分隔
脚本内配置:
ENABLE_WITHDRAW 是否自动提现,默认 True
WITHDRAW_THRESHOLD 提现阈值,默认 3.0 元
CLOUD_PAGE_SIZE 固定 1逐个账号读取并执行
REQUEST_DELAY_MIN/MAX 单账号内请求间隔
ACCOUNT_DELAY_MIN/MAX 多账号之间间隔
DEBUG 调试日志,默认 False
"""
import json
import logging
import os
import random
import re
import sys
import time
import traceback
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Tuple
try:
import requests
requests.packages.urllib3.disable_warnings()
except ImportError:
print("缺少 requests请先安装pip install requests")
sys.exit(1)
# ===================== 固定配置 =====================
APPID = "wx6fcde446296d9588"
BASE = "https://lmf.lvmifo.com"
ACCESS_TOKEN_URL = f"{BASE}/api/5a60c77b79875"
LOGIN_URL = f"{BASE}/api/5e05692405c63"
TASK_URL = f"{BASE}/api/5dca57afa379e"
CASH_URL = f"{BASE}/api/5e12a7e1848ba"
REFERER = f"https://servicewechat.com/{APPID}/284/page-frame.html"
SCRIPT_DIR = Path(__file__).resolve().parent
CACHE_DIR = SCRIPT_DIR / "绿蜜蜂缓存"
TOKEN_CACHE_FILE = CACHE_DIR / "lmf_token_cache.json"
UA_FILE = SCRIPT_DIR / "User_Agent.json"
# 只读取这 4 个外部变量
ENV = os.environ
WX_CLOUD = ENV.get("wx_cloud", "").rstrip("/")
WX_TOKEN = ENV.get("wx_token", "").strip()
if WX_TOKEN and not WX_TOKEN.lower().startswith("bearer "):
WX_TOKEN = f"Bearer {WX_TOKEN}"
SINGLE_TEST_WXID = ENV.get("LMF_TEST_WXID", "").strip()
EXCLUDE_WXIDS_RAW = ENV.get("LMF_EXCLUDE_WXIDS", "")
EXCLUDE_WXIDS = [x.strip() for x in re.split(r"[\n,]+", EXCLUDE_WXIDS_RAW) if x.strip()]
# 脚本内配置
DEBUG = False
VERIFY_SSL = False
TIMEOUT = 20.0
CLOUD_PAGE_SIZE = 1
REQUEST_DELAY_MIN = 3
REQUEST_DELAY_MAX = 5
ACCOUNT_DELAY_MIN = 20
ACCOUNT_DELAY_MAX = 40
ENABLE_WITHDRAW = True
WITHDRAW_THRESHOLD = 3.0
RUN_START_TS = time.time()
DEFAULT_UA_LIST = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090c33) XWEB/14185",
]
ACCESS_TOKEN_PAYLOAD = {
"app_id": "75762944",
"device_id": "17518763397639",
"rand_str": "lv_mi_feng_uni_app",
"timestamp": "1751876339",
"signature": "e2e80ab21122603c622deb6dd7a89ec6",
}
@dataclass
class Account:
remark: str
wxid: str
ua: str = ""
@dataclass
class AccountResult:
remark: str
wxid: str
success: bool = False
login: str = "未执行"
sign: str = "未执行"
nick_name: str = ""
amount: float = 0.0
withdraw: str = "未执行"
message: str = ""
# ===================== 基础工具 =====================
def setup_logging() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s\t- %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.StreamHandler()],
)
def mask(value: Any, keep_start: int = 6, keep_end: int = 4) -> str:
s = str(value or "")
if not s:
return ""
if len(s) <= keep_start + keep_end:
return "***"
return s[:keep_start] + "***" + s[-keep_end:]
def debug(title: str, data: Any) -> None:
if not DEBUG:
return
try:
logging.info("[调试] %s: %s", title, json.dumps(data, ensure_ascii=False, indent=2))
except Exception:
logging.info("[调试] %s: %s", title, data)
def safe_json(resp: requests.Response) -> Optional[Dict[str, Any]]:
try:
return resp.json()
except Exception:
return None
def load_ua_list() -> List[str]:
if UA_FILE.exists():
try:
uas = [x.strip() for x in UA_FILE.read_text(encoding="utf-8").splitlines() if x.strip() and not x.strip().startswith("#")]
if uas:
return uas
except Exception:
pass
return DEFAULT_UA_LIST
UA_LIST = load_ua_list()
def stable_ua(key: str) -> str:
return UA_LIST[abs(hash(key)) % len(UA_LIST)]
def now_ts() -> int:
return int(time.time())
def ensure_cache_dir() -> None:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
def load_token_cache() -> Dict[str, Any]:
ensure_cache_dir()
if not TOKEN_CACHE_FILE.exists():
return {"_说明": "绿蜜蜂自动维护缓存。按 wxid 保存 user-token、昵称、余额等access_token 为全局短期凭据。", "global": {}, "accounts": {}}
try:
data = json.loads(TOKEN_CACHE_FILE.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError("cache root is not dict")
data.setdefault("_说明", "绿蜜蜂自动维护缓存。")
data.setdefault("global", {})
data.setdefault("accounts", {})
return data
except Exception as e:
backup = TOKEN_CACHE_FILE.with_suffix(f".broken.{now_ts()}.json")
try:
TOKEN_CACHE_FILE.rename(backup)
except Exception:
pass
if DEBUG:
logging.warning("[调试] 缓存文件损坏,已重建: %s", e)
return {"_说明": "绿蜜蜂自动维护缓存。", "global": {}, "accounts": {}}
def save_token_cache(cache: Dict[str, Any]) -> None:
ensure_cache_dir()
TOKEN_CACHE_FILE.write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8")
def get_cached_account_token(acc: Account, cache: Dict[str, Any]) -> str:
item = (cache.get("accounts") or {}).get(acc.wxid) or {}
token = str(item.get("user_token") or "")
# 绿蜜蜂 user-token 可能长期有效,但保守起见只复用 7 天内缓存。
updated_at = int(item.get("updated_at") or 0)
if token and now_ts() - updated_at < 7 * 24 * 3600:
return token
return ""
def update_account_cache(acc: Account, cache: Dict[str, Any], *, user_token: str = "", nick_name: str = "", amount: float = 0.0) -> None:
accounts = cache.setdefault("accounts", {})
old = accounts.get(acc.wxid) or {}
old.update({
"remark": acc.remark,
"wxid": acc.wxid,
"user_token": user_token or old.get("user_token", ""),
"nick_name": nick_name or old.get("nick_name", ""),
"amount": amount,
"ua": acc.ua,
"updated_at": now_ts(),
"updated_at_text": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
})
accounts[acc.wxid] = old
save_token_cache(cache)
def get_cached_access_token(cache: Dict[str, Any]) -> str:
g = cache.get("global") or {}
token = str(g.get("access_token") or "")
updated_at = int(g.get("updated_at") or 0)
# access_token 不确定有效期,保守复用 1 小时。
if token and now_ts() - updated_at < 3600:
return token
return ""
def update_access_cache(cache: Dict[str, Any], access_token: str) -> None:
cache["global"] = {
"access_token": access_token,
"updated_at": now_ts(),
"updated_at_text": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
save_token_cache(cache)
def sleep_request() -> None:
time.sleep(random.randint(REQUEST_DELAY_MIN, REQUEST_DELAY_MAX))
# ===================== 养鸡场账号/code =====================
def wx_cloud_headers() -> Dict[str, str]:
return {"Authorization": WX_TOKEN, "Content-Type": "application/json"}
def account_from_cloud_row(item: Dict[str, Any]) -> Optional[Account]:
wxid = item.get("wxId") or item.get("wxid") or item.get("Wxid") or ""
if not wxid:
return None
remark = item.get("wxName") or item.get("wxname") or item.get("remark") or item.get("nickName") or mask(wxid)
return Account(remark=str(remark), wxid=str(wxid))
def fetch_cloud_account_page(page_num: int, page_size: int) -> Tuple[List[Account], int]:
if not WX_CLOUD or not WX_TOKEN:
raise RuntimeError("缺少 wx_cloud 或 wx_token")
url = f"{WX_CLOUD}/prod-api/wechat/wechat/list"
resp = requests.get(url, headers=wx_cloud_headers(), params={"pageNum": page_num, "pageSize": page_size}, timeout=TIMEOUT, verify=VERIFY_SSL)
data = safe_json(resp)
debug(f"养鸡场账号列表 page={page_num}", data if data is not None else resp.text[:500])
if data and data.get("code") == 200 and isinstance(data.get("rows"), list):
accounts = [acc for row in data["rows"] if (acc := account_from_cloud_row(row))]
return accounts, int(data.get("total") or len(accounts))
raise RuntimeError((data or {}).get("msg") or (data or {}).get("message") or f"获取账号失败 HTTP {resp.status_code}")
def iter_accounts_from_cloud() -> Generator[Tuple[Account, int, int], None, None]:
page = 1
seen = 0
total = None
while True:
accounts, total_count = fetch_cloud_account_page(page, CLOUD_PAGE_SIZE)
total = total_count if total is None else total
if not accounts:
break
for acc in accounts:
seen += 1
yield acc, seen, total
if seen >= total:
break
page += 1
def should_skip(acc: Account) -> bool:
if acc.wxid in EXCLUDE_WXIDS:
return True
if SINGLE_TEST_WXID and acc.wxid != SINGLE_TEST_WXID:
return True
return False
def iter_selected_accounts() -> Generator[Tuple[Account, int, int], None, None]:
processed = 0
for acc, cloud_index, cloud_total in iter_accounts_from_cloud():
if should_skip(acc):
continue
processed += 1
yield acc, processed if SINGLE_TEST_WXID else cloud_index, 1 if SINGLE_TEST_WXID else cloud_total
if SINGLE_TEST_WXID and acc.wxid == SINGLE_TEST_WXID:
break
def get_mini_program_code(wxid: str) -> str:
url = f"{WX_CLOUD}/prod-api/wechat/api/getMiniProgramCode"
payload = {"wxid": wxid, "appid": APPID}
resp = requests.post(url, headers=wx_cloud_headers(), json=payload, timeout=TIMEOUT, verify=VERIFY_SSL)
data = safe_json(resp)
debug("获取小程序code", data if data is not None else resp.text[:500])
if not data:
raise RuntimeError(f"获取 code 失败: HTTP {resp.status_code}")
if data.get("code") != 200:
raise RuntimeError(data.get("msg") or data.get("message") or f"获取 code 失败: {data}")
code = ((data.get("data") or {}).get("code") or data.get("codeData") or data.get("data"))
if isinstance(code, dict):
code = code.get("code")
if not code:
raise RuntimeError(f"获取 code 失败: 响应中没有 code")
return str(code)
# ===================== 绿蜜蜂接口 =====================
def lmf_headers(access_token: str = "", user_token: str = "", ua: str = "") -> Dict[str, str]:
return {
"User-Agent": ua or DEFAULT_UA_LIST[0],
"lng": "",
"access-token": access_token or "",
"Content-Type": "application/x-www-form-urlencoded",
"xweb_xhr": "1",
"user-token": user_token or "",
"lat": "",
"this-shop-id": "0",
"version": "v1.0.0",
"Sec-Fetch-Site": "cross-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": REFERER,
"Accept-Language": "zh-CN,zh;q=0.9",
}
def lmf_request(
method: str,
url: str,
*,
headers: Dict[str, str],
params: Dict[str, Any] = None,
data_body: Dict[str, Any] = None,
json_body: Dict[str, Any] = None,
) -> Dict[str, Any]:
if method.upper() == "POST":
# 绿蜜蜂接口对 form 表单更稳定access_token 接口用 JSON 会返回“timestamp不能为空”。
if data_body is not None:
resp = requests.post(url, headers=headers, params=params, data=data_body, timeout=TIMEOUT, verify=VERIFY_SSL)
else:
resp = requests.post(url, headers=headers, params=params, json=json_body, timeout=TIMEOUT, verify=VERIFY_SSL)
else:
resp = requests.get(url, headers=headers, params=params, timeout=TIMEOUT, verify=VERIFY_SSL)
data = safe_json(resp)
debug(f"绿蜜蜂接口 {url}", data if data is not None else resp.text[:500])
if isinstance(data, dict):
return data
return {"code": -1, "msg": f"JSON解析失败 HTTP {resp.status_code}: {resp.text[:200]}"}
def get_access_token(ua: str) -> str:
data = lmf_request("POST", ACCESS_TOKEN_URL, headers=lmf_headers(ua=ua), data_body=ACCESS_TOKEN_PAYLOAD)
if data.get("code") == 1:
token = (data.get("data") or {}).get("access_token")
if token:
return str(token)
raise RuntimeError(data.get("msg") or data.get("message") or "获取 access_token 失败")
def login_with_code(code: str, access_token: str, ua: str) -> str:
data = lmf_request("POST", LOGIN_URL, headers=lmf_headers(access_token=access_token, ua=ua), data_body={"code": code})
if data.get("code") == 1:
utoken = (data.get("data") or {}).get("utoken") or (data.get("data") or {}).get("user_token")
if utoken:
return str(utoken)
raise RuntimeError(data.get("msg") or data.get("message") or "code 登录换 user-token 失败")
def token_invalid_message(msg: str) -> bool:
return any(k in (msg or "") for k in ["token", "登录", "授权", "失效", "过期", "无效", "请先"])
def to_sign(access_token: str, user_token: str, ua: str) -> str:
data = lmf_request("GET", TASK_URL, headers=lmf_headers(access_token=access_token, user_token=user_token, ua=ua), params={"m": "toSign"})
msg = data.get("msg") or data.get("message") or ""
if data.get("code") == 1:
return msg or "签到成功"
if token_invalid_message(msg):
raise RuntimeError(f"TOKEN_INVALID: {msg or data.get('code')}")
return msg or f"签到失败: {data.get('code')}"
def get_user_info(access_token: str, user_token: str, ua: str) -> Tuple[str, float]:
data = lmf_request("GET", TASK_URL, headers=lmf_headers(access_token=access_token, user_token=user_token, ua=ua), params={"m": "getUserInfo"})
if data.get("code") == 1 and isinstance(data.get("data"), dict):
d = data["data"]
nick = str(d.get("nick_name") or d.get("nickname") or "")
try:
amount = float(d.get("amount") or 0)
except Exception:
amount = 0.0
return nick, amount
msg = data.get("msg") or data.get("message") or ""
if token_invalid_message(msg):
raise RuntimeError(f"TOKEN_INVALID: {msg or data.get('code')}")
raise RuntimeError(msg or "获取用户信息失败")
def cash_apply(access_token: str, user_token: str, amount: float, ua: str) -> str:
amount_str = f"{amount:.2f}".rstrip("0").rstrip(".")
data = lmf_request("GET", CASH_URL, headers=lmf_headers(access_token=access_token, user_token=user_token, ua=ua), params={"m": "cashApply", "amount": amount_str})
msg = data.get("msg") or data.get("message") or ""
if data.get("code") == 1:
return msg or f"提现成功: {amount_str}"
return msg or f"提现失败: {data.get('code')}"
# ===================== 主流程 =====================
def process_account(acc: Account, index: int, total: int) -> AccountResult:
acc.ua = stable_ua(acc.wxid or acc.remark)
result = AccountResult(remark=acc.remark, wxid=acc.wxid)
logging.info("账号%d/%d %s 开始", index, total, acc.remark)
cache = load_token_cache()
try:
access_token = get_cached_access_token(cache)
if access_token:
logging.info("账号%d/%d %s 读取access_token缓存完成", index, total, acc.remark)
else:
access_token = get_access_token(acc.ua)
update_access_cache(cache, access_token)
logging.info("账号%d/%d %s 获取access_token完成", index, total, acc.remark)
sleep_request()
user_token = get_cached_account_token(acc, cache)
if user_token:
result.login = "使用缓存"
logging.info("账号%d/%d %s 读取user-token缓存完成", index, total, acc.remark)
else:
code = get_mini_program_code(acc.wxid)
logging.info("账号%d/%d %s 获取code完成", index, total, acc.remark)
sleep_request()
user_token = login_with_code(code, access_token, acc.ua)
result.login = "成功"
update_account_cache(acc, cache, user_token=user_token)
logging.info("账号%d/%d %s 登录完成user-token已缓存", index, total, acc.remark)
if DEBUG:
logging.info("[调试] access_token=%s user_token=%s 缓存目录=%s", mask(access_token), mask(user_token), CACHE_DIR)
sleep_request()
try:
result.sign = to_sign(access_token, user_token, acc.ua)
except RuntimeError as e:
if str(e).startswith("TOKEN_INVALID") and result.login == "使用缓存":
logging.info("账号%d/%d %s 缓存user-token失效重新获取code登录", index, total, acc.remark)
code = get_mini_program_code(acc.wxid)
logging.info("账号%d/%d %s 获取code完成", index, total, acc.remark)
sleep_request()
user_token = login_with_code(code, access_token, acc.ua)
result.login = "缓存失效后重登成功"
update_account_cache(acc, cache, user_token=user_token)
logging.info("账号%d/%d %s 重新登录完成user-token已更新", index, total, acc.remark)
sleep_request()
result.sign = to_sign(access_token, user_token, acc.ua)
else:
raise
logging.info("账号%d/%d %s 签到完成 | %s", index, total, acc.remark, result.sign)
sleep_request()
result.nick_name, result.amount = get_user_info(access_token, user_token, acc.ua)
update_account_cache(acc, cache, user_token=user_token, nick_name=result.nick_name, amount=result.amount)
logging.info("账号%d/%d %s 余额查询完成 | 昵称: %s | 余额: %.2f", index, total, acc.remark, result.nick_name or "", result.amount)
sleep_request()
if ENABLE_WITHDRAW:
if result.amount >= WITHDRAW_THRESHOLD:
result.withdraw = cash_apply(access_token, user_token, result.amount, acc.ua)
logging.info("账号%d/%d %s 提现完成 | %s", index, total, acc.remark, result.withdraw)
sleep_request()
else:
result.withdraw = "未达到提现阈值"
logging.info("账号%d/%d %s 提现跳过 | 余额 %.2f元 < %.2f", index, total, acc.remark, result.amount, WITHDRAW_THRESHOLD)
else:
result.withdraw = "未开启自动提现"
logging.info("账号%d/%d %s 提现跳过 | 未开启自动提现", index, total, acc.remark)
result.success = True
except Exception as e:
result.success = False
result.message = str(e).replace("TOKEN_INVALID: ", "")
if DEBUG:
logging.error("账号 %s 执行失败: %s", acc.remark, e)
traceback.print_exc()
if result.success:
logging.info(
"账号%d/%d %s 完成 | 登录: %s | 签到: %s | 昵称: %s | 余额: %.2f元 | 提现: %s",
index, total, acc.remark, result.login, result.sign, result.nick_name or "", result.amount, result.withdraw,
)
else:
logging.info("账号%d/%d %s 跳过/失败 | %s", index, total, acc.remark, result.message or "失败")
return result
def build_report(results: List[AccountResult]) -> str:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
elapsed = time.time() - RUN_START_TS
total = len(results)
success = sum(1 for r in results if r.success)
sign_ok = sum(1 for r in results if r.success and ("成功" in r.sign or "" in r.sign))
withdraw_ok = sum(1 for r in results if r.success and "成功" in r.withdraw)
total_amount = sum(r.amount for r in results)
lines = [
"",
"==================================================",
"📊 绿蜜蜂执行汇总",
"==================================================",
f"⏱️ 执行时间: {now}",
f"👥 总账号数: {total}",
f"✅ 成功账号: {success}",
f"📝 签到成功/已签: {sign_ok}",
f"💸 自动提现: {'开启' if ENABLE_WITHDRAW else '关闭'}",
f"💸 提现成功: {withdraw_ok}",
f"💰 当前余额合计: {total_amount:.2f}",
f"🧭 UA数量: {len(UA_LIST)}",
f"📁 缓存目录: {CACHE_DIR}",
"",
"📋 账号详情:",
]
for i, r in enumerate(results, 1):
icon = "" if r.success else ""
lines.append(f"{icon} 账号{i}: {r.remark} | 昵称: {r.nick_name or ''} | 余额: {r.amount:.2f}")
lines.append(f" 🔐 登录: {r.login}")
lines.append(f" 📝 签到: {r.sign}")
lines.append(f" 💸 提现: {r.withdraw}")
if r.message:
lines.append(f" ⚠️ 错误: {r.message}")
lines.append(f"=== 执行结束 [{now}] 耗时 {elapsed:.2f} 秒 退出码 0 ===")
return "\n".join(lines)
def main() -> None:
setup_logging()
logging.info("【绿蜜蜂】开始执行任务")
if DEBUG:
logging.info("APPID=%s WX_CLOUD=%s", APPID, WX_CLOUD or "(未设置)")
results: List[AccountResult] = []
handled = 0
for acc, i, total in iter_selected_accounts():
handled += 1
results.append(process_account(acc, i, total))
if not SINGLE_TEST_WXID and i < total:
delay = random.randint(ACCOUNT_DELAY_MIN, ACCOUNT_DELAY_MAX)
if DEBUG:
logging.info("[调试] 账号间隔等待 %d", delay)
time.sleep(delay)
if handled == 0:
raise RuntimeError("没有可处理账号。请检查 wx_cloud、wx_token、LMF_TEST_WXID、LMF_EXCLUDE_WXIDS 配置")
print(build_report(results))
if __name__ == "__main__":
try:
main()
except Exception as e:
setup_logging()
logging.error("主流程错误: %s", e)
if DEBUG:
traceback.print_exc()
sys.exit(1)