crawler_81tv/main.py
2025-06-08 16:25:53 +08:00

744 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import uuid
import logging
import time
from typing import Dict, List, Optional, Literal
from datetime import datetime
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel, HttpUrl
import uvicorn
from multiprocessing import Process
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 添加scrapy项目到Python路径
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "scrapy_proj"))
# 导入爬虫
from scrapy_proj.spiders.zgjs import ZGJSSpider
from scrapy_proj.models import ScheduledTask
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 定时任务相关的Pydantic模型
class ScheduledTaskCreate(BaseModel):
"""创建定时任务请求模型"""
name: str
cron_expression: str
spider_name: str
url: str
video_list: int
enabled: bool = True
class ScheduledTaskUpdate(BaseModel):
"""更新定时任务请求模型"""
name: Optional[str] = None
cron_expression: Optional[str] = None
spider_name: Optional[str] = None
url: Optional[str] = None
video_list: Optional[int] = None
enabled: Optional[bool] = None
class TaskStatus(BaseModel):
"""任务状态响应模型"""
status: Literal['pending', 'running', 'completed', 'failed']
message: Optional[str] = None
start_time: Optional[str] = None
end_time: Optional[str] = None
class ScheduledTaskResponse(BaseModel):
"""定时任务响应模型"""
id: int
name: str
cron_expression: str
spider_name: str
url: str
video_list: int
enabled: bool
create_time: str
update_time: str
# 存储任务状态
task_status_store: Dict[str, Dict] = {}
# 创建数据库引擎和会话
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///data/videos.db")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建调度器
scheduler = BackgroundScheduler()
scheduler.start()
# 创建FastAPI应用
app = FastAPI(
title="Scrapy API",
description="影视资源爬虫API",
version="0.1.0",
)
# 存储爬虫任务状态
spider_tasks = {}
# 获取数据库会话
def get_db():
db = SessionLocal()
try:
return db
finally:
db.close()
class SpiderRequest(BaseModel):
"""爬虫请求模型"""
url: Optional[HttpUrl] = None
spider_name: str = "example"
settings: Optional[Dict] = None
video_list: int
class SpiderResponse(BaseModel):
"""爬虫响应模型"""
task_id: str
status: str
spider_name: str
task_name: Optional[str] = None # 添加任务名称字段
message: str
start_time: Optional[str] = None
end_time: Optional[str] = None
class PaginatedSpiderResponse(BaseModel):
"""分页爬虫响应模型"""
items: List[SpiderResponse]
total: int
page: int
page_size: int
total_pages: int
def _run_spider_process(spider_name: str, url: Optional[str], video_list: int, settings: Dict):
"""实际运行爬虫的进程函数"""
try:
# 获取项目设置
crawler_settings = get_project_settings()
# 如果提供了自定义设置,则更新
if settings:
for key, value in settings.items():
crawler_settings.set(key, value)
# 创建爬虫进程
process = CrawlerProcess(settings=crawler_settings)
# 选择爬虫
if spider_name == "zgjs":
logger.info(f"启动爬虫 {spider_name}URL: {url}")
process.crawl(ZGJSSpider, url=url, video_list=video_list)
else:
raise ValueError(f"未知的爬虫: {spider_name}")
# 启动爬虫
process.start()
# 确保爬虫进程正确关闭
try:
if hasattr(process, '_active') and not process._active:
logger.info("爬虫进程已完成")
elif hasattr(process, 'bootstrap_stopped') and process.bootstrap_stopped: # type: ignore
logger.info("爬虫进程正常停止")
else:
logger.warning("爬虫进程未正常停止,强制关闭")
process.stop()
# 确保所有reactor线程都停止
from twisted.internet import reactor
if reactor.running: # type: ignore
logger.info("停止reactor")
reactor.stop() # type: ignore
except Exception as e:
logger.error(f"关闭爬虫进程时出错: {str(e)}")
sys.exit(1) # 非正常退出
# 确保进程退出
logger.info("爬虫进程正常退出")
sys.exit(0)
except Exception as e:
logger.error(f"爬虫进程运行错误: {str(e)}", exc_info=True)
sys.exit(1) # 非正常退出
def run_spider(task_id: str, spider_name: str, task_name: Optional[str] = None, url: Optional[str] = None, video_list: int = 0, settings: Optional[Dict] = None):
"""在后台运行爬虫
Args:
task_id: 任务ID
spider_name: 爬虫名称
task_name: 任务名称
url: 开始URL
video_list: 视频列表数量
settings: 爬虫设置
"""
try:
# 确保settings是字典
settings = settings or {}
# 初始化爬虫任务状态
if task_id not in spider_tasks:
spider_tasks[task_id] = {
"status": "pending",
"spider_name": spider_name,
"task_name": task_name,
"message": "爬虫任务初始化中",
"started_at": time.time(),
"finished_at": None
}
# 创建并启动新进程
p = Process(
target=_run_spider_process,
args=(spider_name, url, video_list, settings),
daemon=True # 设置为守护进程,确保主进程退出时子进程也会退出
)
p.start()
# 更新任务状态为运行中
spider_tasks[task_id].update({
"status": "running",
"message": "爬虫任务正在运行",
"process": p,
"started_at": time.time()
})
# 启动一个线程来监控进程状态
def monitor_process(process, task_id):
process.join() # 等待进程结束
if task_id in spider_tasks:
finish_time = time.time()
spider_tasks[task_id]["finished_at"] = finish_time
if process.exitcode == 0:
spider_tasks[task_id].update({
"status": "completed",
"message": "爬虫任务成功完成"
})
else:
spider_tasks[task_id].update({
"status": "failed",
"message": f"爬虫任务失败,退出码: {process.exitcode}"
})
# 同步更新定时任务状态(如果存在)
for scheduled_task_id, status_info in task_status_store.items():
if status_info.get("spider_task_id") == task_id:
status_info.update({
"status": spider_tasks[task_id]["status"],
"message": spider_tasks[task_id]["message"],
"end_time": datetime.fromtimestamp(finish_time).strftime("%Y-%m-%d %H:%M:%S")
})
from threading import Thread
monitor_thread = Thread(target=monitor_process, args=(p, task_id))
monitor_thread.daemon = True
monitor_thread.start()
except Exception as e:
error_time = time.time()
# 更新任务状态为失败
spider_tasks[task_id].update({
"status": "failed",
"message": f"启动爬虫进程错误: {str(e)}",
"finished_at": error_time
})
# 同步更新定时任务状态(如果存在)
for scheduled_task_id, status_info in task_status_store.items():
if status_info.get("spider_task_id") == task_id:
status_info.update({
"status": "failed",
"message": f"启动爬虫进程错误: {str(e)}",
"end_time": datetime.fromtimestamp(error_time).strftime("%Y-%m-%d %H:%M:%S")
})
logger.error(f"启动爬虫进程错误: {str(e)}")
def cleanup_finished_processes():
"""清理已完成的进程并更新状态"""
for task_id, task_info in list(spider_tasks.items()):
if "process" in task_info:
process = task_info["process"]
if not process.is_alive():
# 获取进程退出码
exitcode = process.exitcode
# 清理进程资源
process.join()
process.close()
# 当前时间
current_time = time.time()
# 根据退出码更新状态
if exitcode == 0:
task_info["status"] = "completed"
task_info["message"] = "爬虫任务成功完成"
else:
task_info["status"] = "failed"
task_info["message"] = f"爬虫任务失败,退出码: {exitcode}"
# 记录完成时间(如果还没有设置的话)
if "finished_at" not in task_info:
task_info["finished_at"] = current_time
# 确保有开始时间
if "started_at" not in task_info:
task_info["started_at"] = task_info.get("finished_at", current_time)
del spider_tasks[task_id]["process"]
@app.post("/api/spiders/run", response_model=SpiderResponse)
async def start_spider(spider_request: SpiderRequest, background_tasks: BackgroundTasks):
"""启动爬虫任务
Args:
spider_request: 爬虫请求参数
background_tasks: 后台任务
Returns:
SpiderResponse: 爬虫响应
"""
# 生成任务ID
task_id = str(uuid.uuid4())
if (spider_request.url is None):
raise HTTPException(status_code=400, detail="缺少url参数")
if (spider_request.video_list is None):
raise HTTPException(status_code=400, detail="缺少video_list参数")
# 当前时间戳
current_time = time.time()
# 记录任务信息
spider_tasks[task_id] = {
"status": "pending",
"spider_name": spider_request.spider_name,
"message": "爬虫任务已创建,等待执行",
"started_at": current_time, # 添加开始时间
"finished_at": None # 初始化结束时间为None
}
# 在后台运行爬虫
background_tasks.add_task(
run_spider,
task_id=task_id,
spider_name=spider_request.spider_name,
url=str(spider_request.url) if spider_request.url else None,
video_list=spider_request.video_list,
settings=spider_request.settings
)
# 格式化时间为ISO格式
start_time = datetime.fromtimestamp(current_time).isoformat()
return SpiderResponse(
task_id=task_id,
status="pending",
spider_name=spider_request.spider_name,
message="爬虫任务已创建,等待执行",
start_time=start_time,
end_time=None
)
@app.get("/api/spiders/status/{task_id}", response_model=SpiderResponse)
async def get_spider_status(task_id: str):
"""获取爬虫任务状态
Args:
task_id: 任务ID
Returns:
SpiderResponse: 爬虫响应
"""
# 先清理已完成进程
cleanup_finished_processes()
if task_id not in spider_tasks:
raise HTTPException(status_code=404, detail="任务不存在")
task_info = spider_tasks[task_id]
return SpiderResponse(
task_id=task_id,
status=task_info["status"],
spider_name=task_info["spider_name"],
task_name=task_info.get("task_name"),
message=task_info["message"],
start_time=datetime.fromtimestamp(task_info["started_at"]).strftime("%Y-%m-%d %H:%M:%S") if task_info.get("started_at") else None,
end_time=datetime.fromtimestamp(task_info["finished_at"]).strftime("%Y-%m-%d %H:%M:%S") if task_info.get("finished_at") else None
)
@app.post("/api/spiders/cleanup")
async def cleanup_spiders():
"""清理已完成的任务进程
Returns:
Dict: 清理结果
"""
cleanup_finished_processes()
return {"message": "已完成进程清理"}
@app.get("/api/spiders/list", response_model=PaginatedSpiderResponse)
async def list_spiders(page: int = 1, page_size: int = 10):
"""列出爬虫任务(分页)
Args:
page: 页码从1开始
page_size: 每页数量
Returns:
PaginatedSpiderResponse: 分页的爬虫任务列表
"""
# 先清理已完成进程,确保状态最新
cleanup_finished_processes()
# 计算分页参数
all_tasks = list(spider_tasks.items())
all_tasks.reverse() # 倒序
total = len(all_tasks)
total_pages = (total + page_size - 1) // page_size
page = max(1, min(page, total_pages))
# 获取当前页的数据
start = (page - 1) * page_size
end = start + page_size
paginated_tasks = all_tasks[start:end]
return PaginatedSpiderResponse(
items=[
SpiderResponse(
task_id=str(task_id),
status=task_info["status"],
spider_name=task_info["spider_name"],
task_name=task_info.get("task_name"),
message=task_info["message"],
start_time=datetime.fromtimestamp(task_info["started_at"]).strftime("%Y-%m-%d %H:%M:%S") if task_info.get("started_at") else None,
end_time=datetime.fromtimestamp(task_info["finished_at"]).strftime("%Y-%m-%d %H:%M:%S") if task_info.get("finished_at") else None
)
for task_id, task_info in paginated_tasks
],
total=total,
page=page,
page_size=page_size,
total_pages=total_pages
)
def add_job_to_scheduler(task: ScheduledTask):
"""添加任务到调度器"""
if task.enabled: # type: ignore
# 创建一个包装函数来处理定时任务的状态更新
def scheduled_spider_run():
# 生成唯一的爬虫任务ID
spider_task_id = str(uuid.uuid4())
task_id_str = str(task.id)
current_time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 记录定时任务状态
task_status_store[task_id_str] = {
"status": "running",
"message": "定时任务自动执行中",
"start_time": current_time_str,
"end_time": None,
"spider_task_id": spider_task_id
}
# 运行爬虫
run_spider(
task_id=spider_task_id,
spider_name=str(task.spider_name),
task_name=str(task.name),
url=str(task.url),
video_list=task.video_list # type: ignore
)
# 记录日志
logger.info(f"定时任务 {task.name} (ID: {task.id}) 已自动执行爬虫任务ID: {spider_task_id}")
# 添加到调度器
scheduler.add_job(
scheduled_spider_run,
CronTrigger.from_crontab(task.cron_expression),
id=str(task.id),
replace_existing=True
)
@app.post("/api/scheduled-tasks", response_model=ScheduledTaskResponse)
async def create_scheduled_task(task: ScheduledTaskCreate):
"""创建定时任务"""
db = get_db()
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
db_task = ScheduledTask(
name=task.name,
cron_expression=task.cron_expression,
spider_name=task.spider_name,
url=task.url,
video_list=task.video_list,
enabled=task.enabled,
create_time=current_time,
update_time=current_time
)
try:
db.add(db_task)
db.commit()
db.refresh(db_task)
# 如果任务启用,添加到调度器
if task.enabled:
add_job_to_scheduler(db_task)
return db_task
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@app.get("/api/scheduled-tasks", response_model=List[ScheduledTaskResponse])
async def list_scheduled_tasks():
"""获取所有定时任务"""
db = get_db()
try:
tasks = db.query(ScheduledTask).all()
return tasks
finally:
db.close()
@app.get("/api/scheduled-tasks/{task_id}", response_model=ScheduledTaskResponse)
async def get_scheduled_task(task_id: int):
"""获取指定定时任务"""
db = get_db()
try:
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
return task
finally:
db.close()
@app.put("/api/scheduled-tasks/{task_id}", response_model=ScheduledTaskResponse)
async def update_scheduled_task(task_id: int, task_update: ScheduledTaskUpdate):
"""更新定时任务"""
db = get_db()
try:
db_task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
update_data = task_update.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_task, key, value)
db_task.update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # type: ignore
db.commit()
db.refresh(db_task)
# 更新调度器中的任务
job_id = str(db_task.id)
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
if db_task.enabled: # type: ignore
add_job_to_scheduler(db_task)
return db_task
finally:
db.close()
@app.delete("/api/scheduled-tasks/{task_id}")
async def delete_scheduled_task(task_id: int):
"""删除定时任务"""
db = get_db()
try:
db_task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
# 从调度器中移除任务
job_id = str(db_task.id)
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
db.delete(db_task)
db.commit()
return {"message": "Task deleted successfully"}
finally:
db.close()
@app.post("/api/scheduled-tasks/{task_id}/toggle")
async def toggle_scheduled_task(task_id: int):
"""启用/禁用定时任务"""
db = get_db()
try:
db_task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
db_task.enabled = not db_task.enabled # type: ignore
db_task.update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # type: ignore
# 更新调度器中的任务
job_id = str(db_task.id)
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
if db_task.enabled: # type: ignore
add_job_to_scheduler(db_task)
db.commit()
db.refresh(db_task)
return {"message": f"Task {'enabled' if db_task.enabled else 'disabled'} successfully"} # type: ignore
finally:
db.close()
@app.post("/api/scheduled-tasks/{task_id}/run")
async def run_scheduled_task(task_id: int, background_tasks: BackgroundTasks):
"""手动执行定时任务"""
db = get_db()
try:
db_task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
if db_task is None:
raise HTTPException(status_code=404, detail="Task not found")
# 生成唯一的爬虫任务ID
spider_task_id = str(uuid.uuid4())
# 当前时间戳
current_time = time.time()
current_time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 记录爬虫任务信息
spider_tasks[spider_task_id] = {
"status": "pending",
"spider_name": db_task.spider_name,
"task_name": db_task.name,
"message": "爬虫任务已创建,等待执行",
"started_at": current_time,
"finished_at": None
}
# 记录定时任务状态
task_status_store[str(task_id)] = {
"status": "running",
"message": "定时任务正在执行",
"start_time": current_time_str,
"end_time": None,
"spider_task_id": spider_task_id, # 关联爬虫任务ID
"task_name": db_task.name
}
# 在后台运行爬虫
background_tasks.add_task(
run_spider,
task_id=spider_task_id,
spider_name=str(db_task.spider_name),
task_name=str(db_task.name),
url=str(db_task.url),
video_list=db_task.video_list # type: ignore
)
return {
"message": "任务已开始执行",
"task_id": task_id,
"spider_task_id": spider_task_id,
"task_name": db_task.name # 添加任务名称到响应
}
finally:
db.close()
@app.get("/api/task-status/{task_id}", response_model=TaskStatus)
async def get_task_status(task_id: int):
"""获取定时任务的执行状态"""
task_id_str = str(task_id)
# 先清理已完成的爬虫进程,确保状态最新
cleanup_finished_processes()
if task_id_str not in task_status_store:
return TaskStatus(status="pending", message="任务未执行")
status_info = task_status_store[task_id_str]
# 如果任务正在运行,检查爬虫任务的状态
if "spider_task_id" in status_info:
spider_task_id = status_info["spider_task_id"]
if spider_task_id in spider_tasks:
spider_info = spider_tasks[spider_task_id]
spider_status = spider_info["status"]
spider_message = spider_info["message"]
# 同步状态
if spider_status != status_info["status"]:
status_info["status"] = spider_status
status_info["message"] = spider_message
# 如果爬虫任务完成或失败,更新结束时间
if spider_status in ["completed", "failed"]:
status_info["end_time"] = datetime.fromtimestamp(
spider_info.get("finished_at", time.time())
).strftime("%Y-%m-%d %H:%M:%S")
else:
# 如果爬虫任务不存在且状态是running可能是异常终止
if status_info["status"] == "running":
status_info["status"] = "failed"
status_info["message"] = "爬虫任务异常终止"
status_info["end_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return TaskStatus(
status=status_info["status"],
message=status_info["message"],
start_time=status_info.get("start_time"),
end_time=status_info.get("end_time")
)
# 挂载静态文件目录
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/")
async def read_index():
return FileResponse("static/index.html")
if __name__ == "__main__":
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "8000"))
log_level = os.getenv("LOG_LEVEL", "info")
# 启动服务器
uvicorn.run(
app if not os.getenv("RELOAD") else "main:app",
host=host,
port=port,
log_level=log_level,
reload=bool(os.getenv("RELOAD"))
)