code_text
```import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000, dropout=0.2):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
class TransformerRegressionModel(nn.Module):
def __init__(self, input_dim, num_heads, num_layers, output_dim, dropout=0.25):
super(TransformerRegressionModel, self).__init__()
self.pos_encoder = PositionalEncoding(input_dim, dropout=dropout)
encoder_layer = nn.TransformerEncoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
decoder_layer = nn.TransformerDecoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout)
self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers)
self.output_layer = nn.Linear(input_dim, output_dim)
self._init_weights()
def _init_weights(self):
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.TransformerEncoderLayer) or isinstance(m, nn.TransformerDecoderLayer):
for param in m.parameters():
if param.dim() > 1:
nn.init.xavier_uniform_(param)
def forward(self, src, tgt):
src = self.pos_encoder(src) # (sequence_length, batch_size, input_dim)
tgt = self.pos_encoder(tgt) # (sequence_length, batch_size, input_dim)
memory = self.transformer_encoder(src) # (sequence_length, batch_size, input_dim)
out = self.transformer_decoder(tgt, memory) # (sequence_length, batch_size, input_dim)
out = self.output_layer(out) # 输出层
return out
# 获取数据
file_path = "C:\\python\\soc_model\\train_data\\train1"
data = pd.read_csv(file_path)
# 特征和目标
x = data[['Current_(mA)', 'Voltage_(V)']].values
scaler = StandardScaler()
y_true = data['Capacity_(Ah)'].values
x_normalized = scaler.fit_transform(np.abs(x)) # Normalize the feature data
# Convert to PyTorch tensors
x_tensor = torch.tensor(x_normalized, dtype=torch.float32)
y_tensor = torch.tensor(y_true, dtype=torch.float32).view(-1, 1)
# 创建数据集
train_dataset = TensorDataset(x_tensor, y_tensor)
# 创建数据加载器
train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=False)
# 超参数配置
input_dim = 2 # 输入特征数量
output_dim = 1 # 输出特征数量
num_heads = 2 # 注意力头数量
num_layers = 6
num_epochs = 100
# 检查 GPU 可用性
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 初始化模型
model = TransformerRegressionModel(input_dim, num_heads, num_layers, output_dim).to(device)
# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.000001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5, mode='min')
# 提前停止策略
patience = 5
best_loss = float('inf')
patience_counter = 0
# 训练模型
model.train()
for epoch in range(num_epochs):
epoch_loss = 0.0
previous_y = None # 保存上一时刻的预测值
predicted = []
for i, (current_x, current_y) in enumerate(train_dataloader):
current_x, current_y = current_x.to(device), current_y.to(device)
# 如果需要,可以调整 current_x 的维度
current_x = current_x.unsqueeze(0) # (1, batch_size, input_dim)
if i == 0:
# 第一个时间步,初始化目标序列 tgt 为 0
tgt = torch.zeros_like(current_y).unsqueeze(0).to(device) # (1, batch_size, input_dim)
else:
# 使用上一时刻的预测值作为目标序列
tgt = previous_y.unsqueeze(0)
# 使用当前时刻的输入和上一时刻的输出进行预测
predicted_y = model(current_x, tgt)
print(predicted_y)
predicted.append(predicted_y)
# 保存当前时刻的预测值以供下一时刻使用
previous_y = predicted_y.squeeze(0).detach()
# 计算损失
loss = criterion(predicted_y, current_y.unsqueeze(0))
epoch_loss += loss.item()
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
scheduler.step(epoch_loss)
print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}")
if epoch_loss < best_loss:
best_loss = epoch_loss
patience_counter = 0
torch.save(model.state_dict(), "transformer_model.pth1")
print("Model saved.")
else:
patience_counter += 1
if patience_counter >= patience:
print("Early stopping triggered.")
break
code_text
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000, dropout=0.2):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
class TransformerRegressionModel(nn.Module):
def __init__(self, input_dim, num_heads, num_layers, output_dim, dropout=0.25):
super(TransformerRegressionModel, self).__init__()
self.pos_encoder = PositionalEncoding(input_dim, dropout=dropout)
encoder_layer = nn.TransformerEncoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
decoder_layer = nn.TransformerDecoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout)
self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers)
self.output_layer = nn.Linear(input_dim, output_dim)
self._init_weights()
def _init_weights(self):
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.TransformerEncoderLayer) or isinstance(m, nn.TransformerDecoderLayer):
for param in m.parameters():
if param.dim() > 1:
nn.init.xavier_uniform_(param)
def forward(self, src, tgt):
src = self.pos_encoder(src) # (sequence_length, batch_size, input_dim)
tgt = self.pos_encoder(tgt) # (sequence_length, batch_size, input_dim)
memory = self.transformer_encoder(src) # (sequence_length, batch_size, input_dim)
out = self.transformer_decoder(tgt, memory) # (sequence_length, batch_size, input_dim)
out = self.output_layer(out) # 输出层
return out
# 获取数据
file_path = "C:\\python\\soc_model\\train_data\\train5"
data = pd.read_csv(file_path)
# 特征和目标
x = data[['Current_(mA)', 'Voltage_(V)']].values
scaler_x = StandardScaler()
x_normalized = scaler_x.fit_transform(x)
x = np.abs(x)
y_true = data['Capacity_(Ah)'].values
# 将数据转换为 PyTorch 张量
x_tensor = torch.tensor(x_normalized, dtype=torch.float32)
# 创建数据集
test_dataset = TensorDataset(x_tensor)
# 创建数据加载器
test_data = DataLoader(test_dataset, batch_size=1, shuffle=False)
# 超参数配置
input_dim = 2 # 输入特征数量
output_dim = 1 # 输出特征数量
num_heads = 2 # 注意力头数量
num_layers = 6
num_epochs = 100
# 检查 GPU 可用性
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 初始化模型
model = TransformerRegressionModel(input_dim, num_heads, num_layers, output_dim).to(device)
model.load_state_dict(torch.load('transformer_model.pth1'))
# 预测流程
# 推理
predicted = []
with torch.no_grad():
for i, (current_x,) in enumerate(test_data):
current_x = current_x.to(device)
current_x = current_x.unsqueeze(1) # 增加 seq_len 维度
# 初始化目标序列 tgt
if i == 0:
tgt = torch.zeros(current_x.size(0), 1, current_x.size(2)).to(device)
else:
tgt = predicted_y.unsqueeze(1) # 作为下一步的目标序列
# 执行预测
predicted_y = model(current_x, tgt)
print(current_x)
print(tgt)
# 提取最后一个时间步的预测值
predicted_y = predicted_y[:, -1, :] # (batch_size, output_dim)
# 保存预测结果
predicted.append(predicted_y.cpu().numpy())
# 转换为 NumPy 数组
predicted = np.array(predicted).squeeze()
**真心求助,为什么预测时输出的结果的相同**