crawler_81tv/scrapy_proj/pipelines.py
2025-06-08 16:25:53 +08:00

479 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime
import logging
import json
import requests
import time
import base64
import oss2
from aliyunsdkcore.client import AcsClient
from aliyunsdkvod.request.v20170321.CreateUploadVideoRequest import CreateUploadVideoRequest
from aliyunsdkvod.request.v20170321.GetVideoInfoRequest import GetVideoInfoRequest
from aliyunsdkvod.request.v20170321.UpdateVideoInfoRequest import UpdateVideoInfoRequest
from aliyunsdkvod.request.v20170321.CreateUploadImageRequest import CreateUploadImageRequest
from sqlalchemy import text
from .database import DatabaseManager
from .models import VideoSQLite
logger = logging.getLogger(__name__)
class AliyunVodPipeline:
"""阿里云视频点播处理中间件"""
def __init__(self, settings):
"""初始化阿里云视频点播中间件
初始化SQLite中间件
Args:
settings: Scrapy设置对象
"""
self.settings = settings
self.access_key_id = settings.get('ALIYUN_ACCESS_KEY_ID')
self.access_key_secret = settings.get('ALIYUN_ACCESS_KEY_SECRET')
self.template_group_id = settings.get('ALIYUN_TEMPLATE_GROUP_ID')
self.client = AcsClient(self.access_key_id, self.access_key_secret, 'cn-shanghai')
self.oss_client = oss2.Auth(self.access_key_id, self.access_key_secret)
self.db_manager = DatabaseManager(settings)
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
def upload_media_by_url(self, video_url, title, cover_url=None):
"""通过URL上传视频到阿里云VOD
Args:
video_url: 视频URL
title: 视频标题
cover_url: 封面URL
Returns:
str: 上传任务ID (JobId)
"""
from aliyunsdkvod.request.v20170321.UploadMediaByURLRequest import UploadMediaByURLRequest
request = UploadMediaByURLRequest()
request.set_accept_format('JSON')
# 设置视频URL
logger.info(f"上传视频URL: {video_url}")
request.set_UploadURLs(video_url)
# 设置视频信息需要是JSON数组字符串
upload_metadata = [{
'Title': title,
'SourceURL': video_url,
'TemplateGroupId': self.template_group_id
}]
# 设置封面URL
# if cover_url:
# upload_metadata[0]['CoverURL'] = cover_url
request.set_UploadMetadatas(json.dumps(upload_metadata))
response = self.client.do_action_with_exception(request)
result = json.loads(response)
# 返回第一个上传任务的JobId
upload_jobs = result.get('UploadJobs', [])
if not upload_jobs:
raise Exception("No upload job created")
job = upload_jobs[0]
# if job.get('Code') != 'Success':
# raise Exception(f"Upload job failed: {job}")
return job.get('JobId') # 返回JobId而不是VideoId
def get_upload_job_status(self, job_id):
"""获取上传任务状态
Args:
job_id: 上传任务ID
Returns:
dict: 任务状态信息包含VideoId如果上传完成
"""
from aliyunsdkvod.request.v20170321.GetURLUploadInfosRequest import GetURLUploadInfosRequest
request = GetURLUploadInfosRequest()
request.set_accept_format('JSON')
request.set_JobIds(job_id)
response = self.client.do_action_with_exception(request)
result = json.loads(response)
upload_jobs = result.get('URLUploadInfoList', [])
if not upload_jobs:
raise Exception(f"No upload job found with ID: {job_id}")
job = upload_jobs[0]
return job
def wait_for_video_id(self, job_id, max_retries=5, retry_interval=2):
"""等待上传任务完成并获取VideoId
Args:
job_id: 上传任务ID
max_retries: 最大重试次数
retry_interval: 重试间隔(秒)
Returns:
str: 视频ID
"""
import time
for i in range(max_retries):
job_status = self.get_upload_job_status(job_id)
if job_status.get('MediaId'):
return job_status.get('MediaId')
# 等待一段时间后重试
time.sleep(retry_interval)
raise Exception(f"Max retries reached, upload job not completed: {job_id}")
def upload_image_to_oss(self, image_url, title):
"""直接上传图片到阿里云OSS
Args:
image_url: 图片URL
title: 图片标题
Returns:
str: OSS中的图片URL
"""
logger.info(f"开始上传图片到OSS: {image_url}")
try:
# 1. 下载远程图片
image_response = requests.get(image_url, timeout=30)
image_response.raise_for_status()
image_content = image_response.content
except Exception as e:
logger.error(f"下载图片失败: {str(e)}")
raise Exception(f"下载图片失败: {str(e)}")
try:
# 2. 生成OSS中的文件名使用时间戳和原始文件名的组合
timestamp = int(time.time())
file_ext = image_url.split('.')[-1] if '.' in image_url else 'jpg'
oss_filename = f"images/{timestamp}_{title[:30]}.{file_ext}" # 限制标题长度,避免文件名过长
# 3. 获取OSS bucket从settings中获取配置
bucket_name = self.settings.get('ALIYUN_OSS_BUCKET')
endpoint = self.settings.get('ALIYUN_OSS_ENDPOINT')
oss_bucket = oss2.Bucket(self.oss_client, endpoint, bucket_name)
# 4. 上传图片到OSS
upload_response = oss_bucket.put_object(oss_filename, image_content)
if upload_response.status == 200:
# 5. 返回可访问的URL
oss_url = f"https://{bucket_name}.{endpoint}/{oss_filename}"
logger.info(f"图片上传成功: {oss_url}")
return oss_url
else:
raise Exception(f"图片上传失败: {upload_response.status}")
except Exception as e:
logger.error(f"上传图片到OSS失败: {str(e)}")
raise Exception(f"上传图片到OSS失败: {str(e)}")
def process_item(self, item, spider):
"""处理数据项通过URL上传视频到阿里云VOD
Args:
item: 爬取的数据项
spider: 爬虫实例
Returns:
item: 处理后的数据项
"""
# 如果已经有阿里云视频ID跳过处理
with self.db_manager.sqlite_session() as session:
# 检查是否存在相同source_url的记录
existing_video = session.query(VideoSQLite).filter_by(source_url=item.get('source_url')).first()
if existing_video and existing_video.aliyun_video_id:
logger.info(f"阿里云视频ID已存在跳过该任务: {item.get('title')}")
return item
video_url = item.get('video_url')
if not video_url:
logger.warning(f"视频URL为空跳过处理: {item.get('source_url')}")
return item
try:
# 1. 上传封面图片到OSS如果有
cover_url = item.get('source_thumbnail_url')
if cover_url:
try:
oss_url = self.upload_image_to_oss(
image_url=cover_url,
title=item.get('title', '')
)
# 更新item中的封面URL为OSS URL
item['thumbnail_url'] = oss_url
logger.info(f"封面图片上传到OSS成功: {oss_url}")
except Exception as e:
logger.error(f"封面图片上传到OSS失败: {str(e)}")
# 如果封面上传失败,继续处理视频,不中断流程
# 2. 通过URL上传视频获取JobId
title = item.get('title', '')
job_id = self.upload_media_by_url(
video_url=video_url,
title=title,
cover_url=oss_url # 使用刚上传的OSS封面URL
)
logger.info(f"成功创建阿里云视频URL上传任务: job_id={job_id}, title={title}")
# 2. 等待上传完成并获取VideoId
try:
video_id = self.wait_for_video_id(job_id)
logger.info(f"视频上传完成: video_id={video_id}, job_id={job_id}")
# 3. 更新item中的阿里云视频ID和状态
item['aliyun_video_id'] = video_id
item['aliyun_status'] = 'Success'
except Exception as e:
logger.error(f"等待视频上传完成失败: job_id={job_id}, error={str(e)}")
item['aliyun_video_id'] = ""
item['aliyun_status'] = 'Uploading'
raise # 重新抛出异常,让上层错误处理来处理
except Exception as e:
logger.error(f"阿里云视频URL上传失败: {str(e)}")
item['aliyun_status'] = 'Failed'
return item
class SQLitePipeline:
"""SQLite数据库处理中间件"""
def __init__(self, settings):
"""初始化SQLite中间件
Args:
settings: Scrapy设置对象
"""
self.db_manager = DatabaseManager(settings)
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
def process_item(self, item, spider):
"""处理数据项保存到SQLite数据库
Args:
item: 爬取的数据项
spider: 爬虫实例
Returns:
item: 处理后的数据项
"""
now = datetime.now()
now_str = now.strftime('%Y-%m-%d %H:%M:%S')
with self.db_manager.sqlite_session() as session:
# 检查是否存在相同source_url的记录
existing_video = session.query(VideoSQLite).filter_by(source_url=item.get('source_url')).first()
if existing_video:
logger.info(f"发现重复视频: {item.get('source_url')}")
# 更新现有记录
existing_video.title = item.get('title', '')
existing_video.description = item.get('description', '')
existing_video.publish_time = item.get('publish_time', '')
existing_video.update_time = now_str
existing_video.video_url = item.get('video_url', '')
existing_video.source_thumbnail_url = item.get('source_thumbnail_url', '')
existing_video.duration = str(item.get('duration', ''))
existing_video.video_list = str(item.get('video_list', 0))
# 判断video_id、status、thumbnail_url防止被覆盖
if item.get('aliyun_video_id'):
existing_video.aliyun_video_id = item['aliyun_video_id']
if item.get('aliyun_status'):
existing_video.aliyun_status = item['aliyun_status']
if item.get('thumbnail_url'):
existing_video.thumbnail_url = item['thumbnail_url']
# existing_video.status = 0 # 重置状态为0
# 保存SQLite记录ID到item中供后续中间件使用
item['sqlite_id'] = existing_video.id
else:
# 创建新记录
sqlite_data = {
'title': item.get('title', ''),
'description': item.get('description', ''),
'source_url': item.get('source_url', ''),
'publish_time': item.get('publish_time', ''),
'create_time': now_str,
'update_time': now_str,
'video_url': item.get('video_url', ''),
'source_thumbnail_url': item.get('source_thumbnail_url', ''),
'thumbnail_url': item.get('thumbnail_url', ''),
'duration': str(item.get('duration', '')),
'video_list': item.get('video_list', ''),
'aliyun_video_id': item.get('aliyun_video_id', ''),
'aliyun_status': item.get('aliyun_status', ''),
'status': 0
}
new_video = VideoSQLite(**sqlite_data)
session.add(new_video)
session.flush() # 获取新插入记录的ID
# 保存SQLite记录ID到item中供后续中间件使用
item['sqlite_id'] = new_video.id
return item
class MariaDBPipeline:
"""将数据从SQLite迁移到MariaDB的管道"""
def __init__(self, settings):
"""初始化管道
Args:
settings: Scrapy设置对象
"""
self.db_manager = DatabaseManager(settings)
self.logger = logging.getLogger(__name__)
@classmethod
def from_crawler(cls, crawler):
"""从crawler创建管道实例
Args:
crawler: Scrapy crawler对象
Returns:
MariaDBPipeline: 管道实例
"""
return cls(crawler.settings)
def open_spider(self, spider):
"""当spider开启时调用"""
self.logger.info("MariaDB管道已开启")
def close_spider(self, spider):
"""当spider关闭时调用"""
self.logger.info("MariaDB管道已关闭")
self.migrate_data()
def process_item(self, item, spider):
"""处理item
Args:
item: Scrapy item对象
spider: Scrapy spider对象
Returns:
item: 处理后的item
"""
# 这里不需要处理item因为我们要从SQLite读取数据
return item
def migrate_data(self):
"""从SQLite迁移数据到MariaDB"""
try:
with self.db_manager.sqlite_session() as sqlite_session, \
self.db_manager.mysql_session() as mysql_session:
# 1. 从SQLite读取视频数据
sqlite_videos = sqlite_session.query(VideoSQLite).where((VideoSQLite.aliyun_video_id != None) & (VideoSQLite.aliyun_video_id != '')).all()
# sqlite_videos = sqlite_session.query(VideoSQLite).all()
# 2. 批量迁移到MariaDB
for video in sqlite_videos:
# 根据video_id查重
existing_video_id = mysql_session.execute(
text("SELECT id FROM wz_video WHERE video_remote_id = :video_remote_id LIMIT 1"), {
'video_remote_id': video.aliyun_video_id
}
)
if existing_video_id.first():
self.logger.info(f"远程数据库已存在该视频: {video.title}")
continue
# 映射到wz_video表
wz_video = {
'cid': 1,
'title': video.title or '',
'css': '',
'thumb': video.thumbnail_url or '',
'keywords': '',
'remark': video.description or '',
'block': 0,
'url': '',
'status': 9,
'route': 0,
'publisher': 'spider',
'addtime': int(time.time()),
'updatetime': int(time.time()),
'area': '1',
'category': '1',
'theme': 0,
'year': '2025',
'video_remote_id': video.aliyun_video_id or '',
'video_url': '',
'video_list': video.video_list or 0,
'month': '1'
}
# 映射到wz_video_data表
wz_video_data = {
'id': None, # 将在插入后设置
'content': '',
'coin': 0,
'groups': '',
'pagetype': 0,
'maxchars': 0,
'template': '',
'allowcomment': 1,
'relation': ''
}
# 插入wz_video并获取ID
result = mysql_session.execute(
text("""INSERT INTO wz_video (
cid, title, css, thumb, keywords, remark, block, url,
status, route, publisher, addtime, updatetime, area,
category, theme, year, video_remote_id, video_url,
video_list, month
) VALUES (
:cid, :title, :css, :thumb, :keywords, :remark, :block, :url,
:status, :route, :publisher, :addtime, :updatetime, :area,
:category, :theme, :year, :video_remote_id, :video_url,
:video_list, :month
)"""),
wz_video
)
video_id = result.lastrowid
# 设置wz_video_data的id并插入
wz_video_data['id'] = video_id
mysql_session.execute(
text("""INSERT INTO wz_video_data (
id, content, coin, groups, pagetype, maxchars,
template, allowcomment, relation
) VALUES (
:id, :content, :coin, :groups, :pagetype, :maxchars,
:template, :allowcomment, :relation
)"""),
wz_video_data
)
mysql_session.commit()
self.logger.info(f"成功迁移 {len(sqlite_videos)} 条视频数据到线上数据库")
except Exception as e:
self.logger.error(f"数据迁移失败: {str(e)}")
raise