求助,transformer预测值相同
-
code_text ```import torch import torch.nn as nn import torch.nn.functional as F import numpy as np import pandas as pd from torch.utils.data import DataLoader, TensorDataset from sklearn.preprocessing import StandardScaler class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000, dropout=0.2): super(PositionalEncoding, self).__init__() self.dropout = nn.Dropout(p=dropout) pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) self.register_buffer('pe', pe) def forward(self, x): x = x + self.pe[:x.size(0), :] return self.dropout(x) class TransformerRegressionModel(nn.Module): def __init__(self, input_dim, num_heads, num_layers, output_dim, dropout=0.25): super(TransformerRegressionModel, self).__init__() self.pos_encoder = PositionalEncoding(input_dim, dropout=dropout) encoder_layer = nn.TransformerEncoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout) self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers) decoder_layer = nn.TransformerDecoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout) self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers) self.output_layer = nn.Linear(input_dim, output_dim) self._init_weights() def _init_weights(self): for m in self.modules(): if isinstance(m, nn.Linear): nn.init.xavier_uniform_(m.weight) if m.bias is not None: nn.init.constant_(m.bias, 0) elif isinstance(m, nn.TransformerEncoderLayer) or isinstance(m, nn.TransformerDecoderLayer): for param in m.parameters(): if param.dim() > 1: nn.init.xavier_uniform_(param) def forward(self, src, tgt): src = self.pos_encoder(src) # (sequence_length, batch_size, input_dim) tgt = self.pos_encoder(tgt) # (sequence_length, batch_size, input_dim) memory = self.transformer_encoder(src) # (sequence_length, batch_size, input_dim) out = self.transformer_decoder(tgt, memory) # (sequence_length, batch_size, input_dim) out = self.output_layer(out) # 输出层 return out # 获取数据 file_path = "C:\\python\\soc_model\\train_data\\train1" data = pd.read_csv(file_path) # 特征和目标 x = data[['Current_(mA)', 'Voltage_(V)']].values scaler = StandardScaler() y_true = data['Capacity_(Ah)'].values x_normalized = scaler.fit_transform(np.abs(x)) # Normalize the feature data # Convert to PyTorch tensors x_tensor = torch.tensor(x_normalized, dtype=torch.float32) y_tensor = torch.tensor(y_true, dtype=torch.float32).view(-1, 1) # 创建数据集 train_dataset = TensorDataset(x_tensor, y_tensor) # 创建数据加载器 train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=False) # 超参数配置 input_dim = 2 # 输入特征数量 output_dim = 1 # 输出特征数量 num_heads = 2 # 注意力头数量 num_layers = 6 num_epochs = 100 # 检查 GPU 可用性 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 初始化模型 model = TransformerRegressionModel(input_dim, num_heads, num_layers, output_dim).to(device) # 定义损失函数和优化器 criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.000001) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5, mode='min') # 提前停止策略 patience = 5 best_loss = float('inf') patience_counter = 0 # 训练模型 model.train() for epoch in range(num_epochs): epoch_loss = 0.0 previous_y = None # 保存上一时刻的预测值 predicted = [] for i, (current_x, current_y) in enumerate(train_dataloader): current_x, current_y = current_x.to(device), current_y.to(device) # 如果需要,可以调整 current_x 的维度 current_x = current_x.unsqueeze(0) # (1, batch_size, input_dim) if i == 0: # 第一个时间步,初始化目标序列 tgt 为 0 tgt = torch.zeros_like(current_y).unsqueeze(0).to(device) # (1, batch_size, input_dim) else: # 使用上一时刻的预测值作为目标序列 tgt = previous_y.unsqueeze(0) # 使用当前时刻的输入和上一时刻的输出进行预测 predicted_y = model(current_x, tgt) print(predicted_y) predicted.append(predicted_y) # 保存当前时刻的预测值以供下一时刻使用 previous_y = predicted_y.squeeze(0).detach() # 计算损失 loss = criterion(predicted_y, current_y.unsqueeze(0)) epoch_loss += loss.item() # 反向传播和优化 optimizer.zero_grad() loss.backward() optimizer.step() scheduler.step(epoch_loss) print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}") if epoch_loss < best_loss: best_loss = epoch_loss patience_counter = 0 torch.save(model.state_dict(), "transformer_model.pth1") print("Model saved.") else: patience_counter += 1 if patience_counter >= patience: print("Early stopping triggered.") break
code_text
import torch import torch.nn as nn import torch.nn.functional as F import numpy as np import pandas as pd from torch.utils.data import DataLoader, TensorDataset from sklearn.preprocessing import StandardScaler class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000, dropout=0.2): super(PositionalEncoding, self).__init__() self.dropout = nn.Dropout(p=dropout) pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) self.register_buffer('pe', pe) def forward(self, x): x = x + self.pe[:x.size(0), :] return self.dropout(x) class TransformerRegressionModel(nn.Module): def __init__(self, input_dim, num_heads, num_layers, output_dim, dropout=0.25): super(TransformerRegressionModel, self).__init__() self.pos_encoder = PositionalEncoding(input_dim, dropout=dropout) encoder_layer = nn.TransformerEncoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout) self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers) decoder_layer = nn.TransformerDecoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout) self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers) self.output_layer = nn.Linear(input_dim, output_dim) self._init_weights() def _init_weights(self): for m in self.modules(): if isinstance(m, nn.Linear): nn.init.xavier_uniform_(m.weight) if m.bias is not None: nn.init.constant_(m.bias, 0) elif isinstance(m, nn.TransformerEncoderLayer) or isinstance(m, nn.TransformerDecoderLayer): for param in m.parameters(): if param.dim() > 1: nn.init.xavier_uniform_(param) def forward(self, src, tgt): src = self.pos_encoder(src) # (sequence_length, batch_size, input_dim) tgt = self.pos_encoder(tgt) # (sequence_length, batch_size, input_dim) memory = self.transformer_encoder(src) # (sequence_length, batch_size, input_dim) out = self.transformer_decoder(tgt, memory) # (sequence_length, batch_size, input_dim) out = self.output_layer(out) # 输出层 return out # 获取数据 file_path = "C:\\python\\soc_model\\train_data\\train5" data = pd.read_csv(file_path) # 特征和目标 x = data[['Current_(mA)', 'Voltage_(V)']].values scaler_x = StandardScaler() x_normalized = scaler_x.fit_transform(x) x = np.abs(x) y_true = data['Capacity_(Ah)'].values # 将数据转换为 PyTorch 张量 x_tensor = torch.tensor(x_normalized, dtype=torch.float32) # 创建数据集 test_dataset = TensorDataset(x_tensor) # 创建数据加载器 test_data = DataLoader(test_dataset, batch_size=1, shuffle=False) # 超参数配置 input_dim = 2 # 输入特征数量 output_dim = 1 # 输出特征数量 num_heads = 2 # 注意力头数量 num_layers = 6 num_epochs = 100 # 检查 GPU 可用性 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 初始化模型 model = TransformerRegressionModel(input_dim, num_heads, num_layers, output_dim).to(device) model.load_state_dict(torch.load('transformer_model.pth1')) # 预测流程 # 推理 predicted = [] with torch.no_grad(): for i, (current_x,) in enumerate(test_data): current_x = current_x.to(device) current_x = current_x.unsqueeze(1) # 增加 seq_len 维度 # 初始化目标序列 tgt if i == 0: tgt = torch.zeros(current_x.size(0), 1, current_x.size(2)).to(device) else: tgt = predicted_y.unsqueeze(1) # 作为下一步的目标序列 # 执行预测 predicted_y = model(current_x, tgt) print(current_x) print(tgt) # 提取最后一个时间步的预测值 predicted_y = predicted_y[:, -1, :] # (batch_size, output_dim) # 保存预测结果 predicted.append(predicted_y.cpu().numpy()) # 转换为 NumPy 数组 predicted = np.array(predicted).squeeze() **真心求助,为什么预测时输出的结果的相同**