继续阅读完整内容
支持我们的网站,请点击查看下方广告
以下是一个完整的 Att-BiLSTM 适配驾驶行为数据 的 PyTorch 实现脚本。它包含了从模拟数据生成、模型定义、训练到评估的全流程。代码中已针对驾驶行为时间序列(多维特征)进行了修改,并附有详细注释,方便你替换为真实数据(如 Garmin 手表 + OBD 设备采集的 CSV 文件)。
"""驾驶行为风险预测模型 - Att-BiLSTM 实现数据格式:时间序列,每个样本 shape = (timesteps, features)任务:二分类(高风险/低风险)或回归(风险概率)"""
import numpy as npimport torchimport torch.nn as nnimport torch.optim as optimfrom torch.utils.data import DataLoader, Datasetfrom sklearn.model_selection import train_test_splitfrom sklearn.metrics import accuracy_score, f1_score, roc_auc_scoreimport matplotlib.pyplot as pltimport osimport pandas as pd
# 设置随机种子,保证可重复性def set_seed(seed=42):np.random.seed(seed)torch.manual_seed(seed)if torch.cuda.is_available():torch.cuda.manual_seed_all(seed)
set_seed(42)
# 设备配置device = torch.device("cuda" if torch.cuda.is_available() else "cpu")print(f"Using device: {device}")
# ==================== 1. 数据准备 ====================
def generate_synthetic_driving_data(num_samples=10000, timesteps=60, num_features=20):"""生成模拟驾驶行为数据真实场景应从 CSV 或数据库加载"""X = np.random.randn(num_samples, timesteps, num_features).astype(np.float32)# 生成标签:高风险(1)或低风险(0)# 这里用一个简单规则:某些特征(如急加速次数)的统计量决定风险# 例如:计算每个样本在时间维度上的均值,如果第5个特征的均值>0.5,则为高风险feature_5_mean = X[:, :, 4].mean(axis=1) # 假设第5个特征是“急加速度”y = (feature_5_mean > 0.5).astype(int) # 二分类标签return X, y
def load_real_data(csv_path, timesteps=60, feature_cols=None, label_col='label'):"""从 CSV 加载真实数据(示例,需根据实际格式调整)CSV 应包含时间步索引、样本ID、特征列、标签列"""df = pd.read_csv(csv_path)# 假设 CSV 列:'sample_id', 'time_step', 'feature1', 'feature2', ..., 'label'# 需要按样本ID和时间步整理为 (num_samples, timesteps, num_features)samples = df.groupby('sample_id')X_list, y_list = [], []for name, group in samples:group = group.sort_values('time_step')if len(group) >= timesteps:# 取前 timesteps 个时间步group = group.iloc[:timesteps]features = group[feature_cols].values.astype(np.float32)label = group[label_col].iloc[0]X_list.append(features)y_list.append(label)X = np.stack(X_list)y = np.array(y_list)return X, y
# 生成模拟数据X, y = generate_synthetic_driving_data(num_samples=5000, timesteps=60, num_features=20)print(f"数据形状: X {X.shape}, y {y.shape}")
# 划分训练集、验证集、测试集X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)print(f"训练集: {X_train.shape}, 验证集: {X_val.shape}, 测试集: {X_test.shape}")
# PyTorch Datasetclass DrivingDataset(Dataset):def __init__(self, X, y):self.X = torch.tensor(X, dtype=torch.float32)self.y = torch.tensor(y, dtype=torch.long) # 分类用 long,回归用 floatdef __len__(self):return len(self.X)def __getitem__(self, idx):return self.X[idx], self.y[idx]
train_dataset = DrivingDataset(X_train, y_train)val_dataset = DrivingDataset(X_val, y_val)test_dataset = DrivingDataset(X_test, y_test)
batch_size = 64train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# ==================== 2. 模型定义 ====================
class Attention(nn.Module):"""自定义注意力层"""def __init__(self, hidden_size):super(Attention, self).__init__()self.attention_weights = nn.Linear(hidden_size, 1, bias=False)def forward(self, lstm_outputs):# lstm_outputs: (batch, seq_len, hidden_size)attn_scores = self.attention_weights(lstm_outputs) # (batch, seq_len, 1)attn_weights = torch.softmax(attn_scores, dim=1) # (batch, seq_len, 1)context = torch.sum(attn_weights * lstm_outputs, dim=1) # (batch, hidden_size)return context, attn_weights
class AttBiLSTM(nn.Module):"""Att-BiLSTM 模型,适配驾驶行为时间序列输入: (batch, seq_len, input_size)输出: (batch, num_classes) 或 (batch, 1) 用于回归"""def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout=0.5, bidirectional=True):super(AttBiLSTM, self).__init__()self.hidden_size = hidden_sizeself.num_layers = num_layersself.bidirectional = bidirectional# 移除原始论文中的嵌入层,因为输入已经是数值特征# 可以加一个线性变换来调整维度(可选)self.input_proj = nn.Linear(input_size, hidden_size) # 可选,将特征投影到隐藏空间self.lstm = nn.LSTM(input_size=hidden_size, # 注意:如果用了投影层,这里就是 hidden_sizehidden_size=hidden_size,num_layers=num_layers,batch_first=True,dropout=dropout if num_layers > 1 else 0,bidirectional=bidirectional)lstm_output_size = hidden_size * 2 if bidirectional else hidden_sizeself.attention = Attention(lstm_output_size)# 分类器self.classifier = nn.Sequential(nn.Dropout(dropout),nn.Linear(lstm_output_size, hidden_size),nn.ReLU(),nn.Dropout(dropout),nn.Linear(hidden_size, num_classes))def forward(self, x):# x: (batch, seq_len, input_size)x = self.input_proj(x) # (batch, seq_len, hidden_size)# LSTMlstm_out, (hidden, cell) = self.lstm(x) # lstm_out: (batch, seq_len, hidden_size*num_directions)# 注意力context, attn_weights = self.attention(lstm_out) # context: (batch, hidden_size*num_directions)# 分类logits = self.classifier(context) # (batch, num_classes)return logits
# 模型参数input_size = X.shape[2] # 特征维度,这里是20hidden_size = 64num_layers = 2num_classes = 2 # 二分类dropout = 0.3
model = AttBiLSTM(input_size, hidden_size, num_layers, num_classes, dropout).to(device)print(model)
# 损失函数和优化器criterion = nn.CrossEntropyLoss() # 分类用交叉熵# 如果是回归任务,改为 nn.MSELoss()optimizer = optim.Adam(model.parameters(), lr=0.001)scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)
# ==================== 3. 训练函数 ====================
def train_epoch(model, loader, optimizer, criterion):model.train()total_loss = 0all_preds = []all_labels = []for X_batch, y_batch in loader:X_batch, y_batch = X_batch.to(device), y_batch.to(device)optimizer.zero_grad()outputs = model(X_batch)loss = criterion(outputs, y_batch)loss.backward()optimizer.step()total_loss += loss.item() * X_batch.size(0)_, preds = torch.max(outputs, 1)all_preds.extend(preds.cpu().numpy())all_labels.extend(y_batch.cpu().numpy())avg_loss = total_loss / len(loader.dataset)acc = accuracy_score(all_labels, all_preds)return avg_loss, acc
def evaluate(model, loader, criterion):model.eval()total_loss = 0all_preds = []all_labels = []all_probs = []with torch.no_grad():for X_batch, y_batch in loader:X_batch, y_batch = X_batch.to(device), y_batch.to(device)outputs = model(X_batch)loss = criterion(outputs, y_batch)total_loss += loss.item() * X_batch.size(0)probs = torch.softmax(outputs, dim=1)_, preds = torch.max(outputs, 1)all_preds.extend(preds.cpu().numpy())all_labels.extend(y_batch.cpu().numpy())all_probs.extend(probs[:, 1].cpu().numpy()) # 取正类的概率avg_loss = total_loss / len(loader.dataset)acc = accuracy_score(all_labels, all_preds)f1 = f1_score(all_labels, all_preds)auc = roc_auc_score(all_labels, all_probs)return avg_loss, acc, f1, auc
# ==================== 4. 训练循环 ====================
num_epochs = 30best_val_loss = float('inf')train_losses, val_losses = [], []train_accs, val_accs = [], []
for epoch in range(num_epochs):train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion)val_loss, val_acc, val_f1, val_auc = evaluate(model, val_loader, criterion)train_losses.append(train_loss)val_losses.append(val_loss)train_accs.append(train_acc)val_accs.append(val_acc)scheduler.step(val_loss)# 保存最佳模型if val_loss < best_val_loss:best_val_loss = val_losstorch.save(model.state_dict(), 'best_att_bilstm_model.pth')print(f"Epoch {epoch+1}: 保存最佳模型,验证损失 {val_loss:.4f}")print(f"Epoch {epoch+1}/{num_epochs} | 训练损失: {train_loss:.4f} 训练准确率: {train_acc:.4f} | "f"验证损失: {val_loss:.4f} 验证准确率: {val_acc:.4f} F1: {val_f1:.4f} AUC: {val_auc:.4f}")
# ==================== 5. 测试集评估 ====================
model.load_state_dict(torch.load('best_att_bilstm_model.pth'))test_loss, test_acc, test_f1, test_auc = evaluate(model, test_loader, criterion)print(f"\n测试集结果: 损失: {test_loss:.4f} 准确率: {test_acc:.4f} F1: {test_f1:.4f} AUC: {test_auc:.4f}")
# ==================== 6. 可视化训练曲线 ====================
plt.figure(figsize=(12, 4))plt.subplot(1, 2, 1)plt.plot(train_losses, label='Train Loss')plt.plot(val_losses, label='Val Loss')plt.xlabel('Epoch')plt.ylabel('Loss')plt.legend()plt.title('Loss Curves')
plt.subplot(1, 2, 2)plt.plot(train_accs, label='Train Acc')plt.plot(val_accs, label='Val Acc')plt.xlabel('Epoch')plt.ylabel('Accuracy')plt.legend()plt.title('Accuracy Curves')plt.tight_layout()plt.savefig('training_curves.png')plt.show()
print("训练完成,模型已保存为 best_att_bilstm_model.pth")
使用说明
依赖安装:
pip install torch numpy pandas scikit-learn matplotlib
- 数据替换:
- 当前使用 generate_synthetic_driving_data() 生成模拟数据。
- 若要使用真实数据(如 Garmin 手表 + OBD 设备采集的 CSV),请参考 load_real_data() 函数,并调整数据格式。CSV 应包含 sample_id, time_step, 特征列, label。
- 模型调整:
- 若任务为回归(预测连续风险值),修改 num_classes=1,损失函数为 nn.MSELoss(),并调整输出层和评估指标。
- 可根据需要修改 input_size(特征数)、timesteps(时间窗口长度)、hidden_size 等超参数。
- 硬件要求:支持 CPU 和 GPU,自动检测。
- 输出:
- 最佳模型权重保存为 best_att_bilstm_model.pth
- 训练曲线图保存为 training_curves.png
关键修改说明(相比原文本 Att-BiLSTM)
| 原文本模型 | 本驾驶行为模型 |
| 输入为词索引,需嵌入层 | 输入为数值特征,移除了嵌入层,替换为可选的线性投影层 input_proj |
| 嵌入层维度由词表大小决定 | input_size 由特征数量决定(如 20) |
| 输出类别为文本分类 | 输出为二分类(高风险/低风险) |
| 数据处理需分词、填充 | 数据为固定长度时间序列,直接张量化 |
应用测试
python Att-BiLSTM.py
1. 这个应用是在训练模型吗?哪个是模型?
是的,你正在训练一个深度学习模型。
模型就是代码中定义的 AttBiLSTM 类,它是一个带有注意力机制的双向LSTM网络,专门用于驾驶行为时间序列的分类(高风险/低风险)。
模型结构
- 输入层:接受形状为 (batch, timesteps, features) 的数据(你生成的数据是 (5000, 60, 20),即5000个样本,每个样本60个时间步,每个时间步20个特征)。
- 特征投影:input_proj 将20维特征映射到64维隐藏空间。
- 双向LSTM:2层,每层输出维度64,双向后得到128维。
- 注意力层:对时间步加权,提取关键帧。
- 分类器:全连接层输出2类(高风险/低风险)。
训练过程
你看到的每个epoch输出:
- 训练损失和准确率(已接近1.0,说明模型在训练集上几乎完美拟合)。
- 验证损失为0.0000,准确率1.0000,但F1=0.0000、AUC=nan。这是因为验证集中所有样本的真实标签可能只有一类(全是0或全是1),导致二分类指标无法计算。这通常是数据划分不平衡或生成规则过于简单造成的。
警告可以忽略,后面我们会用真实数据解决。
2. 如何用实际数据来训练模型?
你需要将代码中的数据加载部分替换为真实数据(例如Garmin手表 + OBD设备采集的数据)。以下是详细步骤:
2.1 数据格式要求
模型期望的输入是三维张量:(样本数, 时间步数, 特征数)。
你需要准备一个CSV文件(或多个文件),包含以下字段:
- 样本ID:标识每一次行程或每一个驾驶员的一段连续时间窗口。
- 时间步:每个时间点的顺序索引(例如秒级)。
- 特征列:OBD数据和手表数据的融合特征。建议至少包含:
- OBD特征:车速、加速度(x/y/z)、急加速次数、急刹车次数、急转弯次数、发动机转速、油门踏板位置、刹车踏板状态等。
- 手表特征:心率、心率变异性(HRV)、压力指数、身体能量、睡眠评分(如果采集的是上车前状态)等。
- 标签列:本次行程是否有事故发生(0/1)或风险等级(0/1/2)。如果没有真实事故标签,可以用驾驶行为评分(如基于急加减速次数计算的风险分数)作为替代。
2.2 数据预处理脚本
修改代码中的 load_real_data() 函数,使其读取你的CSV文件并转换为模型所需的格式。
以下是一个更完整的真实数据加载示例(假设你的CSV结构如下):
| sample_id | time_step | speed | accel_x | accel_y | ... | heart_rate | hrv | label |
| 1 | 0 | 0 | 0.1 | 0.0 | ... | 72 | 45 | 0 |
| 1 | 1 | 5 | 0.2 | 0.1 | ... | 73 | 46 | 0 |
| ... | ... | ... | ... | ... | ... | ... | ... | ... |
| 2 | 0 | 10 | -0.1 | 0.3 | ... | 80 | 30 | 1 |
def load_real_data(csv_path, timesteps=60, feature_cols=None, label_col='label', sample_id_col='sample_id', time_col='time_step'):"""从CSV加载真实驾驶数据- timesteps: 每个样本截取的时间步数(固定窗口)- feature_cols: 要使用的特征列名列表"""df = pd.read_csv(csv_path)samples = df.groupby(sample_id_col)X_list, y_list = [], []for sid, group in samples:group = group.sort_values(time_col)# 如果总时间步数不足 timesteps,跳过或填充if len(group) < timesteps:continue# 均匀采样或截取前 timesteps 个时间步(可根据需要选择)group = group.iloc[:timesteps]features = group[feature_cols].values.astype(np.float32)label = group[label_col].iloc[0] # 假设该样本只有一个标签(比如该次行程是否出险)X_list.append(features)y_list.append(label)X = np.stack(X_list)y = np.array(y_list)return X, y
2.3 使用示例
feature_cols = ['speed', 'accel_x', 'accel_y', 'accel_z', 'heart_rate', 'hrv']X, y = load_real_data('your_data.csv', timesteps=60, feature_cols=feature_cols)print(f"数据形状: {X.shape}, 类别分布: {np.bincount(y)}")
然后继续使用 train_test_split 划分数据。
2.4 注意事项
- 时间窗口长度(timesteps)可根据你的采样频率调整(例如5秒窗口,每秒10条数据,则 timesteps=50)。
- 特征归一化:建议对每个特征进行Z-score归一化(减去均值除以标准差),可用 sklearn.preprocessing.StandardScaler 对训练集拟合后,再转换验证/测试集。
- 类别不平衡:如果真实数据中高风险样本很少,需要采用重采样(过采样少数类或欠采样多数类)或调整损失函数权重。
3. 能否直接对接实时数据来管理车辆?
当然可以! 训练好的模型可以部署到实时流处理管道中,对车辆的实时数据进行风险预测,并及时干预(如向驾驶员发送警报)。
3.1 实时系统架构建议
OBD设备 + 智能手表 → 数据采集终端(如4G/5G盒子) → 消息队列(Kafka/MQTT) → 流处理引擎(Flink/Spark Streaming) → 模型推理服务 → 预警系统
3.2 模型部署方式
- 在线推理API:使用Flask/FastAPI将模型封装成REST API,实时数据通过HTTP POST发送,返回风险概率。
- 边缘计算:将模型量化后部署到车机端或采集终端(如使用TensorFlow Lite或ONNX Runtime),实现本地实时推理,减少网络延迟。
3.3 实时预警逻辑
- 每秒或每帧接收OBD和手表数据,累积到固定窗口长度(如60个时间步)后送入模型。
- 如果预测概率超过阈值(如0.7),触发报警:通过APP推送、语音提示或通知车队管理员。
- 同时可将结果反馈给保险公司,用于动态调整保费。
3.4 代码示例(简单的Flask API)
from flask import Flask, request, jsonifyimport numpy as npimport torch
app = Flask(__name__)model = AttBiLSTM(input_size=20, hidden_size=64, num_layers=2, num_classes=2)model.load_state_dict(torch.load('best_att_bilstm_model.pth'))model.eval()
@app.route('/predict', methods=['POST'])def predict():data = request.get_json()# 假设data包含一个窗口数据,形状为 (60, 20)window = np.array(data['window'], dtype=np.float32).reshape(1, 60, 20)with torch.no_grad():inputs = torch.tensor(window)outputs = model(inputs)probs = torch.softmax(outputs, dim=1)risk = probs[0, 1].item()return jsonify({'risk_probability': risk})
if __name__ == '__main__':app.run(host='0.0.0.0', port=5000)
4. 能否对接车辆监控视频并分析?
可以,但这属于计算机视觉领域,与当前的时间序列模型不同。 你需要引入视频分析模型(如CNN或3D-CNN)来检测驾驶员状态(疲劳、分心等)或路况。
4.1 两种融合方式
- 方式一:独立模型,结果融合
视频模型输出驾驶员疲劳程度、注意力分散等指标,作为新特征输入到你的Att-BiLSTM中(与手表数据类似)。 - 方式二:多模态融合
将视频特征(如CNN提取的帧特征)与时间序列特征拼接,输入到一个更大的融合模型。
4.2 视频数据集成到现有架构
假设你已经有一个视频模型(如使用OpenCV + 预训练模型检测眨眼频率、头部姿态),可以实时输出“疲劳指数”(0-1),然后将其作为一个额外特征(和OBD、手表数据一起)喂给Att-BiLSTM。
4.3 视频分析的简单实现思路(示例)
import cv2import mediapipe as mp # 用于人脸关键点检测
def estimate_drowsiness(frame):# 用MediaPipe检测眼睛纵横比(EAR),判断闭眼程度# 返回疲劳分数(0-1)...return fatigue_score
# 在主循环中,每帧调用此函数,将fatigue_score加入特征向量
4.4 注意事项
- 计算资源:视频分析通常需要GPU,若部署在车机端需考虑算力。
- 实时性:视频处理帧率需与时间序列采样率匹配。
下一步建议
- 收集真实数据:联系Garmin或高新兴获取开发套件,采集少量样本测试数据流程。
- 改进数据划分:确保验证集和测试集中有足够多样的样本(包括正负例)。
- 模型调优:针对真实数据可能存在的类别不平衡,使用 class_weight 或Focal Loss。
- 部署试点:先在小规模车队中测试实时预警系统,验证模型效果和业务价值(降低事故率)。