“实现调用DeepSeek API的Celery任务”或“创建Flask提交选题的路由”;2.修改 tasks.py:在其中导入 SessionLocal 和 Task 模型,重写 generate_article 函数,使其从数据库获取任务、调用DeepSeek API、再将结果更新回数据库。 创建 app.py 路由:提供Web界面,让用户提交选题(即向 task 表插入新记录)和触发审核发布。 实现 platform_publishers 模块:为每个平台(joomla, wechat_mp等)编写具体的发布函数,从 PublishPlatform.config 中读取配置并调用对应API。
因为 要完整的解决方案,所以从最核心的Celery任务开始,因为这是整个自动化流程的引擎。然后构建Flask Web界面让用户可以提交任务,最后实现各个平台的发布函数。
要实现"从数据库获取任务、调用DeepSeek API、再将结果更新回数据库"的完整流程,需要设计一个可靠的任务处理机制。对于Flask路由,需要设计一个简单但功能完整的Web界面。对于平台发布模块,需要考虑到不同平台API的差异性,设计一个可扩展的架构。
从最核心的自动化引擎开始。在数据库验证成功后,下一步就是实现调用DeepSeek API的Celery任务和提供操作界面的Flask应用。
以下是实现 tasks.py 和 app.py 的核心代码与操作指南。
🔧 第一步:实现核心引擎 - 重写 tasks.py
这个文件是整个系统的异步任务中枢,负责执行最耗时的AI生成和内容发布工作。
1. 创建 tasks.py 文件
在您的项目目录 ~/ContentAutomationHub 下,创建并编辑 tasks.py 文件:
# ~/ContentAutomationHub/tasks.pyimport osimport requestsimport jsonfrom celery import Celeryfrom models import SessionLocal, Task, PublishPlatform, PublishLog
# 1. 初始化Celery应用,连接到Redis# broker和backend的地址需根据您的Redis配置调整(如果Redis运行在默认端口可不变)app = Celery('content_tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
# 2. DeepSeek API配置 - 【关键】请务必从环境变量读取,不要硬编码!DEEPSEEK_API_KEY = os.environ.get("DEEPSEEK_API_KEY", "您的API密钥")DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
@app.task(bind=True)def generate_article(self, task_id): """ 核心任务:从数据库获取任务,调用DeepSeek API生成文章,并更新回数据库。 """ db = SessionLocal() try: # 1. 从数据库获取任务 task = db.query(Task).filter(Task.id == task_id).first() if not task: print(f"任务 {task_id} 不存在") return {"status": "failed", "reason": "Task not found"}
print(f"开始为任务 {task_id} ({task.title}) 生成文章...") task.status = 'generating' db.commit()
# 2. 精心设计提示词,这是生成高质量文章的关键 prompt = f"""你是一位资深的电子通信与嵌入式开发领域的技术专家。请围绕以下主题,撰写一篇深入、专业且实用的技术文章。
**文章主题:** {task.title} **核心关键词:** {task.keywords}
**文章要求:**1. 结构清晰,包含引言、技术原理分析、实战应用或方案对比、总结展望。2. 语言严谨专业,但避免过于晦涩,确保中级开发者能理解。3. 内容必须基于真实技术原理,可适当举例说明。4. 字数在1500-2000字左右。请直接开始文章的正文内容,无需在开头重复标题。"""
# 3. 调用DeepSeek API headers = { 'Authorization': f'Bearer {DEEPSEEK_API_KEY}', 'Content-Type': 'application/json' } payload = { "model": "deepseek-chat", "messages": [{"role": "user", "content": prompt}], "temperature": 0.7, # 控制创造性,技术文章可调低 "max_tokens": 4000 }
response = requests.post(DEEPSEEK_API_URL, headers=headers, json=payload, timeout=120) response.raise_for_status() # 检查HTTP错误 result = response.json()
# 4. 提取生成的文本并更新数据库 article_content = result['choices'][0]['message']['content']
task.ai_raw_content = article_content task.status = 'review' # 更新为“待审核”状态 db.commit()
print(f"任务 {task_id} 文章生成完成,状态已更新为‘待审核’") return {"status": "success", "task_id": task_id}
except requests.exceptions.RequestException as e: print(f"API调用失败: {e}") if db and task: task.status = 'failed' db.commit() return {"status": "failed", "reason": f"API Error: {e}"} except Exception as e: print(f"任务处理发生未知错误: {e}") return {"status": "failed", "reason": str(e)} finally: db.close()
@app.task(bind=True)def publish_article(self, task_id, platform_names): """ 发布任务:将审核通过的文章发布到指定的多个平台。 platform_names: 列表,如 ['joomla', 'wechat_mp'] """ # 此函数将在实现 platform_publishers 模块后完善 print(f"待实现: 发布任务 {task_id} 到平台 {platform_names}") # 简单逻辑示例: # 1. 从数据库获取任务最终内容 (task.final_content) # 2. 遍历 platform_names # 3. 对每个平台,从 PublishPlatform 表读取 config # 4. 调用对应的发布函数 (如 publish_to_joomla) # 5. 将结果记录到 PublishLog 表 return {"status": "pending_implementation"}
🌐 第二步:创建用户界面 - 构建 app.py
这个文件提供Web界面,是用户与自动化系统交互的入口。
1. 创建 app.py 文件
# ~/ContentAutomationHub/app.pyfrom flask import Flask, render_template, request, jsonify, redirect, url_forfrom models import SessionLocal, Taskfrom tasks import generate_article, publish_articleimport os
app = Flask(__name__)app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev-secret-key-change-in-production')
# 首页 - 展示任务列表和提交表单@app.route('/')def index(): db = SessionLocal() tasks = db.query(Task).order_by(Task.created_at.desc()).limit(50).all() db.close() # 首次运行需要创建 templates/index.html 文件(见下文) return render_template('index.html', tasks=tasks)
# API端点:提交新选题@app.route('/api/task', methods=['POST'])def create_task(): data = request.json if not data or not data.get('title'): return jsonify({'error': '标题不能为空'}), 400
db = SessionLocal() try: new_task = Task( title=data['title'], keywords=data.get('keywords', ''), status='pending' ) db.add(new_task) db.commit() db.refresh(new_task)
# 关键:异步调用Celery任务 # 这将触发后台的 generate_article 函数 generate_article.delay(new_task.id)
return jsonify({ 'message': '任务创建成功,文章生成中...', 'task_id': new_task.id, 'status': new_task.status }), 201 except Exception as e: db.rollback() return jsonify({'error': str(e)}), 500 finally: db.close()
# API端点:审核通过并触发发布@app.route('/api/task/<int:task_id>/publish', methods=['POST'])def approve_and_publish(task_id): data = request.json db = SessionLocal() try: task = db.query(Task).filter(Task.id == task_id).first() if not task or task.status != 'review': return jsonify({'error': '任务不存在或状态不可发布'}), 400
# 更新最终内容(如果前端有编辑功能) if data and 'final_content' in data: task.final_content = data['final_content'] else: # 若未提供修订版,则使用AI生成的内容 task.final_content = task.ai_raw_content
# 假设发布到以下平台(实际应从请求或配置中获取) platforms_to_publish = data.get('platforms', ['joomla']) task.status = 'publishing' db.commit()
# 异步调用发布任务 publish_article.delay(task_id, platforms_to_publish)
return jsonify({ 'message': f'任务已提交发布,目标平台: {platforms_to_publish}', 'task_id': task_id }) except Exception as e: db.rollback() return jsonify({'error': str(e)}), 500 finally: db.close()
if __name__ == '__main__': # 启动Flask开发服务器 app.run(host='0.0.0.0', port=5000, debug=True)
2. 创建简单的HTML模板 templates/index.html
在项目根目录下创建 templates 文件夹,然后在其中创建 index.html:
mkdir -p ~/ContentAutomationHub/templates
HTML:
<!-- ~/ContentAutomationHub/templates/index.html --><!DOCTYPE html><html><head> <title>内容自动化中心</title> <meta charset="utf-8"> <style> body { font-family: sans-serif; margin: 40px; } .container { max-width: 900px; margin: auto; } .form-group { margin-bottom: 15px; } label { display: block; margin-bottom: 5px; font-weight: bold; } input[type="text"], textarea { width: 100%; padding: 8px; box-sizing: border-box; } button { background: #4CAF50; color: white; padding: 10px 15px; border: none; cursor: pointer; } .task { border: 1px solid #ddd; padding: 15px; margin-top: 20px; border-radius: 5px; } .status { display: inline-block; padding: 3px 8px; border-radius: 3px; font-size: 0.9em; } .pending { background: #ffeb3b; } .generating { background: #2196F3; color: white; } .review { background: #9C27B0; color: white; } .published { background: #4CAF50; color: white; } .failed { background: #f44336; color: white; } </style></head><body> <div class="container"> <h1>📝 内容自动化工作台</h1> <p>提交选题,AI将自动撰写技术文章。</p>
<h2>提交新选题</h2> <form id="taskForm"> <div class="form-group"> <label for="title">文章标题 *</label> <input type="text" id="title" name="title" required placeholder="例如:深入剖析蓝牙Mesh组网协议"> </div> <div class="form-group"> <label for="keywords">关键词(可选,用逗号分隔)</label> <input type="text" id="keywords" name="keywords" placeholder="蓝牙, Mesh, 物联网, 低功耗"> </div> <button type="submit">提交并启动AI写作</button> </form>
<div id="message" style="margin-top:15px;"></div>
<h2>任务列表</h2> <div id="tasks"> {% for task in tasks %} <div class="task"> <h3>{{ task.title }}</h3> <p><strong>ID:</strong> {{ task.id }} | <strong>状态:</strong> <span class="status {{ task.status }}">{{ task.status }}</span> | <strong>创建时间:</strong> {{ task.created_at }} </p> <p><strong>关键词:</strong> {{ task.keywords or '无' }}</p> {% if task.status == 'review' %} <button onclick="approveTask({{ task.id }})">审核通过并发布</button> {% endif %} </div> {% endfor %} </div> </div>
<script> document.getElementById('taskForm').addEventListener('submit', async (e) => { e.preventDefault(); const title = document.getElementById('title').value; const keywords = document.getElementById('keywords').value;
const response = await fetch('/api/task', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ title, keywords }) });
const result = await response.json(); const msgDiv = document.getElementById('message'); if (response.ok) { msgDiv.innerHTML = `
✅ ${result.message} 任务ID: ${result.task_id}</p>`;
❌ 提交失败: ${result.error}</p>`; // 清空表单 e.target.reset(); // 2秒后刷新页面查看新任务 setTimeout(() => location.reload(), 2000); } else { msgDiv.innerHTML = `
} });
async function approveTask(taskId) { if (!confirm(`确定要发布任务 ${taskId} 吗?`)) return; const response = await fetch(`/api/task/${taskId}/publish`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ platforms: ['joomla'] }) // 默认先发布到Joomla }); const result = await response.json(); alert(result.message || result.error); if (response.ok) { setTimeout(() => location.reload(), 1500); } } </script></body></html>
🚀 第三步:如何运行与测试
现在,您已经拥有了完整的初始版本,可以启动系统进行测试了。
1. 启动Redis服务(如果尚未运行)
redis-server --daemonize yes# 或 sudo service redis-server start
2. 在新终端窗口启动Celery Worker
这是执行异步任务(生成文章)的“引擎”。
cd ~/ContentAutomationHubsource venv/bin/activatecelery -A tasks.app worker --loglevel=info
保持此终端窗口运行。
3. 在新终端窗口启动Flask Web应用
这是用户操作的“界面”。
cd ~/ContentAutomationHubsource venv/bin/activate# 设置DeepSeek API密钥环境变量(重要!)export DEEPSEEK_API_KEY="您的实际API密钥"python app.py
4. 进行测试
- 打开浏览器,访问 http://localhost:5000。
- 在表单中填写一个测试标题,例如:“蓝牙5.3的新特性解析”。
- 点击提交。稍等片刻(根据API速度,通常30-60秒),刷新页面。
- 您应该会看到任务状态从 pending -> generating -> review 的变化。
- 在 review 状态下,可以点击 “审核通过并发布” 按钮(目前发布功能是待实现的模拟)。
📌 关于 platform_publishers 模块的说明
这是最后一个需要集成的部分,其核心是为每个平台编写一个适配器函数。由于各平台(Joomla、微信公众号等)的API差异很大,实现一个稳定、通用的发布模块需要针对每个平台进行单独研究和开发。
一个可扩展的架构建议是:
- 在项目根目录创建 platform_publishers/ 目录。
- 在其中为每个平台创建单独的模块,如 joomla_publisher.py, wechat_publisher.py。
- 在每个模块中,编写一个统一的函数,例如 def publish(config, content, title):,内部处理对应平台的所有API调用细节。
- 在 tasks.py 的 publish_article 函数中,动态导入并调用这些平台特定的函数。
由于时间关系,我们实现了最核心的AI生成和Web工作流。我建议您先确保前两步(Celery任务和Flask应用)运行成功。成功后,您可以告诉我最想优先对接哪个发布平台(例如Joomla),我可以为您提供该平台发布模块的具体实现代码。