以下是一个完整的 Att-BiLSTM 适配驾驶行为数据 的 PyTorch 实现脚本。它包含了从模拟数据生成、模型定义、训练到评估的全流程。代码中已针对驾驶行为时间序列(多维特征)进行了修改,并附有详细注释,方便你替换为真实数据(如 Garmin 手表 + OBD 设备采集的 CSV 文件)。

 

"""
驾驶行为风险预测模型 - Att-BiLSTM 实现
数据格式:时间序列,每个样本 shape = (timesteps, features)
任务:二分类(高风险/低风险)或回归(风险概率)
"""

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
import matplotlib.pyplot as plt
import os
import pandas as pd

# 设置随机种子,保证可重复性
def set_seed(seed=42):
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)

set_seed(42)

# 设备配置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# ==================== 1. 数据准备 ====================

def generate_synthetic_driving_data(num_samples=10000, timesteps=60, num_features=20):
"""
生成模拟驾驶行为数据
真实场景应从 CSV 或数据库加载
"""
X = np.random.randn(num_samples, timesteps, num_features).astype(np.float32)

# 生成标签:高风险(1)或低风险(0)
# 这里用一个简单规则:某些特征(如急加速次数)的统计量决定风险
# 例如:计算每个样本在时间维度上的均值,如果第5个特征的均值>0.5,则为高风险
feature_5_mean = X[:, :, 4].mean(axis=1) # 假设第5个特征是“急加速度”
y = (feature_5_mean > 0.5).astype(int) # 二分类标签

return X, y

def load_real_data(csv_path, timesteps=60, feature_cols=None, label_col='label'):
"""
从 CSV 加载真实数据(示例,需根据实际格式调整)
CSV 应包含时间步索引、样本ID、特征列、标签列
"""
df = pd.read_csv(csv_path)
# 假设 CSV 列:'sample_id', 'time_step', 'feature1', 'feature2', ..., 'label'
# 需要按样本ID和时间步整理为 (num_samples, timesteps, num_features)
samples = df.groupby('sample_id')
X_list, y_list = [], []
for name, group in samples:
group = group.sort_values('time_step')
if len(group) >= timesteps:
# 取前 timesteps 个时间步
group = group.iloc[:timesteps]
features = group[feature_cols].values.astype(np.float32)
label = group[label_col].iloc[0]
X_list.append(features)
y_list.append(label)
X = np.stack(X_list)
y = np.array(y_list)
return X, y

# 生成模拟数据
X, y = generate_synthetic_driving_data(num_samples=5000, timesteps=60, num_features=20)
print(f"数据形状: X {X.shape}, y {y.shape}")

# 划分训练集、验证集、测试集
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
print(f"训练集: {X_train.shape}, 验证集: {X_val.shape}, 测试集: {X_test.shape}")

# PyTorch Dataset
class DrivingDataset(Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.long) # 分类用 long,回归用 float

def __len__(self):
return len(self.X)

def __getitem__(self, idx):
return self.X[idx], self.y[idx]

train_dataset = DrivingDataset(X_train, y_train)
val_dataset = DrivingDataset(X_val, y_val)
test_dataset = DrivingDataset(X_test, y_test)

batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# ==================== 2. 模型定义 ====================

class Attention(nn.Module):
"""自定义注意力层"""
def __init__(self, hidden_size):
super(Attention, self).__init__()
self.attention_weights = nn.Linear(hidden_size, 1, bias=False)

def forward(self, lstm_outputs):
# lstm_outputs: (batch, seq_len, hidden_size)
attn_scores = self.attention_weights(lstm_outputs) # (batch, seq_len, 1)
attn_weights = torch.softmax(attn_scores, dim=1) # (batch, seq_len, 1)
context = torch.sum(attn_weights * lstm_outputs, dim=1) # (batch, hidden_size)
return context, attn_weights

class AttBiLSTM(nn.Module):
"""
Att-BiLSTM 模型,适配驾驶行为时间序列
输入: (batch, seq_len, input_size)
输出: (batch, num_classes) 或 (batch, 1) 用于回归
"""
def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout=0.5, bidirectional=True):
super(AttBiLSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.bidirectional = bidirectional

# 移除原始论文中的嵌入层,因为输入已经是数值特征
# 可以加一个线性变换来调整维度(可选)
self.input_proj = nn.Linear(input_size, hidden_size) # 可选,将特征投影到隐藏空间

self.lstm = nn.LSTM(
input_size=hidden_size, # 注意:如果用了投影层,这里就是 hidden_size
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=bidirectional
)

lstm_output_size = hidden_size * 2 if bidirectional else hidden_size
self.attention = Attention(lstm_output_size)

# 分类器
self.classifier = nn.Sequential(
nn.Dropout(dropout),
nn.Linear(lstm_output_size, hidden_size),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size, num_classes)
)

def forward(self, x):
# x: (batch, seq_len, input_size)
x = self.input_proj(x) # (batch, seq_len, hidden_size)

# LSTM
lstm_out, (hidden, cell) = self.lstm(x) # lstm_out: (batch, seq_len, hidden_size*num_directions)

# 注意力
context, attn_weights = self.attention(lstm_out) # context: (batch, hidden_size*num_directions)

# 分类
logits = self.classifier(context) # (batch, num_classes)
return logits

# 模型参数
input_size = X.shape[2] # 特征维度,这里是20
hidden_size = 64
num_layers = 2
num_classes = 2 # 二分类
dropout = 0.3

model = AttBiLSTM(input_size, hidden_size, num_layers, num_classes, dropout).to(device)
print(model)

# 损失函数和优化器
criterion = nn.CrossEntropyLoss() # 分类用交叉熵
# 如果是回归任务,改为 nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)

# ==================== 3. 训练函数 ====================

def train_epoch(model, loader, optimizer, criterion):
model.train()
total_loss = 0
all_preds = []
all_labels = []

for X_batch, y_batch in loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)

optimizer.zero_grad()
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
loss.backward()
optimizer.step()

total_loss += loss.item() * X_batch.size(0)

_, preds = torch.max(outputs, 1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(y_batch.cpu().numpy())

avg_loss = total_loss / len(loader.dataset)
acc = accuracy_score(all_labels, all_preds)
return avg_loss, acc

def evaluate(model, loader, criterion):
model.eval()
total_loss = 0
all_preds = []
all_labels = []
all_probs = []

with torch.no_grad():
for X_batch, y_batch in loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
total_loss += loss.item() * X_batch.size(0)

probs = torch.softmax(outputs, dim=1)
_, preds = torch.max(outputs, 1)

all_preds.extend(preds.cpu().numpy())
all_labels.extend(y_batch.cpu().numpy())
all_probs.extend(probs[:, 1].cpu().numpy()) # 取正类的概率

avg_loss = total_loss / len(loader.dataset)
acc = accuracy_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)
auc = roc_auc_score(all_labels, all_probs)
return avg_loss, acc, f1, auc

# ==================== 4. 训练循环 ====================

num_epochs = 30
best_val_loss = float('inf')
train_losses, val_losses = [], []
train_accs, val_accs = [], []

for epoch in range(num_epochs):
train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion)
val_loss, val_acc, val_f1, val_auc = evaluate(model, val_loader, criterion)

train_losses.append(train_loss)
val_losses.append(val_loss)
train_accs.append(train_acc)
val_accs.append(val_acc)

scheduler.step(val_loss)

# 保存最佳模型
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_att_bilstm_model.pth')
print(f"Epoch {epoch+1}: 保存最佳模型,验证损失 {val_loss:.4f}")

print(f"Epoch {epoch+1}/{num_epochs} | 训练损失: {train_loss:.4f} 训练准确率: {train_acc:.4f} | "
f"验证损失: {val_loss:.4f} 验证准确率: {val_acc:.4f} F1: {val_f1:.4f} AUC: {val_auc:.4f}")

# ==================== 5. 测试集评估 ====================

model.load_state_dict(torch.load('best_att_bilstm_model.pth'))
test_loss, test_acc, test_f1, test_auc = evaluate(model, test_loader, criterion)
print(f"\n测试集结果: 损失: {test_loss:.4f} 准确率: {test_acc:.4f} F1: {test_f1:.4f} AUC: {test_auc:.4f}")

# ==================== 6. 可视化训练曲线 ====================

plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Loss Curves')

plt.subplot(1, 2, 2)
plt.plot(train_accs, label='Train Acc')
plt.plot(val_accs, label='Val Acc')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Accuracy Curves')
plt.tight_layout()
plt.savefig('training_curves.png')
plt.show()

print("训练完成,模型已保存为 best_att_bilstm_model.pth")

使用说明

依赖安装

pip install torch numpy pandas scikit-learn matplotlib

  1. 数据替换
    • 当前使用 generate_synthetic_driving_data() 生成模拟数据。
    • 若要使用真实数据(如 Garmin 手表 + OBD 设备采集的 CSV),请参考 load_real_data() 函数,并调整数据格式。CSV 应包含 sample_id, time_step, 特征列, label。
  2. 模型调整
    • 若任务为回归(预测连续风险值),修改 num_classes=1,损失函数为 nn.MSELoss(),并调整输出层和评估指标。
    • 可根据需要修改 input_size(特征数)、timesteps(时间窗口长度)、hidden_size 等超参数。
  3. 硬件要求:支持 CPU 和 GPU,自动检测。
  4. 输出
    • 最佳模型权重保存为 best_att_bilstm_model.pth
    • 训练曲线图保存为 training_curves.png

关键修改说明(相比原文本 Att-BiLSTM)

原文本模型 本驾驶行为模型
输入为词索引,需嵌入层 输入为数值特征,移除了嵌入层,替换为可选的线性投影层 input_proj
嵌入层维度由词表大小决定 input_size 由特征数量决定(如 20)
输出类别为文本分类 输出为二分类(高风险/低风险)
数据处理需分词、填充 数据为固定长度时间序列,直接张量化

应用测试

python Att-BiLSTM.py

1. 这个应用是在训练模型吗?哪个是模型?

是的,你正在训练一个深度学习模型。
模型就是代码中定义的 AttBiLSTM 类,它是一个带有注意力机制的双向LSTM网络,专门用于驾驶行为时间序列的分类(高风险/低风险)。

模型结构

  • 输入层:接受形状为 (batch, timesteps, features) 的数据(你生成的数据是 (5000, 60, 20),即5000个样本,每个样本60个时间步,每个时间步20个特征)。
  • 特征投影:input_proj 将20维特征映射到64维隐藏空间。
  • 双向LSTM:2层,每层输出维度64,双向后得到128维。
  • 注意力层:对时间步加权,提取关键帧。
  • 分类器:全连接层输出2类(高风险/低风险)。

训练过程

你看到的每个epoch输出:

  • 训练损失和准确率(已接近1.0,说明模型在训练集上几乎完美拟合)。
  • 验证损失为0.0000,准确率1.0000,但F1=0.0000、AUC=nan。这是因为验证集中所有样本的真实标签可能只有一类(全是0或全是1),导致二分类指标无法计算。这通常是数据划分不平衡或生成规则过于简单造成的。

警告可以忽略,后面我们会用真实数据解决。

2. 如何用实际数据来训练模型?

你需要将代码中的数据加载部分替换为真实数据(例如Garmin手表 + OBD设备采集的数据)。以下是详细步骤:

2.1 数据格式要求

模型期望的输入是三维张量:(样本数, 时间步数, 特征数)。
你需要准备一个CSV文件(或多个文件),包含以下字段:

  • 样本ID:标识每一次行程或每一个驾驶员的一段连续时间窗口。
  • 时间步:每个时间点的顺序索引(例如秒级)。
  • 特征列:OBD数据和手表数据的融合特征。建议至少包含:
    • OBD特征:车速、加速度(x/y/z)、急加速次数、急刹车次数、急转弯次数、发动机转速、油门踏板位置、刹车踏板状态等。
    • 手表特征:心率、心率变异性(HRV)、压力指数、身体能量、睡眠评分(如果采集的是上车前状态)等。
  • 标签列:本次行程是否有事故发生(0/1)或风险等级(0/1/2)。如果没有真实事故标签,可以用驾驶行为评分(如基于急加减速次数计算的风险分数)作为替代。

2.2 数据预处理脚本

修改代码中的 load_real_data() 函数,使其读取你的CSV文件并转换为模型所需的格式。

以下是一个更完整的真实数据加载示例(假设你的CSV结构如下):

sample_id time_step speed accel_x accel_y ... heart_rate hrv label
1 0 0 0.1 0.0 ... 72 45 0
1 1 5 0.2 0.1 ... 73 46 0
... ... ... ... ... ... ... ... ...
2 0 10 -0.1 0.3 ... 80 30 1

def load_real_data(csv_path, timesteps=60, feature_cols=None, label_col='label', sample_id_col='sample_id', time_col='time_step'):
"""
从CSV加载真实驾驶数据
- timesteps: 每个样本截取的时间步数(固定窗口)
- feature_cols: 要使用的特征列名列表
"""
df = pd.read_csv(csv_path)
samples = df.groupby(sample_id_col)
X_list, y_list = [], []

for sid, group in samples:
group = group.sort_values(time_col)
# 如果总时间步数不足 timesteps,跳过或填充
if len(group) < timesteps:
continue
# 均匀采样或截取前 timesteps 个时间步(可根据需要选择)
group = group.iloc[:timesteps]
features = group[feature_cols].values.astype(np.float32)
label = group[label_col].iloc[0] # 假设该样本只有一个标签(比如该次行程是否出险)
X_list.append(features)
y_list.append(label)

X = np.stack(X_list)
y = np.array(y_list)
return X, y

2.3 使用示例

feature_cols = ['speed', 'accel_x', 'accel_y', 'accel_z', 'heart_rate', 'hrv']
X, y = load_real_data('your_data.csv', timesteps=60, feature_cols=feature_cols)
print(f"数据形状: {X.shape}, 类别分布: {np.bincount(y)}")

然后继续使用 train_test_split 划分数据。

2.4 注意事项

  • 时间窗口长度(timesteps)可根据你的采样频率调整(例如5秒窗口,每秒10条数据,则 timesteps=50)。
  • 特征归一化:建议对每个特征进行Z-score归一化(减去均值除以标准差),可用 sklearn.preprocessing.StandardScaler 对训练集拟合后,再转换验证/测试集。
  • 类别不平衡:如果真实数据中高风险样本很少,需要采用重采样(过采样少数类或欠采样多数类)或调整损失函数权重。

3. 能否直接对接实时数据来管理车辆?

当然可以! 训练好的模型可以部署到实时流处理管道中,对车辆的实时数据进行风险预测,并及时干预(如向驾驶员发送警报)。

3.1 实时系统架构建议

OBD设备 + 智能手表 → 数据采集终端(如4G/5G盒子) → 消息队列(Kafka/MQTT) → 流处理引擎(Flink/Spark Streaming) → 模型推理服务 → 预警系统

3.2 模型部署方式

  • 在线推理API:使用Flask/FastAPI将模型封装成REST API,实时数据通过HTTP POST发送,返回风险概率。
  • 边缘计算:将模型量化后部署到车机端或采集终端(如使用TensorFlow Lite或ONNX Runtime),实现本地实时推理,减少网络延迟。

3.3 实时预警逻辑

  • 每秒或每帧接收OBD和手表数据,累积到固定窗口长度(如60个时间步)后送入模型。
  • 如果预测概率超过阈值(如0.7),触发报警:通过APP推送、语音提示或通知车队管理员。
  • 同时可将结果反馈给保险公司,用于动态调整保费。

3.4 代码示例(简单的Flask API)

from flask import Flask, request, jsonify
import numpy as np
import torch

app = Flask(__name__)
model = AttBiLSTM(input_size=20, hidden_size=64, num_layers=2, num_classes=2)
model.load_state_dict(torch.load('best_att_bilstm_model.pth'))
model.eval()

@app.route('/predict', methods=['POST'])
def predict():
data = request.get_json()
# 假设data包含一个窗口数据,形状为 (60, 20)
window = np.array(data['window'], dtype=np.float32).reshape(1, 60, 20)
with torch.no_grad():
inputs = torch.tensor(window)
outputs = model(inputs)
probs = torch.softmax(outputs, dim=1)
risk = probs[0, 1].item()
return jsonify({'risk_probability': risk})

if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)

4. 能否对接车辆监控视频并分析?

可以,但这属于计算机视觉领域,与当前的时间序列模型不同。 你需要引入视频分析模型(如CNN或3D-CNN)来检测驾驶员状态(疲劳、分心等)或路况。

4.1 两种融合方式

  • 方式一:独立模型,结果融合
    视频模型输出驾驶员疲劳程度、注意力分散等指标,作为新特征输入到你的Att-BiLSTM中(与手表数据类似)。
  • 方式二:多模态融合
    将视频特征(如CNN提取的帧特征)与时间序列特征拼接,输入到一个更大的融合模型。

4.2 视频数据集成到现有架构

假设你已经有一个视频模型(如使用OpenCV + 预训练模型检测眨眼频率、头部姿态),可以实时输出“疲劳指数”(0-1),然后将其作为一个额外特征(和OBD、手表数据一起)喂给Att-BiLSTM。

4.3 视频分析的简单实现思路(示例)

import cv2
import mediapipe as mp # 用于人脸关键点检测

def estimate_drowsiness(frame):
# 用MediaPipe检测眼睛纵横比(EAR),判断闭眼程度
# 返回疲劳分数(0-1)
...
return fatigue_score

# 在主循环中,每帧调用此函数,将fatigue_score加入特征向量

4.4 注意事项

  • 计算资源:视频分析通常需要GPU,若部署在车机端需考虑算力。
  • 实时性:视频处理帧率需与时间序列采样率匹配。

下一步建议

  1. 收集真实数据:联系Garmin或高新兴获取开发套件,采集少量样本测试数据流程。
  2. 改进数据划分:确保验证集和测试集中有足够多样的样本(包括正负例)。
  3. 模型调优:针对真实数据可能存在的类别不平衡,使用 class_weight 或Focal Loss。
  4. 部署试点:先在小规模车队中测试实时预警系统,验证模型效果和业务价值(降低事故率)。

 


登陆