设计一个可扩展的系统来管理多平台配置,以及如何将之前讨论的Flask、Celery架构与数据库结合起来。
设计一个既能满足不同平台特性,又保持一致的JSON结构。微信公众号和小红书是典型代表,它们的认证机制(一个用Access Token,一个可能是OAuth2或Cookie)和内容格式要求都不同。config结构应该包含通用字段(如api_url)和平台专用字段(如appid、secret),还可以考虑添加发布参数模板(如文章默认分类)。这样设计后,他的系统未来接入新平台时,只需要增加新的平台配置记录,而不需要修改数据库表结构。
编写连接数据库、定义 SQLAlchemy 模型和实现核心任务逻辑的代码,提供完整的SQLAlchemy模型定义,这能让用户用Python对象方式操作数据库,比直接写SQL更安全便捷。同时,连接数据库的配置要强调从环境变量读取敏感信息,这是基本的安全实践。
提供一些实用示例,比如如何插入一个平台配置、如何查询待处理任务,这样用户能立即看到效果。最后:定义Celery任务来调用AI和发布内容。
------------------------
分两步走:首先设计多平台的配置结构,然后回到Python环境实现数据层和核心逻辑。
第一部分:多平台 config JSON结构设计
config字段的设计核心是 “通用参数标准化,平台参数专用化”。以下是为常见平台设计的JSON结构示例,您可以直接用于初始化数据库。
-- 在创建 publish_platform 表后,插入以下示例配置(请务必替换为您的真实数据)INSERT INTO publish_platform (name, display_name, config) VALUES( 'joomla', 'Joomla 4 网站', JSON_OBJECT( 'api_url', 'https://您的网站域名/api/index.php/v1/articles', 'api_key', '您的_Joomla_API_Token', 'category_id', 1, 'default_state', 1, 'content_type', 'html' )),( 'wechat_mp', '微信公众号', JSON_OBJECT( 'appid', '您的微信公众号AppID', 'secret', '您的微信公众号AppSecret', -- 注意:access_token通常通过appid和secret动态获取,不应长期存储。 -- 这里可以存储一个获取token的临时字段,或由程序缓存。 'token_cache_key', 'wechat_access_token', 'post_url', 'https://api.weixin.qq.com/cgi-bin/material/add_news' )),( 'xiaohongshu', '小红书', -- 小红书官方API未完全开放,此处模拟常见方案(如Cookie/Token方式) JSON_OBJECT( 'api_endpoint', 'https://creator.xiaohongshu.com/api/...', 'session_token', '您的登录态Token', 'cookie_string', '您的完整Cookie字符串(如使用Cookie方案)', 'default_note_type', 'normal' )),( 'toutiao', '今日头条(头条号)', JSON_OBJECT( 'access_token', '从开放平台获取的Access Token', 'refresh_token', '用于刷新access_token的refresh_token', 'publish_url', 'https://open.snssdk.com/toutiao/v1/article/publish/' )),( 'weibo', '新浪微博', JSON_OBJECT( 'access_token', 'OAuth2.0的Access Token', 'api_url', 'https://api.weibo.com/2/statuses/share.json', 'security_url', 'https://api.weibo.com/2/statuses/upload_url_text.json' ));
关键设计说明:
- joomla:配置最直接,主要是接口地址和密钥。
- wechat_mp:最复杂。绝对不要将动态的 access_token 直接永久存入数据库。标准做法是:程序用 appid 和 secret 获取 access_token 后,在内存或Redis中缓存(通常2小时过期)。token_cache_key 字段指明了缓存键名。
- xiaohongshu:由于官方API限制,目前很多方案依赖 Cookie 或 Session。这种方式极其脆弱(Cookie会过期),且存在封号风险,仅作技术探讨。
- 通用安全警告:所有 secret、token、cookie 都是最高机密。最佳实践是在生产环境中,这些值不应明文写在数据库里,而应从 环境变量 或 专门的密钥管理服务(如Vault) 中读取,config 字段可以只存储一个密钥的标识符。
第二部分:Python环境代码实现
现在,我们创建核心的Python模块。请确保您仍在虚拟环境 (venv) 中,并位于项目目录 ~/ContentAutomationHub。
1. 安装必要的Python包
pip install pymysql sqlalchemy celery redis flask flask-cors
2. 创建数据库模型与连接 (models.py)
这个文件负责定义数据表对应的Python类,并建立数据库连接。
# ~/ContentAutomationHub/models.py
import osfrom sqlalchemy import create_engine, Column, Integer, String, Text, Boolean, Enum, TIMESTAMP, JSON, ForeignKeyfrom sqlalchemy.orm import declarative_basefrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy.sql import funcfrom datetime import datetime
# 1. 定义基类Base = declarative_base()
# 2. 定义数据表对应的类class Task(Base): """对应 task 表""" __tablename__ = 'task'
id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(255), nullable=False) keywords = Column(Text) status = Column(Enum('pending', 'generating', 'review', 'publishing', 'published', 'failed'), default='pending', nullable=False) ai_raw_content = Column(Text) # 使用Text,足够存储长文章 final_content = Column(Text) created_at = Column(TIMESTAMP, server_default=func.now()) updated_at = Column(TIMESTAMP, server_default=func.now(), onupdate=func.now())
# 建立与发布日志的一对多关系(方便查询) publish_logs = relationship("PublishLog", back_populates="task")
class PublishPlatform(Base): """对应 publish_platform 表""" __tablename__ = 'publish_platform'
id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(50), unique=True, nullable=False) display_name = Column(String(100)) config = Column(JSON) # SQLAlchemy会自动处理JSON的序列化和反序列化 is_active = Column(Boolean, default=True) created_at = Column(TIMESTAMP, server_default=func.now()) updated_at = Column(TIMESTAMP, server_default=func.now(), onupdate=func.now())
# 建立与发布日志的一对多关系 publish_logs = relationship("PublishLog", back_populates="platform")
class PublishLog(Base): """对应 publish_log 表""" __tablename__ = 'publish_log'
id = Column(Integer, primary_key=True, autoincrement=True) task_id = Column(Integer, ForeignKey('task.id', ondelete='CASCADE'), nullable=False) platform_id = Column(Integer, ForeignKey('publish_platform.id', ondelete='CASCADE'), nullable=False) status = Column(Enum('success', 'failed'), nullable=False) message = Column(String(500)) response = Column(Text) published_at = Column(TIMESTAMP, server_default=func.now())
# 定义关系,方便通过 log.task 或 log.platform 直接访问对象 task = relationship("Task", back_populates="publish_logs") platform = relationship("PublishPlatform", back_populates="publish_logs")
# 3. 创建数据库连接引擎(核心!)# 警告:密码应从环境变量读取,切勿硬编码!DATABASE_URL = "mysql+pymysql://root:YourRootPassword@localhost/content_hub?charset=utf8mb4"# 建议使用环境变量,例如:# import os# DATABASE_URL = f"mysql+pymysql://{os.getenv('DB_USER')}:{os.getenv('DB_PASS')}@{os.getenv('DB_HOST')}/{os.getenv('DB_NAME')}?charset=utf8mb4"
engine = create_engine(DATABASE_URL, echo=False) # echo=True 会打印所有SQL,调试时有用
# 4. 创建所有表(如果表不存在,此操作会创建它们;已存在则无影响。)Base.metadata.create_all(bind=engine)
# 5. 创建会话工厂,用于后续所有数据库操作SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db(): """ 依赖注入函数,用于在路由中获取数据库会话。 使用示例:在FastAPI/Flask路由中调用 `db = get_db()` """ db = SessionLocal() try: yield db finally: db.close()
3. 创建数据库操作示例脚本 (db_demo.py)
这个脚本演示如何插入、查询和更新数据,请运行它来测试您的数据库连接和模型是否正常工作。
# ~/ContentAutomationHub/db_demo.py
from models import SessionLocal, Task, PublishPlatform, PublishLog
def demo_crud():
# 获取一个数据库会话
db = SessionLocal()
try:
print("=== 1. 插入一个新任务 ===")
new_task = Task(
title="测试蓝牙Mesh组网技术",
keywords="蓝牙, Mesh, 物联网, 智能家居"
)
db.add(new_task)
db.commit() # 提交,让数据库生成ID
db.refresh(new_task) # 从数据库刷新,获取id等默认值
print(f"创建任务成功!任务ID: {new_task.id}, 状态: {new_task.status}")
print("\n=== 2. 查询所有启用中的发布平台 ===")
active_platforms = db.query(PublishPlatform).filter(PublishPlatform.is_active == True).all()
for platform in active_platforms:
print(f"平台: {platform.display_name} ({platform.name})")
print(f"配置: {platform.config}\n")
print("=== 3. 模拟AI生成内容并更新任务 ===")
# 假设这是从DeepSeek API返回的内容
ai_generated_content = "# 这是AI生成的关于蓝牙Mesh的测试文章内容...\n\n详细内容..."
new_task.ai_raw_content = ai_generated_content
new_task.status = 'review'
db.commit()
print(f"任务 {new_task.id} 已更新为待审核状态。")
print("\n=== 4. 插入一条模拟的发布日志 ===")
if active_platforms:
sample_platform = active_platforms[0]
new_log = PublishLog(
task_id=new_task.id,
platform_id=sample_platform.id,
status='success',
message='测试发布成功',
response='{"code": 0, "msg": "ok"}'
)
db.add(new_log)
db.commit()
print(f"已为任务 {new_task.id} 创建 {sample_platform.display_name} 的发布日志。")
except Exception as e:
print(f"操作出错: {e}")
db.rollback()
finally:
db.close()
print("\n=== 数据库会话已关闭 ===")
if __name__ == "__main__":
demo_crud()
运行测试:
# 在项目根目录下执行python db_demo.py
如果一切顺利,您将看到成功的操作输出。这验证了:1)数据库连接正常;2)模型定义正确;3)基本CRUD操作无误。
🚀 下一步:衔接核心任务逻辑
至此,数据层已经完全就绪。接下来的步骤是:
- 修改 tasks.py:在其中导入 SessionLocal 和 Task 模型,重写 generate_article 函数,使其从数据库获取任务、调用DeepSeek API、再将结果更新回数据库。
- 创建 app.py 路由:提供Web界面,让用户提交选题(即向 task 表插入新记录)和触发审核发布。
- 实现 platform_publishers 模块:为每个平台(joomla, wechat_mp等)编写具体的发布函数,从 PublishPlatform.config 中读取配置并调用对应API。
您已经成功搭建了项目的坚实骨架。请先运行 python db_demo.py 验证基础数据操作。成功后,我们可以选择任何一个方向(例如“实现调用DeepSeek API的Celery任务”或“创建Flask提交选题的路由”)进行深化。