Files
Yangmao_Script/Points_Based/D_SFSY_loader.py

1198 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# cron: 11 9,18 * * *
# const $ = new Env("顺丰日常任务版");
"""
顺丰速运日常积分任务
Author: 爱学习的呆子
Version: 1.3.0
Date: 2026-04-04
"""
import hashlib
import json
import os
import random
import time
from datetime import datetime
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from urllib.parse import unquote, urlparse, parse_qs
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 禁用SSL警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# ==================== 代理相关配置常量 ====================
PROXY_TIMEOUT = 15 # 代理超时时间(秒)
MAX_PROXY_RETRIES = 5 # 最大代理重试次数
REQUEST_RETRY_COUNT = 3 # 请求重试次数
# ==================== 并发配置常量 ====================
CONCURRENT_NUM = int(os.getenv('SFBF', '1')) # 并发数量默认为1串行最大20
if CONCURRENT_NUM > 20:
CONCURRENT_NUM = 20
print(f'⚠️ 并发数量超过最大值20已自动调整为20')
elif CONCURRENT_NUM < 1:
CONCURRENT_NUM = 1
print(f'⚠️ 并发数量小于1已自动调整为1串行模式')
# 全局线程锁
print_lock = Lock() # 用于保护打印输出
# ==================== 配置类 ====================
@dataclass
class Config:
"""全局配置"""
APP_NAME: str = "顺丰速运"
VERSION: str = "1.2.0"
ENV_NAME: str = "sfsyUrl"
PROXY_API_URL: str = os.getenv('SF_PROXY_API_URL', '')
# 代理相关配置常量
PROXY_TIMEOUT = 15 # 代理时间(秒)
MAX_PROXY_RETRIES = 5 # 最大代理重试次数
REQUEST_RETRY_COUNT = 3 # 请求重试次数
# API签名配置
TOKEN: str = 'wwesldfs29aniversaryvdld29'
SYS_CODE: str = 'MCS-MIMP-CORE'
# 任务跳过列表
SKIP_TASKS: List[str] = None
def __post_init__(self):
if self.SKIP_TASKS is None:
# 尝试直接提交所有任务,看看能否领取奖励
# 原本跳过的任务:'用行业模板寄件下单'、'去新增一个收件偏好'
self.SKIP_TASKS = ['用行业模板寄件下单','用积分兑任意礼品','参与积分活动','每月累计寄件','完成每月任务','去使用AI寄件']
# ==================== 日志系统 ====================
class Logger:
"""
日志管理器 - 实现图片中的日志风格
"""
# 日志图标
ICONS = {
'task_found': '🎯', # 发现任务
'task_skip': '⏭️', # 跳过任务
'task_complete': '', # 任务完成
'reward_get': '🎁', # 奖励领取
'info': '📝', # 普通信息
'success': '', # 成功
'error': '', # 错误
'warning': '⚠️', # 警告
'user': '👤', # 用户信息
'money': '💰', # 积分/金币
'gift': '🎁', # 礼物
'target': '🎯', # 目标
}
def __init__(self):
self.messages: List[str] = []
self.current_account_msg: List[str] = []
self.lock = Lock() # 每个Logger实例独立的锁
def _format_msg(self, icon: str, content: str) -> str:
"""格式化消息"""
return f"{icon} {content}"
def _safe_print(self, msg: str):
"""线程安全的打印"""
with print_lock:
print(msg)
def task_found(self, task_name: str, status: int = 2):
"""发现任务"""
msg = self._format_msg(self.ICONS['task_found'], f"发现任务: {task_name} (状态: {status})")
self._safe_print(msg)
with self.lock:
self.current_account_msg.append(msg)
self.messages.append(msg)
def task_skip(self, task_name: str):
"""跳过任务"""
msg = self._format_msg(self.ICONS['task_skip'], f"[{task_name}] 已跳过")
self._safe_print(msg)
with self.lock:
self.current_account_msg.append(msg)
self.messages.append(msg)
def task_complete(self, task_name: str):
"""任务完成"""
msg = self._format_msg(self.ICONS['task_complete'], f"[{task_name}] 提交成功")
self._safe_print(msg)
with self.lock:
self.current_account_msg.append(msg)
self.messages.append(msg)
def reward_get(self, task_name: str):
"""奖励领取成功"""
msg = self._format_msg(self.ICONS['reward_get'], f"[{task_name}] 奖励领取成功")
self._safe_print(msg)
with self.lock:
self.current_account_msg.append(msg)
self.messages.append(msg)
def info(self, content: str):
"""普通信息"""
msg = self._format_msg(self.ICONS['info'], content)
self._safe_print(msg)
with self.lock:
self.current_account_msg.append(msg)
self.messages.append(msg)
def success(self, content: str):
"""成功信息"""
msg = self._format_msg(self.ICONS['success'], content)
self._safe_print(msg)
with self.lock:
self.current_account_msg.append(msg)
self.messages.append(msg)
def error(self, content: str):
"""错误信息"""
msg = self._format_msg(self.ICONS['error'], content)
self._safe_print(msg)
with self.lock:
self.current_account_msg.append(msg)
self.messages.append(msg)
def warning(self, content: str):
"""警告信息"""
msg = self._format_msg(self.ICONS['warning'], content)
self._safe_print(msg)
with self.lock:
self.current_account_msg.append(msg)
self.messages.append(msg)
def user_info(self, account_index: int, mobile: str):
"""用户信息"""
msg = self._format_msg(self.ICONS['user'], f"账号{account_index}: 【{mobile}】登录成功")
self._safe_print(msg)
with self.lock:
self.current_account_msg.append(msg)
self.messages.append(msg)
def points_info(self, points: int, prefix: str = "当前积分"):
"""积分信息"""
msg = self._format_msg(self.ICONS['money'], f"{prefix}: 【{points}")
self._safe_print(msg)
with self.lock:
self.current_account_msg.append(msg)
self.messages.append(msg)
def reset_account_msg(self):
"""重置当前账号消息"""
self.current_account_msg = []
def get_all_messages(self) -> str:
"""获取所有消息"""
return '\n'.join(self.messages)
def get_account_messages(self) -> str:
"""获取当前账号消息"""
return '\n'.join(self.current_account_msg)
# ==================== 代理管理器 ====================
class ProxyManager:
"""代理管理器"""
def __init__(self, api_url: str):
self.api_url = api_url
self.logger = Logger()
def get_proxy(self) -> Optional[Dict[str, str]]:
"""获取代理
返回格式:{'http': 'http://ip:port', 'https': 'http://ip:port'}
"""
try:
if not self.api_url:
print('⚠️ 未配置代理API地址将不使用代理')
return None
response = requests.get(self.api_url, timeout=10)
if response.status_code == 200:
proxy_text = response.text.strip()
if ':' in proxy_text:
# 构建代理URL
if proxy_text.startswith('http://') or proxy_text.startswith('https://'):
proxy = proxy_text
else:
proxy = f'http://{proxy_text}'
# 隐藏认证信息用于显示(如果有的话)
display_proxy = proxy
if '@' in proxy:
# 格式: http://user:pass@ip:port
parts = proxy.split('@')
if len(parts) == 2:
display_proxy = f"http://***:***@{parts[1]}"
print(f"✅ 成功获取代理: {display_proxy}")
return {'http': proxy, 'https': proxy}
print(f'❌ 获取代理失败: {response.text}')
return None
except Exception as e:
print(f'❌ 获取代理异常: {str(e)}')
return None
# ==================== HTTP客户端 ====================
class SFHttpClient:
"""顺丰HTTP客户端"""
def __init__(self, config: Config, proxy_manager: ProxyManager):
self.config = config
self.proxy_manager = proxy_manager
self.session = requests.Session()
self.session.verify = False
# 设置代理
proxy = self.proxy_manager.get_proxy()
if proxy:
self.session.proxies = proxy
# 默认请求头
self.headers = {
'Host': 'mcs-mimp-web.sf-express.com',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63090551) XWEB/6945 Flue',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'sec-fetch-site': 'none',
'sec-fetch-mode': 'navigate',
'sec-fetch-user': '?1',
'sec-fetch-dest': 'document',
'accept-language': 'zh-CN,zh',
'platform': 'MINI_PROGRAM',
}
def _generate_sign(self) -> Dict[str, str]:
"""生成API签名"""
timestamp = str(int(round(time.time() * 1000)))
data = f'token={self.config.TOKEN}&timestamp={timestamp}&sysCode={self.config.SYS_CODE}'
signature = hashlib.md5(data.encode()).hexdigest()
return {
'sysCode': self.config.SYS_CODE,
'timestamp': timestamp,
'signature': signature
}
def request(
self,
url: str,
method: str = 'POST',
data: Optional[Dict] = None,
max_retries: int = REQUEST_RETRY_COUNT
) -> Optional[Dict[str, Any]]:
"""发送HTTP请求带双层重试机制
Args:
url: 请求URL
method: 请求方法 GET/POST
data: 请求数据
max_retries: 最大重试次数
Returns:
响应JSON数据或None
"""
# 更新签名
sign_data = self._generate_sign()
self.headers.update(sign_data)
retry_count = 0
proxy_retry_count = 0
while proxy_retry_count < MAX_PROXY_RETRIES:
try:
# 如果请求重试次数达到2次尝试切换代理
if retry_count >= 2:
print('请求已失败2次尝试切换代理IP')
new_proxy = self.proxy_manager.get_proxy()
if new_proxy:
self.session.proxies = new_proxy
else:
print('⚠️ 切换代理失败,无可用代理')
retry_count = 0 # 重置请求重试计数
try:
if method.upper() == 'GET':
response = self.session.get(url, headers=self.headers, timeout=PROXY_TIMEOUT)
elif method.upper() == 'POST':
response = self.session.post(url, headers=self.headers, json=data or {}, timeout=PROXY_TIMEOUT)
else:
raise ValueError(f'不支持的请求方法: {method}')
# 检查响应状态码
response.raise_for_status()
try:
res = response.json()
if res is None:
print(f'响应内容为空,正在重试 ({retry_count + 1}/{max_retries})')
retry_count += 1
time.sleep(2)
continue
return res
except (json.JSONDecodeError, ValueError) as e:
print(f'JSON解析失败: {str(e)}, 响应内容: {response.text[:200]}')
retry_count += 1
if retry_count < max_retries:
print(f'正在进行第{retry_count + 1}次重试...')
time.sleep(2)
continue
return None
except requests.exceptions.RequestException as e:
retry_count += 1
print(f'请求失败,正在重试 ({retry_count}/{max_retries}): {str(e)}')
# 如果是代理错误或SSL错误增加代理重试计数
if 'ProxyError' in str(e) or 'SSLError' in str(e):
proxy_retry_count += 1
print(f'代理连接失败,尝试切换代理 ({proxy_retry_count}/{MAX_PROXY_RETRIES})')
if proxy_retry_count < MAX_PROXY_RETRIES:
new_proxy = self.proxy_manager.get_proxy()
if new_proxy:
self.session.proxies = new_proxy
time.sleep(2)
continue
except Exception as e:
print(f'请求发生异常: {str(e)}')
proxy_retry_count += 1
if proxy_retry_count < MAX_PROXY_RETRIES:
print(f'尝试切换代理 ({proxy_retry_count}/{MAX_PROXY_RETRIES})')
time.sleep(2)
continue
else:
print('达到最大代理重试次数返回None')
return None
print('请求最终失败返回None')
return None
def login(self, url: str, timeout: int = PROXY_TIMEOUT) -> tuple[bool, str, str]:
"""
登录兼容URL和CK格式
Args:
url: 登录URL 或 CK字符串(sessionId=xxx;_login_mobile_=xxx;_login_user_id_=xxx)
timeout: 超时时间(秒)
Returns:
tuple: (是否成功, user_id, 手机号)
"""
try:
decoded_input = unquote(url)
if decoded_input.startswith('sessionId=') or '_login_mobile_=' in decoded_input:
cookie_dict = {}
for item in decoded_input.split(';'):
item = item.strip()
if '=' in item:
k, v = item.split('=', 1)
cookie_dict[k] = v
for k, v in cookie_dict.items():
self.session.cookies.set(k, v, domain='mcs-mimp-web.sf-express.com')
user_id = cookie_dict.get('_login_user_id_', '')
phone = cookie_dict.get('_login_mobile_', '')
if phone:
return True, user_id, phone
else:
return False, '', ''
else:
decoded_url = unquote(url)
self.session.get(decoded_url, headers=self.headers, timeout=timeout)
cookies = self.session.cookies.get_dict()
user_id = cookies.get('_login_user_id_', '')
phone = cookies.get('_login_mobile_', '')
if phone:
return True, user_id, phone
else:
return False, '', ''
except Exception as e:
print(f'登录异常: {str(e)}')
return False, '', ''
# ==================== 任务执行器 ====================
class TaskExecutor:
"""任务执行器"""
def __init__(
self,
http_client: SFHttpClient,
logger: Logger,
config: Config,
user_id: str
):
self.http = http_client
self.logger = logger
self.config = config
self.user_id = user_id
self.total_points = 0
# 任务相关属性
self.taskId = ""
self.taskCode = ""
self.strategyId = ""
self.title = ""
@staticmethod
def generate_device_id(characters: str = 'abcdef0123456789') -> str:
"""生成设备ID"""
result = ''
for char in 'xxxxxxxx-xxxx-xxxx':
if char == 'x':
result += random.choice(characters)
else:
result += char
return result
def _extract_task_id_from_url(self, url: str) -> str:
"""从URL中提取taskId"""
try:
from urllib.parse import parse_qs, urlparse, unquote
import json
# 处理_ug_view_param参数
parsed = urlparse(url)
params = parse_qs(parsed.query)
if '_ug_view_param' in params:
ug_params = json.loads(unquote(params['_ug_view_param'][0]))
if 'taskId' in ug_params:
return str(ug_params['taskId']) # 确保返回字符串类型
# 如果URL是JSON格式的尝试解析
if url.startswith('com.sf-express://'):
json_str = url.split('_ug_view_param=')[1]
ug_params = json.loads(unquote(json_str))
if 'taskId' in ug_params:
return str(ug_params['taskId']) # 确保返回字符串类型
except Exception as e:
self.logger.warning(f'从URL提取taskId失败: {e}')
return ''
def _set_task_attrs(self, task: Dict) -> None:
"""设置任务属性"""
self.taskId = str(task.get('taskId', '')) # 确保是字符串类型
self.taskCode = str(task.get('taskCode', '')) # 确保是字符串类型
self.strategyId = int(task.get('strategyId', 0)) # 确保是整数类型
self.title = str(task.get('title', '未知任务'))
self.point = int(task.get('point', 0)) # 确保是整数类型
# 如果taskCode为空尝试从buttonRedirect中提取
if not self.taskCode and 'buttonRedirect' in task:
extracted_task_id = self._extract_task_id_from_url(task['buttonRedirect'])
if extracted_task_id:
self.taskCode = extracted_task_id
self.logger.info(f'从buttonRedirect中提取到taskId: {self.taskCode}')
def app_sign_in(self) -> tuple[bool, str]:
"""APP每日签到使用getUnFetchPointAndDiscount接口触发签到+领取)
Returns:
tuple[bool, str]: (是否成功, 错误信息)
"""
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~integralTaskSignPlusService~getUnFetchPointAndDiscount'
data = {}
# 保存原有的platform头
original_platform = self.http.headers.get('platform', 'MINI_PROGRAM')
# 临时切换为APP平台
self.http.headers['platform'] = 'SFAPP'
try:
response = self.http.request(url, data=data)
if response and response.get('success'):
obj = response.get('obj', [])
# 响应是一个数组,包含待领取的奖励
if obj and isinstance(obj, list) and len(obj) > 0:
total_points = 0
reward_names = []
for item in obj:
packet_name = item.get('packetName', '未知奖励')
detail_value = item.get('detailValue', '0')
reward_names.append(packet_name)
try:
total_points += int(detail_value)
except:
pass
self.logger.success(f'[APP签到] 签到成功,获得【{", ".join(reward_names)}')
else:
self.logger.info(f'[APP签到] 今日已签到或无可领取奖励')
return True, ''
else:
error_msg = response.get('errorMessage', '未知错误') if response else '请求失败'
# 如果返回"没有待领取礼包"等待1秒后再次调用接口
if '没有待领取礼包' in error_msg:
self.logger.info(f'[APP签到] 检测到需要二次领取等待1秒后重试...')
time.sleep(1)
# 再次调用getUnFetchPointAndDiscount接口
response2 = self.http.request(url, data=data)
if response2 and response2.get('success'):
obj2 = response2.get('obj', [])
if obj2 and isinstance(obj2, list) and len(obj2) > 0:
total_points = 0
reward_names = []
for item in obj2:
packet_name = item.get('packetName', '未知奖励')
detail_value = item.get('detailValue', '0')
reward_names.append(packet_name)
try:
total_points += int(detail_value)
except:
pass
self.logger.success(f'[APP签到] 二次领取成功,获得【{", ".join(reward_names)}')
else:
self.logger.info(f'[APP签到] 二次领取完成,但无可领取奖励')
return True, ''
else:
error_msg2 = response2.get('errorMessage', '未知错误') if response2 else '请求失败'
self.logger.error(f'[APP签到] 二次领取失败: {error_msg2}')
return False, error_msg2
else:
self.logger.error(f'[APP签到] 失败: {error_msg}')
return False, error_msg
finally:
# 恢复原有的platform头
self.http.headers['platform'] = original_platform
def sign_in(self) -> tuple[bool, str]:
"""小程序每日签到
Returns:
tuple[bool, str]: (是否成功, 错误信息)
"""
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~integralTaskSignPlusService~automaticSignFetchPackage'
data = {"comeFrom": "vioin", "channelFrom": "WEIXIN"}
response = self.http.request(url, data=data)
if response and response.get('success'):
count_day = response.get('obj', {}).get('countDay', 0)
packet_list = response.get('obj', {}).get('integralTaskSignPackageVOList', [])
if packet_list:
packet_name = packet_list[0].get('packetName', '未知奖励')
self.logger.success(f'签到成功,获得【{packet_name}】,本周累计签到【{count_day + 1}】天')
else:
self.logger.info(f'今日已签到,本周累计签到【{count_day + 1}】天')
return True, ''
else:
error_msg = response.get('errorMessage', '未知错误') if response else '请求失败'
self.logger.error(f'签到失败: {error_msg}')
return False, error_msg
def new_sign_in(self) -> tuple[bool, str]:
"""新签到integralSignV2Service
Returns:
tuple[bool, str]: (是否成功, 错误信息)
"""
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~integralSignV2Service~sign'
data = {}
original_platform = self.http.headers.get('platform', 'MINI_PROGRAM')
self.http.headers['platform'] = 'SFAPP'
try:
response = self.http.request(url, data=data)
if response and response.get('success'):
obj = response.get('obj', {})
signed = obj.get('signed', False)
day_count = obj.get('dayCount', 0)
total_count = obj.get('totalCount', 0)
award = obj.get('award', {})
award_type = obj.get('awardType', '')
award_num = obj.get('awardNum', 0)
if signed and award:
gift_bag_name = award.get('giftBagName', '未知奖励')
self.logger.success(f'[新签到] 签到成功,连续第{day_count}天,获得【{gift_bag_name}')
elif signed:
self.logger.info(f'[新签到] 今日已签到,连续第{day_count}')
else:
self.logger.info(f'[新签到] 签到完成')
return True, ''
else:
error_msg = response.get('errorMessage', '未知错误') if response else '请求失败'
self.logger.error(f'[新签到] 失败: {error_msg}')
return False, error_msg
finally:
self.http.headers['platform'] = original_platform
def get_task_list(self) -> List[Dict]:
"""获取任务列表"""
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~integralTaskStrategyService~queryPointTaskAndSignFromES'
all_tasks = []
task_codes_seen = set()
for channel_type in ['1', '2', '3', '4','01','02','03','04']:
data = {
'channelType': channel_type,
'deviceId': self.generate_device_id(),
}
response = self.http.request(url, data=data)
if response and response.get('success') and response.get('obj'):
# 只在第一次请求时获取总积分
if channel_type == '1':
self.total_points = response['obj'].get('totalPoint', 0)
tasks = response['obj'].get('taskTitleLevels', [])
# 去重添加任务
for task in tasks:
task_code = task.get('taskCode')
task_title = task.get('title', '未知任务')
# 尝试提取taskId
if 'buttonRedirect' in task:
extracted_id = self._extract_task_id_from_url(task['buttonRedirect'])
if extracted_id and not task_code:
task_code = extracted_id
task['taskCode'] = extracted_id
# 如果taskCode为空但能从buttonRedirect中提取到taskId则使用提取的taskId
if not task_code and 'buttonRedirect' in task:
extracted_id = self._extract_task_id_from_url(task['buttonRedirect'])
if extracted_id:
task['taskCode'] = extracted_id
task_code = extracted_id
# 如果taskCode仍然为空则跳过
if not task_code:
continue
# 检查是否已存在相同taskCode的任务
if task_code not in task_codes_seen:
task_codes_seen.add(task_code)
all_tasks.append(task)
else:
error_msg = response.get('errorMessage', '未知错误') if response else '请求失败'
self.logger.warning(f'获取 channelType={channel_type} 的任务失败: {error_msg}')
return all_tasks
def execute_task(self) -> bool:
"""执行单个任务"""
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonRoutePost/memberEs/taskRecord/finishTask'
data = {'taskCode': self.taskCode}
response = self.http.request(url, data=data)
if response and response.get('success'):
return True
return False
def _update_points(self):
"""更新积分显示"""
tasks = self.get_task_list()
if tasks:
self.logger.points_info(self.total_points, "当前积分")
def receive_task_reward(self) -> bool:
"""领取任务奖励"""
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~integralTaskStrategyService~fetchIntegral'
data = {
"strategyId": self.strategyId,
"taskId": self.taskId,
"taskCode": self.taskCode,
"deviceId": self.generate_device_id()
}
response = self.http.request(url, data=data)
if response:
if response.get('success'):
self.logger.success(f'成功领取任务奖励: {self.title}')
return True
return False
def get_welfare_list(self) -> List[Dict]:
"""获取生活特权列表"""
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberGoods~mallGoodsLifeService~list'
data = {
"memGrade": 3,
"categoryCode": "SHTQ",
"showCode": "SHTQWNTJ"
}
response = self.http.request(url, data=data)
if response and response.get('success'):
obj_list = response.get('obj', [])
# 收集所有可领取的特权
welfare_list = []
for module in obj_list:
goods_list = module.get('goodsList', [])
for goods in goods_list:
# exchangeStatus=1 表示可以领取
if goods.get('exchangeStatus') == 1:
welfare_list.append({
'goodsId': goods.get('goodsId'),
'goodsNo': goods.get('goodsNo'),
'goodsName': goods.get('goodsName'),
'showName': goods.get('showName', ''),
'id': goods.get('id')
})
return welfare_list
return []
def receive_welfare(self, goods_no: str, goods_name: str, task_code: str) -> bool:
"""领取生活特权"""
url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberGoods~pointMallService~createOrder'
data = {
"from": "Point_Mall",
"orderSource": "POINT_MALL_EXCHANGE",
"goodsNo": goods_no,
"quantity": 1,
"taskCode": task_code
}
response = self.http.request(url, data=data)
if response and response.get('success'):
order_no = response.get('obj', {}).get('orderNo', '')
self.logger.success(f'成功领取生活特权: {goods_name} (订单号: {order_no})')
return True
else:
error_msg = response.get('errorMessage', '未知错误') if response else '请求失败'
self.logger.error(f'领取生活特权失败: {goods_name} - {error_msg}')
return False
def handle_welfare_task(self, task_title: str) -> bool:
"""处理领取生活特权任务"""
self.logger.info('正在获取生活特权列表...')
welfare_list = self.get_welfare_list()
if not welfare_list:
self.logger.warning('没有可领取的生活特权')
return False
self.logger.info(f'找到 {len(welfare_list)} 个可领取的生活特权')
# 尝试领取第一个可用的特权
for welfare in welfare_list:
goods_no = welfare.get('goodsNo')
goods_name = welfare.get('goodsName')
show_name = welfare.get('showName')
if not goods_no:
continue
display_name = f"{show_name} - {goods_name}" if show_name else goods_name
# 使用任务的 taskCode
if self.receive_welfare(goods_no, display_name, self.taskCode):
return True
# 如果领取失败,等待一下再尝试下一个
time.sleep(1)
return False
def run_all_tasks(self) -> tuple[int, int]:
"""执行所有任务
Returns:
tuple: (执行前积分, 执行后积分)
"""
print('-'*50)
# 只在这里显示一次任务列表更新信息
self.logger.info('正在获取任务列表...')
tasks = self.get_task_list()
if not tasks:
self.logger.error('获取任务列表失败')
return (0, 0)
points_before = self.total_points
self.logger.points_info(points_before, "执行前积分")
for task in tasks:
task_title = task.get('title', '未知任务')
task_status = task.get('status')
# 状态3表示已完成
if task_status == 3:
self.logger.success(f'{task_title} - 已完成')
continue
# 跳过特定任务
if task_title in self.config.SKIP_TASKS:
self.logger.task_skip(task_title)
continue
# 提取任务属性
self._set_task_attrs(task)
# 检查是否成功提取 taskCode
if not self.taskCode:
# 如果taskCode为空尝试从buttonRedirect中提取
if 'buttonRedirect' in task:
self.logger.info(f'尝试从buttonRedirect中提取taskCode: {task_title}')
extracted_task_id = self._extract_task_id_from_url(task['buttonRedirect'])
if extracted_task_id:
self.taskCode = extracted_task_id
self.logger.info(f'成功从buttonRedirect中提取到taskCode: {self.taskCode}')
else:
self.logger.warning(f'{task_title} - 无法从buttonRedirect提取taskCode跳过')
continue
else:
self.logger.warning(f'{task_title} - 无法提取taskCode跳过')
continue
# 发现任务
self.logger.task_found(task_title, task_status)
# 特殊任务处理 - 需要在状态判断之前处理
if '领任意生活特权福利' in task_title:
# 先处理生活特权领取
if self.handle_welfare_task(task_title):
time.sleep(2)
# 然后执行任务提交
if self.execute_task():
self.logger.task_complete(task_title)
time.sleep(2)
# 领取奖励
if self.receive_task_reward():
self.logger.reward_get(task_title)
self._update_points()
else:
self.logger.warning(f'任务执行失败: {task_title}')
else:
self.logger.warning(f'{task_title} - 无法完成,跳过')
time.sleep(3)
continue
# 状态1表示需要先执行任务
if task_status == 1:
# 特殊处理连签7天任务
if '连签7天' in task_title and 'process' in task:
current, total = map(int, task['process'].split('/'))
if current < total:
self.logger.info(f'{task_title}】进度: {task["process"]},还需{total - current}')
continue
if self.execute_task():
self.logger.task_complete(task_title)
time.sleep(2)
# 执行成功后将状态更新为2可领取奖励
task_status = 2
else:
self.logger.warning(f'任务执行失败: {task_title}')
continue
# 状态2表示可领取奖励
if task_status == 2:
# 先尝试直接领取奖励
if self.receive_task_reward():
self.logger.reward_get(task_title)
# 更新积分
self._update_points()
continue
# 如果直接领取失败,尝试先执行任务再领取
if self.execute_task():
self.logger.task_complete(task_title)
time.sleep(2)
# 再次尝试领取奖励
if self.receive_task_reward():
self.logger.reward_get(task_title)
self._update_points()
else:
self.logger.warning(f'任务执行失败: {task_title}')
continue
time.sleep(3)
# 获取最新积分
tasks = self.get_task_list()
points_after = self.total_points if tasks else points_before
if tasks:
self.logger.points_info(points_after, "执行后积分")
return (points_before, points_after)
# ==================== 账号管理器 ====================
class AccountManager:
"""账号管理器"""
def __init__(self, account_url: str, account_index: int, config: Config):
self.account_url = account_url
self.account_index = account_index + 1
self.config = config
self.logger = Logger()
self.proxy_manager = ProxyManager(config.PROXY_API_URL)
# 登录重试机制(参考顺丰代理.py的实现
self.login_success = False
self.user_id = None
self.phone = None
self.http_client = None
retry_count = 0
while retry_count < MAX_PROXY_RETRIES and not self.login_success:
try:
# 每次重试都重新获取代理和创建HTTP客户端
self.http_client = SFHttpClient(config, self.proxy_manager)
# 尝试登录(带超时)
success, self.user_id, self.phone = self.http_client.login(account_url)
if success:
masked_phone = self.phone[:3] + "*" * 4 + self.phone[7:]
self.logger.user_info(self.account_index, masked_phone)
self.login_success = True
break
else:
if retry_count < MAX_PROXY_RETRIES - 1:
print(f'账号{self.account_index} 登录失败,尝试重新获取代理 ({retry_count + 1}/{MAX_PROXY_RETRIES})')
time.sleep(2)
except Exception as e:
print(f'账号{self.account_index} 登录异常: {str(e)[:100]}')
retry_count += 1
# 如果所有代理重试都失败,记录错误
if not self.login_success:
self.logger.error(f'账号{self.account_index} 登录失败,已重试{MAX_PROXY_RETRIES}次,所有代理均不可用')
def run(self) -> Dict[str, Any]:
"""运行账号任务
Returns:
Dict: 包含账号统计信息的字典
"""
if not self.login_success:
return {
'success': False,
'phone': '',
'points_before': 0,
'points_after': 0,
'points_earned': 0
}
# 随机延迟
wait_time = random.randint(1000, 3000) / 1000.0
time.sleep(wait_time)
# 初始化任务执行器
executor = TaskExecutor(self.http_client, self.logger, self.config, self.user_id)
# 先执行APP签到
app_sign_success, app_error_msg = executor.app_sign_in()
time.sleep(1)
# 执行新签到
new_sign_success, new_sign_error = executor.new_sign_in()
time.sleep(1)
# 再执行小程序签到
sign_success, error_msg = executor.sign_in()
# 如果签到失败且错误信息包含“活动太火爆”,尝试重新登录
if not sign_success and '活动太火爆' in error_msg:
max_retries = 3
for retry in range(max_retries):
self.logger.warning(f'签到失败代理IP问题{2}秒后重新获取代理并重试(第{retry + 1}次)...')
time.sleep(2)
try:
# 重新创建HTTP客户端会自动获取新代理
self.http_client = SFHttpClient(self.config, self.proxy_manager)
# 重新登录
success, self.user_id, self.phone = self.http_client.login(self.account_url)
if success:
# 更新执行器的HTTP客户端
executor.http = self.http_client
executor.user_id = self.user_id
# 重试签到
sign_success, error_msg = executor.sign_in()
if sign_success:
self.logger.success('重新登录后签到成功')
break
elif '活动太火爆' not in error_msg:
# 如果不是代理问题,则不再重试
break
else:
if retry == max_retries - 1:
self.logger.error(f'重新登录失败,已重试{max_retries}')
except Exception as e:
if retry == max_retries - 1:
self.logger.error(f'重新登录异常: {str(e)[:100]},已重试{max_retries}')
# 执行其他任务
points_before, points_after = executor.run_all_tasks()
points_earned = points_after - points_before
# 返回统计信息
return {
'success': True,
'phone': self.phone,
'points_before': points_before,
'points_after': points_after,
'points_earned': points_earned
}
# ==================== 单账号执行函数 ====================
def run_single_account(account_info: str, index: int, config: Config) -> Dict[str, Any]:
"""
执行单个账号的任务(线程安全)
Args:
account_info: 账号信息
index: 账号索引
config: 配置对象
Returns:
Dict: 包含账号统计信息的字典
"""
try:
with print_lock:
print(f"🚀 开始执行账号{index + 1}")
account = AccountManager(account_info, index, config)
result = account.run()
if result['success']:
with print_lock:
print(f"✅ 账号{index + 1}执行完成")
else:
with print_lock:
print(f"❌ 账号{index + 1}执行失败")
result['index'] = index
return result
except Exception as e:
error_msg = f"账号{index + 1}执行异常: {str(e)}"
with print_lock:
print(f"{error_msg}")
return {
'index': index,
'success': False,
'phone': '',
'points_before': 0,
'points_after': 0,
'points_earned': 0,
'error': error_msg
}
# ==================== 主程序 ====================
def main():
"""主函数"""
config = Config()
env_value = os.getenv(config.ENV_NAME)
if not env_value:
print(f"❌ 未找到环境变量 {config.ENV_NAME},请检查配置")
return
account_urls = [url.strip() for url in env_value.split('&') if url.strip()]
if not account_urls:
print(f"❌ 环境变量 {config.ENV_NAME} 为空或格式错误")
return
# 随机打乱账号顺序
random.shuffle(account_urls)
print(f"🔀 已随机打乱账号执行顺序")
print("=" * 50)
print(f"🎉 {config.APP_NAME} v{config.VERSION}")
print(f"👨‍💻 作者: 爱学习的呆子")
print(f"📱 共获取到 {len(account_urls)} 个账号")
print(f"⚙️ 并发数量: {CONCURRENT_NUM}")
print(f"⏰ 执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 50)
# 收集所有账号的统计信息
all_results = []
if CONCURRENT_NUM <= 1:
# 串行执行模式
print("🔄 使用串行模式执行...")
for index, account_url in enumerate(account_urls):
account = AccountManager(account_url, index, config)
result = account.run()
result['index'] = index
all_results.append(result)
if index < len(account_urls) - 1:
print("=" * 50)
print(f"⏳ 等待 2 秒后执行下一个账号...")
time.sleep(2)
else:
# 并发执行模式
print(f"🔄 使用并发模式执行,并发数: {CONCURRENT_NUM}")
# 使用线程池执行
with ThreadPoolExecutor(max_workers=CONCURRENT_NUM) as executor:
# 提交所有任务
future_to_index = {
executor.submit(run_single_account, account_url, index, config): index
for index, account_url in enumerate(account_urls)
}
# 等待任务完成
for future in as_completed(future_to_index):
result = future.result()
all_results.append(result)
# 按索引排序结果
all_results.sort(key=lambda x: x['index'])
# 统计成功和失败数量
success_count = sum(1 for r in all_results if r['success'])
fail_count = len(all_results) - success_count
total_earned = sum(r['points_earned'] for r in all_results if r['success'])
# 显示汇总统计表格
print(f"\n" + "=" * 80)
print(f"📊 积分统计汇总")
print("=" * 80)
print(f"{'序号':<6} {'手机号':<15} {'今日获得积分':<15} {'总积分':<15} {'状态':<10}")
print("-" * 80)
for result in all_results:
index = result['index'] + 1
phone = result['phone'][:3] + "****" + result['phone'][7:] if result['phone'] else "未登录"
earned = result['points_earned']
total = result['points_after']
status = "✅成功" if result['success'] else "❌失败"
print(f"{index:<6} {phone:<15} {earned:<15} {total:<15} {status:<10}")
print("-" * 80)
print(f"{'汇总':<6} {'账号总数: ' + str(len(all_results)):<15} {'今日总获得: ' + str(total_earned):<15} {'':<15} {'成功: ' + str(success_count):<10}")
print("=" * 80)
print("\n🎊 所有账号任务执行完成!")
if __name__ == '__main__':
main()