跳转至内容
  • 社区首页
  • 版块
  • 最新
  • 标签
  • 热门
折叠

GitHub中文论坛

  1. 主页
  2. 版块
  3. 技术交流
  4. 求助,transformer预测值相同

求助,transformer预测值相同

已定时 已固定 已锁定 已移动 技术交流
机器学习
1 帖子 1 发布者 6.1k 浏览
  • 从旧到新
  • 从新到旧
  • 最多赞同
回复
  • 在新帖中回复
登录后回复
此主题已被删除。只有拥有主题管理权限的用户可以查看。
  • F 离线
    F 离线
    ffffffkkkk
    写于 最后由 编辑
    #1
    code_text
    ```import torch
    import torch.nn as nn
    import torch.nn.functional as F
    import numpy as np
    import pandas as pd
    from torch.utils.data import DataLoader, TensorDataset
    from sklearn.preprocessing import StandardScaler
    
    
    class PositionalEncoding(nn.Module):
        def __init__(self, d_model, max_len=5000, dropout=0.2):
            super(PositionalEncoding, self).__init__()
            self.dropout = nn.Dropout(p=dropout)
    
            pe = torch.zeros(max_len, d_model)
            position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
            div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
            pe[:, 0::2] = torch.sin(position * div_term)
            pe[:, 1::2] = torch.cos(position * div_term)
            pe = pe.unsqueeze(0).transpose(0, 1)
            self.register_buffer('pe', pe)
    
        def forward(self, x):
            x = x + self.pe[:x.size(0), :]
            return self.dropout(x)
    
    
    class TransformerRegressionModel(nn.Module):
        def __init__(self, input_dim, num_heads, num_layers, output_dim, dropout=0.25):
            super(TransformerRegressionModel, self).__init__()
            self.pos_encoder = PositionalEncoding(input_dim, dropout=dropout)
            encoder_layer = nn.TransformerEncoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout)
            self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
            decoder_layer = nn.TransformerDecoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout)
            self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers)
            self.output_layer = nn.Linear(input_dim, output_dim)
            self._init_weights()
    
        def _init_weights(self):
            for m in self.modules():
                if isinstance(m, nn.Linear):
                    nn.init.xavier_uniform_(m.weight)
                    if m.bias is not None:
                        nn.init.constant_(m.bias, 0)
                elif isinstance(m, nn.TransformerEncoderLayer) or isinstance(m, nn.TransformerDecoderLayer):
                    for param in m.parameters():
                        if param.dim() > 1:
                            nn.init.xavier_uniform_(param)
    
        def forward(self, src, tgt):
            src = self.pos_encoder(src)  # (sequence_length, batch_size, input_dim)
            tgt = self.pos_encoder(tgt)  # (sequence_length, batch_size, input_dim)
    
            memory = self.transformer_encoder(src)  # (sequence_length, batch_size, input_dim)
            out = self.transformer_decoder(tgt, memory)  # (sequence_length, batch_size, input_dim)
            out = self.output_layer(out)  # 输出层
            return out
    
    
    # 获取数据
    file_path = "C:\\python\\soc_model\\train_data\\train1"
    data = pd.read_csv(file_path)
    
    # 特征和目标
    x = data[['Current_(mA)', 'Voltage_(V)']].values
    scaler = StandardScaler()
    y_true = data['Capacity_(Ah)'].values
    x_normalized = scaler.fit_transform(np.abs(x))  # Normalize the feature data
    
    # Convert to PyTorch tensors
    x_tensor = torch.tensor(x_normalized, dtype=torch.float32)
    y_tensor = torch.tensor(y_true, dtype=torch.float32).view(-1, 1)
    
    # 创建数据集
    train_dataset = TensorDataset(x_tensor, y_tensor)
    
    # 创建数据加载器
    train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=False)
    
    # 超参数配置
    input_dim = 2  # 输入特征数量
    output_dim = 1  # 输出特征数量
    num_heads = 2  # 注意力头数量
    num_layers = 6
    num_epochs = 100
    
    # 检查 GPU 可用性
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # 初始化模型
    model = TransformerRegressionModel(input_dim, num_heads, num_layers, output_dim).to(device)
    
    # 定义损失函数和优化器
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.000001)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5, mode='min')
    
    # 提前停止策略
    patience = 5
    best_loss = float('inf')
    patience_counter = 0
    
    # 训练模型
    model.train()
    for epoch in range(num_epochs):
        epoch_loss = 0.0
        previous_y = None  # 保存上一时刻的预测值
        predicted = []
        for i, (current_x, current_y) in enumerate(train_dataloader):
    
            current_x, current_y = current_x.to(device), current_y.to(device)
    
            # 如果需要,可以调整 current_x 的维度
            current_x = current_x.unsqueeze(0)  # (1, batch_size, input_dim)
    
            if i == 0:
                # 第一个时间步,初始化目标序列 tgt 为 0
                tgt = torch.zeros_like(current_y).unsqueeze(0).to(device)  # (1, batch_size, input_dim)
            else:
                # 使用上一时刻的预测值作为目标序列
                tgt = previous_y.unsqueeze(0)
    
    
            # 使用当前时刻的输入和上一时刻的输出进行预测
            predicted_y = model(current_x, tgt)
            print(predicted_y)
            predicted.append(predicted_y)
    
    
    
            # 保存当前时刻的预测值以供下一时刻使用
            previous_y = predicted_y.squeeze(0).detach()
    
            # 计算损失
            loss = criterion(predicted_y, current_y.unsqueeze(0))
            epoch_loss += loss.item()
    
            # 反向传播和优化
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
        scheduler.step(epoch_loss)
    
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}")
    
        if epoch_loss < best_loss:
            best_loss = epoch_loss
            patience_counter = 0
            torch.save(model.state_dict(), "transformer_model.pth1")
            print("Model saved.")
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("Early stopping triggered.")
                break
    

    code_text

    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    import numpy as np
    import pandas as pd
    from torch.utils.data import DataLoader, TensorDataset
    from sklearn.preprocessing import StandardScaler
    
    
    class PositionalEncoding(nn.Module):
        def __init__(self, d_model, max_len=5000, dropout=0.2):
            super(PositionalEncoding, self).__init__()
            self.dropout = nn.Dropout(p=dropout)
    
            pe = torch.zeros(max_len, d_model)
            position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
            div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
            pe[:, 0::2] = torch.sin(position * div_term)
            pe[:, 1::2] = torch.cos(position * div_term)
            pe = pe.unsqueeze(0).transpose(0, 1)
            self.register_buffer('pe', pe)
    
        def forward(self, x):
            x = x + self.pe[:x.size(0), :]
            return self.dropout(x)
    
    
    class TransformerRegressionModel(nn.Module):
        def __init__(self, input_dim, num_heads, num_layers, output_dim, dropout=0.25):
            super(TransformerRegressionModel, self).__init__()
            self.pos_encoder = PositionalEncoding(input_dim, dropout=dropout)
            encoder_layer = nn.TransformerEncoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout)
            self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
            decoder_layer = nn.TransformerDecoderLayer(input_dim, num_heads, dim_feedforward=input_dim * 4, activation='relu', dropout=dropout)
            self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers)
            self.output_layer = nn.Linear(input_dim, output_dim)
            self._init_weights()
    
        def _init_weights(self):
            for m in self.modules():
                if isinstance(m, nn.Linear):
                    nn.init.xavier_uniform_(m.weight)
                    if m.bias is not None:
                        nn.init.constant_(m.bias, 0)
                elif isinstance(m, nn.TransformerEncoderLayer) or isinstance(m, nn.TransformerDecoderLayer):
                    for param in m.parameters():
                        if param.dim() > 1:
                            nn.init.xavier_uniform_(param)
    
        def forward(self, src, tgt):
            src = self.pos_encoder(src)  # (sequence_length, batch_size, input_dim)
            tgt = self.pos_encoder(tgt)  # (sequence_length, batch_size, input_dim)
    
            memory = self.transformer_encoder(src)  # (sequence_length, batch_size, input_dim)
            out = self.transformer_decoder(tgt, memory)  # (sequence_length, batch_size, input_dim)
            out = self.output_layer(out)  # 输出层
            return out
    
    
    # 获取数据
    file_path = "C:\\python\\soc_model\\train_data\\train5"
    data = pd.read_csv(file_path)
    
    # 特征和目标
    x = data[['Current_(mA)', 'Voltage_(V)']].values
    scaler_x = StandardScaler()
    x_normalized = scaler_x.fit_transform(x)
    
    x = np.abs(x)
    
    y_true = data['Capacity_(Ah)'].values
    
    # 将数据转换为 PyTorch 张量
    x_tensor = torch.tensor(x_normalized, dtype=torch.float32)
    
    
    # 创建数据集
    test_dataset = TensorDataset(x_tensor)
    
    # 创建数据加载器
    test_data = DataLoader(test_dataset, batch_size=1, shuffle=False)
    
    # 超参数配置
    input_dim = 2  # 输入特征数量
    output_dim = 1  # 输出特征数量
    num_heads = 2  # 注意力头数量
    num_layers = 6
    num_epochs = 100
    
    # 检查 GPU 可用性
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # 初始化模型
    model = TransformerRegressionModel(input_dim, num_heads, num_layers, output_dim).to(device)
    model.load_state_dict(torch.load('transformer_model.pth1'))
    
    
    
    
    
    # 预测流程
    # 推理
    predicted = []
    
    with torch.no_grad():
        for i, (current_x,) in enumerate(test_data):
            current_x = current_x.to(device)
            current_x = current_x.unsqueeze(1)  # 增加 seq_len 维度
    
            # 初始化目标序列 tgt
            if i == 0:
                tgt = torch.zeros(current_x.size(0), 1, current_x.size(2)).to(device)
            else:
                tgt = predicted_y.unsqueeze(1)  # 作为下一步的目标序列
    
            # 执行预测
            predicted_y = model(current_x, tgt)
            print(current_x)
            print(tgt)
    
    
            # 提取最后一个时间步的预测值
            predicted_y = predicted_y[:, -1, :]  # (batch_size, output_dim)
    
            # 保存预测结果
            predicted.append(predicted_y.cpu().numpy())
    
    # 转换为 NumPy 数组
    predicted = np.array(predicted).squeeze()
    
    
    
    **真心求助,为什么预测时输出的结果的相同**
    1 条回复 最后回复
    0
    回复
    • 在新帖中回复
    登录后回复
    • 从旧到新
    • 从新到旧
    • 最多赞同


    • 登录

    • 第一个帖子
      最后一个帖子
    0
    • 社区首页
    • 版块
    • 最新
    • 标签
    • 热门