基于MLP算法实现交通流量预测(Pytorch版)

在海量的城市数据中,交通流量数据无疑是揭示城市运行脉络、洞察出行规律的关键要素之一。实时且精准的交通流量预测不仅能为交通规划者提供科学决策依据,助力提升道路使用效率、缓解交通拥堵,还能为公众出行提供参考,实现个性化导航服务,进而提升整个城市的运行效能。然而,交通流量受到众多因素的交织影响,如天气变化、特殊事件、节假日效应、时段特性等,其动态变化规律呈现出显著的非线性、时变性和不确定性,这无疑给预测工作带来了巨大挑战。

在此背景下,机器学习技术,尤其是深度学习方法,凭借其强大的模型构建能力和对复杂非线性关系的出色捕捉能力,逐渐崭露头角,成为交通流量预测领域的研究热点。其中,多层感知器(MLP)作为一类基础而经典的前馈型人工神经网络,以其简洁的结构、灵活的适应性和良好的泛化性能,在处理高维、非线性问题上展现出独特优势。 MLP通过模拟人脑神经元的工作机制,通过多层非线性变换对输入数据进行深度抽象和特征学习,能够有效挖掘交通流量数据背后的复杂关联与潜在模式,从而实现对未来的流量状态进行精准预测。

模型简介

多层感知器(Multi-Layer Perceptron, MLP)是一种基础且广泛使用的前馈型人工神经网络模型,它是深度学习领域的重要组成部分,尤其在处理非线性关系和模式识别问题中表现出强大的能力。MLP的核心特征在于其多层结构,由多个神经元组成的隐藏层与输入层和输出层相互连接,形成一个分层信息处理系统。下面是对MLP的详细介绍:

基本结构与组件:

  1. 输入层(Input Layer):接收原始数据作为网络的输入。这些数据通常是经过预处理后的数值向量,代表了待解决问题中的各种特征或变量。

  2. 隐藏层(Hidden Layers):位于输入层与输出层之间的中间层。MLP可以有一个或多个隐藏层,每个隐藏层包含多个神经元。隐藏层的主要功能是通过非线性变换对输入数据进行复杂的特征提取和表示学习,从而捕获数据中的潜在关系和模式。神经元之间通常是全连接的,即每个隐藏层神经元接收到前一层所有神经元的加权输出。

  3. 输出层(Output Layer):产生网络的最终输出,根据任务需求可以是一维或多维的。在交通流量预测等回归任务中,输出层通常只有一个神经元,给出连续的预测值;而在分类任务中,输出层神经元数量对应类别数,每个神经元的激活值代表对应类别的概率。

工作原理:

  1. 加权求和与激活函数:每个神经元接收前一层所有神经元的输出(或输入层的原始数据),对这些输入进行加权求和,加上一个偏置项后,通过一个非线性激活函数进行转换。常见的激活函数包括Sigmoid、Tanh、ReLU及其变种等。激活函数引入非线性,使得网络能够表达复杂的非线性关系。

  2. 前向传播(Forward Propagation):信息从输入层依次经过隐藏层直至输出层的过程称为前向传播。在这个过程中,数据通过各层神经元的加权求和和激活函数计算,逐步形成更高级的特征表示,最终在输出层生成预测结果。

  3. 反向传播(Backpropagation):当网络进行预测后,会计算预测结果与实际标签之间的误差(损失函数)。反向传播算法根据这个误差,从输出层开始,逐层反向调整各层神经元的权重和偏置,以最小化总体误差。这是通过链式法则计算梯度并应用梯度下降或其变种算法实现的。反向传播确保了网络在训练过程中能够自我学习并逐步改善其预测性能。

训练与应用:

  1. 训练过程:给定标记好的训练数据集,MLP通过迭代执行前向传播和反向传播,更新模型参数,直到达到预设的停止条件(如达到一定迭代次数、损失函数收敛或验证集性能不再提升等)。训练过程中可能涉及正则化、批量归一化、dropout等技术防止过拟合。

  2. 应用领域:MLP因其灵活性和普适性,被广泛应用于各种领域,如图像识别、语音识别、自然语言处理、时间序列预测(如交通流量预测)、金融风险评估、生物医学信号分析等。在交通流量预测中,MLP可以接收历史流量数据、气象信息、节假日标志等多元输入,学习并建模这些因素与未来流量之间的复杂非线性关系,从而做出准确预测。

综上所述,多层感知器(MLP)作为一种基础的前馈神经网络模型,通过多层非线性变换对输入数据进行抽象和学习,适用于各种非线性预测和分类任务。其训练过程依赖于反向传播算法来优化模型参数,使其能够捕捉数据中的复杂模式,并在诸多实际应用场景中展现出强大的预测性能。在交通流量预测中,MLP能够整合多种影响因素,为交通管理者提供有价值的决策支持。

模型构建

class MLP(nn.Module):
    def __init__(self, input_size, hidden_sizes, output_size):
        super(MLP, self).__init__()
        self.input_size = input_size
        self.hidden_sizes = hidden_sizes
        self.output_size = output_size
        layers = []
        prev_size = input_size
        for hidden_size in hidden_sizes:
            layers.append(nn.Linear(prev_size, hidden_size))
            layers.append(nn.Sigmoid())
            prev_size = hidden_size
        layers.append(nn.Linear(prev_size, output_size))
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        # [Batch, Input_len, Node] --> [Batch, Node, Input_len]
        x = x.permute(0, 2, 1)
        y = self.model(x)
        # [Batch, Node, Output_len] --> [Batch, Output_len, Node]
        y = y.permute(0, 2, 1)
        return y

其中,input_size代表输入层节点数目,hidden_sizes为隐藏层的节点数目,如[24, 36, 24]则代表有3层隐藏层,隐藏层的节点数目分别为24、36、24,output_size则为输出层节点数目。

一个典型的MLP神经网络如下图所示:

在这里插入图片描述

在时间序列任务中,考虑到数据变化的趋势性,认为未来时间窗的特征和当前时刻的前置时间窗特征相关性较大。

在这里插入图片描述

在单变量的预测场景下,假设时间窗为5分钟,用最近12个时间窗的流量预测未来3个时间窗的流量,则可以将最近12个时间窗的流量作为输入特征,此时输入层的节点数目为12,而需要预测的未来的3个时间窗的流量作为输出特征,此时输出层的节点数目为3。

编写个主函数验证下网络shape变化是否符合预期:

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--window_size', type=int, default=12)
    parser.add_argument('--horizon', type=int, default=3)
    parser.add_argument('--hidden_sizes', type=list, default=[24, 36, 24])

    args = parser.parse_args()

    model = DNN(input_size=args.window_size, hidden_sizes=args.hidden_sizes, output_size=args.horizon)

    print(model)

    x = torch.randn(8, 12, 96)
    y = model(x)
    print(y.shape)

输出为:

MLP(
  (model): Sequential(
    (0): Linear(in_features=12, out_features=24, bias=True)
    (1): Sigmoid()
    (2): Linear(in_features=24, out_features=36, bias=True)
    (3): Sigmoid()
    (4): Linear(in_features=36, out_features=24, bias=True)
    (5): Sigmoid()
    (6): Linear(in_features=24, out_features=3, bias=True)
  )
)
torch.Size([8, 3, 96])

输入张量的shape为[batch_size, input_len, node],node代表不同的序列,如果是交通流量预测,代表的是不同路段或交叉口。

输出张量的shape为[batch_size, output_len, node]。

用最近12个时间窗的流量预测未来3个时间窗的流量,输入张量的shape为[8, 12, 96],输出张量的shape为[8, 3, 96],符合预期。

那如果是多变量的预测场景呢?

其实,也很简单,假设时间窗为5分钟,用最近12个时间窗的流量、速度、占有率来预测未来3个时间窗的流量,此时输入特征为最近12个时间窗的流量、速度、占有率,有12*3=36个特征,故输入层的节点数目为36,由于输出还是流量这个单变量,所以输出层的节点数目还是3。

数据输入

本文以PEMS数据集作为算法的训练、验证和测试数据。

PEMS数据集是针对加利福尼亚州不同区域高速公路网络收集的交通数据,数据集可能包含多个传感器站点的数据,每个站点每五分钟记录了特定路段或路口的交通状况,包括但不限于:

  • 流量

单位时间内通过某路段或交叉口的车辆数量,反映道路的使用程度。

  • 速度

车辆在道路上行驶的平均速度,用于评估道路的运行效率和拥堵状况。

  • 占有率

车道被车辆占用的比例,是衡量道路拥挤程度的另一个重要指标。

PEMS03数据,26208*358*3,358个检测器,26208个5分钟时间窗(2012/5/1开始,91天),3个变量分别为流量、速度和占有率。

PEMS04数据,16992*307*3,307个检测器,16992个5分钟时间窗(2017/7/1开始,59天),3个变量分别为流量、速度和占有率。

PEMS07数据,28224*883*3,883个检测器,28224个5分钟时间窗(2017/5/1开始,98天),3个变量分别为流量、速度和占有率。

PEMS08数据,17856*170*3,170个检测器,17856个5分钟时间窗(2012/3/1开始,62天),3个变量分别为流量、速度和占有率。

读取数据后,可以将数据传入自定义的DataSet,以方便后续的训练:

class ForecastDataset(torch_data.Dataset):
    def __init__(self, df, window_size, horizon, normalize_method=None, norm_statistic=None, interval=1):
        self.window_size = window_size # 12
        self.interval = interval  #1
        self.horizon = horizon
        self.normalize_method = normalize_method
        self.norm_statistic = norm_statistic
        df = pd.DataFrame(df)
        df = df.fillna(method='ffill', limit=len(df)).fillna(method='bfill', limit=len(df)).values
        self.data = df
        self.df_length = len(df)
        self.x_end_idx = self.get_x_end_idx()
        if normalize_method:
            self.data, _ = normalized(self.data, normalize_method, norm_statistic)

    def __getitem__(self, index):
        hi = self.x_end_idx[index] #12
        lo = hi - self.window_size #0
        train_data = self.data[lo: hi] #0:12
        target_data = self.data[hi:hi + self.horizon] #12:24
        x = torch.from_numpy(train_data).type(torch.float)
        y = torch.from_numpy(target_data).type(torch.float)
        return x, y

    def __len__(self):
        return len(self.x_end_idx)

    def get_x_end_idx(self):
        # each element `hi` in `x_index_set` is an upper bound for get training data
        # training data range: [lo, hi), lo = hi - window_size
        x_index_set = range(self.window_size, self.df_length - self.horizon + 1)
        x_end_idx = [x_index_set[j * self.interval] for j in range((len(x_index_set)) // self.interval)]
        return x_end_idx
    
def normalized(data, normalize_method, norm_statistic=None):
    if normalize_method == 'min_max':
        if not norm_statistic:
            norm_statistic = dict(max=np.max(data, axis=0), min=np.min(data, axis=0))
        scale = norm_statistic['max'] - norm_statistic['min'] + 1e-5
        data = (data - norm_statistic['min']) / scale
        data = np.clip(data, 0.0, 1.0)
    elif normalize_method == 'z_score':
        if not norm_statistic:
            norm_statistic = dict(mean=np.mean(data, axis=0), std=np.std(data, axis=0))
        mean = norm_statistic['mean']
        std = norm_statistic['std']
        std = [1 if i == 0 else i for i in std]
        data = (data - mean) / std
        norm_statistic['std'] = std
    return data, norm_statistic

def de_normalized(data, normalize_method, norm_statistic):
    if normalize_method == 'min_max':
        if not norm_statistic:
            norm_statistic = dict(max=np.max(data, axis=0), min=np.min(data, axis=0))
        scale = norm_statistic['max'] - norm_statistic['min'] + 1e-8
        data = data * scale + norm_statistic['min']
    elif normalize_method == 'z_score':
        if not norm_statistic:
            norm_statistic = dict(mean=np.mean(data, axis=0), std=np.std(data, axis=0))
        mean = norm_statistic['mean']
        std = norm_statistic['std']
        std = [1 if i == 0 else i for i in std]
        data = data * std + mean
    return data

上述代码定义了一个名为ForecastDataset的类,它是基于torch_data.Dataset的子类,用于处理时间序列预测任务的数据集。同时,还提供了normalizedde_normalized两个函数,分别用于数据的标准化(归一化)和反标准化。

ForecastDataset类

  1. 初始化方法__init__

    • 输入参数:
      • df:原始数据。
      • window_size:滑动窗口大小,用于截取历史数据作为模型训练的输入。
      • horizon:预测步长,即模型需要预测未来多少个时间点或时间窗的数据。
      • normalize_method:数据标准化方法,可选值为'min_max'(最小最大值归一化)或'z_score'(Z-score标准化)。
      • norm_statistic:若已知数据的统计信息(如最大值、最小值、均值、标准差),可直接传入;否则,将根据数据计算这些统计量。
      • interval:采样间隔。
    • 方法内部:
      • 将输入的DataFrame填充缺失值,并转化为NumPy数组。
      • 初始化类的属性:滑动窗口大小、采样间隔、预测步长、标准化方法、统计信息等。
      • 计算数据集长度和训练数据结束索引(x_end_idx),用于后续按索引获取训练数据和目标数据。
      • 如果指定了标准化方法,则对数据进行标准化处理,同时更新统计信息。
  2. __getitem__方法:

    • 输入参数:index,表示数据集中第index个样本的索引。
    • 方法内部:
      • 根据x_end_idx列表计算当前样本的训练数据起始索引lo和结束索引hi
      • 提取训练数据(历史数据)和目标数据(未来数据)。
      • 将提取到的训练数据和目标数据转化为PyTorch张量并设置为浮点类型。
      • 返回包含训练数据和目标数据的元组。
  3. __len__方法:

    • 返回数据集的样本数,即x_end_idx列表的长度。
  4. get_x_end_idx方法:

    • 该方法用于生成一个列表,其中每个元素hi表示一个训练数据结束索引。
    • 计算x_index_set,包含所有可能的训练数据结束索引(满足window_sizedf_length - horizon + 1的范围)。
    • 根据采样间隔intervalx_index_set中选取训练数据结束索引,组成x_end_idx列表并返回。

辅助函数

  1. normalized函数:

    • 输入参数:待标准化数据data、标准化方法normalize_method及统计信息字典norm_statistic(可选)。
    • 函数内部:
      • 根据指定的标准化方法进行数据标准化:
        • 若为'min_max'
          • 若未提供统计信息,计算数据的最大值和最小值,然后进行最小最大值归一化。
          • 确保数据在[0, 1]范围内。
        • 若为'z_score'
          • 若未提供统计信息,计算数据的均值和标准差,然后进行Z-score标准化。
          • 避免除以零错误,当标准差为零时将其置为1。
      • 返回标准化后的数据及更新后的统计信息字典。
  2. de_normalized函数:

    • 输入参数:已标准化数据data、标准化方法normalize_method及统计信息字典norm_statistic
    • 函数内部:
      • 根据指定的标准化方法进行数据反标准化:
        • 若为'min_max'
          • 使用提供的统计信息(最大值、最小值)进行反归一化。
        • 若为'z_score'
          • 使用提供的统计信息(均值、标准差)进行反Z-score标准化。
          • 避免除以零错误,当标准差为零时将其置为1。
      • 返回反标准化后的数据。

综上所述,上述代码实现了一个用于时间序列预测任务的数据集类ForecastDataset,支持滑动窗口、预测步长、数据标准化等功能,并提供了数据标准化与反标准化的辅助函数。

代码中,window_size即为前置时间窗的个数,horizon则为预测时间窗的个数,interval为采样间隔,默认为1。

比如现在我们有100个时间窗的历史数据,window_size为12,horizon为12,interval为1。

则第1个样本为:

x: 第1到第12个时间窗
y: 第13到第24个时间窗

第2个样本为:

x: 第2到第13个时间窗
y: 第14到第25个时间窗

以此类推,总共可构建出(his_num-window_size-horizon+1) // interval=(100-12-12+1)//1=77个样本。

同样的,若interval为2,则第1个样本为:

x: 第1到第12个时间窗
y: 第13到第24个时间窗

第2个样本为:

x: 第3到第14个时间窗
y: 第15到第26个时间窗

其他样本以此类推,可以看出,所谓的interval其实就是相邻样本之间的时间窗个数。

模型训练

为了适配不同的模型和数据集,我们采用类继承的方式来编写模型训练代码。

首先,我们定义一个父类。

class Exp_Basic(object):
    def __init__(self, args):
        self.args = args
        self.device = self._acquire_device()
        self.model = self._build_model()

    def _acquire_device(self):
        if self.args.use_gpu:
            os.environ["CUDA_VISIBLE_DEVICES"] = str(self.args.gpu) if not self.args.use_multi_gpu else self.args.devices
            device = torch.device('cuda:{}'.format(self.args.gpu))
            print('Use GPU: cuda:{}'.format(self.args.gpu))
        else:
            device = torch.device('cpu')
            print('Use CPU')
        return device

    def _get_data(self):
        pass

    # 创建模型
    def _build_model(self):
        raise NotImplementedError
        return None

    def train(self):
        pass
    def valid(self):
        pass

    def test(self):
        pass
    

接着,编写具体的实现类Exp_MLP_PEMS(Exp_Basic)。

数据读取

首先实现_get_data方法来获取数据:

def _get_data(self):
	data_file = os.path.join('../../data/PEMS', self.args.dataset, self.args.dataset+'.npz')
	print('data file:',data_file)
	data = np.load(data_file,allow_pickle=True)
	data = data['data'][:, :, 0]
	train_ratio = self.args.train_length / (self.args.train_length + self.args.valid_length + self.args.test_length)
	valid_ratio = self.args.valid_length / (self.args.train_length + self.args.valid_length + self.args.test_length)
	train_data = data[:int(train_ratio * len(data))]
	valid_data = data[int(train_ratio * len(data)):int((train_ratio + valid_ratio) * len(data))]
	test_data = data[int((train_ratio + valid_ratio) * len(data)):]
	if len(train_data) == 0:
		raise Exception('Cannot organize enough training data')
	if len(valid_data) == 0:
		raise Exception('Cannot organize enough validation data')
	if len(test_data) == 0:
		raise Exception('Cannot organize enough test data')
	if self.args.normtype == 0:
		train_mean = np.mean(train_data, axis=0)
		train_std = np.std(train_data, axis=0)
		train_normalize_statistic = {"mean": train_mean.tolist(), "std": train_std.tolist()}
		val_mean = np.mean(valid_data, axis=0)
		val_std = np.std(valid_data, axis=0)
		val_normalize_statistic = {"mean": val_mean.tolist(), "std": val_std.tolist()}
		test_mean = np.mean(test_data, axis=0)
		test_std = np.std(test_data, axis=0)
		test_normalize_statistic = {"mean": test_mean.tolist(), "std": test_std.tolist()}
	elif self.args.normtype == 1:
		data_mean = np.mean(data, axis=0)
		data_std = np.std(data, axis=0)
		train_normalize_statistic = {"mean": data_mean.tolist(), "std": data_std.tolist()}
		val_normalize_statistic = {"mean": data_mean.tolist(), "std": data_std.tolist()}
		test_normalize_statistic = {"mean": data_mean.tolist(), "std": data_std.tolist()}
	else:
		train_mean = np.mean(train_data, axis=0)
		train_std = np.std(train_data, axis=0)
		train_normalize_statistic = {"mean": train_mean.tolist(), "std": train_std.tolist()}
		val_normalize_statistic = {"mean": train_mean.tolist(), "std": train_std.tolist()}
		test_normalize_statistic = {"mean": train_mean.tolist(), "std": train_std.tolist()}
	train_set = ForecastDataset(train_data, window_size=self.args.window_size, horizon=self.args.horizon,
							normalize_method=self.args.norm_method, norm_statistic=train_normalize_statistic)
	valid_set = ForecastDataset(valid_data, window_size=self.args.window_size, horizon=self.args.horizon,
								normalize_method=self.args.norm_method, norm_statistic=val_normalize_statistic)
	test_set = ForecastDataset(test_data, window_size=self.args.window_size, horizon=self.args.horizon,
								normalize_method=self.args.norm_method, norm_statistic=test_normalize_statistic)
	train_loader = DataLoader(train_set, batch_size=self.args.batch_size, drop_last=False, shuffle=True,
										num_workers=1)
	valid_loader = DataLoader(valid_set, batch_size=self.args.batch_size, shuffle=False, num_workers=1)
	test_loader = DataLoader(test_set, batch_size=self.args.batch_size, shuffle=False, num_workers=1)
	node_cnt = train_data.shape[1]
	return test_loader, train_loader, valid_loader,node_cnt,test_normalize_statistic,val_normalize_statistic

上述Python代码定义了一个名为_get_data的方法,其目的是根据传入的参数配置来读取PEMS 03/04/07/08数据集中的流量数据,进行预处理(如划分训练集、验证集、测试集,标准化),并创建相应的DataLoader对象。下面是代码的详细解读:

  1. 读取数据

    • 使用os.path.join()函数根据传入的args.dataset参数拼接数据文件路径,数据文件位于../../data/PEMS/{dataset}/{dataset}.npz目录下。
    • 使用np.load()函数加载.npz文件,其中数据以字典形式存储,键为'data'。加载后的数据是一个三维数组,第三维代表不同特征(流量、速度、占有率),这里取第一个特征(流量)作为建模数据。
    • 数据现在是一个形状为(len, node)的二维数组,其中len表示时间维度上的观测点数,node表示不同节点(如道路、路口)的数量。
  2. 划分数据集

    • 根据传入的train_lengthvalid_lengthtest_length参数,按照时间维度将数据划分为训练集、验证集和测试集。
    • 计算训练集、验证集、测试集在时间轴上的起始和结束索引,并分别切分数据。
    • 如果划分后任一数据集的长度为0,抛出异常,表示无法组织足够的数据。
  3. 数据标准化

    • 根据normtype参数选择不同的标准化方法:
      • normtype=0:分别计算训练集、验证集、测试集的均值和标准差,进行独立标准化。
      • normtype=1:使用整个数据集(训练集、验证集、测试集组合)的均值和标准差,对所有数据进行统一标准化。
      • normtype=2:仅使用训练集的均值和标准差,对训练集、验证集、测试集进行标准化。
    • 计算所需统计量(均值和标准差),并将结果存储为字典格式,如{"mean": [mean1, mean2, ...], "std": [std1, std2, ...]}
  4. 创建并返回数据集和DataLoader对象

    • 使用ForecastDataset类(未在提供的代码中定义)创建训练集、验证集、测试集对象。传入原始数据、窗口大小(window_size)、预测时域(horizon)、标准化方法(norm_method)和对应的标准化统计量。
    • 为每个数据集对象创建一个DataLoader,设置批次大小(batch_size)、是否丢弃最后一小批(drop_last)、是否打乱数据(shuffle)、并行处理的worker数(num_workers)等参数。
    • 返回测试集、训练集、验证集的DataLoader对象,以及节点数量(node_cnt)和测试集、验证集的标准化统计量。

总结:该方法完成了PEMS数据集的读取、划分、标准化,并为训练、验证、测试准备了相应的DataLoader对象,为后续模型训练和评估提供了数据支持。

构建模型

def _build_model(self):
	model = MLP(input_size=self.args.window_size, hidden_sizes=self.args.hidden_sizes, output_size=self.args.horizon)
	print(model)
	return model

训练

def train(self):
	my_optim = self._select_optimizer()
	my_lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer=my_optim, gamma=self.args.decay_rate)
	test_loader, train_loader, valid_loader, node_cnt, test_normalize_statistic, val_normalize_statistic = self._get_data()
	forecast_loss = nn.L1Loss()
	best_validate_mae = np.inf
	best_test_mae = np.inf
	validate_score_non_decrease_count = 0

	if self.args.resume:
		self.model, lr, epoch_start = load_model(self.model, self.result_file, model_name=self.args.dataset,
												 horizon=self.args.horizon)
	else:
		epoch_start = 0

	for epoch in range(epoch_start, self.args.epoch):
		lr = adjust_learning_rate(my_optim, epoch, self.args)
		epoch_start_time = time.time()
		self.model.train()
		loss_total = 0
		cnt = 0
		for i, (inputs, target) in enumerate(train_loader):
			inputs = inputs
			target = target
			self.model.zero_grad()
			forecast = self.model(inputs)
			loss = forecast_loss(forecast, target)
			cnt += 1
			loss.backward()
			my_optim.step()
			loss_total += float(loss)
		print('| end of epoch {:3d} | time: {:5.2f}s | train_total_loss {:5.4f} '.format(epoch, (
				time.time() - epoch_start_time), loss_total / cnt))

		if (epoch + 1) % self.args.exponential_decay_step == 0:
			my_lr_scheduler.step()
		if (epoch + 1) % self.args.validate_freq == 0:
			is_best_for_now = False
			print('------ validate on data: VALIDATE ------')
			valid_metrics = self.validate(self.model, valid_loader, self.args.norm_method,
												val_normalize_statistic,
												self.args.window_size, self.args.horizon,
												test=False)
			test_metrics = self.validate(self.model, test_loader, self.args.norm_method,
										 test_normalize_statistic,
										 self.args.window_size, self.args.horizon,
										 test=True)
			if best_validate_mae > valid_metrics['mape']:
				best_validate_mae = valid_metrics['mape']
				is_best_for_now = True
				validate_score_non_decrease_count = 0
				print('got best validation result:', valid_metrics, test_metrics)
			else:
				validate_score_non_decrease_count += 1
			if best_test_mae > test_metrics['mape']:
				best_test_mae = test_metrics['mape']
				print('got best test result:', test_metrics)

			# save model
			if is_best_for_now:
				save_model(epoch, lr, model=self.model, model_dir=self.result_file, model_name=self.args.dataset,
						   horizon=self.args.horizon)
				print('saved model!')
		# early stop
		if self.args.early_stop and validate_score_non_decrease_count >= self.args.early_stop_step:
			break

上述代码定义了一个名为train的方法,用于训练一个给定的模型。

  1. 初始化变量与加载模型:

    • 调用_select_optimizer方法选择优化器(optimizer)。
    • 创建指数衰减学习率调度器(learning rate scheduler),使用ExponentialLR类,指定优化器和衰减率(decay rate)。
    • 调用_get_data方法获取训练、验证、测试数据加载器、节点数量、测试数据与验证数据的标准化统计信息。
    • 定义损失函数(loss function)为L1损失(nn.L1Loss)。
    • 设置最佳验证MAE(Mean Absolute Error)和最佳测试MAE初始值为正无穷大。
    • 初始化验证分数非下降计数器(validate score non-decrease count)为0。
  2. 检查是否继续之前训练:

    • 如果args.resume参数为真(即继续之前训练),则加载模型、学习率(lr)和开始的训练轮数(epoch_start)。
    • 否则,设置epoch_start为0,从头开始训练。
  3. 主训练循环:

    • 对于每个训练轮(epoch),从epoch_startargs.epoch
      • 调整学习率(adjust_learning_rate函数)。
      • 记录当前轮开始时间。
      • 将模型设置为训练模式。
      • 初始化累计训练损失(loss_total)和样本计数器(cnt)为0。
      • 遍历训练数据加载器中的样本(enumerate(train_loader)):
        • 输入(inputs)和目标(target)保持不变。
        • 清除模型的梯度。
        • 使用模型对输入进行预测(forecast)。
        • 计算预测与目标之间的损失(loss)。
        • 更新样本计数器和累计训练损失。
        • 反向传播损失并更新模型参数。
      • 打印本训练轮的训练耗时和平均损失。
  4. 学习率调整与验证:

    • 如果当前训练轮数(epoch+1)能被指数衰减步长整除,执行学习率调度器的step方法,降低学习率。
    • 如果当前训练轮数能被验证频率整除,进行验证过程:
      • 验证模型在验证集上的表现,调用validate方法,传入模型、验证数据加载器、标准化方法、验证数据的统计信息、窗口大小、预测步长等参数,并设置test=False
      • 同样地,验证模型在测试集上的表现,此时设置test=True
      • 检查当前验证MAPE(Mean Absolute Percentage Error)是否优于历史最佳验证MAPE:
        • 如果是,则更新最佳验证MAPE、重置验证分数非下降计数器,并记录当前验证和测试结果。
      • 检查当前测试MAPE是否优于历史最佳测试MAPE,如果是,则更新最佳测试MAPE。
      • 若当前验证结果为最佳,保存模型(save_model函数),并打印提示信息。
  5. 早停条件判断:

    • 如果启用了早停(args.early_stop为真)且连续args.early_stop_step个验证周期内验证分数未下降,则跳出训练循环。

对应的_select_optimizer方法为:

def _select_optimizer(self):
	if self.args.optimizer == 'RMSProp':
		my_optim = torch.optim.RMSprop(params=self.model.parameters(), lr=self.args.lr, eps=1e-08)
	else:
		my_optim = torch.optim.Adam(params=self.model.parameters(), lr=self.args.lr, betas=(0.9, 0.999),
									weight_decay=self.args.weight_decay)
	return my_optim

验证

def validate(self, model, dataloader, normalize_method, statistic,
			 window_size, horizon, test=False):
	if test:
		print("===================Test Normal=========================")
	else:
		print("===================Validate Normal=========================")
	forecast_norm, target_norm, input_norm = self.inference(model, dataloader, window_size, horizon)
	if normalize_method and statistic:
		forecast = de_normalized(forecast_norm, normalize_method, statistic)
		target = de_normalized(target_norm, normalize_method, statistic)
	else:
		forecast, target, input = forecast_norm, target_norm, input_norm
	score = evaluate(target, forecast)
	score_final_detail = evaluate(target, forecast, by_step=True)
	print('by each step: MAPE & MAE & RMSE', score_final_detail)
	if test:
		print(f'TEST: RAW : MAE {score[1]:7.2f};MAPE {score[0]:7.2f}; RMSE {score[2]:7.2f}.')
	else:
		print(f'VAL: RAW : MAE {score[1]:7.2f};MAPE {score[0]:7.2f}; RMSE {score[2]:7.2f}.')
	return dict(mae=score[1], mape=score[0], rmse=score[2])

上述代码定义了一个名为validate的方法,用于评估模型在给定数据集上的预测性能。

  1. 判断验证或测试模式:

    • 根据test参数的值(True或False)输出不同的提示信息,表明正在进行的是测试(Test)还是验证(Validate)。
  2. 模型推理与数据标准化恢复:

    • 调用inference方法,传入模型、数据加载器、窗口大小、预测步长,得到模型对数据集的预测结果(forecast_norm)、真实目标值(target_norm)和输入数据(input_norm)。
    • 判断是否进行了数据标准化:
      • 如果指定了标准化方法(normalize_method)且提供了统计信息(statistic):
        • 使用de_normalized函数对预测结果和真实目标值进行反标准化,恢复到原始数值范围。
      • 否则,直接使用标准化后的预测结果、真实目标值和输入数据。
  3. 计算评估指标:

    • 调用evaluate函数,传入真实目标值和模型预测结果,计算各项评估指标(MAE、MAPE、RMSE)。
  4. 输出详细评估结果:

    • 调用evaluate函数,传入额外参数by_step=True,得到按预测步长分解的各项评估指标。
    • 输出按预测步长分解的评估指标(MAPE、MAE、RMSE)。
  5. 打印总体评估结果:

    • 根据test参数的值,打印相应的测试或验证结果标签。
    • 输出总体的MAE、MAPE、RMSE值,保留两位小数。
  6. 返回评估指标字典:

    • 将计算得到的MAE、MAPE、RMSE值封装到一个字典中,以maemapermse为键,对应值为值,返回该字典。

综上,validate方法通过模型推理、数据标准化恢复、计算评估指标、输出结果等步骤,对模型在给定数据集上的预测性能进行评估,并返回评估指标的字典。根据test参数的不同,该方法可用于模型的验证或测试阶段。

测试

def test(self):
	test_loader, train_loader, valid_loader, node_cnt, test_normalize_statistic, val_normalize_statistic = self._get_data()
	model, lr, epoch = load_model(self.model, self.result_file, model_name=self.args.dataset, horizon=self.args.horizon)
	return self.validate(model, test_loader, self.args.norm_method, test_normalize_statistic,
			 self.args.window_size, self.args.horizon, test=True)

实时推理

def inference(self, model, dataloader, window_size, horizon):
	forecast_set = []
	target_set = []
	input_set = []
	self.model.eval()
	with torch.no_grad():
		for i, (inputs, target) in enumerate(dataloader):
			inputs = inputs
			target = target
			input_set.append(inputs.detach().cpu().numpy())
			step = 0
			forecast_steps = np.zeros([inputs.size()[0], horizon, inputs.size()[2]], dtype=np.float64)
			# 适配迭代预测和非迭代预测
			while step < horizon:
				forecast_result = model(inputs)
				len_model_output = forecast_result.size()[1]
				if len_model_output == 0:
					raise Exception('Get blank inference result')
				inputs[:, :window_size - len_model_output, :] = inputs[:, len_model_output:window_size,
																:].clone()
				inputs[:, window_size - len_model_output:, :] = forecast_result.clone()
				forecast_steps[:, step:min(horizon - step, len_model_output) + step, :] = \
					forecast_result[:, :min(horizon - step, len_model_output), :].detach().cpu().numpy()

				step += min(horizon - step, len_model_output)
			forecast_set.append(forecast_steps)
			target_set.append(target.detach().cpu().numpy())

	return np.concatenate(forecast_set, axis=0), np.concatenate(target_set, axis=0), np.concatenate(input_set,
																									axis=0)

上述代码定义了一个名为inference的方法,用于执行模型在给定数据集上的推理(预测)。

  1. 初始化变量:

    • 创建三个空列表forecast_settarget_setinput_set,分别用于存储模型预测结果、真实目标值和输入数据。
  2. 设置模型为评估模式并禁用梯度计算:

    • 将模型设为评估模式(self.model.eval()),避免在推理过程中进行不必要的前向传播计算。
    • 使用with torch.no_grad():语句块,确保在该块内的计算不记录梯度,节省内存并提高推理速度。
  3. 遍历数据加载器中的样本:

    • 使用enumerate(dataloader)遍历数据加载器中的每个样本,包括输入数据(inputs)和目标值(target)。
    • 将当前样本的输入数据和目标值分别添加到input_settarget_set列表中。
  4. 进行模型推理:

    • 初始化变量step为0,用于记录当前已预测的时间步数。
    • 初始化forecast_steps数组,用于存储当前样本的所有预测结果,形状为(batch_size, horizon, input_feature_dim),其中batch_size为样本批次大小,horizon为预测步长,input_feature_dim为输入特征维度。
    • 迭代预测逻辑:
      • 使用模型对当前输入数据进行预测,得到forecast_result
      • 检查模型输出长度(len_model_output),若为0则抛出异常。
      • 更新输入数据(inputs):将较旧的历史数据移至左侧,并将最新预测结果移至右侧,以准备下一轮预测。
      • 将模型当前输出的预测结果按需填充到forecast_steps数组中,确保每个样本的预测结果按时间步正确排列。
      • 更新step变量,累加已预测的时间步数。
  5. 处理完所有时间步后,将当前样本的预测结果、真实目标值和输入数据添加到相应列表中。

  6. 合并所有样本的预测结果、真实目标值和输入数据:

    • 使用np.concatenate函数,将forecast_settarget_setinput_set列表中的数据沿第一个维度(样本维度)拼接成一个完整的数组。
  7. 返回结果:

    • 返回合并后的预测结果数组、真实目标值数组和输入数据数组。

综上,inference方法通过遍历数据加载器、执行模型推理并收集预测结果、真实目标值和输入数据,最终返回这些数据的完整数组。该方法支持迭代预测(递归使用模型输出作为下一轮输入的一部分)和非迭代预测(单次模型预测即可得到全部结果),适用于不同类型的预测模型。

模型效果

最后,将我们构建好的MLP网络在PEMS数据集进行了准确性测试,算法测试的相关配置如下:

torch.manual_seed(4321)  # reproducible
parser = argparse.ArgumentParser(description='MLP on pems datasets')
### -------  dataset settings --------------
parser.add_argument('--dataset', type=str, default='PEMS08',
                    choices=['PEMS03', 'PEMS04', 'PEMS07', 'PEMS08'])  # sometimes use: PeMS08
parser.add_argument('--norm_method', type=str, default='z_score')
parser.add_argument('--normtype', type=int, default=0)
### -------  input/output length settings --------------
parser.add_argument('--window_size', type=int, default=12)
parser.add_argument('--horizon', type=int, default=12)
parser.add_argument('--train_length', type=float, default=6)
parser.add_argument('--valid_length', type=float, default=2)
parser.add_argument('--test_length', type=float, default=2)
### -------  training settings --------------
parser.add_argument('--use_gpu', type=bool, default=False)
parser.add_argument('--train', type=bool, default=True)
parser.add_argument('--resume', type=bool, default=False)
parser.add_argument('--evaluate', type=bool, default=False)
parser.add_argument('--finetune', type=bool, default=False)
parser.add_argument('--validate_freq', type=int, default=1)
parser.add_argument('--epoch', type=int, default=80)
parser.add_argument('--lr', type=float, default=0.001)
parser.add_argument('--batch_size', type=int, default=8)
parser.add_argument('--optimizer', type=str, default='N')  #
parser.add_argument('--early_stop', type=bool, default=True)
parser.add_argument('--early_stop_step', type=int, default=5)
parser.add_argument('--exponential_decay_step', type=int, default=5)
parser.add_argument('--decay_rate', type=float, default=0.5)
parser.add_argument('--lradj', type=int, default=1, help='adjust learning rate')
parser.add_argument('--weight_decay', type=float, default=1e-5)
parser.add_argument('--model_name', type=str, default='MLP')
### -------  model settings --------------
parser.add_argument('--hidden_sizes', type=list, default=[24, 36, 24])
args = parser.parse_args()

结果如下:

数据集MAEMAPERMSE
PEMS0319.72260.18440331.5506
PEMS0427.24220.16668742.5976
PEMS0728.80020.12248444.1366
PEMS0820.85040.12366832.5307

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/567682.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++ :设计模式实现

文章目录 原则单一职责原则开闭原则依赖倒置原则接口隔离原则里氏替换原则 设计模式单例模式观察者模式策略模式代理模式 原则 单一职责原则 定义&#xff1a; 即一个类只负责一项职责 问题&#xff1a; 类 T 负责两个不同的职责&#xff1a;职责 P1&#xff0c;职责 P2。当…

爆破、批量PoC扫描工具 -- POC-T

免责声明 请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;作者不为此承担任何责任。工具来自网络&#xff0c;安全性自测&#xff0c;如有侵权请联系删除。…

【java】27:java绘图

坐标体系 - 介绍&#xff1a; 下图说明了Java坐标系。坐标原点位于左上角&#xff0c;以像素为单位。在Java坐标系中&#xff0c;第一个是x坐标&#xff0c;表示当前位置为水平方向&#xff0c;距离坐标原点个像素&#xff1b;第二个是y坐标&#xff0c;表示当前位置为垂直方向…

视频不够清晰怎么办?教你几种有效方法

在我们日常生活中&#xff0c;有时候我们会遇到不清晰的视频&#xff0c;这给我们带来了很多不便。那么&#xff0c;怎么将不清晰的视频变清晰呢&#xff1f;本文将为您介绍一些常用的软件工具&#xff0c;帮助您提升视频的清晰度。 方法一&#xff1a;使用AI技术 AI技术可以通…

springboot-异步、定时、邮件任务

目录 一&#xff0c;前言 二&#xff0c;异步 2.1&#xff0c;案例&#xff1a; 1&#xff0c;首先创建一个service&#xff1a; 2&#xff0c;Controller: ① 想办法告诉spring我们的异步方法是异步的&#xff0c;所以要在方法上添加注解 Async ②去springboot主程序中开…

【Java--数据结构】模拟实现ArrayList

欢迎关注个人主页&#xff1a;逸狼 创造不易&#xff0c;可以点点赞吗~ 如有错误&#xff0c;欢迎指出~ 目录 LIst 顺序表ArrayList 顺序表优点 IList接口 ArrayList中定义要操作的数组 在MyArrayList中 重写接口方法 新增元素 在指定位置插入元素 pos不合法异常 判断和查找元素…

Bentley二次开发教程19-文件及模型管理-参照操作

参照操作 模型参照&#xff08;*.dgn&#xff09; 当我们需要与同专业&#xff0c;或者跨专业协同配合时&#xff0c;总是无可避免的需要参照他人的模型。若想通过编程的方式提前将参照模型与指定场景绑定起来&#xff0c;那么就需要掌握模型参照的方法。关于该方法大致的使用…

python创建线程和结束线程

&#x1f47d;发现宝藏 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 python创建线程和结束线程 在 Python 中&#xff0c;线程是一种轻量级的执行单元&#xff…

C++-DAY1

思维导图 有以下定义&#xff0c;说明哪些量可以改变哪些不可以改变&#xff1f; const char *p; const (char *) p; char *const p; const char* const p; char const *p; (char *) const p; char const* const p; const char *p&#xff1a;指针 p 所指向的内容不可改…

【C++庖丁解牛】C++11---右值引用和移动语义

&#x1f341;你好&#xff0c;我是 RO-BERRY &#x1f4d7; 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f384;感谢你的陪伴与支持 &#xff0c;故事既有了开头&#xff0c;就要画上一个完美的句号&#xff0c;让我们一起加油 目录 1 左值引用和右值引用2 左…

是德软件89600 RFID使用笔记

文章目录 1、进入RFID软件:2、RFID软件解调设置项3、如何查看一段指令数据本文是日常工作的笔记分享。 lauch VSA(矢量频谱分析)后会出现以下界面: 当然这是因为频谱仪的输入有信号才显示如下: 否则就显示频谱仪的噪底 这里的设置过程同一般的频谱仪,比如中心频率、span…

逆向修改app就可以游戏充值到账?

hello ,大家好, 现在市场仍然流行着非常多的传奇类游戏私服或者其他类型的游戏私服,随着私服越来越多(很多并不合法),越来越多的人加入了破解,逆向修改,或者代充的队伍并从中获利。这里我给大家分享一下这些做代充的常规的做法,以及大家作为游戏服务器如何避坑做强校验…

ApiHug 的初心-ApiHug101

视频 秒懂 ApiHug -019 HOPE &#x1f525; H.O.P.E.: Help other people excellent &#x1f49d; 是这个项目最初的初心 &#x1f917; ApiHug {Postman|Swagger|Api...} 快↑ 准√ 省↓ &#x1f3e0; gitee github search ApiHug ApiHug &#x1f917; ApiHug {Post…

数据结构(学习笔记)王道

一、绪论 1.1 数据结构的基本概念 数据&#xff1a;是信息的载体&#xff0c;是描述客观事物属性的数、字符以及所有输入到计算机中并被计算机程序识别和处理的符号的集合。&#xff08;计算机程序加工的原料&#xff09;数据元素&#xff1a;数据的基本单位&#xff0c;由若干…

相关电路整理(工程)相关FOC电路整理

1. 基于STM32G4的FOC电机驱动学习板 1.1 防反接电路 电源正确接入时 电流从 VIN 端流向负载&#xff0c;经由 Q3(NMOS) 通向地&#xff08;GND&#xff09;。在上电瞬间&#xff0c;由于 MOS 管的体二极管效应&#xff0c;地回路通过体二极管接通。接下来&#xff0c;由于 Vgs…

【sping】在logback-spring.xml 获取项目名称

在日志文件中我们想根据spring.application.name 创建出的文件夹。 也不想死在XML文件中。 application.yml spring:application:name: my-demo logback-spring.xml <springProperty name"application_name" scope"context" source"spring.app…

Unity类银河恶魔城学习记录13-4 p145 Save Skill Tree源代码

Alex教程每一P的教程原代码加上我自己的理解初步理解写的注释&#xff0c;可供学习Alex教程的人参考 此代码仅为较上一P有所改变的代码 【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili GameData.cs using System.Collections; using System.Collections.Generic…

网络带宽相关

1.tcp重传率计算 watch -n 5 “cat /proc/net/snmp” 如下博客所讲 https://blog.csdn.net/michaelwoshi/article/details/121189743 2.iperf测试网络带宽 #客户端 #tcp iperf -c 服务端ip -P 4 -b 200M #udp iperf -c 服务端ip -u -P 4 -b 1000M -l 10K #服务端 iperf -s

云架构(五)BBF模式

BFF模式&#xff08;Backends for Frontends pattern&#xff09;- https://learn.microsoft.com/en-us/azure/architecture/patterns/backends-for-frontends。 创建单独的后台服务用以提供给特定的前端或者接口。当你希望避免为多个接口定制单独的后台时&#xff0c;此模…

隋总分享:Temu选品师算不算是蓝海项目?

在当今日新月异的互联网经济浪潮中&#xff0c;跨境电商正成为一股不可忽视的力量。最近&#xff0c;网红隋总对Temu选品师这一职业进行了深入介绍&#xff0c;引发了广泛关注。那么&#xff0c;Temu选品师是否真的可以视为一个蓝海项目呢?本文将对此进行一番细致的探讨。 首先…
最新文章