2744 lines
118 KiB
Python
2744 lines
118 KiB
Python
"""
|
||
闲鱼订单详情获取工具
|
||
基于Playwright实现订单详情页面访问和数据提取
|
||
"""
|
||
|
||
import asyncio
|
||
import time
|
||
import sys
|
||
import os
|
||
from typing import Optional, Dict, Any, Tuple, List
|
||
from playwright.async_api import async_playwright, Browser, BrowserContext, Page
|
||
from loguru import logger
|
||
import re
|
||
import json
|
||
from threading import Lock
|
||
from collections import defaultdict
|
||
from utils.time_utils import parse_local_datetime_text_to_db_utc
|
||
|
||
# 修复Docker环境中的asyncio事件循环策略问题
|
||
if sys.platform.startswith('linux') or os.getenv('DOCKER_ENV'):
|
||
try:
|
||
# 在Linux/Docker环境中设置事件循环策略
|
||
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
|
||
except Exception as e:
|
||
logger.warning(f"设置事件循环策略失败: {e}")
|
||
|
||
# 确保在Docker环境中使用正确的事件循环
|
||
if os.getenv('DOCKER_ENV'):
|
||
try:
|
||
# 强制使用SelectorEventLoop(在Docker中更稳定)
|
||
if hasattr(asyncio, 'SelectorEventLoop'):
|
||
loop = asyncio.SelectorEventLoop()
|
||
asyncio.set_event_loop(loop)
|
||
except Exception as e:
|
||
logger.warning(f"设置SelectorEventLoop失败: {e}")
|
||
|
||
|
||
def _normalize_cached_amount(amount: Any) -> Optional[float]:
|
||
if amount in (None, ''):
|
||
return None
|
||
|
||
amount_clean = str(amount).replace('¥', '').replace('¥', '').replace('$', '').strip()
|
||
try:
|
||
return float(amount_clean)
|
||
except (ValueError, TypeError):
|
||
return None
|
||
|
||
|
||
def _is_coin_deduction_item_config(item_config: Dict[str, Any]) -> bool:
|
||
if not item_config:
|
||
return False
|
||
|
||
detail_text = str(item_config.get('item_detail') or '').strip()
|
||
return '闲鱼币抵扣' in detail_text
|
||
|
||
|
||
def _should_use_cached_order(existing_order: Dict[str, Any], item_config: Dict[str, Any] = None) -> bool:
|
||
if not existing_order:
|
||
return False
|
||
|
||
amount_value = _normalize_cached_amount(existing_order.get('amount'))
|
||
amount_valid = amount_value is not None and amount_value > 0
|
||
has_valid_spec = bool((existing_order.get('spec_name') or '').strip() and (existing_order.get('spec_value') or '').strip())
|
||
status_value = str(existing_order.get('order_status') or '').strip().lower()
|
||
status_valid = bool(status_value and status_value not in ('unknown', 'processing'))
|
||
|
||
if _is_coin_deduction_item_config(item_config):
|
||
configured_amount = _normalize_cached_amount(item_config.get('item_price'))
|
||
if configured_amount is not None and amount_value is not None and abs(amount_value - configured_amount) <= 0.0009:
|
||
return False
|
||
|
||
if item_config and item_config.get('is_multi_spec'):
|
||
return amount_valid and status_valid and has_valid_spec
|
||
|
||
return amount_valid and (status_valid or has_valid_spec)
|
||
|
||
|
||
class OrderDetailFetcher:
|
||
"""闲鱼订单详情获取器"""
|
||
|
||
# 类级别的锁字典,为每个order_id维护一个锁
|
||
_order_locks = defaultdict(lambda: asyncio.Lock())
|
||
|
||
def __init__(self, cookie_string: str = None, headless: bool = True, cookie_id_for_log: str = "unknown"):
|
||
self.browser: Optional[Browser] = None
|
||
self.context: Optional[BrowserContext] = None
|
||
self.page: Optional[Page] = None
|
||
self.headless = headless # 保存headless设置
|
||
self.cookie_id_for_log = cookie_id_for_log or "unknown"
|
||
self._last_order_status_source = 'unknown'
|
||
self._active_order_id = ''
|
||
self._captured_amount_candidates: List[Dict[str, Any]] = []
|
||
self._captured_sku_candidates: List[Dict[str, Any]] = []
|
||
self._pending_response_tasks = set()
|
||
self._response_handler = None
|
||
|
||
# 请求头配置
|
||
self.headers = {
|
||
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
|
||
"accept-language": "en,zh-CN;q=0.9,zh;q=0.8,ru;q=0.7",
|
||
"cache-control": "no-cache",
|
||
"pragma": "no-cache",
|
||
"priority": "u=0, i",
|
||
"sec-ch-ua": "\"Not)A;Brand\";v=\"8\", \"Chromium\";v=\"138\", \"Google Chrome\";v=\"138\"",
|
||
"sec-ch-ua-mobile": "?0",
|
||
"sec-ch-ua-platform": "\"Windows\"",
|
||
"sec-fetch-dest": "document",
|
||
"sec-fetch-mode": "navigate",
|
||
"sec-fetch-site": "same-origin",
|
||
"sec-fetch-user": "?1",
|
||
"upgrade-insecure-requests": "1"
|
||
}
|
||
|
||
# Cookie配置 - 支持动态传入
|
||
self.cookie = cookie_string
|
||
|
||
async def init_browser(self, headless: bool = None):
|
||
"""初始化浏览器"""
|
||
try:
|
||
# 如果没有传入headless参数,使用实例的设置
|
||
if headless is None:
|
||
headless = self.headless
|
||
|
||
logger.info(f"开始初始化浏览器,headless模式: {headless}")
|
||
|
||
playwright = await async_playwright().start()
|
||
|
||
# 启动浏览器(Docker环境优化)
|
||
browser_args = [
|
||
'--no-sandbox',
|
||
'--disable-setuid-sandbox',
|
||
'--disable-dev-shm-usage',
|
||
'--disable-accelerated-2d-canvas',
|
||
'--no-first-run',
|
||
'--no-zygote',
|
||
'--disable-gpu',
|
||
'--disable-background-timer-throttling',
|
||
'--disable-backgrounding-occluded-windows',
|
||
'--disable-renderer-backgrounding',
|
||
'--disable-features=TranslateUI',
|
||
'--disable-ipc-flooding-protection',
|
||
'--disable-extensions',
|
||
'--disable-default-apps',
|
||
'--disable-sync',
|
||
'--disable-translate',
|
||
'--hide-scrollbars',
|
||
'--mute-audio',
|
||
'--no-default-browser-check',
|
||
'--no-pings'
|
||
]
|
||
|
||
# 移除--single-process参数,使用多进程模式提高稳定性
|
||
# if os.getenv('DOCKER_ENV'):
|
||
# browser_args.append('--single-process') # 注释掉,避免崩溃
|
||
|
||
# 在Docker环境中添加额外参数
|
||
if os.getenv('DOCKER_ENV'):
|
||
browser_args.extend([
|
||
'--disable-background-networking',
|
||
'--disable-background-timer-throttling',
|
||
'--disable-client-side-phishing-detection',
|
||
'--disable-default-apps',
|
||
'--disable-hang-monitor',
|
||
'--disable-popup-blocking',
|
||
'--disable-prompt-on-repost',
|
||
'--disable-sync',
|
||
'--disable-web-resources',
|
||
'--metrics-recording-only',
|
||
'--no-first-run',
|
||
'--safebrowsing-disable-auto-update',
|
||
'--enable-automation',
|
||
'--password-store=basic',
|
||
'--use-mock-keychain',
|
||
# 添加内存优化和稳定性参数
|
||
'--memory-pressure-off',
|
||
'--max_old_space_size=512',
|
||
'--disable-ipc-flooding-protection',
|
||
'--disable-component-extensions-with-background-pages',
|
||
'--disable-features=TranslateUI,BlinkGenPropertyTrees',
|
||
'--disable-logging',
|
||
'--disable-permissions-api',
|
||
'--disable-notifications',
|
||
'--no-pings',
|
||
'--no-zygote'
|
||
])
|
||
|
||
logger.info(f"启动浏览器,参数: {browser_args}")
|
||
self.browser = await playwright.chromium.launch(
|
||
headless=headless,
|
||
args=browser_args
|
||
)
|
||
|
||
logger.info("浏览器启动成功,创建上下文...")
|
||
|
||
# 创建浏览器上下文
|
||
self.context = await self.browser.new_context(
|
||
viewport={'width': 1920, 'height': 1080},
|
||
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36'
|
||
)
|
||
|
||
logger.info("浏览器上下文创建成功,设置HTTP头...")
|
||
|
||
# 设置额外的HTTP头
|
||
await self.context.set_extra_http_headers(self.headers)
|
||
|
||
logger.info("创建页面...")
|
||
|
||
# 创建页面
|
||
self.page = await self.context.new_page()
|
||
|
||
logger.info("页面创建成功,设置Cookie...")
|
||
|
||
# 设置Cookie
|
||
await self._set_cookies()
|
||
|
||
# 等待一段时间确保浏览器完全初始化
|
||
await asyncio.sleep(1)
|
||
|
||
logger.info("浏览器初始化成功")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"浏览器初始化失败: {e}")
|
||
return False
|
||
|
||
async def _set_cookies(self):
|
||
"""设置Cookie"""
|
||
try:
|
||
# 解析Cookie字符串
|
||
cookies = []
|
||
for cookie_pair in self.cookie.split('; '):
|
||
if '=' in cookie_pair:
|
||
name, value = cookie_pair.split('=', 1)
|
||
cookies.append({
|
||
'name': name.strip(),
|
||
'value': value.strip(),
|
||
'domain': '.goofish.com',
|
||
'path': '/'
|
||
})
|
||
|
||
# 添加Cookie到上下文
|
||
await self.context.add_cookies(cookies)
|
||
logger.info(f"已设置 {len(cookies)} 个Cookie")
|
||
|
||
except Exception as e:
|
||
logger.error(f"设置Cookie失败: {e}")
|
||
|
||
async def fetch_order_detail(self, order_id: str, timeout: int = 30, force_refresh: bool = False) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
获取订单详情(带锁机制和数据库缓存)
|
||
|
||
Args:
|
||
order_id: 订单ID
|
||
timeout: 超时时间(秒)
|
||
force_refresh: 是否强制刷新(跳过缓存直接从闲鱼获取)
|
||
|
||
Returns:
|
||
包含订单详情的字典,失败时返回None
|
||
"""
|
||
# 获取该订单ID的锁
|
||
order_lock = self._order_locks[order_id]
|
||
|
||
async with order_lock:
|
||
logger.info(f"🔒 获取订单 {order_id} 的锁,开始处理...")
|
||
|
||
try:
|
||
# 如果不是强制刷新,先查询数据库缓存
|
||
if not force_refresh:
|
||
from db_manager import db_manager
|
||
existing_order = db_manager.get_order_by_id(order_id)
|
||
|
||
if existing_order:
|
||
amount = existing_order.get('amount', '')
|
||
item_config = None
|
||
if existing_order.get('item_id') and existing_order.get('cookie_id'):
|
||
item_config = db_manager.get_item_info(existing_order.get('cookie_id'), existing_order.get('item_id'))
|
||
|
||
if _should_use_cached_order(existing_order, item_config=item_config):
|
||
logger.info(f"📋 订单 {order_id} 已存在于数据库中且金额有效({amount}),直接返回缓存数据")
|
||
print(f"✅ 订单 {order_id} 使用缓存数据,跳过浏览器获取")
|
||
|
||
# 构建返回格式,与浏览器获取的格式保持一致
|
||
result = {
|
||
'order_id': existing_order['order_id'],
|
||
'url': f"https://www.goofish.com/order-detail?orderId={order_id}&role=seller",
|
||
'title': f"订单详情 - {order_id}",
|
||
'sku_info': {
|
||
'spec_name': existing_order.get('spec_name', ''),
|
||
'spec_value': existing_order.get('spec_value', ''),
|
||
'spec_name_2': existing_order.get('spec_name_2', ''),
|
||
'spec_value_2': existing_order.get('spec_value_2', ''),
|
||
'quantity': existing_order.get('quantity', ''),
|
||
'amount': existing_order.get('amount', ''),
|
||
'amount_source': 'cache',
|
||
},
|
||
'spec_name': existing_order.get('spec_name', ''),
|
||
'spec_value': existing_order.get('spec_value', ''),
|
||
'spec_name_2': existing_order.get('spec_name_2', ''),
|
||
'spec_value_2': existing_order.get('spec_value_2', ''),
|
||
'quantity': existing_order.get('quantity', ''),
|
||
'amount': existing_order.get('amount', ''),
|
||
'amount_source': 'cache',
|
||
'platform_created_at': existing_order.get('platform_created_at'),
|
||
'platform_paid_at': existing_order.get('platform_paid_at'),
|
||
'platform_completed_at': existing_order.get('platform_completed_at'),
|
||
'timestamp': time.time(),
|
||
'from_cache': True # 标记数据来源
|
||
}
|
||
return result
|
||
else:
|
||
logger.info(f"📋 订单 {order_id} 缓存字段不完整或状态无效,重新获取详情: amount={amount}, status={existing_order.get('order_status')}")
|
||
print(f"⚠️ 订单 {order_id} 缓存不满足复用条件,重新获取详情...")
|
||
else:
|
||
logger.info(f"🔄 订单 {order_id} 强制刷新模式,跳过缓存检查")
|
||
|
||
# 只有在数据库中没有有效数据时才初始化浏览器
|
||
logger.info(f"🌐 订单 {order_id} 需要浏览器获取,开始初始化浏览器...")
|
||
print(f"🔍 订单 {order_id} 开始浏览器获取详情...")
|
||
|
||
# 确保浏览器准备就绪
|
||
if not await self._ensure_browser_ready():
|
||
logger.error("浏览器初始化失败,无法获取订单详情")
|
||
return None
|
||
|
||
self._register_response_capture_handler(order_id)
|
||
try:
|
||
# 构建订单详情URL
|
||
url = f"https://www.goofish.com/order-detail?orderId={order_id}&role=seller"
|
||
logger.info(f"开始访问订单详情页面: {url}")
|
||
|
||
# 访问页面(带重试机制)
|
||
max_retries = 2
|
||
response = None
|
||
|
||
for retry in range(max_retries + 1):
|
||
try:
|
||
response = await self.page.goto(url, wait_until='networkidle', timeout=timeout * 1000)
|
||
|
||
if response and response.status == 200:
|
||
break
|
||
else:
|
||
logger.warning(f"页面访问失败,状态码: {response.status if response else 'None'},重试 {retry + 1}/{max_retries + 1}")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"页面访问异常: {e},重试 {retry + 1}/{max_retries + 1}")
|
||
|
||
# 如果是浏览器连接问题,尝试重新初始化
|
||
if "Target page, context or browser has been closed" in str(e):
|
||
logger.info("检测到浏览器连接断开,尝试重新初始化...")
|
||
if await self._ensure_browser_ready():
|
||
logger.info("浏览器重新初始化成功,继续重试...")
|
||
self._register_response_capture_handler(order_id)
|
||
continue
|
||
else:
|
||
logger.error("浏览器重新初始化失败")
|
||
return None
|
||
|
||
if retry == max_retries:
|
||
logger.error(f"页面访问最终失败: {e}")
|
||
return None
|
||
|
||
await asyncio.sleep(1) # 重试前等待1秒
|
||
|
||
if not response or response.status != 200:
|
||
logger.error(f"页面访问最终失败,状态码: {response.status if response else 'None'}")
|
||
return None
|
||
|
||
logger.info("页面加载成功,等待内容渲染...")
|
||
|
||
# 等待页面完全加载
|
||
try:
|
||
await self.page.wait_for_load_state('networkidle')
|
||
except Exception as e:
|
||
logger.warning(f"等待页面加载状态失败: {e}")
|
||
# 继续执行,不中断流程
|
||
|
||
# 额外等待确保动态内容加载完成
|
||
await asyncio.sleep(3)
|
||
|
||
# 获取并解析SKU信息
|
||
sku_info = await self._get_sku_content()
|
||
|
||
# 获取订单状态
|
||
order_status = await self._get_order_status()
|
||
logger.info(f"订单 {order_id} 状态: {order_status}")
|
||
|
||
# 解析失败时,刷新页面后重试一次,降低偶发结构变化/异步渲染导致的漏解析概率
|
||
if not self._is_order_detail_parse_success(sku_info, order_status):
|
||
self._log_order_detail_parse_event(
|
||
event_name="ORDER_DETAIL_PARSE_ALERT",
|
||
order_id=order_id,
|
||
url=url,
|
||
attempt="first",
|
||
sku_info=sku_info,
|
||
order_status=order_status,
|
||
level="warning"
|
||
)
|
||
logger.warning(
|
||
f"订单 {order_id} 首次解析结果不完整,准备刷新页面重试: "
|
||
f"sku_info={sku_info}, order_status={order_status}"
|
||
)
|
||
try:
|
||
await self.page.reload(wait_until='networkidle', timeout=timeout * 1000)
|
||
await asyncio.sleep(2)
|
||
retry_sku_info = await self._get_sku_content()
|
||
retry_order_status = await self._get_order_status()
|
||
logger.info(
|
||
f"订单 {order_id} 重试解析结果: sku_info={retry_sku_info}, "
|
||
f"order_status={retry_order_status}"
|
||
)
|
||
|
||
if self._is_order_detail_parse_success(retry_sku_info, retry_order_status):
|
||
sku_info = retry_sku_info
|
||
order_status = retry_order_status
|
||
logger.info(f"订单 {order_id} 刷新重试后解析成功")
|
||
self._log_order_detail_parse_event(
|
||
event_name="ORDER_DETAIL_PARSE_RECOVERED",
|
||
order_id=order_id,
|
||
url=url,
|
||
attempt="retry",
|
||
sku_info=sku_info,
|
||
order_status=order_status,
|
||
level="info"
|
||
)
|
||
else:
|
||
logger.warning(f"订单 {order_id} 刷新重试后仍未解析到完整详情")
|
||
self._log_order_detail_parse_event(
|
||
event_name="ORDER_DETAIL_PARSE_ALERT",
|
||
order_id=order_id,
|
||
url=url,
|
||
attempt="retry_final",
|
||
sku_info=retry_sku_info,
|
||
order_status=retry_order_status,
|
||
level="warning"
|
||
)
|
||
except Exception as retry_e:
|
||
logger.warning(f"订单 {order_id} 刷新重试解析异常: {retry_e}")
|
||
self._log_order_detail_parse_event(
|
||
event_name="ORDER_DETAIL_PARSE_ALERT",
|
||
order_id=order_id,
|
||
url=url,
|
||
attempt="retry_exception",
|
||
sku_info=sku_info,
|
||
order_status=order_status,
|
||
level="warning",
|
||
error=str(retry_e)
|
||
)
|
||
|
||
# 获取页面标题
|
||
try:
|
||
title = await self.page.title()
|
||
except Exception as e:
|
||
logger.warning(f"获取页面标题失败: {e}")
|
||
title = f"订单详情 - {order_id}"
|
||
|
||
order_time_fields = await self._get_order_time_fields()
|
||
|
||
result = {
|
||
'order_id': order_id,
|
||
'url': url,
|
||
'title': title,
|
||
'sku_info': sku_info, # 包含解析后的规格信息
|
||
'spec_name': sku_info.get('spec_name', '') if sku_info else '',
|
||
'spec_value': sku_info.get('spec_value', '') if sku_info else '',
|
||
'spec_name_2': sku_info.get('spec_name_2', '') if sku_info else '', # 规格2名称
|
||
'spec_value_2': sku_info.get('spec_value_2', '') if sku_info else '', # 规格2值
|
||
'quantity': sku_info.get('quantity', '') if sku_info else '', # 数量
|
||
'amount': sku_info.get('amount', '') if sku_info else '', # 金额
|
||
'amount_source': sku_info.get('amount_source', '') if sku_info else '',
|
||
'spec_parse_mode': self._classify_spec_parse_mode(sku_info),
|
||
'order_status': order_status, # 订单状态
|
||
'order_status_source': self._last_order_status_source,
|
||
'platform_created_at': order_time_fields.get('platform_created_at'),
|
||
'platform_paid_at': order_time_fields.get('platform_paid_at'),
|
||
'platform_completed_at': order_time_fields.get('platform_completed_at'),
|
||
'timestamp': time.time(),
|
||
'from_cache': False # 标记数据来源
|
||
}
|
||
|
||
logger.info(f"订单详情获取成功: {order_id}")
|
||
if sku_info:
|
||
logger.info(f"规格信息 - 名称: {result['spec_name']}, 值: {result['spec_value']}")
|
||
logger.info(f"数量: {result['quantity']}, 金额: {result['amount']}")
|
||
return result
|
||
finally:
|
||
await self._wait_for_response_capture_tasks(timeout=0.5)
|
||
self._clear_response_capture_handler()
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取订单详情失败: {e}")
|
||
return None
|
||
|
||
def _parse_sku_content(self, sku_content: str) -> Dict[str, str]:
|
||
"""
|
||
解析SKU内容,根据冒号分割规格名称和规格值
|
||
支持双规格格式:例如 "版本选择:mac 版 - 单文件;远程:自行安装"
|
||
|
||
Args:
|
||
sku_content: 原始SKU内容字符串
|
||
|
||
Returns:
|
||
包含规格名称和规格值的字典,如果解析失败则返回空字典
|
||
对于双规格,会额外包含 spec_name_2 和 spec_value_2
|
||
"""
|
||
try:
|
||
if not sku_content or ':' not in sku_content:
|
||
logger.warning(f"SKU内容格式无效或不包含冒号: {sku_content}")
|
||
return {}
|
||
|
||
# 检查是否包含双规格(通过分号分隔,且分号后有冒号)
|
||
# 格式如:版本选择:mac 版 - 单文件;远程:自行安装
|
||
if ';' in sku_content:
|
||
# 查找分号位置,检查分号后面是否有冒号(表示有第二个规格)
|
||
semicolon_idx = sku_content.find(';')
|
||
second_part = sku_content[semicolon_idx + 1:].strip()
|
||
|
||
if ':' in second_part:
|
||
# 这是双规格格式
|
||
first_part = sku_content[:semicolon_idx].strip()
|
||
|
||
# 解析第一个规格
|
||
first_spec_parts = first_part.split(':', 1)
|
||
if len(first_spec_parts) == 2:
|
||
spec_name = first_spec_parts[0].strip()
|
||
spec_value = first_spec_parts[1].strip()
|
||
else:
|
||
logger.warning(f"第一个规格解析失败: {first_part}")
|
||
spec_name = ''
|
||
spec_value = first_part
|
||
|
||
# 解析第二个规格
|
||
second_spec_parts = second_part.split(':', 1)
|
||
spec_name_2 = second_spec_parts[0].strip()
|
||
spec_value_2 = second_spec_parts[1].strip() if len(second_spec_parts) > 1 else ''
|
||
|
||
result = {
|
||
'spec_name': spec_name,
|
||
'spec_value': spec_value
|
||
}
|
||
|
||
if spec_name_2 and spec_value_2:
|
||
result['spec_name_2'] = spec_name_2
|
||
result['spec_value_2'] = spec_value_2
|
||
logger.info(f"双规格解析成功 - 规格1: {spec_name}:{spec_value}, 规格2: {spec_name_2}:{spec_value_2}")
|
||
else:
|
||
logger.info(f"SKU解析成功(单规格)- 规格名称: {spec_name}, 规格值: {spec_value}")
|
||
|
||
return result
|
||
|
||
# 单规格处理(原有逻辑)
|
||
parts = sku_content.split(':', 1) # 只分割第一个冒号
|
||
|
||
if len(parts) == 2:
|
||
spec_name = parts[0].strip()
|
||
spec_value = parts[1].strip()
|
||
|
||
if spec_name and spec_value:
|
||
result = {
|
||
'spec_name': spec_name,
|
||
'spec_value': spec_value
|
||
}
|
||
logger.info(f"SKU解析成功 - 规格名称: {spec_name}, 规格值: {spec_value}")
|
||
return result
|
||
else:
|
||
logger.warning(f"SKU解析失败,规格名称或值为空: 名称='{spec_name}', 值='{spec_value}'")
|
||
return {}
|
||
else:
|
||
logger.warning(f"SKU内容分割失败: {sku_content}")
|
||
return {}
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析SKU内容异常: {e}")
|
||
return {}
|
||
|
||
def _normalize_amount_text(self, amount_text: str) -> Optional[str]:
|
||
"""标准化金额文本,返回纯数字字符串(如 29.90)"""
|
||
try:
|
||
if amount_text is None:
|
||
return None
|
||
text = str(amount_text).strip()
|
||
if not text:
|
||
return None
|
||
|
||
# 优先提取货币格式
|
||
money_match = re.search(r'[¥¥$]\s*([0-9]+(?:\.[0-9]{1,2})?)', text)
|
||
if money_match:
|
||
return money_match.group(1)
|
||
|
||
# 兜底提取纯数字
|
||
number_match = re.search(r'([0-9]+(?:\.[0-9]{1,2})?)', text)
|
||
if number_match:
|
||
return number_match.group(1)
|
||
|
||
return None
|
||
except Exception:
|
||
return None
|
||
|
||
def _has_valid_amount(self, amount_text: Any) -> bool:
|
||
"""判断金额是否可解析为数字(0 也视为有效)"""
|
||
normalized = self._normalize_amount_text(str(amount_text) if amount_text is not None else '')
|
||
if normalized is None:
|
||
return False
|
||
try:
|
||
float(normalized)
|
||
return True
|
||
except (ValueError, TypeError):
|
||
return False
|
||
|
||
def _parse_amount_value(self, amount_text: Any) -> Optional[float]:
|
||
normalized = self._normalize_amount_text(str(amount_text) if amount_text is not None else '')
|
||
if normalized is None:
|
||
return None
|
||
try:
|
||
return float(normalized)
|
||
except (ValueError, TypeError):
|
||
return None
|
||
|
||
def _reset_amount_capture(self, order_id: str) -> None:
|
||
self._active_order_id = str(order_id or '').strip()
|
||
self._captured_amount_candidates = []
|
||
self._captured_sku_candidates = []
|
||
self._pending_response_tasks = set()
|
||
|
||
def _clear_response_capture_handler(self) -> None:
|
||
if not self._response_handler:
|
||
return
|
||
|
||
try:
|
||
if self.page and hasattr(self.page, 'remove_listener'):
|
||
self.page.remove_listener('response', self._response_handler)
|
||
elif self.page and hasattr(self.page, 'off'):
|
||
self.page.off('response', self._response_handler)
|
||
except Exception as e:
|
||
logger.debug(f"移除订单详情响应监听失败: {e}")
|
||
finally:
|
||
self._response_handler = None
|
||
|
||
def _register_response_capture_handler(self, order_id: str) -> None:
|
||
self._clear_response_capture_handler()
|
||
self._reset_amount_capture(order_id)
|
||
|
||
if not self.page:
|
||
return
|
||
|
||
current_order_id = self._active_order_id
|
||
|
||
def _on_task_done(task: asyncio.Task) -> None:
|
||
self._pending_response_tasks.discard(task)
|
||
try:
|
||
task.result()
|
||
except asyncio.CancelledError:
|
||
pass
|
||
except Exception as task_error:
|
||
logger.debug(f"订单详情响应解析任务异常: {task_error}")
|
||
|
||
def _response_handler(response) -> None:
|
||
try:
|
||
task = asyncio.create_task(self._process_order_detail_response(response, current_order_id))
|
||
except Exception as e:
|
||
logger.debug(f"创建订单详情响应解析任务失败: {e}")
|
||
return
|
||
|
||
self._pending_response_tasks.add(task)
|
||
task.add_done_callback(_on_task_done)
|
||
|
||
self._response_handler = _response_handler
|
||
self.page.on('response', _response_handler)
|
||
|
||
async def _wait_for_response_capture_tasks(self, timeout: float = 1.5) -> None:
|
||
if not self._pending_response_tasks:
|
||
return
|
||
|
||
try:
|
||
await asyncio.wait(list(self._pending_response_tasks), timeout=timeout)
|
||
except Exception as e:
|
||
logger.debug(f"等待订单详情响应解析任务失败: {e}")
|
||
|
||
def _try_parse_json_text(self, text: str) -> Optional[Any]:
|
||
if not text:
|
||
return None
|
||
|
||
stripped = str(text).strip()
|
||
if not stripped or stripped[0] not in '{[':
|
||
return None
|
||
|
||
try:
|
||
return json.loads(stripped)
|
||
except Exception:
|
||
return None
|
||
|
||
def _is_trusted_order_detail_response_url(self, url: str) -> bool:
|
||
lowered_url = str(url or '').lower()
|
||
trusted_tokens = (
|
||
'mtop.idle.web.trade.order.detail',
|
||
'trade.order.detail',
|
||
)
|
||
return any(token in lowered_url for token in trusted_tokens)
|
||
|
||
def _normalize_minor_amount_value(self, amount_value: Any) -> Any:
|
||
text = str(amount_value).strip() if amount_value is not None else ''
|
||
if not re.fullmatch(r'\d+', text):
|
||
return amount_value
|
||
|
||
try:
|
||
minor_value = int(text)
|
||
except (TypeError, ValueError):
|
||
return amount_value
|
||
|
||
if minor_value <= 0:
|
||
return amount_value
|
||
|
||
return f"{minor_value / 100:.2f}"
|
||
|
||
def _payload_references_order(self, payload: Any, order_id: str, url: str = '') -> bool:
|
||
order_id_text = str(order_id or '').strip()
|
||
url_text = str(url or '')
|
||
lowered_url = url_text.lower()
|
||
|
||
if order_id_text and order_id_text in url_text:
|
||
return True
|
||
|
||
try:
|
||
payload_text = json.dumps(payload, ensure_ascii=False)
|
||
except Exception:
|
||
payload_text = str(payload)
|
||
|
||
if order_id_text and order_id_text in payload_text:
|
||
return True
|
||
|
||
return self._is_trusted_order_detail_response_url(lowered_url)
|
||
|
||
def _normalize_quantity_text(self, quantity_value: Any) -> Optional[str]:
|
||
text = str(quantity_value or '').strip()
|
||
if not text:
|
||
return None
|
||
|
||
match = re.search(r'(\d+)', text)
|
||
if not match:
|
||
return None
|
||
|
||
try:
|
||
normalized = str(int(match.group(1)))
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
if normalized == '0':
|
||
return None
|
||
return normalized
|
||
|
||
def _normalize_sku_candidate_text(self, sku_text: Any) -> str:
|
||
if sku_text is None:
|
||
return ''
|
||
return re.sub(r'\s+', ' ', str(sku_text).replace(':', ':')).strip()
|
||
|
||
def _is_numeric_index_spec_name_like(self, spec_name: str, spec_value: str) -> bool:
|
||
normalized_name = re.sub(r'\s+', '', (spec_name or '').strip())
|
||
normalized_value = re.sub(r'\s+', ' ', (spec_value or '').strip())
|
||
if not normalized_name or not normalized_value:
|
||
return False
|
||
|
||
if not re.fullmatch(r'(?:第)?\d{1,2}(?:项|号|档)?', normalized_name):
|
||
return False
|
||
|
||
if len(normalized_value) < 2 or len(normalized_value) > 40:
|
||
return False
|
||
|
||
if self._is_datetime_like(normalized_value):
|
||
return False
|
||
|
||
if re.fullmatch(r'[¥¥]?\d+(?:\.\d{1,2})?', normalized_value):
|
||
return False
|
||
|
||
if normalized_value.lower().startswith(('http://', 'https://', 'fleamarket://')):
|
||
return False
|
||
|
||
if not re.search(r'[\u4e00-\u9fffA-Za-z]', normalized_value):
|
||
return False
|
||
|
||
return True
|
||
|
||
def _score_sku_text_candidate(
|
||
self,
|
||
normalized_key: str,
|
||
*,
|
||
path: str = '',
|
||
context: str = '',
|
||
sku_text: str = '',
|
||
from_pair: bool = False
|
||
) -> int:
|
||
key = str(normalized_key or '').lower()
|
||
path_lower = str(path or '').lower()
|
||
normalized_context = re.sub(r'\s+', ' ', str(context or '')).strip()
|
||
normalized_sku_text = self._normalize_sku_candidate_text(sku_text)
|
||
|
||
if not normalized_sku_text or len(normalized_sku_text) > 120 or ':' not in normalized_sku_text:
|
||
return 0
|
||
|
||
score = 0
|
||
strong_keys = {
|
||
'skuinfo', 'sku_info', 'skutext', 'sku_text', 'skudesc', 'sku_desc',
|
||
'skucontent', 'sku_content', 'specinfo', 'spec_info', 'spectext',
|
||
'spec_text', 'specdesc', 'spec_desc', 'itemsku', 'item_sku',
|
||
'itemspec', 'item_spec'
|
||
}
|
||
medium_key_tokens = ('sku', 'spec', 'attr', 'property', 'option', 'variant', 'model')
|
||
|
||
if key in strong_keys:
|
||
score = 220
|
||
elif any(token in key for token in medium_key_tokens):
|
||
score = 170
|
||
elif from_pair:
|
||
score = 135
|
||
elif any(token in path_lower for token in ('.sku', '.spec', '.attr', '.property', '.option', '.variant', '.model')):
|
||
score = 120
|
||
else:
|
||
return 0
|
||
|
||
if '.iteminfo.' in path_lower:
|
||
score += 70
|
||
elif '.components[' in path_lower:
|
||
score += 20
|
||
|
||
if any(token in normalized_context for token in ('规格', '型号', '版本', '选项', '属性', '套餐')):
|
||
score += 35
|
||
|
||
if ';' in normalized_sku_text:
|
||
score += 10
|
||
|
||
return score
|
||
|
||
def _append_sku_candidate(
|
||
self,
|
||
candidates: List[Dict[str, Any]],
|
||
sku_text: Any,
|
||
*,
|
||
quantity: Optional[str] = None,
|
||
path: str = '',
|
||
score: int = 0
|
||
) -> None:
|
||
normalized_sku_text = self._normalize_sku_candidate_text(sku_text)
|
||
if score <= 0 or not normalized_sku_text or len(normalized_sku_text) > 120 or ':' not in normalized_sku_text:
|
||
return
|
||
|
||
candidates.append({
|
||
'sku_text': normalized_sku_text,
|
||
'quantity': quantity,
|
||
'path': path,
|
||
'score': score,
|
||
})
|
||
|
||
def _extract_sku_candidates_from_payload(self, payload: Any, path: str = 'root', depth: int = 0) -> List[Dict[str, Any]]:
|
||
if payload is None or depth > 8:
|
||
return []
|
||
|
||
candidates: List[Dict[str, Any]] = []
|
||
|
||
if isinstance(payload, dict):
|
||
quantity_context = None
|
||
for quantity_key in ('buyAmount', 'buy_amount', 'quantity', 'itemCount', 'count', 'num'):
|
||
if quantity_key in payload:
|
||
quantity_context = self._normalize_quantity_text(payload.get(quantity_key))
|
||
if quantity_context:
|
||
break
|
||
|
||
context_fields = []
|
||
for context_key in ('title', 'label', 'name', 'preText', 'subTitle', 'displayText', 'content', 'desc', 'text'):
|
||
context_value = payload.get(context_key)
|
||
if isinstance(context_value, (str, int, float)):
|
||
normalized_context_value = self._normalize_sku_candidate_text(context_value)
|
||
if normalized_context_value:
|
||
context_fields.append(normalized_context_value)
|
||
dict_context = ' | '.join(context_fields)[:240]
|
||
|
||
title_text = ''
|
||
title_key = ''
|
||
for candidate_key in ('title', 'label', 'name', 'preText', 'subTitle', 'displayText', 'key', 'attrName', 'specName', 'skuName'):
|
||
candidate_value = payload.get(candidate_key)
|
||
if isinstance(candidate_value, (str, int, float)):
|
||
normalized_title = self._normalize_sku_candidate_text(candidate_value)
|
||
if normalized_title:
|
||
title_text = normalized_title
|
||
title_key = candidate_key
|
||
break
|
||
|
||
value_text = ''
|
||
value_key = ''
|
||
for candidate_key in ('value', 'text', 'content', 'displayText', 'attrValue', 'specValue', 'skuValue'):
|
||
candidate_value = payload.get(candidate_key)
|
||
if isinstance(candidate_value, (str, int, float)):
|
||
normalized_value = self._normalize_sku_candidate_text(candidate_value)
|
||
if normalized_value:
|
||
value_text = normalized_value
|
||
value_key = candidate_key
|
||
break
|
||
|
||
if not quantity_context and title_text and value_text and any(token in title_text for token in ('数量', '购买数量', '件数')):
|
||
quantity_context = self._normalize_quantity_text(value_text)
|
||
|
||
if (
|
||
title_text and value_text and
|
||
':' not in title_text and ':' not in value_text and
|
||
(
|
||
self._is_text_fallback_spec_name_like(title_text) or
|
||
self._is_numeric_index_spec_name_like(title_text, value_text)
|
||
)
|
||
):
|
||
pair_path = f"{path}.{title_key}+{value_key}" if title_key and value_key else path
|
||
pair_sku_text = f"{title_text}:{value_text}"
|
||
pair_score = self._score_sku_text_candidate(
|
||
f"{title_key}_{value_key}",
|
||
path=pair_path,
|
||
context=dict_context,
|
||
sku_text=pair_sku_text,
|
||
from_pair=True
|
||
)
|
||
self._append_sku_candidate(
|
||
candidates,
|
||
pair_sku_text,
|
||
quantity=quantity_context,
|
||
path=pair_path,
|
||
score=pair_score
|
||
)
|
||
|
||
for key, value in payload.items():
|
||
key_text = str(key)
|
||
normalized_key = re.sub(r'[^0-9A-Za-z\u4e00-\u9fff]', '', key_text).lower()
|
||
key_path = f"{path}.{key_text}"
|
||
|
||
if isinstance(value, str):
|
||
nested_payload = self._try_parse_json_text(value)
|
||
if nested_payload is not None:
|
||
candidates.extend(
|
||
self._extract_sku_candidates_from_payload(
|
||
nested_payload,
|
||
path=f"{key_path}.json",
|
||
depth=depth + 1
|
||
)
|
||
)
|
||
|
||
score = self._score_sku_text_candidate(
|
||
normalized_key,
|
||
path=key_path,
|
||
context=dict_context,
|
||
sku_text=value
|
||
)
|
||
self._append_sku_candidate(
|
||
candidates,
|
||
value,
|
||
quantity=quantity_context,
|
||
path=key_path,
|
||
score=score
|
||
)
|
||
|
||
candidates.extend(self._extract_sku_candidates_from_payload(value, path=key_path, depth=depth + 1))
|
||
|
||
elif isinstance(payload, list):
|
||
for index, item in enumerate(payload[:50]):
|
||
candidates.extend(self._extract_sku_candidates_from_payload(item, path=f"{path}[{index}]", depth=depth + 1))
|
||
|
||
return candidates
|
||
|
||
def _score_amount_key_candidate(self, normalized_key: str, *, context: str = '', path: str = '') -> int:
|
||
key = str(normalized_key or '').lower()
|
||
if not key:
|
||
return 0
|
||
|
||
ignored_key_tokens = [
|
||
'coupon', 'discount', 'freight', 'postage', 'shipping', 'delivery',
|
||
'deduction', 'coin', 'hongbao', 'voucher', 'reduce', 'cut',
|
||
'original', 'origin', 'raw', 'list', 'market', 'crossed', 'strike',
|
||
'buyamount'
|
||
]
|
||
if any(token in key for token in ignored_key_tokens):
|
||
return 0
|
||
|
||
strong_key_tokens = [
|
||
'actualpay', 'payamount', 'realpay', 'orderamount', 'paymentamount',
|
||
'paidamount', 'finalamount', 'tradeamount', 'dealprice', 'buyerpayamount',
|
||
'buyeractualpay', 'sellerrealamount', 'selleractualamount'
|
||
]
|
||
medium_key_tokens = [
|
||
'currentprice', 'realamount', 'finalprice', 'settleamount', 'settleprice',
|
||
'payprice', 'buyerpay', 'orderprice'
|
||
]
|
||
|
||
matched_strong_key = any(token in key for token in strong_key_tokens)
|
||
matched_medium_key = any(token in key for token in medium_key_tokens)
|
||
|
||
score = 0
|
||
if matched_strong_key:
|
||
score = 220
|
||
elif matched_medium_key:
|
||
score = 170
|
||
elif key in {'price', 'amount', 'money'} or key.endswith('price') or key.endswith('amount'):
|
||
score = 80
|
||
else:
|
||
return 0
|
||
|
||
normalized_context = re.sub(r'\s+', ' ', str(context or '')).strip()
|
||
path_lower = str(path or '').lower()
|
||
high_context_tokens = ['实付款', '订单金额', '应付金额', '应付', '实收金额', '实收', '付款金额', '支付金额', '实付']
|
||
medium_context_tokens = ['改价后', '优惠后', '成交价', '支付价', '最终价', '待发货', '去发货', '小刀']
|
||
low_context_tokens = ['合计', '总价', '商品总价']
|
||
negative_context_tokens = ['闲鱼币抵扣', '优惠', '立减', '折扣', '运费', '邮费', '红包', '券']
|
||
|
||
if key == 'price' and any(token in path_lower for token in ('.iteminfo.price', '.priceinfo.price', '.paymentinfo.price')):
|
||
score = max(score, 210)
|
||
|
||
if any(token in normalized_context for token in high_context_tokens):
|
||
score += 180
|
||
elif any(token in normalized_context for token in medium_context_tokens):
|
||
score += 120
|
||
elif any(token in normalized_context for token in low_context_tokens):
|
||
score += 70
|
||
|
||
if 'priceinfo' in path_lower:
|
||
score += 20
|
||
|
||
if any(token in normalized_context for token in negative_context_tokens) and not any(
|
||
token in normalized_context for token in high_context_tokens + medium_context_tokens
|
||
):
|
||
score -= 110
|
||
|
||
trusted_price_path = any(token in path_lower for token in ('.iteminfo.price', '.priceinfo.price', '.paymentinfo.price'))
|
||
|
||
if (
|
||
not matched_strong_key and
|
||
not matched_medium_key and
|
||
not trusted_price_path and
|
||
(key in {'price', 'amount', 'money'} or key.endswith('price') or key.endswith('amount'))
|
||
) and not any(token in normalized_context for token in high_context_tokens + medium_context_tokens + low_context_tokens):
|
||
return 0
|
||
|
||
if score < 100 and not normalized_context:
|
||
return 0
|
||
|
||
return max(score, 0)
|
||
|
||
def _append_amount_candidate(
|
||
self,
|
||
candidates: List[Dict[str, Any]],
|
||
amount_value: Any,
|
||
source: str,
|
||
score: int,
|
||
*,
|
||
path: str = '',
|
||
context: str = ''
|
||
) -> None:
|
||
if score <= 0:
|
||
return
|
||
|
||
normalized_amount = self._normalize_amount_text(str(amount_value) if amount_value is not None else '')
|
||
parsed_amount = self._parse_amount_value(normalized_amount)
|
||
if normalized_amount is None or parsed_amount is None or parsed_amount <= 0 or parsed_amount > 100000:
|
||
return
|
||
|
||
candidates.append({
|
||
'amount': normalized_amount,
|
||
'source': source,
|
||
'score': score,
|
||
'path': path,
|
||
'context': re.sub(r'\s+', ' ', str(context or '')).strip()[:240],
|
||
})
|
||
|
||
def _score_amount_title_candidate(self, title_text: str) -> int:
|
||
normalized_title = re.sub(r'\s+', ' ', str(title_text or '')).strip()
|
||
if not normalized_title:
|
||
return 0
|
||
|
||
ignored_title_tokens = ['闲鱼币抵扣', '智能抵扣', '待收闲鱼币', '优惠', '立减', '折扣', '运费', '邮费', '红包', '券']
|
||
if any(token in normalized_title for token in ignored_title_tokens):
|
||
return 0
|
||
|
||
high_title_tokens = ['实付款', '订单金额', '应付金额', '应付', '实收金额', '实收', '付款金额', '支付金额', '实付', '成交价', '支付价', '最终价']
|
||
medium_title_tokens = ['改价后', '优惠后', '合计', '总价', '商品总价']
|
||
|
||
if any(token in normalized_title for token in high_title_tokens):
|
||
return 280
|
||
if any(token in normalized_title for token in medium_title_tokens):
|
||
return 170
|
||
return 0
|
||
|
||
def _extract_amount_candidates_from_payload(
|
||
self,
|
||
payload: Any,
|
||
*,
|
||
path: str = 'payload',
|
||
depth: int = 0
|
||
) -> List[Dict[str, Any]]:
|
||
if payload is None or depth > 6:
|
||
return []
|
||
|
||
candidates: List[Dict[str, Any]] = []
|
||
|
||
if isinstance(payload, dict):
|
||
context_fields = []
|
||
for context_key in ('title', 'desc', 'text', 'label', 'name', 'preText', 'subTitle', 'displayText', 'content'):
|
||
context_value = payload.get(context_key)
|
||
if isinstance(context_value, (str, int, float)):
|
||
normalized_context_value = re.sub(r'\s+', ' ', str(context_value)).strip()
|
||
if normalized_context_value:
|
||
context_fields.append(normalized_context_value)
|
||
dict_context = ' | '.join(context_fields)[:240]
|
||
|
||
title_candidate = None
|
||
for title_key in ('title', 'label', 'name', 'preText', 'subTitle', 'displayText'):
|
||
title_value = payload.get(title_key)
|
||
if isinstance(title_value, (str, int, float)):
|
||
normalized_title_value = re.sub(r'\s+', ' ', str(title_value)).strip()
|
||
if normalized_title_value:
|
||
title_candidate = normalized_title_value
|
||
break
|
||
|
||
raw_value_candidate = payload.get('value')
|
||
title_score = self._score_amount_title_candidate(title_candidate)
|
||
if title_score > 0 and isinstance(raw_value_candidate, (str, int, float)):
|
||
self._append_amount_candidate(
|
||
candidates,
|
||
raw_value_candidate,
|
||
'payload_title_value',
|
||
title_score,
|
||
path=f'{path}.value',
|
||
context=title_candidate,
|
||
)
|
||
|
||
for key, value in payload.items():
|
||
key_text = str(key)
|
||
key_path = f"{path}.{key_text}"
|
||
normalized_key = re.sub(r'[^0-9A-Za-z\u4e00-\u9fff]', '', key_text).lower()
|
||
|
||
if isinstance(value, (dict, list)):
|
||
candidates.extend(self._extract_amount_candidates_from_payload(value, path=key_path, depth=depth + 1))
|
||
continue
|
||
|
||
if isinstance(value, str):
|
||
nested_payload = self._try_parse_json_text(value)
|
||
if nested_payload is not None:
|
||
candidates.extend(
|
||
self._extract_amount_candidates_from_payload(
|
||
nested_payload,
|
||
path=f"{key_path}.json",
|
||
depth=depth + 1
|
||
)
|
||
)
|
||
|
||
semantic_amount, semantic_source = self._extract_preferred_amount_from_text(value)
|
||
if semantic_amount:
|
||
semantic_score = 0
|
||
if semantic_source == 'keyword_high':
|
||
semantic_score = 260
|
||
elif semantic_source == 'keyword_low':
|
||
semantic_score = 180
|
||
elif semantic_source == 'currency' and any(token in normalized_key for token in ('price', 'amount', 'money', 'pay', 'text', 'desc', 'label')):
|
||
semantic_score = 120
|
||
|
||
self._append_amount_candidate(
|
||
candidates,
|
||
semantic_amount,
|
||
f'payload_text_{semantic_source}',
|
||
semantic_score,
|
||
path=key_path,
|
||
context=value
|
||
)
|
||
|
||
if isinstance(value, (str, int, float)):
|
||
key_score = self._score_amount_key_candidate(normalized_key, context=dict_context, path=key_path)
|
||
self._append_amount_candidate(
|
||
candidates,
|
||
value,
|
||
f'payload_key_{normalized_key or "unknown"}',
|
||
key_score,
|
||
path=key_path,
|
||
context=dict_context
|
||
)
|
||
|
||
return candidates
|
||
|
||
if isinstance(payload, list):
|
||
for index, item in enumerate(payload[:50]):
|
||
candidates.extend(self._extract_amount_candidates_from_payload(item, path=f"{path}[{index}]", depth=depth + 1))
|
||
|
||
return candidates
|
||
|
||
async def _process_order_detail_response(self, response, order_id: str) -> None:
|
||
try:
|
||
if not response or response.status != 200:
|
||
return
|
||
|
||
url = str(response.url or '')
|
||
lowered_url = url.lower()
|
||
if not any(domain in lowered_url for domain in ('goofish.com', 'idlefish.com', 'taobao.com', 'mtop')):
|
||
return
|
||
|
||
if not self._is_trusted_order_detail_response_url(lowered_url):
|
||
return
|
||
|
||
headers = response.headers or {}
|
||
content_type = (headers.get('content-type') or headers.get('Content-Type') or '').lower()
|
||
resource_type = getattr(getattr(response, 'request', None), 'resource_type', '')
|
||
if resource_type not in ('fetch', 'xhr', 'document') and 'json' not in content_type and 'mtop' not in lowered_url:
|
||
return
|
||
|
||
payload = None
|
||
try:
|
||
payload = await response.json()
|
||
except Exception:
|
||
try:
|
||
response_text = await response.text()
|
||
except Exception:
|
||
response_text = ''
|
||
payload = self._try_parse_json_text(response_text)
|
||
|
||
if payload is None or not self._payload_references_order(payload, order_id, url):
|
||
return
|
||
|
||
response_candidates = self._extract_amount_candidates_from_payload(payload, path=f"response[{url.split('?')[0]}]")
|
||
for candidate in response_candidates:
|
||
candidate_copy = dict(candidate)
|
||
candidate_copy['source'] = f"structured_response::{candidate['source']}"
|
||
candidate_copy['response_url'] = url
|
||
self._captured_amount_candidates.append(candidate_copy)
|
||
|
||
if response_candidates:
|
||
best_candidate = max(response_candidates, key=lambda item: item.get('score', 0))
|
||
logger.info(
|
||
f"捕获订单金额候选: order_id={order_id}, amount={best_candidate.get('amount')}, "
|
||
f"score={best_candidate.get('score')}, source={best_candidate.get('source')}, url={url}"
|
||
)
|
||
|
||
sku_candidates = self._extract_sku_candidates_from_payload(payload, path=f"response[{url.split('?')[0]}]")
|
||
self._captured_sku_candidates.extend(sku_candidates)
|
||
if sku_candidates:
|
||
best_sku_candidate = max(sku_candidates, key=lambda item: item.get('score', 0))
|
||
logger.info(
|
||
f"捕获订单规格候选: order_id={order_id}, sku={best_sku_candidate.get('sku_text')}, "
|
||
f"quantity={best_sku_candidate.get('quantity') or ''}, path={best_sku_candidate.get('path')}"
|
||
)
|
||
except Exception as e:
|
||
logger.debug(f"解析订单详情响应失败: {e}")
|
||
|
||
def _get_best_captured_amount_candidate(self) -> Optional[Dict[str, Any]]:
|
||
if not self._captured_amount_candidates:
|
||
return None
|
||
|
||
deduped: Dict[Tuple[str, str, str], Dict[str, Any]] = {}
|
||
for candidate in self._captured_amount_candidates:
|
||
dedupe_key = (
|
||
str(candidate.get('amount', '')),
|
||
str(candidate.get('source', '')),
|
||
str(candidate.get('path', '')),
|
||
)
|
||
existing = deduped.get(dedupe_key)
|
||
if existing is None or candidate.get('score', 0) > existing.get('score', 0):
|
||
deduped[dedupe_key] = candidate
|
||
|
||
ranked_candidates = sorted(
|
||
deduped.values(),
|
||
key=lambda item: (item.get('score', 0), item.get('amount', '')),
|
||
reverse=True
|
||
)
|
||
return ranked_candidates[0] if ranked_candidates else None
|
||
|
||
def _get_best_captured_sku_candidate(self) -> Optional[Dict[str, Any]]:
|
||
if not self._captured_sku_candidates:
|
||
return None
|
||
|
||
deduped: Dict[Tuple[str, str, str], Dict[str, Any]] = {}
|
||
for candidate in self._captured_sku_candidates:
|
||
dedupe_key = (
|
||
str(candidate.get('sku_text', '')),
|
||
str(candidate.get('quantity', '')),
|
||
str(candidate.get('path', '')),
|
||
)
|
||
existing = deduped.get(dedupe_key)
|
||
if existing is None or candidate.get('score', 0) > existing.get('score', 0):
|
||
deduped[dedupe_key] = candidate
|
||
|
||
ranked_candidates = sorted(
|
||
deduped.values(),
|
||
key=lambda item: (item.get('score', 0), len(str(item.get('sku_text', '')))),
|
||
reverse=True,
|
||
)
|
||
return ranked_candidates[0] if ranked_candidates else None
|
||
|
||
def _get_ranked_captured_sku_candidates(self) -> List[Dict[str, Any]]:
|
||
if not self._captured_sku_candidates:
|
||
return []
|
||
|
||
deduped: Dict[Tuple[str, str, str], Dict[str, Any]] = {}
|
||
for candidate in self._captured_sku_candidates:
|
||
dedupe_key = (
|
||
str(candidate.get('sku_text', '')),
|
||
str(candidate.get('quantity', '')),
|
||
str(candidate.get('path', '')),
|
||
)
|
||
existing = deduped.get(dedupe_key)
|
||
if existing is None or candidate.get('score', 0) > existing.get('score', 0):
|
||
deduped[dedupe_key] = candidate
|
||
|
||
return sorted(
|
||
deduped.values(),
|
||
key=lambda item: (item.get('score', 0), len(str(item.get('sku_text', '')))),
|
||
reverse=True,
|
||
)
|
||
|
||
async def _extract_amount_from_structured_content(self) -> Tuple[Optional[str], str]:
|
||
await self._wait_for_response_capture_tasks(timeout=1.5)
|
||
|
||
best_candidate = self._get_best_captured_amount_candidate()
|
||
if best_candidate:
|
||
logger.info(
|
||
f"采用结构化响应金额候选: amount={best_candidate.get('amount')}, "
|
||
f"score={best_candidate.get('score')}, source={best_candidate.get('source')}, "
|
||
f"path={best_candidate.get('path')}"
|
||
)
|
||
return best_candidate.get('amount'), best_candidate.get('source', 'unknown')
|
||
|
||
try:
|
||
html_content = await self.page.content()
|
||
except Exception as e:
|
||
logger.debug(f"获取页面HTML失败,无法解析结构化金额: {e}")
|
||
return None, 'unknown'
|
||
|
||
if not html_content:
|
||
return None, 'unknown'
|
||
|
||
pattern_specs = [
|
||
(
|
||
'structured_html_priceinfo',
|
||
re.compile(r'"preText"\s*:\s*"[^"]*(实付款|订单金额|应付金额|改价后|优惠后|成交价|支付金额|支付价)[^"]*".{0,240}?"price"\s*:\s*"([0-9]+(?:\.[0-9]{1,2})?)"', re.IGNORECASE | re.DOTALL),
|
||
2,
|
||
),
|
||
(
|
||
'structured_html_priceinfo',
|
||
re.compile(r'"price"\s*:\s*"([0-9]+(?:\.[0-9]{1,2})?)".{0,240}?"preText"\s*:\s*"[^"]*(实付款|订单金额|应付金额|改价后|优惠后|成交价|支付金额|支付价)[^"]*"', re.IGNORECASE | re.DOTALL),
|
||
1,
|
||
),
|
||
(
|
||
'structured_html_key',
|
||
re.compile(r'"(?:actualPay|payAmount|realPay|orderAmount|paymentAmount|finalAmount|buyerPayAmount|dealPrice|paidAmount|tradeAmount)"\s*:\s*"?([0-9]+(?:\.[0-9]{1,2})?)"?', re.IGNORECASE),
|
||
1,
|
||
),
|
||
(
|
||
'structured_html_text',
|
||
re.compile(r'(?:实付款|订单金额|应付金额|改价后|优惠后|成交价|支付金额|支付价)[^0-9¥¥$]{0,20}[¥¥$]?\s*([0-9]+(?:\.[0-9]{1,2})?)', re.IGNORECASE),
|
||
1,
|
||
),
|
||
]
|
||
|
||
for source, pattern, group_index in pattern_specs:
|
||
match = pattern.search(html_content)
|
||
if not match:
|
||
continue
|
||
|
||
normalized_amount = self._normalize_amount_text(match.group(group_index))
|
||
if normalized_amount is None:
|
||
continue
|
||
|
||
logger.info(f"通过页面结构化内容找到金额: {normalized_amount} (source={source})")
|
||
return normalized_amount, source
|
||
|
||
return None, 'unknown'
|
||
|
||
async def _extract_sku_from_structured_content(self) -> Dict[str, str]:
|
||
await self._wait_for_response_capture_tasks(timeout=1.5)
|
||
|
||
for candidate in self._get_ranked_captured_sku_candidates():
|
||
sku_text = str(candidate.get('sku_text') or '').strip()
|
||
if not sku_text:
|
||
continue
|
||
|
||
parsed = self._parse_sku_content(sku_text)
|
||
if not parsed:
|
||
continue
|
||
|
||
sanitized = self._sanitize_sku_result(parsed, source='structured_response_candidate')
|
||
if not (sanitized.get('spec_name') and sanitized.get('spec_value')):
|
||
continue
|
||
|
||
quantity = self._normalize_quantity_text(candidate.get('quantity'))
|
||
if quantity:
|
||
sanitized['quantity'] = quantity
|
||
|
||
logger.info(
|
||
f"采用结构化响应规格候选: sku={sku_text}, quantity={quantity or ''}, "
|
||
f"path={candidate.get('path')}"
|
||
)
|
||
return sanitized
|
||
|
||
return {}
|
||
|
||
async def _extract_amount_from_semantic_blocks(self) -> Tuple[Optional[str], str]:
|
||
semantic_keywords = [
|
||
'实付款', '订单金额', '应付金额', '应付', '实收', '付款金额', '支付金额', '实付',
|
||
'改价后', '优惠后', '成交价', '支付价', '最终价', '闲鱼币抵扣'
|
||
]
|
||
|
||
try:
|
||
text_blocks = await self.page.evaluate(
|
||
"""(keywords) => {
|
||
const nodes = Array.from(document.querySelectorAll('div, span, p, section, article, li'));
|
||
const results = [];
|
||
const seen = new Set();
|
||
for (const node of nodes) {
|
||
const text = String(node.innerText || node.textContent || '')
|
||
.replace(/\\s+/g, ' ')
|
||
.trim();
|
||
if (!text || text.length < 4 || text.length > 180) {
|
||
continue;
|
||
}
|
||
if (!keywords.some(keyword => text.includes(keyword))) {
|
||
continue;
|
||
}
|
||
if (!/\\d/.test(text)) {
|
||
continue;
|
||
}
|
||
if (seen.has(text)) {
|
||
continue;
|
||
}
|
||
seen.add(text);
|
||
results.push(text);
|
||
if (results.length >= 24) {
|
||
break;
|
||
}
|
||
}
|
||
return results;
|
||
}""",
|
||
semantic_keywords,
|
||
)
|
||
except Exception as e:
|
||
logger.debug(f"提取语义金额块失败: {e}")
|
||
return None, 'unknown'
|
||
|
||
high_signal_tokens = {'实付款', '订单金额', '应付金额', '应付', '实收', '付款金额', '支付金额', '实付', '改价后', '优惠后', '成交价', '支付价', '最终价'}
|
||
|
||
for block in text_blocks or []:
|
||
amount, source = self._extract_preferred_amount_from_text(block)
|
||
if amount is None or source == 'unknown':
|
||
continue
|
||
|
||
if source == 'currency' and not any(token in block for token in high_signal_tokens):
|
||
continue
|
||
|
||
semantic_source = f'semantic_{source}'
|
||
logger.info(f"通过语义金额块找到金额: {amount} (source={semantic_source}, block={block[:80]})")
|
||
return amount, semantic_source
|
||
|
||
return None, 'unknown'
|
||
|
||
def _extract_preferred_amount_from_text(self, text: str) -> Tuple[Optional[str], str]:
|
||
"""从文本中提取更可信的金额,优先识别实付款等语义化字段。"""
|
||
if not text:
|
||
return None, 'unknown'
|
||
|
||
normalized_text = re.sub(r'\s+', ' ', str(text)).strip()
|
||
if not normalized_text:
|
||
return None, 'unknown'
|
||
|
||
keyword_groups = [
|
||
('keyword_high', ['实付款', '订单金额', '应付金额', '应付', '实收金额', '实收', '付款金额', '支付金额', '实付']),
|
||
('keyword_low', ['改价后', '优惠后', '成交价', '支付价', '最终价', '合计', '总价', '商品总价']),
|
||
]
|
||
|
||
for source, keywords in keyword_groups:
|
||
for keyword in keywords:
|
||
escaped_keyword = re.escape(keyword)
|
||
patterns = [
|
||
rf'{escaped_keyword}\s*[::]?\s*[¥¥$]?\s*([0-9]+(?:\.[0-9]{{1,2}})?)',
|
||
rf'([0-9]+(?:\.[0-9]{{1,2}})?)\s*(?:元|块)?\s*{escaped_keyword}',
|
||
rf'[¥¥$]\s*([0-9]+(?:\.[0-9]{{1,2}})?)\s*{escaped_keyword}',
|
||
]
|
||
for pattern in patterns:
|
||
matches = re.findall(pattern, normalized_text)
|
||
if matches:
|
||
normalized_amount = self._normalize_amount_text(matches[-1])
|
||
if normalized_amount is not None:
|
||
return normalized_amount, source
|
||
|
||
currency_matches = re.findall(r'[¥¥$]\s*([0-9]+(?:\.[0-9]{1,2})?)', normalized_text)
|
||
if len(currency_matches) == 1:
|
||
normalized_amount = self._normalize_amount_text(currency_matches[0])
|
||
if normalized_amount is not None:
|
||
return normalized_amount, 'currency'
|
||
|
||
return None, 'unknown'
|
||
|
||
def _extract_coin_deduction_value_from_text(self, text: str) -> Optional[str]:
|
||
if not text:
|
||
return None
|
||
|
||
normalized_text = re.sub(r'\s+', ' ', str(text)).strip()
|
||
if not normalized_text or '闲鱼币抵扣' not in normalized_text:
|
||
return None
|
||
|
||
patterns = [
|
||
r'闲鱼币抵扣[^0-9¥¥$]{0,20}[¥¥$]?\s*([0-9]+(?:\.[0-9]{1,2})?)',
|
||
r'([0-9]+(?:\.[0-9]{1,2})?)\s*(?:元|块)?\s*闲鱼币抵扣',
|
||
]
|
||
for pattern in patterns:
|
||
matches = re.findall(pattern, normalized_text)
|
||
if matches:
|
||
normalized_amount = self._normalize_amount_text(matches[-1])
|
||
if normalized_amount is not None:
|
||
return normalized_amount
|
||
|
||
return None
|
||
|
||
def _resolve_coin_deduction_amount(
|
||
self,
|
||
primary_amount: Optional[str],
|
||
primary_source: str,
|
||
fallback_result: Dict[str, str],
|
||
page_text: str,
|
||
) -> Tuple[Optional[str], Optional[str]]:
|
||
if not primary_amount or not page_text or '闲鱼币抵扣' not in page_text:
|
||
return None, None
|
||
|
||
primary_amount_value = self._parse_amount_value(primary_amount)
|
||
if primary_amount_value is None or primary_amount_value <= 0:
|
||
return None, None
|
||
|
||
deduction_amount = self._extract_coin_deduction_value_from_text(page_text)
|
||
deduction_amount_value = self._parse_amount_value(deduction_amount)
|
||
if deduction_amount_value is not None and 0 < deduction_amount_value < primary_amount_value:
|
||
adjusted_amount = self._normalize_amount_text(f"{primary_amount_value - deduction_amount_value:.2f}")
|
||
adjusted_amount_value = self._parse_amount_value(adjusted_amount)
|
||
if adjusted_amount and adjusted_amount_value is not None and 0 < adjusted_amount_value < primary_amount_value:
|
||
logger.info(
|
||
f"检测到闲鱼币抵扣,使用实付金额覆盖原价: primary={primary_amount}, deduction={deduction_amount}, "
|
||
f"adjusted={adjusted_amount}, source={primary_source}"
|
||
)
|
||
return adjusted_amount, 'coin_deduction_adjusted'
|
||
|
||
fallback_amount = fallback_result.get('amount')
|
||
fallback_source = fallback_result.get('amount_source') or ''
|
||
fallback_amount_value = self._parse_amount_value(fallback_amount)
|
||
trusted_fallback_sources = {
|
||
'text_keyword_high',
|
||
'text_keyword_low',
|
||
'semantic_keyword_high',
|
||
'semantic_keyword_low',
|
||
}
|
||
|
||
if (
|
||
fallback_amount_value is not None and
|
||
0 < fallback_amount_value < primary_amount_value and
|
||
fallback_source in trusted_fallback_sources
|
||
):
|
||
logger.info(
|
||
f"检测到闲鱼币抵扣,使用文本实付金额覆盖原价: primary={primary_amount}, "
|
||
f"fallback={fallback_amount}, fallback_source={fallback_source}, source={primary_source}"
|
||
)
|
||
return fallback_amount, f'coin_deduction_{fallback_source}'
|
||
|
||
return None, None
|
||
|
||
async def _get_element_amount_context(self, element) -> str:
|
||
"""获取金额元素的局部上下文,用于判断当前数字是否真的是订单金额。"""
|
||
try:
|
||
return await element.evaluate(
|
||
"""(el) => {
|
||
const texts = [];
|
||
let current = el;
|
||
for (let i = 0; current && i < 4; i += 1, current = current.parentElement) {
|
||
const text = String(current.innerText || current.textContent || '')
|
||
.replace(/\\s+/g, ' ')
|
||
.trim();
|
||
if (!text) {
|
||
continue;
|
||
}
|
||
texts.push(text);
|
||
if (text.length >= 24) {
|
||
break;
|
||
}
|
||
}
|
||
return texts.join(' | ').slice(0, 240);
|
||
}"""
|
||
)
|
||
except Exception as e:
|
||
logger.debug(f"获取金额元素上下文失败: {e}")
|
||
return ''
|
||
|
||
async def _extract_amount_from_selectors(self) -> Tuple[Optional[str], str]:
|
||
amount_selectors = [
|
||
'.boldNum--JgEOXfA3',
|
||
'[class*="boldNum"]',
|
||
'[class*="pay"] [class*="num"]',
|
||
'[class*="amount"] [class*="num"]',
|
||
'[class*="price"] [class*="num"]',
|
||
]
|
||
|
||
for amount_selector in amount_selectors:
|
||
try:
|
||
amount_elements = await self.page.query_selector_all(amount_selector)
|
||
except Exception as selector_e:
|
||
logger.debug(f"金额选择器 {amount_selector} 解析失败: {selector_e}")
|
||
continue
|
||
|
||
for amount_element in amount_elements:
|
||
try:
|
||
amount_text = await amount_element.text_content()
|
||
except Exception as text_error:
|
||
logger.debug(f"读取金额元素文本失败 {amount_selector}: {text_error}")
|
||
continue
|
||
|
||
normalized_amount = self._normalize_amount_text(amount_text or '')
|
||
if normalized_amount is None:
|
||
continue
|
||
|
||
context_text = await self._get_element_amount_context(amount_element)
|
||
context_amount, context_source = self._extract_preferred_amount_from_text(context_text)
|
||
selector_lower = amount_selector.lower()
|
||
is_generic_selector = (
|
||
'price' in selector_lower and
|
||
'pay' not in selector_lower and
|
||
'amount' not in selector_lower and
|
||
'boldnum' not in selector_lower
|
||
)
|
||
|
||
if context_amount and context_amount != normalized_amount:
|
||
logger.info(
|
||
f"金额候选与上下文主金额不一致,跳过: selector={amount_selector}, "
|
||
f"element={normalized_amount}, context={context_amount}, context_source={context_source}"
|
||
)
|
||
continue
|
||
|
||
if is_generic_selector and not context_amount:
|
||
logger.info(
|
||
f"通用价格选择器缺少可信上下文,跳过金额候选: "
|
||
f"selector={amount_selector}, element={normalized_amount}"
|
||
)
|
||
continue
|
||
|
||
if context_amount:
|
||
amount_source = f'selector_{context_source}'
|
||
else:
|
||
amount_source = 'selector_direct'
|
||
|
||
logger.info(f"通过选择器 {amount_selector} 找到金额: {normalized_amount} (source={amount_source})")
|
||
return normalized_amount, amount_source
|
||
|
||
return None, 'unknown'
|
||
|
||
def _is_datetime_like(self, text: str) -> bool:
|
||
"""判断文本是否明显像时间/日期,而非规格。"""
|
||
if not text:
|
||
return False
|
||
normalized = str(text).strip()
|
||
if not normalized:
|
||
return False
|
||
|
||
datetime_patterns = [
|
||
r'^\d{4}[-/]\d{1,2}[-/]\d{1,2}$',
|
||
r'^\d{1,2}:\d{2}(:\d{2})?$',
|
||
r'^\d{4}[-/]\d{1,2}[-/]\d{1,2}\s+\d{1,2}:\d{2}(:\d{2})?$',
|
||
r'^\d{10,13}$',
|
||
]
|
||
return any(re.match(pattern, normalized) for pattern in datetime_patterns)
|
||
|
||
def _is_text_fallback_spec_name_like(self, spec_name: str) -> bool:
|
||
"""校验纯文本兜底中的规格名称是否像真实SKU字段。"""
|
||
normalized = re.sub(r'\s+', '', (spec_name or '').strip())
|
||
if not normalized:
|
||
return False
|
||
|
||
strict_patterns = [
|
||
r'^(?:商品)?类型\d*$',
|
||
r'^(?:商品)?规格\d*$',
|
||
r'^版本(?:选择)?\d*$',
|
||
r'^(?:商品)?分类$',
|
||
r'^选区$',
|
||
r'^区服$',
|
||
r'^服区$',
|
||
r'^分区$',
|
||
r'^平台$',
|
||
r'^系统$',
|
||
r'^颜色$',
|
||
r'^尺码$',
|
||
r'^尺寸$',
|
||
r'^套餐(?:类型)?$',
|
||
r'^型号(?:选择)?$',
|
||
r'^配置$',
|
||
r'^容量$',
|
||
r'^时长$',
|
||
r'^面额$',
|
||
r'^账号(?:类型)?$',
|
||
r'^远程$',
|
||
r'^语言$',
|
||
r'^发货方式$',
|
||
r'^安装方式$',
|
||
r'^接口$',
|
||
r'^地区$',
|
||
r'^区域$',
|
||
r'^省份$',
|
||
r'^城市$',
|
||
r'^选项\d*$',
|
||
r'^属性\d*$',
|
||
r'^服务器$',
|
||
r'^角色$',
|
||
r'^职业$',
|
||
r'^档位$',
|
||
]
|
||
return any(re.match(pattern, normalized, re.IGNORECASE) for pattern in strict_patterns)
|
||
|
||
def _is_valid_spec_candidate(self, spec_name: str, spec_value: str, *, strict: bool = False) -> bool:
|
||
"""校验规格候选是否可信,过滤备案信息/时间等误命中。"""
|
||
name = (spec_name or '').strip()
|
||
value = (spec_value or '').strip()
|
||
|
||
if not name or not value:
|
||
return False
|
||
|
||
# 键名过长通常是正文信息,不是规格名称
|
||
if len(name) > 20:
|
||
return False
|
||
|
||
# 时间戳/日期误识别
|
||
if self._is_datetime_like(name) or self._is_datetime_like(value):
|
||
return False
|
||
|
||
# URL/协议字段不是规格
|
||
invalid_protocol_tokens = ['http://', 'https://', 'fleamarket://']
|
||
if any(token in name.lower() for token in invalid_protocol_tokens):
|
||
return False
|
||
if any(token in value.lower() for token in invalid_protocol_tokens):
|
||
return False
|
||
|
||
# 过滤常见平台资质、订单流程字段
|
||
invalid_tokens = [
|
||
'统一社会信用代码', '许可证', '备案', '经营', '广播电视节目',
|
||
'营业性演出', '集邮市场', '增值电信', 'app备案号',
|
||
'订单号', '付款', '交易', '退款', '发货', '收货',
|
||
'买家', '卖家', '地址', '电话', '手机号', '快递', '物流',
|
||
'创建时间', '付款时间', '成交时间', '下单时间'
|
||
]
|
||
lower_name = name.lower()
|
||
lower_value = value.lower()
|
||
if any(token in lower_name for token in invalid_tokens):
|
||
return False
|
||
if any(token in lower_value for token in invalid_tokens):
|
||
return False
|
||
|
||
if strict and not (
|
||
self._is_text_fallback_spec_name_like(name) or
|
||
self._is_numeric_index_spec_name_like(name, value)
|
||
):
|
||
return False
|
||
|
||
return True
|
||
|
||
def _sanitize_sku_result(self, sku_info: Dict[str, str], source: str = "unknown") -> Dict[str, str]:
|
||
"""清洗SKU结果中的可疑规格字段,避免误发。"""
|
||
if not sku_info:
|
||
return sku_info
|
||
|
||
result = dict(sku_info)
|
||
|
||
spec_name = (result.get('spec_name') or '').strip()
|
||
spec_value = (result.get('spec_value') or '').strip()
|
||
spec_name_2 = (result.get('spec_name_2') or '').strip()
|
||
spec_value_2 = (result.get('spec_value_2') or '').strip()
|
||
|
||
strict_validation = source.startswith('text_fallback')
|
||
primary_valid = self._is_valid_spec_candidate(spec_name, spec_value, strict=strict_validation)
|
||
secondary_valid = self._is_valid_spec_candidate(spec_name_2, spec_value_2, strict=strict_validation) if (spec_name_2 or spec_value_2) else False
|
||
|
||
if not primary_valid and (spec_name or spec_value):
|
||
logger.warning(
|
||
f"过滤疑似误识别规格(primary, source={source}): {spec_name}:{spec_value}"
|
||
)
|
||
result.pop('spec_name', None)
|
||
result.pop('spec_value', None)
|
||
|
||
if not secondary_valid and (spec_name_2 or spec_value_2):
|
||
logger.warning(
|
||
f"过滤疑似误识别规格(secondary, source={source}): {spec_name_2}:{spec_value_2}"
|
||
)
|
||
result.pop('spec_name_2', None)
|
||
result.pop('spec_value_2', None)
|
||
|
||
# 如果主规格被清掉而次规格有效,则提升次规格为主规格
|
||
if ('spec_name' not in result or not result.get('spec_name')) and result.get('spec_name_2') and result.get('spec_value_2'):
|
||
result['spec_name'] = result.pop('spec_name_2')
|
||
result['spec_value'] = result.pop('spec_value_2')
|
||
logger.info(f"规格清洗后提升次规格为主规格(source={source})")
|
||
|
||
return result
|
||
|
||
def _get_status_priority(self, status: str) -> int:
|
||
priority_map = {
|
||
'unknown': 0,
|
||
'pending_payment': 10,
|
||
'pending_ship': 20,
|
||
'shipped': 30,
|
||
'completed': 40,
|
||
'refunding': 50,
|
||
'cancelled': 60,
|
||
}
|
||
return priority_map.get(status or 'unknown', 0)
|
||
|
||
def _extract_status_matches_from_text(self, text: str, *, source: str = 'generic') -> Dict[str, list]:
|
||
"""从文本中提取状态命中详情,便于按来源做更保守的判定。"""
|
||
if not text:
|
||
return {}
|
||
|
||
normalized_text = re.sub(r'\s+', ' ', str(text)).strip()
|
||
if not normalized_text:
|
||
return {}
|
||
|
||
status_patterns = [
|
||
('cancelled', ['交易关闭', '已关闭', '钱款已原路退返', '订单关闭']),
|
||
('refunding', ['退款中', '退货退款', '退款关闭']),
|
||
('completed', ['买家确认收货', '已确认收货,交易成功', '交易成功', '已完成']),
|
||
('shipped', ['等待买家收货', '待收货', '已发货', '查看物流', '确认收货']),
|
||
('pending_ship', ['待发货', '等待你发货', '等待卖家发货', '去发货', '付款完成待发货', '记得及时发货']),
|
||
('pending_payment', ['待付款', '等待买家付款']),
|
||
]
|
||
|
||
if source == 'button':
|
||
status_patterns = [
|
||
('cancelled', ['关闭订单', '订单关闭']),
|
||
('refunding', ['退款中', '退款详情']),
|
||
('completed', ['交易成功', '已完成']),
|
||
('shipped', ['提醒收货', '延长收货', '查看物流', '已发货', '确认收货']),
|
||
('pending_ship', ['去发货', '立即发货', '待发货']),
|
||
('pending_payment', ['修改价格', '等待付款']),
|
||
]
|
||
|
||
if source == 'body':
|
||
status_patterns = [
|
||
('cancelled', ['交易关闭', '已关闭', '钱款已原路退返', '订单关闭']),
|
||
('refunding', ['退款中', '退货退款', '退款关闭']),
|
||
('completed', ['买家已确认收货', '买家确认收货,交易成功', '已确认收货,交易成功']),
|
||
('shipped', ['等待买家收货', '提醒收货', '延长收货']),
|
||
('pending_ship', ['待发货', '等待你发货', '等待卖家发货', '去发货', '付款完成待发货', '记得及时发货']),
|
||
('pending_payment', ['待付款', '等待买家付款']),
|
||
]
|
||
|
||
if source == 'button_group':
|
||
status_patterns = [
|
||
('cancelled', ['关闭订单', '订单关闭']),
|
||
('refunding', ['退款中', '退款详情']),
|
||
('completed', ['交易成功', '已完成']),
|
||
('shipped', ['提醒收货', '延长收货', '查看物流', '已发货', '确认收货']),
|
||
('pending_ship', ['去发货', '立即发货', '待发货']),
|
||
('pending_payment', ['修改价格', '等待付款']),
|
||
]
|
||
|
||
matched_statuses: Dict[str, list] = {}
|
||
for status, patterns in status_patterns:
|
||
matched_patterns = [pattern for pattern in patterns if pattern in normalized_text]
|
||
if matched_patterns:
|
||
matched_statuses[status] = matched_patterns
|
||
|
||
if source == 'button_group':
|
||
completed_signals = []
|
||
if '去评价' in normalized_text:
|
||
completed_signals.append('去评价')
|
||
if '查看钱款' in normalized_text:
|
||
completed_signals.append('查看钱款')
|
||
if '删除订单' in normalized_text:
|
||
completed_signals.append('删除订单')
|
||
|
||
if {'去评价', '查看钱款'}.issubset(set(completed_signals)):
|
||
matched_statuses['completed'] = completed_signals
|
||
|
||
if source == 'body':
|
||
completed_signals = []
|
||
if '快给ta一个评价吧~' in normalized_text or '快给ta一个评价吧~' in normalized_text:
|
||
completed_signals.append('快给ta一个评价吧')
|
||
if '查看钱款' in normalized_text:
|
||
completed_signals.append('查看钱款')
|
||
if '去评价' in normalized_text:
|
||
completed_signals.append('去评价')
|
||
|
||
if '快给ta一个评价吧' in ''.join(completed_signals) and ('查看钱款' in completed_signals or '去评价' in completed_signals):
|
||
matched_statuses['completed'] = completed_signals
|
||
|
||
return matched_statuses
|
||
|
||
def _extract_status_from_text(self, text: str, *, source: str = 'generic') -> str:
|
||
"""从任意文本中提取订单状态,优先返回更可靠/更后置的状态。"""
|
||
matched_status_map = self._extract_status_matches_from_text(text, source=source)
|
||
if not matched_status_map:
|
||
return 'unknown'
|
||
|
||
if source == 'body':
|
||
if 'completed' in matched_status_map and 'shipped' in matched_status_map:
|
||
logger.warning(
|
||
f"订单状态全文兜底同时命中已发货/已完成信号,优先采用shipped: "
|
||
f"completed={matched_status_map.get('completed')}, "
|
||
f"shipped={matched_status_map.get('shipped')}"
|
||
)
|
||
return 'shipped'
|
||
|
||
if 'pending_ship' in matched_status_map and 'shipped' in matched_status_map:
|
||
logger.warning(
|
||
f"订单状态全文兜底出现冲突信号,保守返回unknown: "
|
||
f"pending_ship={matched_status_map.get('pending_ship')}, "
|
||
f"shipped={matched_status_map.get('shipped')}"
|
||
)
|
||
return 'unknown'
|
||
|
||
if 'pending_ship' in matched_status_map and 'pending_payment' in matched_status_map:
|
||
logger.info(
|
||
f"订单状态全文兜底检测到待付款/待发货混合信号,优先采用pending_ship: "
|
||
f"pending_ship={matched_status_map.get('pending_ship')}, "
|
||
f"pending_payment={matched_status_map.get('pending_payment')}"
|
||
)
|
||
return 'pending_ship'
|
||
|
||
matched_statuses = list(matched_status_map.keys())
|
||
|
||
matched_statuses.sort(key=self._get_status_priority, reverse=True)
|
||
return matched_statuses[0]
|
||
|
||
async def _collect_texts_by_selectors(self, selectors, *, max_length: int = 40, max_items: int = 12) -> list:
|
||
"""按选择器批量采集文本,自动去重。"""
|
||
collected = []
|
||
seen = set()
|
||
|
||
for selector in selectors:
|
||
try:
|
||
elements = await self.page.query_selector_all(selector)
|
||
except Exception as e:
|
||
logger.debug(f"批量采集选择器失败 {selector}: {e}")
|
||
continue
|
||
|
||
for element in elements:
|
||
try:
|
||
text = await element.text_content()
|
||
except Exception as text_error:
|
||
logger.debug(f"读取元素文本失败 {selector}: {text_error}")
|
||
continue
|
||
|
||
normalized_text = re.sub(r'\s+', ' ', str(text or '')).strip()
|
||
if not normalized_text:
|
||
continue
|
||
if max_length and len(normalized_text) > max_length:
|
||
continue
|
||
if normalized_text in seen:
|
||
continue
|
||
|
||
seen.add(normalized_text)
|
||
collected.append(normalized_text)
|
||
if len(collected) >= max_items:
|
||
return collected
|
||
|
||
return collected
|
||
|
||
async def _get_page_text(self) -> str:
|
||
"""获取页面可读文本,失败时返回空字符串"""
|
||
try:
|
||
return (await self.page.inner_text('body')).strip()
|
||
except Exception:
|
||
try:
|
||
html_content = await self.page.content()
|
||
return re.sub(r'\s+', ' ', re.sub(r'<[^>]+>', ' ', html_content)).strip()
|
||
except Exception:
|
||
return ''
|
||
|
||
def _build_spec_candidate_identity(self, candidate: Dict[str, str]) -> Tuple[str, str, str, str]:
|
||
"""构建规格候选去重键,避免同一候选重复进入兜底流程。"""
|
||
return (
|
||
(candidate.get('spec_name') or '').strip(),
|
||
(candidate.get('spec_value') or '').strip(),
|
||
(candidate.get('spec_name_2') or '').strip(),
|
||
(candidate.get('spec_value_2') or '').strip(),
|
||
)
|
||
|
||
def _classify_spec_parse_mode(self, sku_info: Optional[Dict[str, str]]) -> str:
|
||
"""根据当前SKU结果判断规格解析模式。"""
|
||
info = sku_info or {}
|
||
has_primary = bool((info.get('spec_name') or '').strip() and (info.get('spec_value') or '').strip())
|
||
has_secondary = bool((info.get('spec_name_2') or '').strip() and (info.get('spec_value_2') or '').strip())
|
||
|
||
if has_primary and has_secondary:
|
||
return 'two_spec'
|
||
if has_primary:
|
||
return 'one_spec'
|
||
return 'no_spec'
|
||
|
||
def _extract_sku_from_text(self, text: str) -> Dict[str, str]:
|
||
"""从页面纯文本中兜底提取金额/规格/数量"""
|
||
result: Dict[str, str] = {}
|
||
if not text:
|
||
return result
|
||
|
||
lines = [line.strip() for line in text.splitlines() if line and line.strip()]
|
||
|
||
# 优先从金额关键词行提取金额
|
||
amount_keywords = ['实付款', '订单金额', '实收', '合计', '总价', '应付', '支付金额', '实付']
|
||
for line in lines:
|
||
if any(keyword in line for keyword in amount_keywords):
|
||
normalized_amount, amount_source = self._extract_preferred_amount_from_text(line)
|
||
if normalized_amount:
|
||
result['amount'] = normalized_amount
|
||
result['amount_source'] = f'text_{amount_source}'
|
||
break
|
||
|
||
# 兜底:从全文提取货币数字
|
||
if 'amount' not in result:
|
||
normalized_amount, amount_source = self._extract_preferred_amount_from_text(text)
|
||
if normalized_amount:
|
||
result['amount'] = normalized_amount
|
||
result['amount_source'] = f'text_{amount_source}'
|
||
|
||
# 数量提取
|
||
quantity_patterns = [
|
||
r'数量\s*[::]?\s*x?\s*(\d+)',
|
||
r'\bx\s*(\d{1,3})\b',
|
||
]
|
||
for pattern in quantity_patterns:
|
||
quantity_match = re.search(pattern, text, re.IGNORECASE)
|
||
if quantity_match:
|
||
result['quantity'] = quantity_match.group(1)
|
||
break
|
||
|
||
# 规格提取:过滤明显非规格行
|
||
spec_candidates = []
|
||
spec_candidate_keys = set()
|
||
ignore_tokens = [
|
||
'http://', 'https://', 'fleamarket://', '订单', '买家', '卖家', '地址',
|
||
'手机', '电话', '时间', '发货', '付款', '交易', '退款', '去发货', '修改价格',
|
||
'等待你发货', '等待买家', '已发货', '待收货', '待发货',
|
||
'统一社会信用代码', '许可证', '备案', '经营', '广播电视节目',
|
||
'营业性演出', '集邮市场', '增值电信', 'app备案号'
|
||
]
|
||
|
||
for line in lines:
|
||
normalized_line = line.replace(':', ':')
|
||
if ':' not in normalized_line:
|
||
continue
|
||
if any(token in normalized_line for token in ignore_tokens):
|
||
continue
|
||
|
||
left, right = normalized_line.split(':', 1)
|
||
left = left.strip()
|
||
right = right.strip()
|
||
if not left or not right:
|
||
continue
|
||
if len(left) > 16:
|
||
continue
|
||
|
||
parsed = self._parse_sku_content(f"{left}:{right}")
|
||
if parsed:
|
||
sanitized_candidate = self._sanitize_sku_result(parsed, source="text_fallback_candidate")
|
||
if sanitized_candidate.get('spec_name') and sanitized_candidate.get('spec_value'):
|
||
candidate_key = self._build_spec_candidate_identity(sanitized_candidate)
|
||
if candidate_key not in spec_candidate_keys:
|
||
spec_candidate_keys.add(candidate_key)
|
||
spec_candidates.append(sanitized_candidate)
|
||
|
||
if spec_candidates:
|
||
explicit_multi_spec_candidates = [
|
||
candidate for candidate in spec_candidates
|
||
if candidate.get('spec_name_2') and candidate.get('spec_value_2')
|
||
]
|
||
|
||
selected_candidate = None
|
||
if len(explicit_multi_spec_candidates) == 1:
|
||
selected_candidate = explicit_multi_spec_candidates[0]
|
||
elif len(spec_candidates) == 1:
|
||
selected_candidate = spec_candidates[0]
|
||
else:
|
||
logger.warning(
|
||
"SKU文本兜底检测到多个规格候选,判定为歧义并跳过规格字段: "
|
||
f"{[self._build_spec_candidate_identity(candidate) for candidate in spec_candidates]}"
|
||
)
|
||
|
||
if selected_candidate:
|
||
if selected_candidate.get('spec_name') and selected_candidate.get('spec_value'):
|
||
result['spec_name'] = selected_candidate['spec_name']
|
||
result['spec_value'] = selected_candidate['spec_value']
|
||
if selected_candidate.get('spec_name_2') and selected_candidate.get('spec_value_2'):
|
||
result['spec_name_2'] = selected_candidate['spec_name_2']
|
||
result['spec_value_2'] = selected_candidate['spec_value_2']
|
||
|
||
return self._sanitize_sku_result(result, source="text_fallback_result")
|
||
|
||
def _is_order_detail_parse_success(self, sku_info: Optional[Dict[str, str]], order_status: str) -> bool:
|
||
"""判定订单详情解析是否成功(金额/规格/状态任一有效即可)"""
|
||
info = sku_info or {}
|
||
has_valid_amount = self._has_valid_amount(info.get('amount'))
|
||
has_valid_spec = bool(info.get('spec_name') and info.get('spec_value'))
|
||
has_valid_status = bool(order_status and order_status != 'unknown')
|
||
return has_valid_amount or has_valid_spec or has_valid_status
|
||
|
||
def _build_parse_field_flags(self, sku_info: Optional[Dict[str, str]], order_status: str) -> Dict[str, Any]:
|
||
"""构建解析字段完整性标记,便于统一告警日志检索。"""
|
||
info = sku_info or {}
|
||
return {
|
||
'has_amount': self._has_valid_amount(info.get('amount')),
|
||
'has_spec': bool(info.get('spec_name') and info.get('spec_value')),
|
||
'has_status': bool(order_status and order_status != 'unknown'),
|
||
'amount': info.get('amount', ''),
|
||
'spec_name': info.get('spec_name', ''),
|
||
'spec_value': info.get('spec_value', ''),
|
||
'quantity': info.get('quantity', ''),
|
||
'order_status': order_status or ''
|
||
}
|
||
|
||
def _log_order_detail_parse_event(
|
||
self,
|
||
event_name: str,
|
||
order_id: str,
|
||
url: str,
|
||
attempt: str,
|
||
sku_info: Optional[Dict[str, str]],
|
||
order_status: str,
|
||
level: str = "warning",
|
||
error: str = None
|
||
) -> None:
|
||
"""输出结构化的订单详情解析告警/恢复日志。"""
|
||
try:
|
||
field_flags = self._build_parse_field_flags(sku_info, order_status)
|
||
payload = {
|
||
'event': event_name,
|
||
'cookie_id': self.cookie_id_for_log,
|
||
'order_id': order_id,
|
||
'attempt': attempt,
|
||
'url': url,
|
||
'field_flags': field_flags
|
||
}
|
||
if error:
|
||
payload['error'] = error
|
||
|
||
log_msg = f"{event_name} {json.dumps(payload, ensure_ascii=False, sort_keys=True)}"
|
||
if level == "info":
|
||
logger.info(log_msg)
|
||
else:
|
||
logger.warning(log_msg)
|
||
except Exception as log_error:
|
||
logger.warning(f"订单解析事件日志输出失败: {log_error}")
|
||
|
||
async def _get_order_status(self) -> str:
|
||
"""
|
||
从订单详情页面获取订单状态
|
||
|
||
Returns:
|
||
订单状态字符串,可能的值:
|
||
- 'pending_payment': 待付款
|
||
- 'pending_ship': 待发货
|
||
- 'shipped': 已发货/待收货
|
||
- 'completed': 交易成功
|
||
- 'refunding': 退款中
|
||
- 'cancelled': 交易关闭
|
||
- 'unknown': 未知状态
|
||
"""
|
||
try:
|
||
self._last_order_status_source = 'unknown'
|
||
if not await self._check_browser_status():
|
||
logger.error("浏览器状态异常,无法获取订单状态")
|
||
return 'unknown'
|
||
|
||
# 尝试多种选择器获取订单状态
|
||
status_selectors = [
|
||
'.orderStatusText--F6eoVcHD', # 常见的订单状态选择器
|
||
'.order-status',
|
||
'.status-text',
|
||
'[class*="orderStatus"]',
|
||
'[class*="StatusText"]',
|
||
'[class*="status"]',
|
||
]
|
||
|
||
status_text = ''
|
||
for selector in status_selectors:
|
||
try:
|
||
element = await self.page.query_selector(selector)
|
||
if element:
|
||
text = await element.text_content()
|
||
if text:
|
||
status_text = text.strip()
|
||
logger.info(f"通过选择器 {selector} 获取到订单状态: {status_text}")
|
||
break
|
||
except Exception as e:
|
||
logger.debug(f"选择器 {selector} 获取失败: {e}")
|
||
continue
|
||
|
||
button_selectors = [
|
||
'button',
|
||
'[role="button"]',
|
||
'[class*="button"]',
|
||
'[class*="Button"]',
|
||
'[class*="btn"]',
|
||
]
|
||
|
||
parsed_from_selector = 'unknown'
|
||
button_texts = await self._collect_texts_by_selectors(button_selectors, max_length=24, max_items=16)
|
||
button_status = 'unknown'
|
||
for button_text in button_texts:
|
||
candidate_status = self._extract_status_from_text(button_text, source='button')
|
||
if self._get_status_priority(candidate_status) > self._get_status_priority(button_status):
|
||
button_status = candidate_status
|
||
|
||
button_group_status = 'unknown'
|
||
if button_texts:
|
||
button_group_status = self._extract_status_from_text(' | '.join(button_texts), source='button_group')
|
||
if self._get_status_priority(button_group_status) > self._get_status_priority(button_status):
|
||
button_status = button_group_status
|
||
|
||
# 先解析选择器结果
|
||
if status_text:
|
||
parsed_from_selector = self._extract_status_from_text(status_text, source='selector')
|
||
if parsed_from_selector == 'unknown':
|
||
logger.warning(f"未知的订单状态文本: {status_text}")
|
||
|
||
preferred_status = parsed_from_selector
|
||
preferred_source = 'selector' if parsed_from_selector != 'unknown' else 'unknown'
|
||
if self._get_status_priority(button_status) > self._get_status_priority(preferred_status):
|
||
preferred_status = button_status
|
||
preferred_source = 'button'
|
||
|
||
logger.info(
|
||
f"订单状态解析候选: selector={parsed_from_selector} ({status_text or 'empty'}), "
|
||
f"button={button_status} ({button_texts or []}), button_group={button_group_status}"
|
||
)
|
||
|
||
if preferred_status != 'unknown':
|
||
self._last_order_status_source = preferred_source
|
||
logger.info(f"订单状态解析最终采用结构化结果: {preferred_status} (source={preferred_source})")
|
||
return preferred_status
|
||
|
||
# 如果选择器/按钮都没有有效结果,尝试从页面文本中提取
|
||
body_text = await self._get_page_text()
|
||
body_status = self._extract_status_from_text(body_text, source='body')
|
||
logger.info(f"订单状态解析候选: body={body_status}")
|
||
if body_status != 'unknown':
|
||
self._last_order_status_source = 'body'
|
||
logger.info(f"从页面文本中检测到订单状态 -> {body_status}")
|
||
return body_status
|
||
|
||
logger.warning("无法获取订单状态")
|
||
return 'unknown'
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取订单状态异常: {e}")
|
||
return 'unknown'
|
||
|
||
def _extract_labeled_datetime_from_text(self, text: str, labels: List[str]) -> Optional[str]:
|
||
if not text:
|
||
return None
|
||
|
||
normalized_text = str(text).replace('\u3000', ' ')
|
||
datetime_pattern = (
|
||
r'(\d{4}\s*(?:年|[-/.])\s*\d{1,2}\s*(?:月|[-/.])\s*\d{1,2}'
|
||
r'\s*(?:日)?\s*(?:T|\s+)\s*\d{1,2}\s*:\s*\d{1,2}(?:\s*:\s*\d{1,2})?)'
|
||
)
|
||
for label in labels:
|
||
for pattern in (
|
||
rf'{re.escape(label)}\s*[::]?\s*{datetime_pattern}',
|
||
rf'{re.escape(label)}[^\d]{{0,8}}{datetime_pattern}',
|
||
):
|
||
match = re.search(pattern, normalized_text, re.IGNORECASE | re.S)
|
||
if not match:
|
||
continue
|
||
parsed = parse_local_datetime_text_to_db_utc(match.group(1))
|
||
if parsed:
|
||
return parsed
|
||
return None
|
||
|
||
def _extract_order_time_fields_from_text(self, text: str) -> Dict[str, str]:
|
||
if not text:
|
||
return {}
|
||
|
||
result: Dict[str, str] = {}
|
||
field_label_map = {
|
||
'platform_created_at': ['创建时间', '下单时间'],
|
||
'platform_paid_at': ['付款时间', '支付时间'],
|
||
'platform_completed_at': ['成交时间', '完成时间', '确认收货时间'],
|
||
}
|
||
|
||
for field_name, labels in field_label_map.items():
|
||
parsed_value = self._extract_labeled_datetime_from_text(text, labels)
|
||
if parsed_value:
|
||
result[field_name] = parsed_value
|
||
|
||
return result
|
||
|
||
async def _get_order_time_fields(self) -> Dict[str, str]:
|
||
labels = ['创建时间', '下单时间', '付款时间', '支付时间', '成交时间', '完成时间', '确认收货时间']
|
||
candidate_texts: List[str] = []
|
||
|
||
page_text = await self._get_page_text()
|
||
if page_text:
|
||
candidate_texts.append(page_text)
|
||
|
||
try:
|
||
text_blocks = await self.page.evaluate(
|
||
"""(labels) => {
|
||
const nodes = Array.from(document.querySelectorAll('div, span, p, li, section, article'));
|
||
const results = [];
|
||
const seen = new Set();
|
||
for (const node of nodes) {
|
||
const text = String(node.innerText || node.textContent || '')
|
||
.replace(/\\s+/g, ' ')
|
||
.trim();
|
||
if (!text || text.length < 8 || text.length > 120) {
|
||
continue;
|
||
}
|
||
if (!/\\d{4}/.test(text)) {
|
||
continue;
|
||
}
|
||
if (!labels.some((label) => text.includes(label))) {
|
||
continue;
|
||
}
|
||
if (seen.has(text)) {
|
||
continue;
|
||
}
|
||
seen.add(text);
|
||
results.push(text);
|
||
if (results.length >= 24) {
|
||
break;
|
||
}
|
||
}
|
||
return results;
|
||
}""",
|
||
labels,
|
||
)
|
||
candidate_texts.extend(text_blocks or [])
|
||
except Exception as e:
|
||
logger.debug(f"提取订单时间文本块失败: {e}")
|
||
|
||
merged_result: Dict[str, str] = {}
|
||
for candidate_text in candidate_texts:
|
||
extracted_fields = self._extract_order_time_fields_from_text(candidate_text)
|
||
for field_name, field_value in extracted_fields.items():
|
||
if field_value and field_name not in merged_result:
|
||
merged_result[field_name] = field_value
|
||
|
||
if merged_result:
|
||
logger.info(f"提取到订单平台时间字段: {merged_result}")
|
||
|
||
return merged_result
|
||
|
||
async def _get_sku_content(self) -> Optional[Dict[str, str]]:
|
||
"""获取并解析SKU内容,包括规格、数量和金额,支持双规格"""
|
||
try:
|
||
# 检查浏览器状态
|
||
if not await self._check_browser_status():
|
||
logger.error("浏览器状态异常,无法获取SKU内容")
|
||
return {}
|
||
|
||
result: Dict[str, str] = {}
|
||
page_text = await self._get_page_text()
|
||
fallback_result = self._extract_sku_from_text(page_text) if page_text else {}
|
||
|
||
# 获取规格元素(主通道)
|
||
sku_selector = '.sku--u_ddZval'
|
||
sku_elements = await self.page.query_selector_all(sku_selector)
|
||
logger.info(f"找到 {len(sku_elements)} 个 sku--u_ddZval 元素")
|
||
|
||
# 获取金额:优先结构化响应/结构化页面内容,再尝试语义块,最后才走选择器兜底
|
||
amount, amount_source = await self._extract_amount_from_structured_content()
|
||
if amount is None:
|
||
amount, amount_source = await self._extract_amount_from_semantic_blocks()
|
||
if amount is None:
|
||
amount, amount_source = await self._extract_amount_from_selectors()
|
||
if amount is not None:
|
||
result['amount'] = amount
|
||
result['amount_source'] = amount_source
|
||
|
||
adjusted_coin_amount, adjusted_coin_source = self._resolve_coin_deduction_amount(
|
||
result.get('amount'),
|
||
result.get('amount_source', ''),
|
||
fallback_result,
|
||
page_text,
|
||
)
|
||
if adjusted_coin_amount is not None:
|
||
result['amount'] = adjusted_coin_amount
|
||
result['amount_source'] = adjusted_coin_source
|
||
|
||
structured_sku_result = await self._extract_sku_from_structured_content()
|
||
if structured_sku_result:
|
||
for key in ['spec_name', 'spec_value', 'spec_name_2', 'spec_value_2', 'quantity']:
|
||
if structured_sku_result.get(key):
|
||
result[key] = structured_sku_result[key]
|
||
|
||
# 收集所有元素的内容
|
||
all_contents = []
|
||
for i, element in enumerate(sku_elements):
|
||
content = await element.text_content()
|
||
if content:
|
||
content = content.strip()
|
||
all_contents.append(content)
|
||
logger.info(f"元素 {i+1} 原始内容: {content}")
|
||
|
||
# 分类:规格 vs 数量
|
||
specs = []
|
||
quantity_content = None
|
||
|
||
for content in all_contents:
|
||
if '数量' in content:
|
||
# 这是数量
|
||
quantity_content = content
|
||
elif ':' in content:
|
||
# 这是规格(包含冒号的)
|
||
specs.append(content)
|
||
else:
|
||
# 没有冒号也没有"数量",可能是纯数字(如 x1)
|
||
if content.startswith('x') or content.isdigit():
|
||
quantity_content = content
|
||
else:
|
||
# 其他情况当作规格处理
|
||
specs.append(content)
|
||
|
||
# 解析规格1(主通道)
|
||
if len(specs) >= 1:
|
||
parsed_spec = self._parse_sku_content(specs[0])
|
||
if parsed_spec:
|
||
result['spec_name'] = parsed_spec['spec_name']
|
||
result['spec_value'] = parsed_spec['spec_value']
|
||
|
||
# 检查第一个规格是否已包含双规格(分号分隔的情况)
|
||
if 'spec_name_2' in parsed_spec and 'spec_value_2' in parsed_spec:
|
||
result['spec_name_2'] = parsed_spec['spec_name_2']
|
||
result['spec_value_2'] = parsed_spec['spec_value_2']
|
||
|
||
# 解析规格2(如果存在且尚未从分号分隔中获取)
|
||
if len(specs) >= 2 and 'spec_name_2' not in result:
|
||
parsed_spec2 = self._parse_sku_content(specs[1])
|
||
if parsed_spec2:
|
||
result['spec_name_2'] = parsed_spec2['spec_name']
|
||
result['spec_value_2'] = parsed_spec2['spec_value']
|
||
|
||
# 如果有更多规格,记录日志(目前只支持双规格)
|
||
if len(specs) > 2:
|
||
logger.warning(f"检测到 {len(specs)} 个规格,目前只支持双规格,多余的规格将被忽略")
|
||
|
||
# 解析数量
|
||
if quantity_content:
|
||
logger.info(f"数量原始内容: {quantity_content}")
|
||
|
||
if ':' in quantity_content:
|
||
quantity_value = quantity_content.split(':', 1)[1].strip()
|
||
else:
|
||
quantity_value = quantity_content
|
||
|
||
# 去掉数量值前面的 'x' 符号(如 "x2" -> "2")
|
||
if quantity_value.startswith('x'):
|
||
quantity_value = quantity_value[1:]
|
||
|
||
result['quantity'] = quantity_value
|
||
logger.info(f"提取到数量: {quantity_value}")
|
||
|
||
# 如果核心字段缺失,使用页面文本兜底;规格字段仅在主通道缺失主规格时才整体补齐
|
||
fallback_used = False
|
||
if 'amount' not in result and fallback_result.get('amount'):
|
||
result['amount'] = fallback_result['amount']
|
||
fallback_used = True
|
||
if 'amount_source' not in result and fallback_result.get('amount_source'):
|
||
result['amount_source'] = fallback_result['amount_source']
|
||
fallback_used = True
|
||
|
||
has_primary_spec = bool(result.get('spec_name') and result.get('spec_value'))
|
||
if not has_primary_spec and fallback_result.get('spec_name') and fallback_result.get('spec_value'):
|
||
result['spec_name'] = fallback_result['spec_name']
|
||
result['spec_value'] = fallback_result['spec_value']
|
||
fallback_used = True
|
||
|
||
if fallback_result.get('spec_name_2') and fallback_result.get('spec_value_2'):
|
||
result['spec_name_2'] = fallback_result['spec_name_2']
|
||
result['spec_value_2'] = fallback_result['spec_value_2']
|
||
elif has_primary_spec and fallback_result.get('spec_name_2') and fallback_result.get('spec_value_2'):
|
||
same_primary_spec = (
|
||
(result.get('spec_name') or '').strip() == (fallback_result.get('spec_name') or '').strip()
|
||
and (result.get('spec_value') or '').strip() == (fallback_result.get('spec_value') or '').strip()
|
||
)
|
||
|
||
if same_primary_spec:
|
||
result['spec_name_2'] = fallback_result['spec_name_2']
|
||
result['spec_value_2'] = fallback_result['spec_value_2']
|
||
fallback_used = True
|
||
logger.info(
|
||
"主通道与文本兜底主规格一致,补齐第二规格: "
|
||
f"{fallback_result.get('spec_name_2')}:{fallback_result.get('spec_value_2')}"
|
||
)
|
||
else:
|
||
logger.warning(
|
||
"主通道已获取主规格,忽略文本兜底补入的不一致第二规格,避免单规格订单被误判为双规格: "
|
||
f"primary={result.get('spec_name')}:{result.get('spec_value')}, "
|
||
f"fallback={fallback_result.get('spec_name')}:{fallback_result.get('spec_value')}, "
|
||
f"secondary={fallback_result.get('spec_name_2')}:{fallback_result.get('spec_value_2')}"
|
||
)
|
||
|
||
if 'quantity' not in result and fallback_result.get('quantity'):
|
||
result['quantity'] = fallback_result['quantity']
|
||
fallback_used = True
|
||
|
||
if fallback_result and fallback_used:
|
||
logger.info(f"SKU文本兜底解析结果: {fallback_result}")
|
||
|
||
# 确保数量字段存在,如果不存在则设置为1
|
||
if 'quantity' not in result:
|
||
result['quantity'] = '1'
|
||
logger.info("未获取到数量信息,默认设置为1")
|
||
|
||
# 对最终规格做二次清洗,防止主通道/兜底误识别正文字段
|
||
cleaned_result = self._sanitize_sku_result(result, source="sku_final")
|
||
if cleaned_result != result:
|
||
logger.warning(f"SKU结果已清洗: before={result}, after={cleaned_result}")
|
||
result = cleaned_result
|
||
|
||
# 打印最终结果
|
||
if result:
|
||
logger.info(f"最终解析结果: {result}")
|
||
return result
|
||
else:
|
||
logger.warning("未能解析到任何有效信息")
|
||
# 即使没有其他信息,也要返回默认数量
|
||
return {'quantity': '0'}
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取SKU内容失败: {e}")
|
||
return {}
|
||
|
||
async def _check_browser_status(self) -> bool:
|
||
"""检查浏览器状态是否正常"""
|
||
try:
|
||
if not self.browser or not self.context or not self.page:
|
||
logger.warning("浏览器组件不完整")
|
||
return False
|
||
|
||
# 检查浏览器是否已连接
|
||
if self.browser.is_connected():
|
||
# 尝试获取页面标题来验证页面是否可用
|
||
await self.page.title()
|
||
return True
|
||
else:
|
||
logger.warning("浏览器连接已断开")
|
||
return False
|
||
except Exception as e:
|
||
logger.warning(f"浏览器状态检查失败: {e}")
|
||
return False
|
||
|
||
async def _ensure_browser_ready(self) -> bool:
|
||
"""确保浏览器准备就绪,如果不可用则重新初始化"""
|
||
try:
|
||
if await self._check_browser_status():
|
||
return True
|
||
|
||
logger.info("浏览器状态异常,尝试重新初始化...")
|
||
|
||
# 先尝试关闭现有的浏览器实例
|
||
await self._force_close_browser()
|
||
|
||
# 重新初始化浏览器
|
||
await self.init_browser()
|
||
|
||
# 等待更长时间确保浏览器完全就绪
|
||
await asyncio.sleep(2)
|
||
|
||
# 再次检查状态
|
||
if await self._check_browser_status():
|
||
logger.info("浏览器重新初始化成功")
|
||
return True
|
||
else:
|
||
logger.error("浏览器重新初始化失败")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"确保浏览器就绪失败: {e}")
|
||
return False
|
||
|
||
async def _force_close_browser(self):
|
||
"""强制关闭浏览器,忽略所有错误"""
|
||
try:
|
||
self._clear_response_capture_handler()
|
||
if self.page:
|
||
try:
|
||
await self.page.close()
|
||
except:
|
||
pass
|
||
self.page = None
|
||
|
||
if self.context:
|
||
try:
|
||
await self.context.close()
|
||
except:
|
||
pass
|
||
self.context = None
|
||
|
||
if self.browser:
|
||
try:
|
||
await self.browser.close()
|
||
except:
|
||
pass
|
||
self.browser = None
|
||
|
||
self._active_order_id = ''
|
||
|
||
except Exception as e:
|
||
logger.debug(f"强制关闭浏览器过程中的异常(可忽略): {e}")
|
||
|
||
async def close(self):
|
||
"""关闭浏览器"""
|
||
try:
|
||
await self._wait_for_response_capture_tasks(timeout=0.2)
|
||
self._clear_response_capture_handler()
|
||
if self.page:
|
||
await self.page.close()
|
||
if self.context:
|
||
await self.context.close()
|
||
if self.browser:
|
||
await self.browser.close()
|
||
self._active_order_id = ''
|
||
logger.info("浏览器已关闭")
|
||
except Exception as e:
|
||
logger.error(f"关闭浏览器失败: {e}")
|
||
# 如果正常关闭失败,尝试强制关闭
|
||
await self._force_close_browser()
|
||
|
||
async def __aenter__(self):
|
||
"""异步上下文管理器入口"""
|
||
await self.init_browser()
|
||
return self
|
||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||
"""异步上下文管理器出口"""
|
||
await self.close()
|
||
|
||
|
||
# 便捷函数
|
||
async def fetch_order_detail_simple(
|
||
order_id: str,
|
||
cookie_string: str = None,
|
||
headless: bool = True,
|
||
force_refresh: bool = False,
|
||
cookie_id_for_log: str = "unknown"
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
简单的订单详情获取函数(优化版:先检查数据库,再初始化浏览器)
|
||
|
||
Args:
|
||
order_id: 订单ID
|
||
cookie_string: Cookie字符串,如果不提供则使用默认值
|
||
headless: 是否无头模式
|
||
force_refresh: 是否强制刷新(跳过缓存直接从闲鱼获取)
|
||
cookie_id_for_log: 日志上下文中的账号ID,用于定位异常账号
|
||
|
||
Returns:
|
||
订单详情字典,包含以下字段:
|
||
- order_id: 订单ID
|
||
- url: 订单详情页面URL
|
||
- title: 页面标题
|
||
- sku_info: 完整的SKU信息字典
|
||
- spec_name: 规格名称
|
||
- spec_value: 规格值
|
||
- quantity: 数量
|
||
- amount: 金额
|
||
- order_status: 订单状态
|
||
- timestamp: 获取时间戳
|
||
失败时返回None
|
||
"""
|
||
# 如果不是强制刷新,先检查数据库中是否有有效数据
|
||
if not force_refresh:
|
||
try:
|
||
from db_manager import db_manager
|
||
existing_order = db_manager.get_order_by_id(order_id)
|
||
|
||
if existing_order:
|
||
amount = existing_order.get('amount', '')
|
||
item_config = None
|
||
if existing_order.get('item_id') and existing_order.get('cookie_id'):
|
||
item_config = db_manager.get_item_info(existing_order.get('cookie_id'), existing_order.get('item_id'))
|
||
|
||
if _should_use_cached_order(existing_order, item_config=item_config):
|
||
logger.info(f"📋 订单 {order_id} 已存在于数据库中且金额有效({amount}),直接返回缓存数据")
|
||
print(f"✅ 订单 {order_id} 使用缓存数据,跳过浏览器获取")
|
||
|
||
# 构建返回格式
|
||
result = {
|
||
'order_id': existing_order['order_id'],
|
||
'url': f"https://www.goofish.com/order-detail?orderId={order_id}&role=seller",
|
||
'title': f"订单详情 - {order_id}",
|
||
'sku_info': {
|
||
'spec_name': existing_order.get('spec_name', ''),
|
||
'spec_value': existing_order.get('spec_value', ''),
|
||
'spec_name_2': existing_order.get('spec_name_2', ''),
|
||
'spec_value_2': existing_order.get('spec_value_2', ''),
|
||
'quantity': existing_order.get('quantity', ''),
|
||
'amount': existing_order.get('amount', ''),
|
||
'amount_source': 'cache'
|
||
},
|
||
'spec_name': existing_order.get('spec_name', ''),
|
||
'spec_value': existing_order.get('spec_value', ''),
|
||
'spec_name_2': existing_order.get('spec_name_2', ''),
|
||
'spec_value_2': existing_order.get('spec_value_2', ''),
|
||
'quantity': existing_order.get('quantity', ''),
|
||
'amount': existing_order.get('amount', ''),
|
||
'amount_source': 'cache',
|
||
'order_status': existing_order.get('order_status', 'unknown'), # 添加订单状态
|
||
'order_status_source': 'cache',
|
||
'timestamp': time.time(),
|
||
'from_cache': True
|
||
}
|
||
return result
|
||
else:
|
||
logger.info(f"📋 订单 {order_id} 缓存字段不完整或状态无效,重新获取详情: amount={amount}, status={existing_order.get('order_status')}")
|
||
print(f"⚠️ 订单 {order_id} 缓存不满足复用条件,重新获取详情...")
|
||
except Exception as e:
|
||
logger.warning(f"检查数据库缓存失败: {e}")
|
||
else:
|
||
logger.info(f"🔄 订单 {order_id} 强制刷新,跳过缓存检查")
|
||
print(f"🔄 订单 {order_id} 强制刷新模式...")
|
||
|
||
# 数据库中没有有效数据,使用浏览器获取
|
||
logger.info(f"🌐 订单 {order_id} 需要浏览器获取,开始初始化浏览器...")
|
||
print(f"🔍 订单 {order_id} 开始浏览器获取详情...")
|
||
|
||
fetcher = OrderDetailFetcher(cookie_string, headless, cookie_id_for_log=cookie_id_for_log)
|
||
try:
|
||
if await fetcher.init_browser(headless=headless):
|
||
return await fetcher.fetch_order_detail(order_id, force_refresh=force_refresh)
|
||
finally:
|
||
await fetcher.close()
|
||
return None
|
||
|
||
|
||
# 测试代码
|
||
if __name__ == "__main__":
|
||
async def test():
|
||
# 测试订单ID
|
||
test_order_id = "2856024697612814489"
|
||
|
||
print(f"🔍 开始获取订单详情: {test_order_id}")
|
||
|
||
result = await fetch_order_detail_simple(test_order_id, headless=False)
|
||
|
||
if result:
|
||
print("✅ 订单详情获取成功:")
|
||
print(f"📋 订单ID: {result['order_id']}")
|
||
print(f"🌐 URL: {result['url']}")
|
||
print(f"📄 页面标题: {result['title']}")
|
||
print(f"🛍️ 规格名称: {result.get('spec_name', '未获取到')}")
|
||
print(f"📝 规格值: {result.get('spec_value', '未获取到')}")
|
||
print(f"🔢 数量: {result.get('quantity', '未获取到')}")
|
||
print(f"💰 金额: {result.get('amount', '未获取到')}")
|
||
else:
|
||
print("❌ 订单详情获取失败")
|
||
|
||
# 运行测试
|
||
asyncio.run(test())
|