一、引言:为什么需要艺术作品数据库?
在数字时代,艺术作品的数字化管理和分析已成为艺术界、学术界和商业领域的重要需求。无论是博物馆的藏品管理、艺术市场的价格分析,还是AI艺术创作的训练数据,一个结构完善的艺术作品数据库都扮演着至关重要的角色。
1.1 艺术作品数据的价值
艺术作品数据包含了丰富的信息维度:创作背景、艺术家生平、技法材料、尺寸规格、收藏历史、市场价值等。这些数据不仅对于艺术史研究具有学术价值,还在以下领域发挥着重要作用:
-
艺术市场分析:通过分析作品的成交价格、流通频率等数据,可以洞察艺术市场的走势和规律
-
艺术品鉴定:基于已知作品的技法特征、材料使用等数据,为鉴定提供参考依据
-
数字化展览:构建虚拟博物馆和在线展览的基础数据支撑
-
艺术教育:为学生和爱好者提供系统化的艺术知识学习资源
-
AI艺术创作:为生成对抗网络等深度学习模型提供训练数据
1.2 传统数据收集方式的局限性
过去,艺术作品信息的收集主要依赖人工整理,这种方式存在明显不足:
-
效率低下:手动录入一件作品可能需要数分钟,面对百万级作品数据时几乎不可行
-
更新滞后:新展览、新交易的信息无法及时收录
-
覆盖不全:受限于人力,难以同时关注多个艺术机构和渠道
-
标准化困难:不同来源的数据格式各异,整合成本高
1.3 Python爬虫的解决方案
Python爬虫技术为我们提供了一个高效、可扩展的数据采集方案。通过编写自动化程序,我们可以:
大规模采集:同时抓取多个艺术网站的数百万条作品信息
实时更新:设置定时任务,确保数据的新鲜度
智能解析:使用自然语言处理和图像识别技术提取结构化信息
异常处理:应对网站改版、网络波动等复杂情况
数据清洗:自动去重、格式转换、缺失值处理
二、项目规划与技术选型
在开始编写代码之前,我们需要对整个项目进行系统性的规划,明确技术路线和架构设计。
2.1 数据来源分析
我们将从以下几个主要渠道采集艺术作品数据:
2.1.1 主流艺术平台
-
WikiArt:包含超过25万件艺术作品,涵盖数千位艺术家
-
Artsy:当代艺术平台,提供详细的艺术家和作品信息
-
Art Institute of Chicago:开放API,提供高质量的馆藏数据
-
大都会艺术博物馆:完整的公开数据集
-
欧洲数字图书馆:聚合欧洲多家博物馆的藏品数据
2.1.2 目标数据字段
我们需要采集的结构化信息包括:
python
艺术作品信息结构 = {
"basic_info": {
"title": "作品名称",
"artist": "艺术家",
"date": "创作年份",
"medium": "材质/技法",
"dimensions": "尺寸(高×宽)",
"collection": "收藏机构"
},
"detailed_info": {
"description": "作品描述",
"provenance": "来源历史",
"exhibition_history": "展览历史",
"bibliography": "参考文献",
"signature": "签名位置"
},
"market_info": {
"estimated_price": "估价",
"sale_price": "成交价",
"auction_house": "拍卖行",
"sale_date": "拍卖日期"
},
"media_info": {
"image_url": "图片链接",
"thumbnail_url": "缩略图链接",
"image_metadata": "图片元数据"
}
}
2.2 技术栈选择
基于2024-2025年的最新技术发展,我们选择以下技术栈:
2.2.1 核心爬虫框架
python
# 环境要求
Python >= 3.11
aiohttp >= 3.9.0 # 异步HTTP客户端
httpx >= 0.27.0 # 支持HTTP/2的客户端
Scrapy >= 2.11 # 分布式爬虫框架
Playwright >= 1.40 # 无头浏览器自动化
Selenium >= 4.15 # Web自动化测试工具
2.2.2 数据解析与处理
python
# 数据解析库
BeautifulSoup4 >= 4.12 # HTML解析
lxml >= 4.9 # 高性能XML解析
parsel >= 1.8 # 基于lxml的选择器
jsonpath-ng >= 1.6 # JSON路径查询
jmespath >= 1.0 # 另一个JSON查询库
2.2.3 数据存储方案
python
# 数据库选择
MongoDB >= 7.0 # 文档数据库,适合存储JSON格式的艺术品数据
PostgreSQL >= 16 # 关系型数据库,适合结构化数据存储
Redis >= 7.2 # 缓存和任务队列
Elasticsearch >= 8.11 # 全文搜索和分析
MinIO >= RELEASE.2024 # 对象存储,存储图片等媒体文件
2.2.4 数据清洗与增强
python
# 数据处理工具
pandas >= 2.1 # 数据分析
numpy >= 1.26 # 数值计算
pillow >= 10.1 # 图像处理
opencv-python >= 4.9 # 计算机视觉
pytesseract >= 0.3.10 # OCR文字识别
2.3 系统架构设计
我们将采用分布式微服务架构,确保系统的可扩展性和稳定性:
text
┌─────────────────────────────────────────────────────┐
│ 调度中心 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │定时调度器 │ │优先级队列 │ │失败重试器 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ 爬虫节点1 │ │ 爬虫节点2 │ │ 爬虫节点N │
│ – WikiArt │ │ – Artsy │ │ – 其他平台 │
│ – 速率限制 │ │ – 代理IP池 │ │ – 分布式 │
└───────────────┘ └───────────────┘ └───────────────┘
│ │ │
└───────────────────┼───────────────────┘
▼
┌─────────────────────┐
│ 消息队列(Kafka) │
│ – 数据分发 │
│ – 流量削峰 │
└─────────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ 清洗管道1 │ │ 清洗管道2 │ │ 清洗管道N │
│ – 数据验证 │ │ – 格式转换 │ │ – 数据增强 │
│ – 去重处理 │ │ – 实体识别 │ │ – 图像处理 │
└───────────────┘ └───────────────┘ └───────────────┘
│ │ │
└───────────────────┼───────────────────┘
▼
┌─────────────────────┐
│ 数据存储层 │
│ ┌───────────────┐ │
│ │ MongoDB │ │
│ │ PostgreSQL │ │
│ │ Elasticsearch│ │
│ │ MinIO │ │
│ └───────────────┘ │
└─────────────────────┘
三、环境搭建与基础配置
3.1 开发环境准备
首先,我们需要创建一个独立的Python环境,确保依赖包版本的一致性:
bash
# 创建虚拟环境
python3.11 -m venv art_crawler_env
# 激活虚拟环境(Linux/Mac)
source art_crawler_env/bin/activate
# 激活虚拟环境(Windows)
art_crawler_env\\Scripts\\activate
# 升级pip和setuptools
python -m pip install –upgrade pip setuptools wheel
3.2 创建项目结构
一个清晰的项目结构对于大型爬虫项目至关重要:
bash
art_crawler_project/
├── docker/ # Docker配置
│ ├── docker-compose.yml # 服务编排
│ ├── mongodb/ # MongoDB配置
│ ├── postgres/ # PostgreSQL配置
│ └── redis/ # Redis配置
├── src/ # 源代码
│ ├── crawlers/ # 爬虫模块
│ │ ├── __init__.py
│ │ ├── base.py # 基础爬虫类
│ │ ├── wikiart.py # WikiArt爬虫
│ │ ├── artsy.py # Artsy爬虫
│ │ ├── museum_api.py # 博物馆API爬虫
│ │ └── middlewares/ # 中间件
│ │ ├── __init__.py
│ │ ├── proxy.py # 代理中间件
│ │ ├── retry.py # 重试中间件
│ │ └── rate_limiter.py # 速率限制
│ ├── parsers/ # 解析器
│ │ ├── __init__.py
│ │ ├── html_parser.py # HTML解析器
│ │ ├── json_parser.py # JSON解析器
│ │ └── image_parser.py # 图像元数据解析
│ ├── pipelines/ # 数据管道
│ │ ├── __init__.py
│ │ ├── validation.py # 数据验证
│ │ ├── deduplication.py # 数据去重
│ │ ├── enrichment.py # 数据增强
│ │ └── storage.py # 数据存储
│ ├── models/ # 数据模型
│ │ ├── __init__.py
│ │ ├── artwork.py # 作品模型
│ │ ├── artist.py # 艺术家模型
│ │ └── museum.py # 博物馆模型
│ ├── utils/ # 工具函数
│ │ ├── __init__.py
│ │ ├── logger.py # 日志工具
│ │ ├── validator.py # 验证工具
│ │ └── helpers.py # 辅助函数
│ └── config/ # 配置文件
│ ├── __init__.py
│ ├── settings.py # 爬虫配置
│ └── constants.py # 常量定义
├── tests/ # 测试代码
│ ├── __init__.py
│ ├── test_crawlers/
│ ├── test_parsers/
│ └── test_pipelines/
├── scripts/ # 脚本工具
│ ├── init_db.py # 数据库初始化
│ ├── run_crawler.py # 运行爬虫
│ └── clean_data.py # 数据清理
├── requirements/ # 依赖管理
│ ├── base.txt # 基础依赖
│ ├── dev.txt # 开发依赖
│ └── prod.txt # 生产依赖
├── .env.example # 环境变量示例
├── .gitignore # Git忽略文件
├── README.md # 项目说明
└── setup.py # 安装脚本
3.3 配置文件编写
3.3.1 基础配置 (src/config/settings.py)
python
import os
from pathlib import Path
from typing import Dict, List, Optional
from pydantic_settings import BaseSettings
from pydantic import Field, validator
class CrawlerSettings(BaseSettings):
"""爬虫配置类,使用Pydantic进行配置验证"""
# 项目路径
BASE_DIR: Path = Path(__file__).resolve().parent.parent.parent
LOG_DIR: Path = BASE_DIR / "logs"
DATA_DIR: Path = BASE_DIR / "data"
# 爬虫配置
USER_AGENTS: List[str] = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
]
# 请求配置
REQUEST_TIMEOUT: int = 30
MAX_RETRIES: int = 3
RETRY_DELAY: int = 5
CONCURRENT_REQUESTS: int = 10
DOWNLOAD_DELAY: float = 1.0
# 代理配置
USE_PROXY: bool = False
PROXY_POOL: List[str] = []
PROXY_ROTATION_INTERVAL: int = 300 # 5分钟更换一次代理
# 数据库配置
MONGODB_URI: str = Field(default="mongodb://localhost:27017/", env="MONGODB_URI")
MONGODB_DB: str = "art_database"
MONGODB_COLLECTION: str = "artworks"
POSTGRESQL_CONFIG: Dict = {
"host": os.getenv("POSTGRES_HOST", "localhost"),
"port": int(os.getenv("POSTGRES_PORT", 5432)),
"database": os.getenv("POSTGRES_DB", "art_db"),
"user": os.getenv("POSTGRES_USER", "art_user"),
"password": os.getenv("POSTGRES_PASSWORD", "art_password")
}
REDIS_CONFIG: Dict = {
"host": os.getenv("REDIS_HOST", "localhost"),
"port": int(os.getenv("REDIS_PORT", 6379)),
"db": int(os.getenv("REDIS_DB", 0)),
"password": os.getenv("REDIS_PASSWORD", None)
}
ELASTICSEARCH_CONFIG: Dict = {
"hosts": [os.getenv("ELASTICSEARCH_HOST", "http://localhost:9200")],
"index": "artworks"
}
# 速率限制配置
RATE_LIMIT_ENABLED: bool = True
DEFAULT_RATE: int = 5 # 每秒请求数
BURST_RATE: int = 10 # 突发请求数
# 爬虫目标配置
TARGET_SITES: Dict[str, Dict] = {
"wikiart": {
"base_url": "https://www.wikiart.org",
"enabled": True,
"rate_limit": 2, # 每秒2个请求
"max_pages": 1000,
"use_playwright": False
},
"artsy": {
"base_url": "https://www.artsy.net",
"enabled": True,
"rate_limit": 1, # API限制较严格
"max_pages": 500,
"use_playwright": True, # 需要处理JavaScript
"api_key": os.getenv("ARTSY_API_KEY", "")
},
"met_museum": {
"base_url": "https://collectionapi.metmuseum.org",
"enabled": True,
"rate_limit": 10, # 公共API限制较宽松
"max_pages": 2000,
"use_playwright": False,
"api_key": os.getenv("MET_API_KEY", "")
}
}
# 日志配置
LOG_LEVEL: str = "INFO"
LOG_FORMAT: str = "%(asctime)s – %(name)s – %(levelname)s – %(message)s"
LOG_ROTATION: str = "1 day"
LOG_RETENTION: str = "30 days"
# 数据清洗配置
DEDUPLICATION_ENABLED: bool = True
DEDUPLICATION_FIELDS: List[str] = ["title", "artist", "date"]
SIMILARITY_THRESHOLD: float = 0.85 # 相似度阈值
# 图像处理配置
DOWNLOAD_IMAGES: bool = True
IMAGE_STORAGE_PATH: Path = DATA_DIR / "images"
THUMBNAIL_SIZE: tuple = (300, 300)
ALLOWED_IMAGE_FORMATS: List[str] = ["jpg", "jpeg", "png", "gif", "webp"]
class Config:
env_file = ".env"
case_sensitive = True
@validator("LOG_DIR", "DATA_DIR", "IMAGE_STORAGE_PATH")
def create_directories(cls, v):
"""自动创建必要的目录"""
v.mkdir(parents=True, exist_ok=True)
return v
# 创建全局配置实例
settings = CrawlerSettings()
3.3.2 常量定义 (src/config/constants.py)
python
from enum import Enum, auto
from typing import Dict, Any
class ArtworkMedium(Enum):
"""艺术作品媒介枚举"""
PAINTING = "绘画"
SCULPTURE = "雕塑"
PHOTOGRAPHY = "摄影"
PRINT = "版画"
DRAWING = "素描"
INSTALLATION = "装置"
DIGITAL_ART = "数字艺术"
MIXED_MEDIA = "混合媒介"
CERAMICS = "陶瓷"
TEXTILE = "纺织"
PERFORMANCE = "行为艺术"
VIDEO_ART = "视频艺术"
class ArtworkPeriod(Enum):
"""艺术时期枚举"""
ANCIENT = "古代艺术"
MEDIEVAL = "中世纪艺术"
RENAISSANCE = "文艺复兴"
BAROQUE = "巴洛克"
ROCOCO = "洛可可"
NEOCLASSICISM = "新古典主义"
ROMANTICISM = "浪漫主义"
REALISM = "现实主义"
IMPRESSIONISM = "印象派"
POST_IMPRESSIONISM = "后印象派"
MODERN = "现代艺术"
CONTEMPORARY = "当代艺术"
# 艺术流派映射字典
ART_MOVEMENTS: Dict[str, str] = {
"cubism": "立体主义",
"surrealism": "超现实主义",
"abstract-expressionism": "抽象表现主义",
"pop-art": "波普艺术",
"minimalism": "极简主义",
"conceptual-art": "概念艺术",
"fauvism": "野兽派",
"expressionism": "表现主义",
"art-nouveau": "新艺术运动",
"bauhaus": "包豪斯",
"dadaism": "达达主义",
"futurism": "未来主义"
}
# HTTP状态码含义
HTTP_STATUS_CODES: Dict[int, str] = {
200: "成功",
201: "已创建",
204: "无内容",
301: "永久重定向",
302: "临时重定向",
400: "错误请求",
401: "未授权",
403: "禁止访问",
404: "未找到",
429: "请求过多",
500: "服务器内部错误",
502: "网关错误",
503: "服务不可用",
504: "网关超时"
}
# 数据验证正则表达式
REGEX_PATTERNS: Dict[str, str] = {
"year": r"^(1[0-9]{3}|2[0-9]{3})$", # 1000-2999年
"dimension": r"^\\d+(\\.\\d+)?\\s*[xX]\\s*\\d+(\\.\\d+)?\\s*(cm|mm|in)$", # 尺寸格式
"artist_name": r"^[A-Za-z\\s\\-'.]+$", # 艺术家姓名(简化版)
"price": r"^\\$?\\d+(,\\d{3})*(\\.\\d{2})?$" # 价格格式
}
3.4 Docker环境配置
为了确保开发环境的一致性,我们使用Docker来管理所有依赖服务:
yaml
# docker/docker-compose.yml
version: '3.8'
services:
mongodb:
image: mongo:7.0
container_name: art_mongodb
restart: always
ports:
– "27017:27017"
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin123
MONGO_INITDB_DATABASE: art_database
volumes:
– ./mongodb/data:/data/db
– ./mongodb/init.js:/docker-entrypoint-initdb.d/init.js:ro
networks:
– art_network
healthcheck:
test: ["CMD", "mongosh", "–eval", "db.adminCommand('ping')"]
interval: 30s
timeout: 10s
retries: 3
postgres:
image: postgres:16
container_name: art_postgres
restart: always
ports:
– "5432:5432"
environment:
POSTGRES_DB: art_db
POSTGRES_USER: art_user
POSTGRES_PASSWORD: art_password
volumes:
– ./postgres/data:/var/lib/postgresql/data
– ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql:ro
networks:
– art_network
healthcheck:
test: ["CMD-SHELL", "pg_isready -U art_user"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7.2
container_name: art_redis
restart: always
ports:
– "6379:6379"
command: redis-server –requirepass redis_password
volumes:
– ./redis/data:/data
networks:
– art_network
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
timeout: 10s
retries: 3
elasticsearch:
image: elasticsearch:8.11.1
container_name: art_elasticsearch
restart: always
ports:
– "9200:9200"
environment:
– discovery.type=single-node
– xpack.security.enabled=false
– "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
– ./elasticsearch/data:/usr/share/elasticsearch/data
networks:
– art_network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9200"]
interval: 30s
timeout: 10s
retries: 3
minio:
image: minio/minio:RELEASE.2024-01-13T07-53-03Z
container_name: art_minio
restart: always
ports:
– "9000:9000"
– "9001:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin123
volumes:
– ./minio/data:/data
command: server /data –console-address ":9001"
networks:
– art_network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 10s
retries: 3
kafka:
image: confluentinc/cp-kafka:latest
container_name: art_kafka
restart: always
ports:
– "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
– zookeeper
networks:
– art_network
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: art_zookeeper
restart: always
ports:
– "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
– art_network
crawler_ui:
image: apify/actor-web-scraper:latest
container_name: art_crawler_ui
restart: always
ports:
– "8080:8080"
environment:
– APIFY_LOCAL_STORAGE_DIR=/storage
volumes:
– ./crawler_ui/storage:/storage
networks:
– art_network
networks:
art_network:
driver: bridge
四、核心爬虫实现
4.1 基础爬虫类设计
我们首先设计一个基础爬虫类,封装通用的爬虫功能:
python
# src/crawlers/base.py
import asyncio
import logging
import random
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from datetime import datetime
from urllib.parse import urljoin, urlparse
import aiohttp
from aiohttp import ClientTimeout, TCPConnector
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log
)
import backoff
from fake_useragent import UserAgent
from src.config.settings import settings
from src.utils.logger import setup_logger
from src.utils.validator import DataValidator
from src.middlewares.rate_limiter import RateLimiter
from src.middlewares.proxy import ProxyMiddleware
class BaseCrawler(ABC):
"""
基础爬虫类,提供通用的爬虫功能
所有具体平台的爬虫都应该继承此类
"""
def __init__(self, name: str, config: Dict[str, Any]):
"""
初始化基础爬虫
Args:
name: 爬虫名称
config: 爬虫配置字典
"""
self.name = name
self.config = config
self.logger = setup_logger(f"crawler.{name}")
# 初始化中间件
self.rate_limiter = RateLimiter(
rate=config.get("rate_limit", settings.DEFAULT_RATE),
burst=settings.BURST_RATE
)
self.proxy_middleware = ProxyMiddleware() if settings.USE_PROXY else None
# 初始化会话相关属性
self.session: Optional[aiohttp.ClientSession] = None
self.ua = UserAgent()
# 统计信息
self.stats = {
"requests_made": 0,
"requests_success": 0,
"requests_failed": 0,
"items_extracted": 0,
"start_time": None,
"end_time": None
}
# 创建连接器
self.connector = TCPConnector(
limit=settings.CONCURRENT_REQUESTS,
ttl_dns_cache=300,
ssl=False, # 某些艺术网站SSL证书可能有问题
force_close=True
)
async def __aenter__(self):
"""异步上下文管理器入口"""
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
await self.close()
async def start(self):
"""启动爬虫"""
self.logger.info(f"Starting crawler: {self.name}")
self.stats["start_time"] = datetime.now()
# 创建HTTP会话
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=ClientTimeout(total=settings.REQUEST_TIMEOUT),
headers=self._get_default_headers()
)
async def close(self):
"""关闭爬虫"""
self.logger.info(f"Closing crawler: {self.name}")
self.stats["end_time"] = datetime.now()
if self.session and not self.session.closed:
await self.session.close()
# 记录最终统计信息
duration = (self.stats["end_time"] – self.stats["start_time"]).total_seconds()
self.logger.info(
f"Crawler stats – "
f"Requests: {self.stats['requests_made']}, "
f"Success: {self.stats['requests_success']}, "
f"Failed: {self.stats['requests_failed']}, "
f"Items: {self.stats['items_extracted']}, "
f"Duration: {duration:.2f}s"
)
def _get_default_headers(self) -> Dict[str, str]:
"""获取默认请求头"""
return {
"User-Agent": self.ua.random,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Cache-Control": "max-age=0"
}
async def _get_proxy(self) -> Optional[str]:
"""获取代理"""
if self.proxy_middleware:
return await self.proxy_middleware.get_proxy()
return None
@retry(
stop=stop_after_attempt(settings.MAX_RETRIES),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(
(aiohttp.ClientError, asyncio.TimeoutError, ConnectionError)
),
before_sleep=before_sleep_log(logging.getLogger(), logging.WARNING)
)
async def fetch(
self,
url: str,
method: str = "GET",
params: Optional[Dict] = None,
data: Optional[Dict] = None,
headers: Optional[Dict] = None,
cookies: Optional[Dict] = None,
proxy: Optional[str] = None,
timeout: Optional[int] = None
) -> Optional[aiohttp.ClientResponse]:
"""
发送HTTP请求
Args:
url: 请求URL
method: 请求方法
params: URL参数
data: 请求数据
headers: 请求头
cookies: Cookie
proxy: 代理
timeout: 超时时间
Returns:
响应对象
"""
# 速率限制
await self.rate_limiter.wait_if_needed()
# 合并请求头
request_headers = self._get_default_headers()
if headers:
request_headers.update(headers)
# 获取代理
if not proxy and settings.USE_PROXY:
proxy = await self._get_proxy()
self.stats["requests_made"] += 1
try:
async with self.session.request(
method=method,
url=url,
params=params,
json=data if method.upper() == "POST" else None,
data=data if method.upper() != "POST" else None,
headers=request_headers,
cookies=cookies,
proxy=proxy,
timeout=timeout or settings.REQUEST_TIMEOUT,
ssl=False
) as response:
# 检查响应状态
if response.status == 200:
self.stats["requests_success"] += 1
return response
elif response.status == 429: # 请求过多
retry_after = response.headers.get("Retry-After", "60")
self.logger.warning(f"Rate limited. Retry after {retry_after}s")
await asyncio.sleep(int(retry_after))
raise aiohttp.ClientError("Rate limited")
elif response.status >= 500: # 服务器错误
self.logger.error(f"Server error: {response.status}")
raise aiohttp.ClientError(f"Server error: {response.status}")
else:
self.logger.error(f"HTTP error: {response.status} – {url}")
self.stats["requests_failed"] += 1
return None
except asyncio.TimeoutError:
self.logger.error(f"Timeout error: {url}")
self.stats["requests_failed"] += 1
raise
except aiohttp.ClientError as e:
self.logger.error(f"Client error: {str(e)} – {url}")
self.stats["requests_failed"] += 1
raise
except Exception as e:
self.logger.error(f"Unexpected error: {str(e)} – {url}")
self.stats["requests_failed"] += 1
raise
@abstractmethod
async def parse(self, response: aiohttp.ClientResponse) -> List[Dict[str, Any]]:
"""
解析响应(抽象方法,子类必须实现)
Args:
response: 响应对象
Returns:
解析后的数据列表
"""
pass
@abstractmethod
async def run(self, *args, **kwargs):
"""
运行爬虫(抽象方法,子类必须实现)
"""
pass
def validate_data(self, data: Dict[str, Any]) -> bool:
"""
验证数据格式
Args:
data: 待验证的数据
Returns:
是否有效
"""
return DataValidator.validate_artwork(data)
async def save_data(self, data: Union[Dict, List[Dict]]):
"""
保存数据到管道
Args:
data: 待保存的数据
"""
# 这里可以发送到消息队列或直接存储
# 后续会在管道中实现
pass
def normalize_url(self, base_url: str, relative_url: str) -> str:
"""规范化URL"""
return urljoin(base_url, relative_url)
def extract_domain(self, url: str) -> str:
"""提取URL域名"""
parsed = urlparse(url)
return parsed.netloc
4.2 中间件实现
4.2.1 速率限制器
python
# src/middlewares/rate_limiter.py
import asyncio
import time
from collections import deque
from typing import Optional
import threading
class RateLimiter:
"""
速率限制器,使用令牌桶算法
支持突发流量和平均速率控制
"""
def __init__(self, rate: float, burst: Optional[int] = None):
"""
初始化速率限制器
Args:
rate: 平均速率(请求/秒)
burst: 突发容量,默认等于rate
"""
self.rate = rate
self.burst = burst or int(rate)
self.tokens = self.burst
self.last_update = time.monotonic()
self.lock = asyncio.Lock()
async def wait_if_needed(self):
"""如果需要则等待"""
async with self.lock:
await self._update_tokens()
if self.tokens < 1:
# 计算需要等待的时间
wait_time = (1 – self.tokens) / self.rate
await asyncio.sleep(wait_time)
await self._update_tokens()
self.tokens -= 1
async def _update_tokens(self):
"""更新令牌数"""
now = time.monotonic()
elapsed = now – self.last_update
self.tokens = min(
self.burst,
self.tokens + elapsed * self.rate
)
self.last_update = now
class SlidingWindowRateLimiter:
"""
滑动窗口速率限制器
更精确的速率控制,适合严格的API限制
"""
def __init__(self, max_requests: int, window_size: int = 60):
"""
初始化滑动窗口限制器
Args:
max_requests: 窗口内最大请求数
window_size: 窗口大小(秒)
"""
self.max_requests = max_requests
self.window_size = window_size
self.requests = deque()
self.lock = asyncio.Lock()
async def wait_if_needed(self):
"""检查并等待直到可以发送请求"""
async with self.lock:
now = time.time()
# 移除窗口外的请求记录
while self.requests and self.requests[0] < now – self.window_size:
self.requests.popleft()
# 如果达到限制,等待
if len(self.requests) >= self.max_requests:
wait_time = self.requests[0] + self.window_size – now
if wait_time > 0:
await asyncio.sleep(wait_time)
# 等待后重新清理
while self.requests and self.requests[0] < now – self.window_size:
self.requests.popleft()
# 记录当前请求
self.requests.append(now)
4.2.2 代理中间件
python
# src/middlewares/proxy.py
import random
import asyncio
from typing import List, Optional
from datetime import datetime, timedelta
import aiohttp
from src.config.settings import settings
class ProxyMiddleware:
"""
代理中间件,管理代理IP池
支持代理验证、轮换和健康检查
"""
def __init__(self):
self.proxies: List[dict] = []
self.current_index = 0
self.last_rotation = datetime.now()
self.lock = asyncio.Lock()
async def get_proxy(self) -> Optional[str]:
"""
获取一个可用的代理
Returns:
代理URL或None
"""
async with self.lock:
# 检查是否需要轮换
if self._should_rotate():
await self._rotate_proxy()
if not self.proxies:
# 从配置文件或代理服务获取代理
await self._fetch_proxies()
if self.proxies:
proxy = self.proxies[self.current_index % len(self.proxies)]
self.current_index += 1
return proxy.get("url")
return None
async def report_failure(self, proxy_url: str):
"""报告代理失败"""
async with self.lock:
for proxy in self.proxies:
if proxy["url"] == proxy_url:
proxy["failures"] = proxy.get("failures", 0) + 1
proxy["last_failure"] = datetime.now()
# 如果失败次数过多,暂时禁用
if proxy["failures"] >= 3:
proxy["disabled_until"] = datetime.now() + timedelta(minutes=5)
break
def _should_rotate(self) -> bool:
"""检查是否需要轮换代理"""
return (
datetime.now() – self.last_rotation
).seconds > settings.PROXY_ROTATION_INTERVAL
async def _rotate_proxy(self):
"""轮换代理"""
self.current_index = random.randint(0, max(0, len(self.proxies) – 1))
self.last_rotation = datetime.now()
async def _fetch_proxies(self):
"""从代理服务获取代理列表"""
# 这里可以实现从免费代理网站或付费代理API获取代理
# 例如:ProxyBroker、Scrapy-Proxy-Pool等
# 示例:从配置文件读取代理
if settings.PROXY_POOL:
for proxy_url in settings.PROXY_POOL:
self.proxies.append({
"url": proxy_url,
"failures": 0,
"last_check": None,
"disabled_until": None
})
# 如果配置中没有代理,可以使用免费的代理API
try:
async with aiohttp.ClientSession() as session:
# 示例:从免费代理API获取
async with session.get(
"https://api.proxyscrape.com/v2/?request=getproxies&protocol=http&timeout=10000&country=all&ssl=all&anonymity=all"
) as response:
if response.status == 200:
text = await response.text()
proxy_list = text.strip().split('\\r\\n')
for proxy in proxy_list[:10]: # 限制数量
self.proxies.append({
"url": f"http://{proxy}",
"failures": 0,
"last_check": datetime.now(),
"disabled_until": None
})
except Exception as e:
print(f"Error fetching proxies: {e}")
async def validate_proxy(self, proxy_url: str) -> bool:
"""验证代理是否可用"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
"http://httpbin.org/ip",
proxy=proxy_url,
timeout=10
) as response:
if response.status == 200:
return True
except:
pass
return False
4.3 具体平台爬虫实现
4.3.1 WikiArt爬虫
python
# src/crawlers/wikiart.py
import json
import re
from typing import List, Dict, Any, Optional
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import asyncio
from src.crawlers.base import BaseCrawler
from src.config.settings import settings
class WikiArtCrawler(BaseCrawler):
"""
WikiArt网站爬虫
采集艺术家和艺术作品信息
"""
def __init__(self):
super().__init__(
name="wikiart",
config=settings.TARGET_SITES["wikiart"]
)
self.base_url = self.config["base_url"]
self.api_base = "https://www.wikiart.org/en/api/2"
async def run(self, max_pages: int = None):
"""
运行爬虫
Args:
max_pages: 最大爬取页数
"""
self.logger.info("Starting WikiArt crawler")
# 先获取艺术家列表
artists = await self._get_artists(max_pages)
# 并发获取每个艺术家的作品
tasks = []
for artist in artists[:10]: # 先限制数量测试
task = self._get_artist_artworks(artist)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
all_artworks = []
for artist_artworks in results:
if isinstance(artist_artworks, Exception):
self.logger.error(f"Error getting artworks: {artist_artworks}")
else:
all_artworks.extend(artist_artworks)
self.logger.info(f"Collected {len(all_artworks)} artworks")
# 保存数据
await self.save_data(all_artworks)
return all_artworks
async def _get_artists(self, max_pages: int = None) -> List[Dict]:
"""
获取艺术家列表
Args:
max_pages: 最大页数
Returns:
艺术家列表
"""
artists = []
page = 1
max_pages = max_pages or self.config.get("max_pages", 100)
while page <= max_pages:
url = f"{self.api_base}/Artists"
params = {
"paginationPage": page,
"paginationLimit": 100,
"imageLimit": 1,
"auth": "" # 可能需要认证
}
self.logger.info(f"Fetching artists page {page}")
response = await self.fetch(url, params=params)
if not response:
break
try:
data = await response.json()
items = data.get("data", [])
if not items:
break
for item in items:
artist = {
"name": item.get("artistName"),
"url": item.get("url"),
"birth_day": item.get("birthDayAsString"),
"death_day": item.get("deathDayAsString"),
"nationality": item.get("nationality"),
"art_movement": item.get("artMovement"),
"image": item.get("image"),
"wikiart_url": f"{self.base_url}{item.get('url')}"
}
artists.append(artist)
self.logger.info(f"Found {len(items)} artists on page {page}")
# 检查是否还有更多页
pagination = data.get("pagination", {})
if page >= pagination.get("pages", 0):
break
page += 1
except json.JSONDecodeError as e:
self.logger.error(f"JSON decode error: {e}")
break
except Exception as e:
self.logger.error(f"Error parsing artists: {e}")
break
# 避免请求过快
await asyncio.sleep(1)
self.logger.info(f"Total artists collected: {len(artists)}")
return artists
async def _get_artist_artworks(self, artist: Dict) -> List[Dict]:
"""
获取艺术家的所有作品
Args:
artist: 艺术家信息
Returns:
作品列表
"""
artworks = []
page = 1
while True:
url = f"{self.api_base}/Artists/{artist['url']}/Paintings"
params = {
"paginationPage": page,
"paginationLimit": 100,
"json": 2
}
self.logger.info(f"Fetching artworks for {artist['name']} – page {page}")
response = await self.fetch(url, params=params)
if not response:
break
try:
data = await response.json()
items = data.get("data", [])
if not items:
break
for item in items:
artwork = {
"title": item.get("title"),
"artist": artist["name"],
"artist_url": artist["url"],
"date": item.get("completitionYear"),
"year": item.get("yearAsString"),
"medium": item.get("technique"),
"dimensions": f"{item.get('width', '')} x {item.get('height', '')}".strip(),
"style": item.get("artStyle"),
"genre": item.get("genre"),
"series": item.get("series"),
"image_url": item.get("image"),
"wikiart_url": f"{self.base_url}{item.get('url')}",
"description": item.get("description"),
"museum": item.get("museum", "Unknown")
}
# 数据验证
if self.validate_data(artwork):
artworks.append(artwork)
self.stats["items_extracted"] += 1
self.logger.info(f"Found {len(items)} artworks for {artist['name']} on page {page}")
# 检查是否还有更多页
pagination = data.get("pagination", {})
if page >= pagination.get("pages", 0):
break
page += 1
except json.JSONDecodeError as e:
self.logger.error(f"JSON decode error: {e}")
break
except Exception as e:
self.logger.error(f"Error parsing artworks: {e}")
break
await asyncio.sleep(1)
return artworks
async def parse(self, response):
"""
解析HTML页面(用于直接页面解析模式)
"""
html = await response.text()
soup = BeautifulSoup(html, 'lxml')
# 示例:解析作品详情页
artwork = {}
# 标题
title_elem = soup.find('h1', class_='painting-page-title')
if title_elem:
artwork['title'] = title_elem.text.strip()
# 艺术家
artist_elem = soup.find('a', class_='artist-name')
if artist_elem:
artwork['artist'] = artist_elem.text.strip()
# 年份
year_elem = soup.find('span', itemprop='dateCreated')
if year_elem:
artwork['year'] = year_elem.text.strip()
# 描述
desc_elem = soup.find('div', class_='description')
if desc_elem:
artwork['description'] = desc_elem.text.strip()
# 图片
img_elem = soup.find('img', class_='painting-image')
if img_elem and img_elem.get('src'):
artwork['image_url'] = self.normalize_url(
response.url,
img_elem['src']
)
return [artwork] if artwork else []
4.3.2 Artsy爬虫(使用Playwright)
python
# src/crawlers/artsy.py
import asyncio
import json
from typing import List, Dict, Any, Optional
from playwright.async_api import async_playwright, Browser, Page
from src.crawlers.base import BaseCrawler
from src.config.settings import settings
class ArtsyCrawler(BaseCrawler):
"""
Artsy网站爬虫
使用Playwright处理JavaScript渲染的页面
"""
def __init__(self):
super().__init__(
name="artsy",
config=settings.TARGET_SITES["artsy"]
)
self.base_url = self.config["base_url"]
self.browser: Optional[Browser] = None
async def start(self):
"""启动爬虫,初始化浏览器"""
await super().start()
# 启动Playwright
self.playwright = await async_playwright().start()
# 启动浏览器
self.browser = await self.playwright.chromium.launch(
headless=True, # 无头模式
args=[
'–disable-blink-features=AutomationControlled',
'–disable-dev-shm-usage',
'–no-sandbox',
'–disable-setuid-sandbox',
'–disable-web-security',
'–disable-features=IsolateOrigins,site-per-process'
]
)
async def close(self):
"""关闭爬虫,关闭浏览器"""
if self.browser:
await self.browser.close()
if hasattr(self, 'playwright'):
await self.playwright.stop()
await super().close()
async def run(self, max_items: int = 100):
"""
运行爬虫
Args:
max_items: 最大采集数量
"""
self.logger.info("Starting Artsy crawler")
# 创建新页面
page = await self.browser.new_page()
try:
# 设置页面视图
await page.set_viewport_size({"width": 1920, "height": 1080})
# 设置用户代理和请求头
await page.set_extra_http_headers({
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1"
})
# 访问艺术品列表页
url = f"{self.base_url}/collect"
self.logger.info(f"Navigating to {url}")
await page.goto(url, wait_until="networkidle")
# 等待内容加载
await page.wait_for_selector(".artwork-item", timeout=10000)
# 滚动加载更多内容
artworks = []
previous_height = 0
while len(artworks) < max_items:
# 获取当前页面的艺术品
page_artworks = await self._extract_artworks_from_page(page)
artworks.extend(page_artworks)
self.logger.info(f"Collected {len(artworks)} artworks so far")
# 检查是否达到目标
if len(artworks) >= max_items:
break
# 滚动到底部加载更多
previous_height = await page.evaluate("document.body.scrollHeight")
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
# 等待新内容加载
await page.wait_for_timeout(2000)
# 检查是否还有更多内容
new_height = await page.evaluate("document.body.scrollHeight")
if new_height == previous_height:
self.logger.info("Reached end of page")
break
# 获取详情页信息
detailed_artworks = []
for artwork in artworks[:max_items]:
if artwork.get("href"):
detail = await self._get_artwork_detail(artwork["href"])
if detail:
artwork.update(detail)
detailed_artworks.append(artwork)
self.logger.info(f"Final collected: {len(detailed_artworks)} artworks")
# 保存数据
await self.save_data(detailed_artworks)
return detailed_artworks
finally:
await page.close()
async def _extract_artworks_from_page(self, page: Page) -> List[Dict]:
"""
从页面提取艺术品信息
Args:
page: Playwright页面对象
Returns:
艺术品列表
"""
artworks = []
# 使用JavaScript在页面上下文中提取数据
data = await page.evaluate("""
() => {
const items = document.querySelectorAll('.artwork-item');
const results = [];
items.forEach(item => {
const link = item.querySelector('a');
const img = item.querySelector('img');
const title = item.querySelector('.artwork-title');
const artist = item.querySelector('.artwork-artist');
results.push({
href: link ? link.href : null,
image: img ? img.src : null,
title: title ? title.textContent.trim() : null,
artist: artist ? artist.textContent.trim() : null
});
});
return results;
}
""")
for item in data:
if item.get('href'):
artwork = {
'url': item['href'],
'title': item.get('title', ''),
'artist': item.get('artist', ''),
'image_url': item.get('image', ''),
'source': 'artsy'
}
artworks.append(artwork)
return artworks
async def _get_artwork_detail(self, url: str) -> Dict[str, Any]:
"""
获取艺术品详情
Args:
url: 详情页URL
Returns:
详情信息
"""
page = await self.browser.new_page()
try:
await page.goto(url, wait_until="networkidle")
# 等待主要内容加载
await page.wait_for_selector(".artwork-detail", timeout=10000)
# 提取详细信息
detail = await page.evaluate("""
() => {
const detail = {};
// 标题
const title = document.querySelector('.artwork-title');
if (title) detail.title = title.textContent.trim();
// 艺术家
const artist = document.querySelector('.artwork-artist');
if (artist) detail.artist = artist.textContent.trim();
// 年份
const year = document.querySelector('.artwork-year');
if (year) detail.year = year.textContent.trim();
// 材质
const medium = document.querySelector('.artwork-medium');
if (medium) detail.medium = medium.textContent.trim();
// 尺寸
const dimensions = document.querySelector('.artwork-dimensions');
if (dimensions) detail.dimensions = dimensions.textContent.trim();
// 描述
const description = document.querySelector('.artwork-description');
if (description) detail.description = description.textContent.trim();
// 价格
const price = document.querySelector('.artwork-price');
if (price) detail.price = price.textContent.trim();
return detail;
}
""")
return detail
except Exception as e:
self.logger.error(f"Error getting artwork detail from {url}: {e}")
return {}
finally:
await page.close()
async def parse(self, response):
"""实现抽象方法,但Artsy使用Playwright,此方法不会直接使用"""
return []
五、数据处理管道
5.1 数据验证器
python
# src/pipelines/validation.py
import re
from typing import Dict, Any, List, Optional
from datetime import datetime
from pydantic import BaseModel, Field, validator, ValidationError
from src.config.constants import REGEX_PATTERNS
class ArtworkModel(BaseModel):
"""
艺术作品数据模型(使用Pydantic v2)
"""
# 基本信息
title: str = Field(…, min_length=1, max_length=500)
artist: str = Field(…, min_length=1, max_length=200)
date: Optional[str] = Field(None, max_length=50)
year: Optional[int] = Field(None, ge=1000, le=2025)
medium: Optional[str] = Field(None, max_length=200)
dimensions: Optional[str] = Field(None, max_length=100)
# 分类信息
style: Optional[str] = Field(None, max_length=100)
genre: Optional[str] = Field(None, max_length=100)
series: Optional[str] = Field(None, max_length=200)
# 来源信息
source: str = Field(…, max_length=50)
url: Optional[str] = Field(None, max_length=500)
image_url: Optional[str] = Field(None, max_length=500)
# 描述信息
description: Optional[str] = Field(None, max_length=5000)
# 元数据
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
@validator('title')
def validate_title(cls, v):
"""验证标题"""
if not v or not v.strip():
raise ValueError('Title cannot be empty')
return v.strip()
@validator('artist')
def validate_artist(cls, v):
"""验证艺术家"""
if not v or not v.strip():
raise ValueError('Artist cannot be empty')
return v.strip()
@validator('year')
def validate_year(cls, v):
"""验证年份"""
if v is not None:
current_year = datetime.now().year
if v < 1000 or v > current_year + 5:
raise ValueError(f'Year must be between 1000 and {current_year + 5}')
return v
@validator('dimensions')
def validate_dimensions(cls, v):
"""验证尺寸格式"""
if v:
# 检查尺寸格式:数字 x 数字 单位
pattern = r'^\\d+(\\.\\d+)?\\s*[xX]\\s*\\d+(\\.\\d+)?\\s*(cm|mm|in|inches)?$'
if not re.match(pattern, v.strip()):
# 如果不是标准格式,记录但不抛出异常
pass
return v
@validator('image_url')
def validate_image_url(cls, v):
"""验证图片URL"""
if v:
url_pattern = r'^https?://.+\\.(jpg|jpeg|png|gif|webp)(\\?.*)?$'
if not re.match(url_pattern, v.lower()):
# 如果不是图片URL,记录但不抛出异常
pass
return v
class DataValidator:
"""数据验证器"""
@staticmethod
def validate_artwork(data: Dict[str, Any]) -> bool:
"""
验证艺术作品数据
Args:
data: 待验证的数据
Returns:
是否有效
"""
try:
# 使用Pydantic模型验证
artwork = ArtworkModel(**data)
return True
except ValidationError as e:
print(f"Validation error: {e}")
return False
except Exception as e:
print(f"Unexpected error: {e}")
return False
@staticmethod
def validate_batch(artworks: List[Dict]) -> tuple[List[Dict], List[Dict]]:
"""
批量验证
Args:
artworks: 艺术品数据列表
Returns:
(有效数据, 无效数据)
"""
valid = []
invalid = []
for artwork in artworks:
if DataValidator.validate_artwork(artwork):
valid.append(artwork)
else:
invalid.append(artwork)
return valid, invalid
5.2 数据去重器
python
# src/pipelines/deduplication.py
import hashlib
import json
from typing import Dict, List, Set, Optional
from datetime import datetime, timedelta
from difflib import SequenceMatcher
import redis.asyncio as redis
from src.config.settings import settings
class DeduplicationPipeline:
"""
数据去重管道
使用Redis存储指纹,支持多种去重策略
"""
def __init__(self):
self.redis_client = None
self.seen_keys = set()
self.fingerprint_cache = {}
async def initialize(self):
"""初始化Redis连接"""
self.redis_client = redis.Redis(
host=settings.REDIS_CONFIG['host'],
port=settings.REDIS_CONFIG['port'],
db=settings.REDIS_CONFIG['db'],
password=settings.REDIS_CONFIG.get('password'),
decode_responses=True
)
async def close(self):
"""关闭Redis连接"""
if self.redis_client:
await self.redis_client.close()
def generate_fingerprint(self, artwork: Dict, fields: List[str]) -> str:
"""
生成数据指纹
Args:
artwork: 艺术品数据
fields: 用于生成指纹的字段
Returns:
指纹字符串
"""
# 提取指定字段的值
values = []
for field in fields:
value = artwork.get(field, '')
if value:
# 清理和规范化
value = str(value).lower().strip()
values.append(value)
# 组合并生成哈希
combined = '|'.join(values)
return hashlib.sha256(combined.encode()).hexdigest()
async def is_duplicate(self, artwork: Dict) -> bool:
"""
检查是否重复
Args:
artwork: 艺术品数据
Returns:
是否重复
"""
if not settings.DEDUPLICATION_ENABLED:
return False
# 使用配置的字段生成指纹
fingerprint = self.generate_fingerprint(
artwork,
settings.DEDUPLICATION_FIELDS
)
# 检查Redis中是否存在
if self.redis_client:
exists = await self.redis_client.exists(f"artwork:{fingerprint}")
if exists:
return True
# 检查内存缓存
if fingerprint in self.fingerprint_cache:
cache_time = self.fingerprint_cache[fingerprint]
if datetime.now() – cache_time < timedelta(hours=24):
return True
return False
async def mark_as_seen(self, artwork: Dict):
"""
标记数据为已处理
Args:
artwork: 艺术品数据
"""
fingerprint = self.generate_fingerprint(
artwork,
settings.DEDUPLICATION_FIELDS
)
# 存入Redis
if self.redis_client:
await self.redis_client.setex(
f"artwork:{fingerprint}",
86400 * 30, # 保留30天
json.dumps(artwork, default=str)
)
# 更新内存缓存
self.fingerprint_cache[fingerprint] = datetime.now()
def calculate_similarity(self, artwork1: Dict, artwork2: Dict) -> float:
"""
计算两个艺术品的相似度
Args:
artwork1: 艺术品1
artwork2: 艺术品2
Returns:
相似度分数(0-1)
"""
scores = []
weights = {
'title': 0.3,
'artist': 0.3,
'date': 0.2,
'medium': 0.1,
'dimensions': 0.1
}
for field, weight in weights.items():
value1 = str(artwork1.get(field, '')).lower()
value2 = str(artwork2.get(field, '')).lower()
if value1 and value2:
similarity = SequenceMatcher(None, value1, value2).ratio()
scores.append(similarity * weight)
return sum(scores) if scores else 0
async def find_similar(self, artwork: Dict, threshold: float = None) -> List[Dict]:
"""
查找相似的艺术品
Args:
artwork: 目标艺术品
threshold: 相似度阈值
Returns:
相似艺术品列表
"""
if threshold is None:
threshold = settings.SIMILARITY_THRESHOLD
similar = []
# 这里可以从数据库查询相似数据
# 示例代码省略实际数据库查询
return similar
async def process_batch(self, artworks: List[Dict]) -> tuple[List[Dict], List[Dict]]:
"""
批量处理去重
Args:
artworks: 艺术品列表
Returns:
(新数据, 重复数据)
"""
new_artworks = []
duplicate_artworks = []
for artwork in artworks:
if await self.is_duplicate(artwork):
duplicate_artworks.append(artwork)
else:
await self.mark_as_seen(artwork)
new_artworks.append(artwork)
self.logger.info(
f"Deduplication: {len(new_artworks)} new, "
f"{len(duplicate_artworks)} duplicates"
)
return new_artworks, duplicate_artworks
5.3 数据增强器
python
# src/pipelines/enrichment.py
import re
from typing import Dict, Any, Optional
from datetime import datetime
import spacy
from PIL import Image
import requests
from io import BytesIO
import exifread
from transformers import pipeline
from src.config.constants import ART_MOVEMENTS
class DataEnrichmentPipeline:
"""
数据增强管道
使用NLP和图像处理技术丰富数据
"""
def __init__(self):
# 加载NLP模型
try:
self.nlp = spacy.load("en_core_web_sm")
except:
# 如果模型不存在,下载
import subprocess
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"])
self.nlp = spacy.load("en_core_web_sm")
# 初始化图像描述模型
self.image_captioner = pipeline(
"image-to-text",
model="nlpconnect/vit-gpt2-image-captioning"
)
# 初始化实体识别模型
self.ner = pipeline(
"ner",
model="dbmdz/bert-large-cased-finetuned-conll03-english"
)
def enrich_artwork(self, artwork: Dict) -> Dict:
"""
增强艺术品数据
Args:
artwork: 原始艺术品数据
Returns:
增强后的数据
"""
enriched = artwork.copy()
# 1. 从描述中提取实体
if artwork.get('description'):
entities = self._extract_entities(artwork['description'])
enriched['extracted_entities'] = entities
# 2. 解析和规范化日期
if artwork.get('date'):
normalized_date = self._normalize_date(artwork['date'])
if normalized_date:
enriched['normalized_date'] = normalized_date
# 3. 分类艺术流派
if artwork.get('style'):
movement = self._classify_movement(artwork['style'])
if movement:
enriched['art_movement'] = movement
# 4. 从图像中提取信息
if artwork.get('image_url'):
image_info = self._extract_image_info(artwork['image_url'])
if image_info:
enriched['image_metadata'] = image_info
# 5. 生成标签
tags = self._generate_tags(artwork)
if tags:
enriched['tags'] = tags
# 6. 添加时间戳
enriched['enriched_at'] = datetime.now().isoformat()
return enriched
def _extract_entities(self, text: str) -> Dict[str, list]:
"""
从文本中提取实体
Args:
text: 文本内容
Returns:
实体字典
"""
doc = self.nlp(text)
entities = {
'persons': [],
'organizations': [],
'locations': [],
'dates': [],
'artworks': []
}
for ent in doc.ents:
if ent.label_ == 'PERSON':
entities['persons'].append(ent.text)
elif ent.label_ == 'ORG':
entities['organizations'].append(ent.text)
elif ent.label_ == 'GPE' or ent.label_ == 'LOC':
entities['locations'].append(ent.text)
elif ent.label_ == 'DATE':
entities['dates'].append(ent.text)
elif ent.label_ == 'WORK_OF_ART':
entities['artworks'].append(ent.text)
return entities
def _normalize_date(self, date_str: str) -> Optional[Dict]:
"""
规范化日期格式
Args:
date_str: 原始日期字符串
Returns:
规范化后的日期信息
"""
date_info = {}
# 匹配年份
year_pattern = r'\\b(1[0-9]{3}|2[0-9]{3})\\b'
years = re.findall(year_pattern, date_str)
if years:
date_info['years'] = [int(y) for y in years]
# 匹配世纪
century_pattern = r'(\\d+)(st|nd|rd|th)\\s+century'
century_match = re.search(century_pattern, date_str.lower())
if century_match:
century = int(century_match.group(1))
date_info['century'] = century
date_info['century_range'] = [
(century – 1) * 100,
century * 100 – 1
]
# 匹配范围
range_pattern = r'(\\d{4})\\s*[-–—]\\s*(\\d{4})'
range_match = re.search(range_pattern, date_str)
if range_match:
date_info['date_range'] = [
int(range_match.group(1)),
int(range_match.group(2))
]
# 尝试解析具体日期
try:
# 尝试多种日期格式
for fmt in ['%Y', '%Y-%m-%d', '%d %B %Y', '%B %d, %Y']:
try:
parsed_date = datetime.strptime(date_str, fmt)
date_info['iso_date'] = parsed_date.isoformat()
break
except:
continue
except:
pass
return date_info if date_info else None
def _classify_movement(self, style: str) -> Optional[str]:
"""
分类艺术流派
Args:
style: 艺术风格
Returns:
规范化后的流派
"""
style_lower = style.lower()
# 直接匹配
for eng, chn in ART_MOVEMENTS.items():
if eng in style_lower or chn in style_lower:
return chn
# 模糊匹配
for eng, chn in ART_MOVEMENTS.items():
if any(word in style_lower for word in eng.split('-')):
return chn
return None
def _extract_image_info(self, image_url: str) -> Optional[Dict]:
"""
从图像中提取信息
Args:
image_url: 图像URL
Returns:
图像信息
"""
try:
# 下载图像
response = requests.get(image_url, timeout=10)
img = Image.open(BytesIO(response.content))
image_info = {
'format': img.format,
'mode': img.mode,
'size': img.size,
'width': img.width,
'height': img.height,
'aspect_ratio': round(img.width / img.height, 2)
}
# 提取EXIF数据
if hasattr(img, '_getexif') and img._getexif():
exif_data = {}
tags = exifread.process_file(BytesIO(response.content))
for tag, value in tags.items():
exif_data[tag] = str(value)
image_info['exif'] = exif_data
# 生成图像描述
if img.size[0] * img.size[1] < 1000000: # 限制大小避免内存问题
try:
caption = self.image_captioner(img)
if caption:
image_info['ai_description'] = caption[0]['generated_text']
except:
pass
return image_info
except Exception as e:
print(f"Error extracting image info: {e}")
return None
def _generate_tags(self, artwork: Dict) -> List[str]:
"""
生成标签
Args:
artwork: 艺术品数据
Returns:
标签列表
"""
tags = set()
# 基于标题生成标签
if artwork.get('title'):
title_words = re.findall(r'\\w+', artwork['title'].lower())
tags.update(title_words[:5]) # 取前5个词
# 基于艺术家
if artwork.get('artist'):
artist_parts = artwork['artist'].lower().split()
tags.update(artist_parts[:3])
# 基于风格
if artwork.get('style'):
style_words = artwork['style'].lower().split()
tags.update(style_words[:3])
# 基于流派
if artwork.get('genre'):
tags.add(artwork['genre'].lower())
# 过滤掉太短的词
tags = {tag for tag in tags if len(tag) > 2}
return list(tags)[:10] # 最多返回10个标签
5.4 数据存储器
python
# src/pipelines/storage.py
import asyncio
import json
from typing import Dict, List, Any, Optional
from datetime import datetime
from motor.motor_asyncio import AsyncIOMotorClient
from elasticsearch import AsyncElasticsearch
import asyncpg
from minio import Minio
import io
from src.config.settings import settings
class StoragePipeline:
"""
数据存储管道
支持多数据库存储
"""
def __init__(self):
self.mongo_client = None
self.mongo_db = None
self.pg_pool = None
self.es_client = None
self.minio_client = None
async def initialize(self):
"""初始化所有数据库连接"""
# MongoDB连接
self.mongo_client = AsyncIOMotorClient(settings.MONGODB_URI)
self.mongo_db = self.mongo_client[settings.MONGODB_DB]
# PostgreSQL连接池
self.pg_pool = await asyncpg.create_pool(**settings.POSTGRESQL_CONFIG)
# Elasticsearch连接
self.es_client = AsyncElasticsearch(hosts=settings.ELASTICSEARCH_CONFIG['hosts'])
# MinIO连接
self.minio_client = Minio(
'localhost:9000',
access_key='minioadmin',
secret_key='minioadmin123',
secure=False
)
# 确保bucket存在
if not self.minio_client.bucket_exists('art-images'):
self.minio_client.make_bucket('art-images')
# 创建数据库表
await self._init_postgres_tables()
# 创建Elasticsearch索引
await self._init_elasticsearch_index()
async def close(self):
"""关闭所有数据库连接"""
if self.mongo_client:
self.mongo_client.close()
if self.pg_pool:
await self.pg_pool.close()
if self.es_client:
await self.es_client.close()
async def _init_postgres_tables(self):
"""初始化PostgreSQL表"""
async with self.pg_pool.acquire() as conn:
# 创建艺术家表
await conn.execute('''
CREATE TABLE IF NOT EXISTS artists (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
birth_year INTEGER,
death_year INTEGER,
nationality VARCHAR(100),
biography TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(name)
)
''')
# 创建艺术品表
await conn.execute('''
CREATE TABLE IF NOT EXISTS artworks (
id SERIAL PRIMARY KEY,
title VARCHAR(500) NOT NULL,
artist_id INTEGER REFERENCES artists(id),
year INTEGER,
medium VARCHAR(200),
dimensions VARCHAR(100),
style VARCHAR(100),
genre VARCHAR(100),
description TEXT,
image_url TEXT,
source VARCHAR(50),
source_url TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建索引
await conn.execute('CREATE INDEX IF NOT EXISTS idx_artworks_artist ON artworks(artist_id)')
await conn.execute('CREATE INDEX IF NOT EXISTS idx_artworks_year ON artworks(year)')
await conn.execute('CREATE INDEX IF NOT EXISTS idx_artworks_style ON artworks(style)')
async def _init_elasticsearch_index(self):
"""初始化Elasticsearch索引"""
index_name = settings.ELASTICSEARCH_CONFIG['index']
# 检查索引是否存在
exists = await self.es_client.indices.exists(index=index_name)
if not exists:
# 创建索引映射
mappings = {
"mappings": {
"properties": {
"title": {"type": "text", "analyzer": "standard"},
"artist": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
"description": {"type": "text", "analyzer": "standard"},
"year": {"type": "integer"},
"style": {"type": "keyword"},
"genre": {"type": "keyword"},
"medium": {"type": "keyword"},
"tags": {"type": "keyword"},
"created_at": {"type": "date"}
}
},
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"art_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "stop", "snowball"]
}
}
}
}
}
await self.es_client.indices.create(
index=index_name,
body=mappings
)
async def store_artwork(self, artwork: Dict) -> str:
"""
存储单个艺术品
Args:
artwork: 艺术品数据
Returns:
存储ID
"""
artwork_id = None
# 1. 存储到MongoDB
mongo_result = await self.mongo_db.artworks.insert_one(artwork)
artwork_id = str(mongo_result.inserted_id)
# 2. 存储到PostgreSQL
await self._store_to_postgres(artwork)
# 3. 索引到Elasticsearch
await self._index_to_elasticsearch(artwork, artwork_id)
# 4. 存储图片
if artwork.get('image_url'):
await self._store_image(artwork['image_url'], artwork_id)
return artwork_id
async def store_batch(self, artworks: List[Dict]) -> List[str]:
"""
批量存储艺术品
Args:
artworks: 艺术品列表
Returns:
存储ID列表
"""
if not artworks:
return []
# 批量插入MongoDB
result = await self.mongo_db.artworks.insert_many(artworks)
ids = [str(id) for id in result.inserted_ids]
# 批量插入PostgreSQL
await self._store_batch_to_postgres(artworks)
# 批量索引Elasticsearch
await self._index_batch_to_elasticsearch(artworks, ids)
return ids
async def _store_to_postgres(self, artwork: Dict):
"""存储到PostgreSQL"""
async with self.pg_pool.acquire() as conn:
async with conn.transaction():
# 先处理艺术家
artist_id = await self._get_or_create_artist(conn, artwork.get('artist'))
# 插入艺术品
await conn.execute('''
INSERT INTO artworks
(title, artist_id, year, medium, dimensions, style, genre,
description, image_url, source, source_url)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (title, artist_id) DO NOTHING
''',
artwork.get('title'),
artist_id,
artwork.get('year'),
artwork.get('medium'),
artwork.get('dimensions'),
artwork.get('style'),
artwork.get('genre'),
artwork.get('description'),
artwork.get('image_url'),
artwork.get('source'),
artwork.get('url')
)
async def _get_or_create_artist(self, conn, artist_name: str) -> Optional[int]:
"""获取或创建艺术家"""
if not artist_name:
return None
# 查找现有艺术家
result = await conn.fetchrow(
'SELECT id FROM artists WHERE name = $1',
artist_name
)
if result:
return result['id']
# 创建新艺术家
result = await conn.fetchrow(
'INSERT INTO artists (name) VALUES ($1) RETURNING id',
artist_name
)
return result['id']
async def _store_batch_to_postgres(self, artworks: List[Dict]):
"""批量存储到PostgreSQL"""
async with self.pg_pool.acquire() as conn:
async with conn.transaction():
for artwork in artworks:
artist_id = await self._get_or_create_artist(conn, artwork.get('artist'))
await conn.execute('''
INSERT INTO artworks
(title, artist_id, year, medium, dimensions, style, genre,
description, image_url, source, source_url)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (title, artist_id) DO NOTHING
''',
artwork.get('title'),
artist_id,
artwork.get('year'),
artwork.get('medium'),
artwork.get('dimensions'),
artwork.get('style'),
artwork.get('genre'),
artwork.get('description'),
artwork.get('image_url'),
artwork.get('source'),
artwork.get('url')
)
async def _index_to_elasticsearch(self, artwork: Dict, artwork_id: str):
"""索引到Elasticsearch"""
doc = {
'title': artwork.get('title'),
'artist': artwork.get('artist'),
'description': artwork.get('description'),
'year': artwork.get('year'),
'style': artwork.get('style'),
'genre': artwork.get('genre'),
'medium': artwork.get('medium'),
'tags': artwork.get('tags', []),
'created_at': datetime.now().isoformat()
}
await self.es_client.index(
index=settings.ELASTICSEARCH_CONFIG['index'],
id=artwork_id,
body=doc,
refresh=True
)
async def _index_batch_to_elasticsearch(self, artworks: List[Dict], ids: List[str]):
"""批量索引到Elasticsearch"""
actions = []
for i, (artwork, artwork_id) in enumerate(zip(artworks, ids)):
doc = {
'title': artwork.get('title'),
'artist': artwork.get('artist'),
'description': artwork.get('description'),
'year': artwork.get('year'),
'style': artwork.get('style'),
'genre': artwork.get('genre'),
'medium': artwork.get('medium'),
'tags': artwork.get('tags', []),
'created_at': datetime.now().isoformat()
}
action = {
'_index': settings.ELASTICSEARCH_CONFIG['index'],
'_id': artwork_id,
'_source': doc
}
actions.append(action)
if actions:
await self.es_client.bulk(body=actions, refresh=True)
async def _store_image(self, image_url: str, artwork_id: str):
"""存储图片到MinIO"""
try:
# 下载图片
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(image_url) as response:
if response.status == 200:
data = await response.read()
# 上传到MinIO
self.minio_client.put_object(
'art-images',
f'{artwork_id}.jpg',
io.BytesIO(data),
length=len(data),
content_type='image/jpeg'
)
except Exception as e:
print(f"Error storing image: {e}")
async def search_artworks(self, query: str, size: int = 10) -> List[Dict]:
"""
搜索艺术品
Args:
query: 搜索关键词
size: 返回数量
Returns:
搜索结果
"""
search_body = {
"query": {
"multi_match": {
"query": query,
"fields": ["title^3", "artist^2", "description", "tags"]
}
},
"size": size
}
result = await self.es_client.search(
index=settings.ELASTICSEARCH_CONFIG['index'],
body=search_body
)
return [hit['_source'] for hit in result['hits']['hits']]
六、调度与监控
6.1 任务调度器
python
# src/scheduler.py
import asyncio
import signal
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import aioredis
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
import json
from src.crawlers.wikiart import WikiArtCrawler
from src.crawlers.artsy import ArtsyCrawler
from src.pipelines.storage import StoragePipeline
from src.pipelines.deduplication import DeduplicationPipeline
from src.pipelines.enrichment import DataEnrichmentPipeline
from src.utils.logger import setup_logger
class CrawlerScheduler:
"""
爬虫任务调度器
管理所有爬虫的执行、监控和错误处理
"""
def __init__(self):
self.logger = setup_logger("scheduler")
self.scheduler = AsyncIOScheduler()
self.running = False
self.redis = None
# 任务状态
self.task_status = {}
# 爬虫实例
self.crawlers = {
'wikiart': WikiArtCrawler,
'artsy': ArtsyCrawler
}
async def initialize(self):
"""初始化调度器"""
# 连接Redis
self.redis = await aioredis.from_url(
f"redis://{settings.REDIS_CONFIG['host']}:{settings.REDIS_CONFIG['port']}",
password=settings.REDIS_CONFIG.get('password'),
decode_responses=True
)
# 恢复任务状态
await self._load_task_status()
# 设置信号处理
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda: asyncio.create_task(self.shutdown()))
async def _load_task_status(self):
"""从Redis加载任务状态"""
keys = await self.redis.keys("task:*")
for key in keys:
status = await self.redis.get(key)
if status:
self.task_status[key[5:]] = json.loads(status)
async def save_task_status(self, task_name: str):
"""保存任务状态到Redis"""
if task_name in self.task_status:
await self.redis.setex(
f"task:{task_name}",
86400 * 7, # 保留7天
json.dumps(self.task_status[task_name], default=str)
)
def add_crawler_task(
self,
crawler_name: str,
schedule_type: str = "interval",
**kwargs
):
"""
添加爬虫任务
Args:
crawler_name: 爬虫名称
schedule_type: 调度类型(interval/cron)
**kwargs: 调度参数
"""
if crawler_name not in self.crawlers:
self.logger.error(f"Unknown crawler: {crawler_name}")
return
task_id = f"{crawler_name}_task"
# 根据调度类型创建触发器
if schedule_type == "interval":
trigger = IntervalTrigger(
minutes=kwargs.get('minutes', 60),
hours=kwargs.get('hours', 0),
days=kwargs.get('days', 0)
)
elif schedule_type == "cron":
trigger = CronTrigger(
hour=kwargs.get('hour', '*'),
minute=kwargs.get('minute', '0'),
day=kwargs.get('day', '*'),
month=kwargs.get('month', '*'),
day_of_week=kwargs.get('day_of_week', '*')
)
else:
self.logger.error(f"Unknown schedule type: {schedule_type}")
return
# 添加任务
self.scheduler.add_job(
func=self.run_crawler,
trigger=trigger,
args=[crawler_name],
id=task_id,
name=f"Run {crawler_name} crawler",
replace_existing=True,
misfire_grace_time=300
)
self.logger.info(f"Added crawler task: {crawler_name} ({schedule_type})")
# 初始化任务状态
self.task_status[crawler_name] = {
'last_run': None,
'last_success': None,
'last_error': None,
'total_runs': 0,
'successful_runs': 0,
'failed_runs': 0,
'total_items': 0
}
async def run_crawler(self, crawler_name: str):
"""
运行爬虫
Args:
crawler_name: 爬虫名称
"""
self.logger.info(f"Starting crawler: {crawler_name}")
# 更新状态
self.task_status[crawler_name]['last_run'] = datetime.now()
self.task_status[crawler_name]['total_runs'] += 1
crawler_class = self.crawlers[crawler_name]
try:
# 创建爬虫实例
async with crawler_class() as crawler:
# 运行爬虫
artworks = await crawler.run()
if artworks:
# 去重
dedup_pipeline = DeduplicationPipeline()
await dedup_pipeline.initialize()
new_artworks, duplicates = await dedup_pipeline.process_batch(artworks)
# 数据增强
enrich_pipeline = DataEnrichmentPipeline()
enriched_artworks = [
enrich_pipeline.enrich_artwork(artwork)
for artwork in new_artworks
]
# 存储
storage_pipeline = StoragePipeline()
await storage_pipeline.initialize()
ids = await storage_pipeline.store_batch(enriched_artworks)
# 关闭管道
await dedup_pipeline.close()
await storage_pipeline.close()
# 更新统计
self.task_status[crawler_name]['total_items'] += len(ids)
self.logger.info(
f"Crawler {crawler_name} completed. "
f"Collected: {len(artworks)}, "
f"New: {len(new_artworks)}, "
f"Duplicates: {len(duplicates)}"
)
# 更新成功状态
self.task_status[crawler_name]['last_success'] = datetime.now()
self.task_status[crawler_name]['successful_runs'] += 1
except Exception as e:
self.logger.error(f"Crawler {crawler_name} failed: {e}", exc_info=True)
# 更新失败状态
self.task_status[crawler_name]['last_error'] = {
'time': datetime.now(),
'error': str(e)
}
self.task_status[crawler_name]['failed_runs'] += 1
finally:
# 保存状态
await self.save_task_status(crawler_name)
async def run_all_crawlers(self):
"""运行所有爬虫"""
tasks = []
for crawler_name in self.crawlers:
task = self.run_crawler(crawler_name)
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
async def get_status(self) -> Dict:
"""获取所有任务状态"""
return {
'running': self.running,
'tasks': self.task_status,
'scheduler_jobs': [
{
'id': job.id,
'name': job.name,
'next_run': job.next_run_time.isoformat() if job.next_run_time else None
}
for job in self.scheduler.get_jobs()
]
}
def start(self):
"""启动调度器"""
self.scheduler.start()
self.running = True
self.logger.info("Scheduler started")
async def shutdown(self):
"""关闭调度器"""
self.logger.info("Shutting down scheduler…")
self.scheduler.shutdown()
self.running = False
if self.redis:
await self.redis.close()
self.logger.info("Scheduler stopped")
async def run_once(self, crawler_name: str):
"""
立即运行一次爬虫
Args:
crawler_name: 爬虫名称
"""
await self.run_crawler(crawler_name)
6.2 Web监控界面
python
# src/web/dashboard.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from starlette.requests import Request
import plotly.graph_objects as go
import plotly.express as px
from datetime import datetime, timedelta
import pandas as pd
from typing import List, Dict
import json
from src.scheduler import CrawlerScheduler
from src.pipelines.storage import StoragePipeline
app = FastAPI(title="Art Crawler Dashboard")
# 模板和静态文件
templates = Jinja2Templates(directory="src/web/templates")
app.mount("/static", StaticFiles(directory="src/web/static"), name="static")
# 全局实例
scheduler = CrawlerScheduler()
storage = StoragePipeline()
class ConnectionManager:
"""WebSocket连接管理器"""
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: dict):
for connection in self.active_connections:
try:
await connection.send_json(message)
except:
pass
manager = ConnectionManager()
@app.on_event("startup")
async def startup_event():
"""启动事件"""
await scheduler.initialize()
await storage.initialize()
@app.on_event("shutdown")
async def shutdown_event():
"""关闭事件"""
await storage.close()
@app.get("/", response_class=HTMLResponse)
async def dashboard(request: Request):
"""仪表盘主页"""
return templates.TemplateResponse(
"dashboard.html",
{"request": request}
)
@app.get("/api/status")
async def get_status():
"""获取系统状态"""
return await scheduler.get_status()
@app.get("/api/stats")
async def get_stats(days: int = 7):
"""获取统计数据"""
end_date = datetime.now()
start_date = end_date – timedelta(days=days)
# 从数据库获取统计
pipeline = [
{
"$match": {
"created_at": {
"$gte": start_date,
"$lte": end_date
}
}
},
{
"$group": {
"_id": {
"year": {"$year": "$created_at"},
"month": {"$month": "$created_at"},
"day": {"$dayOfMonth": "$created_at"}
},
"count": {"$sum": 1},
"source": {"$first": "$source"}
}
},
{"$sort": {"_id": 1}}
]
cursor = storage.mongo_db.artworks.aggregate(pipeline)
results = await cursor.to_list(length=None)
# 转换为DataFrame
df = pd.DataFrame(results)
# 生成图表
charts = {}
if not df.empty:
# 每日采集数量折线图
fig_line = go.Figure()
for source in df['source'].unique():
source_data = df[df['source'] == source]
fig_line.add_trace(go.Scatter(
x=pd.to_datetime(source_data['_id'].apply(
lambda x: f"{x['year']}-{x['month']}-{x['day']}"
)),
y=source_data['count'],
mode='lines+markers',
name=source
))
fig_line.update_layout(
title="每日采集数量趋势",
xaxis_title="日期",
yaxis_title="数量"
)
charts['daily_trend'] = fig_line.to_json()
# 来源分布饼图
source_counts = df.groupby('source')['count'].sum().reset_index()
fig_pie = px.pie(
source_counts,
values='count',
names='source',
title="数据来源分布"
)
charts['source_distribution'] = fig_pie.to_json()
# 艺术家Top 10
pipeline_artists = [
{"$group": {"_id": "$artist", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
{"$limit": 10}
]
top_artists = await storage.mongo_db.artworks.aggregate(
pipeline_artists
).to_list(length=None)
if top_artists:
fig_bar = go.Figure(data=[
go.Bar(
x=[a['_id'] for a in top_artists],
y=[a['count'] for a in top_artists]
)
])
fig_bar.update_layout(
title="作品最多的艺术家Top 10",
xaxis_title="艺术家",
yaxis_title="作品数量"
)
charts['top_artists'] = fig_bar.to_json()
# 获取总数
total_count = await storage.mongo_db.artworks.count_documents({})
# 获取最新作品
latest = await storage.mongo_db.artworks.find().sort(
"created_at", -1
).limit(10).to_list(length=None)
return {
"total": total_count,
"charts": charts,
"latest": [
{
"title": item.get("title"),
"artist": item.get("artist"),
"date": item.get("created_at").isoformat()
}
for item in latest
]
}
@app.post("/api/crawlers/{crawler_name}/run")
async def run_crawler(crawler_name: str):
"""手动运行爬虫"""
asyncio.create_task(scheduler.run_once(crawler_name))
return {"status": "started", "crawler": crawler_name}
@app.post("/api/crawlers/schedule")
async def update_schedule(config: dict):
"""更新调度配置"""
# 更新调度器配置
pass
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket实时更新"""
await manager.connect(websocket)
try:
while True:
# 每秒发送一次状态更新
status = await scheduler.get_status()
await websocket.send_json(status)
await asyncio.sleep(1)
except WebSocketDisconnect:
manager.disconnect(websocket)
网硕互联帮助中心





评论前必须登录!
注册