Support us and view this ad

可选:点击以支持我们的网站

免费文章

设计一个可扩展的系统来管理多平台配置,以及如何将之前讨论的Flask、Celery架构与数据库结合起来。

 

设计一个既能满足不同平台特性,又保持一致的JSON结构。微信公众号和小红书是典型代表,它们的认证机制(一个用Access Token,一个可能是OAuth2或Cookie)和内容格式要求都不同。config结构应该包含通用字段(如api_url)和平台专用字段(如appid、secret),还可以考虑添加发布参数模板(如文章默认分类)。这样设计后,他的系统未来接入新平台时,只需要增加新的平台配置记录,而不需要修改数据库表结构。

编写连接数据库、定义 SQLAlchemy 模型和实现核心任务逻辑的代码,提供完整的SQLAlchemy模型定义,这能让用户用Python对象方式操作数据库,比直接写SQL更安全便捷。同时,连接数据库的配置要强调从环境变量读取敏感信息,这是基本的安全实践。

提供一些实用示例,比如如何插入一个平台配置、如何查询待处理任务,这样用户能立即看到效果。最后:定义Celery任务来调用AI和发布内容。

------------------------

分两步走:首先设计多平台的配置结构,然后回到Python环境实现数据层和核心逻辑。

第一部分:多平台 config JSON结构设计

config字段的设计核心是 “通用参数标准化,平台参数专用化”。以下是为常见平台设计的JSON结构示例,您可以直接用于初始化数据库。

-- 在创建 publish_platform 表后,插入以下示例配置(请务必替换为您的真实数据)
INSERT INTO publish_platform (name, display_name, config) VALUES
(
'joomla',
'Joomla 4 网站',
JSON_OBJECT(
'api_url', 'https://您的网站域名/api/index.php/v1/articles',
'api_key', '您的_Joomla_API_Token',
'category_id', 1,
'default_state', 1,
'content_type', 'html'
)
),
(
'wechat_mp',
'微信公众号',
JSON_OBJECT(
'appid', '您的微信公众号AppID',
'secret', '您的微信公众号AppSecret',
-- 注意:access_token通常通过appid和secret动态获取,不应长期存储。
-- 这里可以存储一个获取token的临时字段,或由程序缓存。
'token_cache_key', 'wechat_access_token',
'post_url', 'https://api.weixin.qq.com/cgi-bin/material/add_news'
)
),
(
'xiaohongshu',
'小红书',
-- 小红书官方API未完全开放,此处模拟常见方案(如Cookie/Token方式)
JSON_OBJECT(
'api_endpoint', 'https://creator.xiaohongshu.com/api/...',
'session_token', '您的登录态Token',
'cookie_string', '您的完整Cookie字符串(如使用Cookie方案)',
'default_note_type', 'normal'
)
),
(
'toutiao',
'今日头条(头条号)',
JSON_OBJECT(
'access_token', '从开放平台获取的Access Token',
'refresh_token', '用于刷新access_token的refresh_token',
'publish_url', 'https://open.snssdk.com/toutiao/v1/article/publish/'
)
),
(
'weibo',
'新浪微博',
JSON_OBJECT(
'access_token', 'OAuth2.0的Access Token',
'api_url', 'https://api.weibo.com/2/statuses/share.json',
'security_url', 'https://api.weibo.com/2/statuses/upload_url_text.json'
)
);

关键设计说明:

  1. joomla:配置最直接,主要是接口地址和密钥。
  2. wechat_mp:最复杂。绝对不要将动态的 access_token 直接永久存入数据库。标准做法是:程序用 appid 和 secret 获取 access_token 后,在内存或Redis中缓存(通常2小时过期)。token_cache_key 字段指明了缓存键名。
  3. xiaohongshu:由于官方API限制,目前很多方案依赖 Cookie 或 Session。这种方式极其脆弱(Cookie会过期),且存在封号风险,仅作技术探讨。
  4. 通用安全警告:所有 secret、token、cookie 都是最高机密。最佳实践是在生产环境中,这些值不应明文写在数据库里,而应从 环境变量 或 专门的密钥管理服务(如Vault) 中读取,config 字段可以只存储一个密钥的标识符

第二部分:Python环境代码实现

现在,我们创建核心的Python模块。请确保您仍在虚拟环境 (venv) 中,并位于项目目录 ~/ContentAutomationHub。

1. 安装必要的Python包

pip install pymysql sqlalchemy celery redis flask flask-cors

2. 创建数据库模型与连接 (models.py)

这个文件负责定义数据表对应的Python类,并建立数据库连接。

# ~/ContentAutomationHub/models.py

import os
from sqlalchemy import create_engine, Column, Integer, String, Text, Boolean, Enum, TIMESTAMP, JSON, ForeignKey
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.sql import func
from datetime import datetime

# 1. 定义基类
Base = declarative_base()

# 2. 定义数据表对应的类
class Task(Base):
"""对应 task 表"""
__tablename__ = 'task'

id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(255), nullable=False)
keywords = Column(Text)
status = Column(Enum('pending', 'generating', 'review', 'publishing', 'published', 'failed'),
default='pending', nullable=False)
ai_raw_content = Column(Text) # 使用Text,足够存储长文章
final_content = Column(Text)
created_at = Column(TIMESTAMP, server_default=func.now())
updated_at = Column(TIMESTAMP, server_default=func.now(), onupdate=func.now())

# 建立与发布日志的一对多关系(方便查询)
publish_logs = relationship("PublishLog", back_populates="task")

class PublishPlatform(Base):
"""对应 publish_platform 表"""
__tablename__ = 'publish_platform'

id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), unique=True, nullable=False)
display_name = Column(String(100))
config = Column(JSON) # SQLAlchemy会自动处理JSON的序列化和反序列化
is_active = Column(Boolean, default=True)
created_at = Column(TIMESTAMP, server_default=func.now())
updated_at = Column(TIMESTAMP, server_default=func.now(), onupdate=func.now())

# 建立与发布日志的一对多关系
publish_logs = relationship("PublishLog", back_populates="platform")

class PublishLog(Base):
"""对应 publish_log 表"""
__tablename__ = 'publish_log'

id = Column(Integer, primary_key=True, autoincrement=True)
task_id = Column(Integer, ForeignKey('task.id', ondelete='CASCADE'), nullable=False)
platform_id = Column(Integer, ForeignKey('publish_platform.id', ondelete='CASCADE'), nullable=False)
status = Column(Enum('success', 'failed'), nullable=False)
message = Column(String(500))
response = Column(Text)
published_at = Column(TIMESTAMP, server_default=func.now())

# 定义关系,方便通过 log.task 或 log.platform 直接访问对象
task = relationship("Task", back_populates="publish_logs")
platform = relationship("PublishPlatform", back_populates="publish_logs")

# 3. 创建数据库连接引擎(核心!)
# 警告:密码应从环境变量读取,切勿硬编码!
DATABASE_URL = "mysql+pymysql://root:YourRootPassword@localhost/content_hub?charset=utf8mb4"
# 建议使用环境变量,例如:
# import os
# DATABASE_URL = f"mysql+pymysql://{os.getenv('DB_USER')}:{os.getenv('DB_PASS')}@{os.getenv('DB_HOST')}/{os.getenv('DB_NAME')}?charset=utf8mb4"

engine = create_engine(DATABASE_URL, echo=False) # echo=True 会打印所有SQL,调试时有用

# 4. 创建所有表(如果表不存在,此操作会创建它们;已存在则无影响。)
Base.metadata.create_all(bind=engine)

# 5. 创建会话工厂,用于后续所有数据库操作
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def get_db():
"""
依赖注入函数,用于在路由中获取数据库会话。
使用示例:在FastAPI/Flask路由中调用 `db = get_db()`
"""
db = SessionLocal()
try:
yield db
finally:
db.close()

3. 创建数据库操作示例脚本 (db_demo.py)

这个脚本演示如何插入、查询和更新数据,请运行它来测试您的数据库连接和模型是否正常工作。

# ~/ContentAutomationHub/db_demo.py
from models import SessionLocal, Task, PublishPlatform, PublishLog

def demo_crud():
# 获取一个数据库会话
db = SessionLocal()

try:
print("=== 1. 插入一个新任务 ===")
new_task = Task(
title="测试蓝牙Mesh组网技术",
keywords="蓝牙, Mesh, 物联网, 智能家居"
)
db.add(new_task)
db.commit() # 提交,让数据库生成ID
db.refresh(new_task) # 从数据库刷新,获取id等默认值
print(f"创建任务成功!任务ID: {new_task.id}, 状态: {new_task.status}")

print("\n=== 2. 查询所有启用中的发布平台 ===")
active_platforms = db.query(PublishPlatform).filter(PublishPlatform.is_active == True).all()
for platform in active_platforms:
print(f"平台: {platform.display_name} ({platform.name})")
print(f"配置: {platform.config}\n")

print("=== 3. 模拟AI生成内容并更新任务 ===")
# 假设这是从DeepSeek API返回的内容
ai_generated_content = "# 这是AI生成的关于蓝牙Mesh的测试文章内容...\n\n详细内容..."
new_task.ai_raw_content = ai_generated_content
new_task.status = 'review'
db.commit()
print(f"任务 {new_task.id} 已更新为待审核状态。")

print("\n=== 4. 插入一条模拟的发布日志 ===")
if active_platforms:
sample_platform = active_platforms[0]
new_log = PublishLog(
task_id=new_task.id,
platform_id=sample_platform.id,
status='success',
message='测试发布成功',
response='{"code": 0, "msg": "ok"}'
)
db.add(new_log)
db.commit()
print(f"已为任务 {new_task.id} 创建 {sample_platform.display_name} 的发布日志。")

except Exception as e:
print(f"操作出错: {e}")
db.rollback()
finally:
db.close()
print("\n=== 数据库会话已关闭 ===")

if __name__ == "__main__":
demo_crud()

运行测试:

# 在项目根目录下执行
python db_demo.py

如果一切顺利,您将看到成功的操作输出。这验证了:1)数据库连接正常;2)模型定义正确;3)基本CRUD操作无误。

🚀 下一步:衔接核心任务逻辑

至此,数据层已经完全就绪。接下来的步骤是:

  1. 修改 tasks.py:在其中导入 SessionLocal 和 Task 模型,重写 generate_article 函数,使其从数据库获取任务、调用DeepSeek API、再将结果更新回数据库。
  2. 创建 app.py 路由:提供Web界面,让用户提交选题(即向 task 表插入新记录)和触发审核发布。
  3. 实现 platform_publishers 模块:为每个平台(joomla, wechat_mp等)编写具体的发布函数,从 PublishPlatform.config 中读取配置并调用对应API。

您已经成功搭建了项目的坚实骨架。请先运行 python db_demo.py 验证基础数据操作。成功后,我们可以选择任何一个方向(例如“实现调用DeepSeek API的Celery任务”或“创建Flask提交选题的路由”)进行深化。