# cron: 2 7 * * * # new Env("旧衣回收_铛铛一下") import hashlib import random import time import requests import os import logging import traceback import base64 import json from Crypto.Cipher import AES from Crypto.Random import get_random_bytes from datetime import datetime # ====================== 养鸡场配置 ====================== WX_TOKEN = os.getenv('wx_token') # 养鸡场 Authorization 令牌 wx_cloud = os.getenv('wx_cloud', 'http://192.168.31.203:666') # 养鸡场服务地址 # ======================================================== remove_wxids1 = ["wxid_11111111111111"] # 需要剔除的多个wxid DEFAULT_WITHDRAW_BALANCE = 0.3 # 默认超过该金额进行提现,需大于等于0.3 MULTI_ACCOUNT_SPLIT = ["\n", "@"] # 分隔符列表 MULTI_ACCOUNT_PROXY = False # 是否使用多账号代理,默认不使用,True则使用多账号代理 # 微信小程序ID(根据实际业务调整) WX_APPID = "wxe378d2d7636c180e" class WXBizDataCryptUtil: """ 微信小程序加解密工具 """ def __init__(self, sessionKey): self.sessionKey = sessionKey def encrypt(self, data, iv=None): """ data: dict或str,若为dict自动转为json字符串 iv: base64字符串,若为None自动生成 返回: (加密数据base64, iv base64) """ if isinstance(data, dict): data = json.dumps(data, separators=(',', ':')) if iv is None: iv_bytes = get_random_bytes(16) iv = base64.b64encode(iv_bytes).decode('utf-8') else: iv_bytes = base64.b64decode(iv) sessionKey = base64.b64decode(self.sessionKey) cipher = AES.new(sessionKey, AES.MODE_CBC, iv_bytes) padded = self._pad(data.encode('utf-8')) encrypted = cipher.encrypt(padded) encrypted_b64 = base64.b64encode(encrypted).decode('utf-8') return encrypted_b64, iv def decrypt(self, encryptedData, iv): """ encryptedData: base64字符串 iv: base64字符串 返回: dict或str """ sessionKey = base64.b64decode(self.sessionKey) encryptedData = base64.b64decode(encryptedData) iv = base64.b64decode(iv) cipher = AES.new(sessionKey, AES.MODE_CBC, iv) decrypted = self._unpad(cipher.decrypt(encryptedData)) try: return json.loads(decrypted) except Exception: return decrypted.decode('utf-8') def _pad(self, s): pad_len = 16 - len(s) % 16 return s + bytes([pad_len] * pad_len) def _unpad(self, s): return s[:-s[-1]] class AutoTask: def __init__(self, site_name): """ 初始化自动任务类 :param site_name: 站点名称,用于日志显示 """ self.site_name = site_name self.proxy_url = os.getenv("PROXY_API_URL") # 代理api,返回一条txt文本,内容为代理ip:端口 self.wx_appid = WX_APPID # 微信小程序id self.host = "vues.dd1x.cn" self.user_agent = "Mozilla/5.0 (Linux; Android 12; M2012K11AC Build/SKQ1.220303.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/134.0.6998.136 Mobile Safari/537.36 XWEB/1340129 MMWEBSDK/20240301 MMWEBID/9871 MicroMessenger/8.0.48.2580(0x28003036) WeChat/arm64 Weixin NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android" self.setup_logging() def setup_logging(self): """ 配置日志系统 """ logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s\t- %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ logging.StreamHandler() ] ) def get_proxy(self): """ 获取代理 :return: 代理 """ if not self.proxy_url: logging.warning("[获取代理]没有找到环境变量PROXY_API_URL,不使用代理") return None url = self.proxy_url response = requests.get(url) proxy = response.text.strip() # 去除首尾空格和换行 logging.info(f"[获取代理]: {proxy}") return proxy def check_proxy(self, proxy, session): """ 检查代理 :param proxy: 代理 :param session: session :return: 是否可用 """ try: url = f"http://{self.host}/api/v2/get_sign_list" session.headers["Token"] = "" response = session.get(url, timeout=5) if response.status_code == 200: logging.info(f"[检查代理]: {proxy} 应该可用") return True else: logging.info(f"[检查代理]: {response.text}") return False except Exception as e: logging.error(f"[检查代理]代理 {proxy} 不可用: {str(e)}") return False def get_wechat_account_list(self): """ 从养鸡场接口获取账号列表 :return: 账号列表/False """ try: if not WX_TOKEN: logging.error("[获取账号列表]未设置环境变量 wx_token") return False url = f"{wx_cloud}/prod-api/wechat/wechat/list?pageNum=1&pageSize=1000" headers = { 'Authorization': WX_TOKEN, 'Content-Type': 'application/json', } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() res_data = response.json() if res_data and res_data.get('code') == 200 and isinstance(res_data.get('rows'), list): accounts = res_data['rows'] logging.info(f"[获取账号列表]成功获取到 {len(accounts)} 个账号") return accounts else: logging.error(f"[获取账号列表]接口返回异常: {json.dumps(res_data)}") return False except Exception as e: logging.error(f"[获取账号列表]发生错误: {str(e)}\n{traceback.format_exc()}") return False def get_wx_code_yjc(self, wxid): """ 使用养鸡场接口获取微信小程序Code :param wxid: 微信ID :return: code/False """ try: if not WX_TOKEN: logging.error("[获取Code]未设置环境变量 wx_token") return False url = f"{wx_cloud}/prod-api/wechat/api/getMiniProgramCode" headers = { 'Authorization': WX_TOKEN, 'Content-Type': 'application/json' } payload = { "wxid": wxid, "appid": self.wx_appid } response = requests.post(url, json=payload, headers=headers, timeout=10) response.raise_for_status() res_data = response.json() if res_data and res_data.get('code') == 200 and res_data.get('data', {}).get('code'): code = res_data['data']['code'] logging.info(f"[获取Code]成功获取 wxid:{wxid} 的code") return code else: logging.error(f"[获取Code]接口返回异常: {json.dumps(res_data)}") return False except Exception as e: logging.error(f"[获取Code]wxid:{wxid} 发生错误: {str(e)}\n{traceback.format_exc()}") return False def wxlogin(self, session, code): """ 登录 :param session: session :param code: 微信code :return: 登录结果 """ try: url = f"https://{self.host}/wechat/login" params = { "code": code, "channelId": 154 } response = session.get(url, params=params, timeout=10) response.raise_for_status() response_json = response.json() if response_json.get('code') == 0: tel = response_json['data'].get('tel', '') # 号码中间4位*号代替 if tel: tel = tel[:3] + "****" + tel[-4:] logging.info(f"[登录]成功: 当前账号 {tel}") token = response_json['data']['token'] session.headers["Token"] = token return True else: logging.error(f"[登录]发生错误: {response_json.get('msg', '未知错误')}") return False except requests.RequestException as e: logging.error(f"[登录]发生网络错误: {str(e)}\n{traceback.format_exc()}") return False except Exception as e: logging.error(f"[登录]发生错误: {str(e)}\n{traceback.format_exc()}") return False def sign_in(self, session): """ 签到 :param session: session :return: 签到结果 """ try: url = f"https://{self.host}/api/v2/sign_join" response = session.get(url, timeout=10) response.raise_for_status() response_json = response.json() if response_json.get('code') == 0: logging.info(f"[签到]: 成功") return True else: logging.warning(f"[签到]: {response_json.get('msg', '未知错误')}") return False except Exception as e: logging.error(f"[签到]发生错误: {str(e)}\n{traceback.format_exc()}") return False def add_lottery_count(self, session): """ 增加抽奖次数 :param session: session :return: 增加抽奖次数结果 """ try: url = f"https://{self.host}/front/activity/add_lottery_count" response = session.get(url, timeout=10) response.raise_for_status() response_json = response.json() if response_json.get('code') == 0: logging.info(f"[增加抽奖次数]: 成功") return True elif "达到上限" in response_json.get('msg', ''): logging.warning(f"[增加抽奖次数]: {response_json.get('msg')}") return False else: logging.error(f"[增加抽奖次数]发生错误: {response_json.get('msg', '未知错误')}") return False except Exception as e: logging.error(f"[增加抽奖次数]发生错误: {str(e)}\n{traceback.format_exc()}") return False def update_lottery_result(self, session): """ 更新抽奖结果(执行抽奖) :param session: session :return: 抽奖结果 """ try: url = f"https://{self.host}/front/activity/update_lottery_result" params = { "id": 3438615 } response = session.get(url, params=params, timeout=10) response.raise_for_status() response_json = response.json() if response_json.get('code') == 0: good_name = response_json['data'].get('goodName', '未知奖励') logging.info(f"[抽奖]: 获得{good_name}") return True else: logging.warning(f"[抽奖]: {response_json.get('msg', '未知错误')}") return False except Exception as e: logging.error(f"[抽奖]发生错误: {str(e)}\n{traceback.format_exc()}") return False def get_withdrawal_trade_list(self, session): """ 获取提现相关数据 :param session: session :return: (余额, 提现数据)/False """ try: url = f"https://{self.host}/api/h/get_withdrawal_trade_list" response = session.get(url, timeout=10) response.raise_for_status() response_json = response.json() if response_json.get('code') == 0 and isinstance(response_json.get('data'), list) and len(response_json['data']) > 0: balance = float(response_json['data'][0].get('money', 0)) logging.info(f"[余额]: {balance}元") return balance, response_json['data'] else: logging.error(f"[获取提现相关数据]发生错误: {response_json.get('msg', '未知错误')}") return False except Exception as e: logging.error(f"[获取提现相关数据]发生错误: {str(e)}\n{traceback.format_exc()}") return False def withdraw(self, session, balance, withdrawal_trade_list): """ 提现 :param session: session :param balance: 余额 :param withdrawal_trade_list: 提现相关数据 :return: 提现结果 """ try: url = f"https://{self.host}/api/h/withdrawal" payload = { "totalMoney": balance, "type": 1, "withdrawalDetailPojoList": withdrawal_trade_list } response = session.post(url, json=payload, timeout=10) response.raise_for_status() response_json = response.json() if response_json.get('code') == 0: logging.info(f"[提现]: {response_json.get('msg', '提现成功')}") return True else: logging.warning(f"[提现]: {response_json.get('msg', '提现失败')}") return False except Exception as e: logging.error(f"[提现]发生错误: {str(e)}\n{traceback.format_exc()}") return False def run(self): """ 运行任务 """ try: logging.info(f"【{self.site_name}】开始执行任务") # 从养鸡场获取账号列表 accounts = self.get_wechat_account_list() if not accounts: logging.error("【{self.site_name}】获取账号列表失败,任务终止") return # 过滤需要剔除的账号 combined_remove = set(remove_wxids1) filtered_accounts = [acc for acc in accounts if acc.get('wxId') not in combined_remove] logging.info(f"[账号过滤]剔除后剩余 {len(filtered_accounts)} 个账号") # 执行每个账号的任务 for index, account in enumerate(filtered_accounts, 1): wxid = account.get('wxId') wxname = account.get('wxName', '未知昵称') logging.info("") logging.info(f"------ 【账号{index}: {wxname} ({wxid})】开始执行任务 ------") # 初始化session和代理 if MULTI_ACCOUNT_PROXY: proxy = self.get_proxy() session = requests.Session() if proxy: session.proxies.update({ "http": f"http://{proxy}", "https": f"http://{proxy}" }) # 检查代理,不可用则不使用代理 if not self.check_proxy(proxy, session): session.proxies.clear() logging.warning("[代理]使用默认网络环境") else: session = requests.Session() session.headers["User-Agent"] = self.user_agent # 执行微信授权获取code code = self.get_wx_code_yjc(wxid) if code: # 登录 login_result = self.wxlogin(session, code) time.sleep(random.randint(1, 3)) if login_result: # 签到 self.sign_in(session) time.sleep(random.randint(1, 3)) # 抽奖循环 lottery_result = self.update_lottery_result(session) while lottery_result: time.sleep(random.randint(3, 5)) lottery_result = self.update_lottery_result(session) # 增加抽奖次数并继续抽奖 add_count_result = self.add_lottery_count(session) while add_count_result: self.update_lottery_result(session) time.sleep(random.randint(3, 5)) add_count_result = self.add_lottery_count(session) # 提现逻辑 withdraw_data = self.get_withdrawal_trade_list(session) if withdraw_data: balance, withdrawal_list = withdraw_data if balance >= DEFAULT_WITHDRAW_BALANCE: self.withdraw(session, balance, withdrawal_list) time.sleep(random.randint(1, 3)) else: logging.warning(f"[提现]: 余额{balance}元不足{DEFAULT_WITHDRAW_BALANCE}元,不进行提现") else: logging.warning("[提现]: 获取提现数据失败") else: logging.error(f"[授权]wxid:{wxid} 获取Code失败,跳过该账号") logging.info(f"------ 【账号{index}: {wxname} ({wxid})】执行任务完成 ------") # 账号间增加延迟,避免请求过快 if index < len(filtered_accounts): time.sleep(random.randint(2, 5)) except Exception as e: logging.error(f"【{self.site_name}】执行过程中发生错误: {str(e)}\n{traceback.format_exc()}") finally: logging.info(f"【{self.site_name}】所有账号任务执行完毕") if __name__ == "__main__": # 检查必要环境变量 if not WX_TOKEN: print("错误:请先设置环境变量 wx_token") exit(1) auto_task = AutoTask("铛铛一下") auto_task.run()