ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [GluonTS 3탄] GluonTS로 시계열 데이터 예측해보기 - 심화편
    PROGRAMMING/python 2024. 9. 20. 17:31

    ※ 모든 내용은 GluonTS 홈페이지에서 번역하여 가져온 내용입니다. 출처는 GluonTS 홈페이지에 있음을 알려드립니다. 번역은 gpt가 했고, 추가로 필요한 부분은 공부해서 채워넣었습니다:)

     

    https://ts.gluon.ai/stable/index.html

     

    GluonTS documentation

    Next Installation

    ts.gluon.ai

    %matplotlib inline
    import mxnet as mx
    from mxnet import gluon
    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt
    import json
    import os
    from itertools import islice
    from pathlib import Path
    mx.random.seed(0)
    np.random.seed(0)

     

    Datasets

    GluonTS는 사용자에게 다음 세 가지 옵션의 데이터셋을 제공한다.

    • GluonTS에서 제공하는 데이터셋 사용
    • GluonTS를 사용해 인공 데이터셋 생성
    • 사용자의 데이터셋을 GluonTS 형식으로 변환

    여기서 데이터셋반복 가능한 시간 시리즈 항목들의 모음으로 target 필드start 필드를 반드시 가지고 있어야 한다.

    • 반복 가능(iterable) : 반복적으로 데이터 추출이 가능해야 함
    • 각 항목에 target start field가 있어야 함 : target 예측하려는 시간 시리즈의 값이고, start 그 시간 시리즈가 시작하는 시점을 의미한다. 
    from gluonts.dataset.repository import get_dataset, dataset_names
    from gluonts.dataset.util import to_pandas
    print(f"Available datasets: {dataset_names}")

    기존에 존재하는 데이터셋을 다운받기 위해서는 get_dataset을 호출해 데이터셋을 다운 받으면 된다.

    dataset = get_dataset("m4_hourly")

     

    What is in a dataset?

    GluonTS의 데이터셋 세 가지 주요 구성 요소로 이루어져 있다.

    1. train : 모델 학습에 사용되는 데이터셋으로 각 항목은 하나의 시간 시리즈를 나타낸다.
    2. test : 추론에 사용되는 데이터셋으로 학습 데이터셋의 끝 부분에 훈련 중에 보지 못한 예측창(window) 포함되어 있다.
    3. metadata : 데이터셋의 주파수(시간간격), 예측에 권장되는 창의 길이 등 메타데이터를 포함한다. 

    우선 훈련 데이터 셋의 첫번째 항목을 살펴보자. 각 항목에는 최소한 target 필드와 start 필드가 포함되어야 하며, 테스트 항목의 target에는 예측 창 길이동일한 추가 구간이 포함되어 있어야 한다.

    # 훈련 데이터셋에서 첫 번째 시간 시리즈 가져오기
    train_entry = next(iter(dataset.train))
    train_entry.keys()
    >>
    dict_keys(['target', 'start', 'feat_static_cat', 'item_id'])

     

    테스트 데이터 셋의 첫번째 항목도 훈련 데이터 셋의 첫번째 항목과 비슷한 방식임을 확인할 수 있다.

    # 테스트 데이터셋에서 첫 번째 시간 시리즈 가져오기
    test_entry = next(iter(dataset.test))
    test_entry.keys()
    >>
    dict_keys(['target', 'start', 'feat_static_cat', 'item_id'])

    또한 target 필드에는 예측 창(prediction_length)동일한 길이의 추가 구간마지막에 포함되어 있을 것으로 예상된다. 이를 아래와 같이 시각화할 수 있다. 

    test_series = to_pandas(test_entry)
    train_series = to_pandas(train_entry)
    
    fig, ax = plt.subplots(2, 1, sharex=True, sharey=True, figsize=(10, 7))
    
    train_series.plot(ax=ax[0])
    ax[0].grid(which="both")
    ax[0].legend(["train series"], loc="upper left")
    
    test_series.plot(ax=ax[1])
    ax[1].axvline(train_series.index[-1], color="r")  # end of train dataset
    ax[1].grid(which="both")
    ax[1].legend(["test series", "end of train series"], loc="upper left")
    
    plt.show()

    print(f"Length of forecasting window in test dataset: {len(test_series) - len(train_series)}")
    print(f"Recommended prediction horizon: {dataset.metadata.prediction_length}")
    print(f"Frequency of the time series: {dataset.metadata.freq}")

     

    Create artificial datasets

    복잡한 임의의 시계열 데이터셋은 ComplexSeasonalTimeSeries 모듈을 통해 만들 수 있다. 

    from gluonts.dataset.artificial import ComplexSeasonalTimeSeries
    from gluonts.dataset.common import ListDataset
    artificial_dataset = ComplexSeasonalTimeSeries(
        num_series=10,
        prediction_length=21,
        freq_str="H",
        length_low=30,
        length_high=200,
        min_val=-10000,
        max_val=10000,
        is_integer=False,
        proportion_missing_values=0,
        is_noise=True,
        is_scale=True,
        percentage_unique_timestamps=1,
        is_out_of_bounds_date=True,
    )
    print(f"prediction length: {artificial_dataset.metadata.prediction_length}")
    print(f"frequency: {artificial_dataset.metadata.freq}")
    >>
    prediction length: 21
    frequency: H

    여기서 생성된 인공 데이터셋딕셔너리의 리스트로 구성되어 있다. 각 딕셔너리하나의 시간 시리즈에 해당하며, 필수 필드를 포함해야 한다. 

    print(f"type of train dataset: {type(artificial_dataset.train)}")
    print(f"train dataset fields: {artificial_dataset.train[0].keys()}")
    print(f"type of test dataset: {type(artificial_dataset.test)}")
    print(f"test dataset fields: {artificial_dataset.test[0].keys()}")
    >>
    type of train dataset: <class 'list'>
    train dataset fields: dict_keys(['start', 'target', 'item_id'])
    type of test dataset: <class 'list'>
    test dataset fields: dict_keys(['start', 'target', 'item_id'])

    인공적으로 생성된 데이터 셋(list of dictonaries)를 사용하려면, 이를 ListDataset 객체로 변환해야 한다. ListDataset은 GluonTS에서 사용되는 형식으로 시간 시리즈 데이터를 다루는 데 적합한 데이터 구조다.

    train_ds = ListDataset(artificial_dataset.train, freq=artificial_dataset.metadata.freq)
    test_ds = ListDataset(artificial_dataset.test, freq=artificial_dataset.metadata.freq)
    train_entry = next(iter(train_ds))
    train_entry.keys()
    >> 
    dict_keys(['start', 'target', 'item_id'])
    test_entry = next(iter(test_ds))
    test_entry.keys()
    >> 
    dict_keys(['start', 'target', 'item_id'])

     

    test_series = to_pandas(test_entry)
    train_series = to_pandas(train_entry)
    
    fig, ax = plt.subplots(2, 1, sharex=True, sharey=True, figsize=(10, 7))
    
    train_series.plot(ax=ax[0])
    ax[0].grid(which="both")
    ax[0].legend(["train series"], loc="upper left")
    
    test_series.plot(ax=ax[1])
    ax[1].axvline(train_series.index[-1], color="r")  # end of train dataset
    ax[1].grid(which="both")
    ax[1].legend(["test series", "end of train series"], loc="upper left")
    
    plt.show()

     

    Use your time series and features

    이제 사용자 정의 데이터셋을 GluonTS에 적합한 형식으로 변환하는 방법을 살펴보자. 앞서 언급한 것처럼, 데이터셋에는 최소한 target 필드(예측할 값) start 필드(시작 날짜)가 있어야 한다. 하지만, 그 외에도 다양한 필드를 추가할 수 있다.

     

    from gluonts.dataset.field_names import FieldName
    [
        f"FieldName.{k} = '{v}'"
        for k, v in FieldName.__dict__.items()
        if not k.startswith("_")
    ]
    >>
    ["FieldName.ITEM_ID = 'item_id'",
     "FieldName.INFO = 'info'",
     "FieldName.START = 'start'",
     "FieldName.TARGET = 'target'",
     "FieldName.FEAT_STATIC_CAT = 'feat_static_cat'",
     "FieldName.FEAT_STATIC_REAL = 'feat_static_real'",
     "FieldName.FEAT_DYNAMIC_CAT = 'feat_dynamic_cat'",
     "FieldName.FEAT_DYNAMIC_REAL = 'feat_dynamic_real'",
     "FieldName.PAST_FEAT_DYNAMIC_CAT = 'past_feat_dynamic_cat'",
     "FieldName.PAST_FEAT_DYNAMIC_REAL = 'past_feat_dynamic_real'",
     "FieldName.FEAT_DYNAMIC_REAL_LEGACY = 'dynamic_feat'",
     "FieldName.FEAT_DYNAMIC = 'feat_dynamic'",
     "FieldName.PAST_FEAT_DYNAMIC = 'past_feat_dynamic'",
     "FieldName.FEAT_TIME = 'time_feat'",
     "FieldName.FEAT_CONST = 'feat_dynamic_const'",
     "FieldName.FEAT_AGE = 'feat_dynamic_age'",
     "FieldName.OBSERVED_VALUES = 'observed_values'",
     "FieldName.IS_PAD = 'is_pad'",
     "FieldName.FORECAST_START = 'forecast_start'",
     "FieldName.TARGET_DIM_INDICATOR = 'target_dimension_indicator'"]

     

    필드는 필수 필드, 선택적 필드, Transformtaion에 의해 추가될 수 있는 필드 이렇게 총 3가지로 나뉜다. 

     

    필수(Required) 필드

    • start : 시간 시리즈의 시작 날짜
    • target : 시간 시리즈의 실제 값들

    선택적(Optional) 필드

    • feat_static_cat : 시간에 따라 변하지 않는 정적 범주형 특징. 피처의 개수와 동일한 차원의 리스트로 제공
    • feat_static_real : 시간에 따라 변하지 않는 정적 실수형 특징. 피처의 개수와 동일한 차원의 리스트로 제공
    • feat_dynamic_cat : 시간에 따라 변하는 동적 범주형 특징. (피처 개수, target 길이)와 같은 형태의 배열로 제공
    • feat_dynamic_real: 시간에 따라 변하는 동적 실수형 특징. (피처 개수, target 길이)와 같은 형태의 배열로 제공

    Transformation에 의해 추가되는 필드(Added by Transformation)

    • time_feat : 월이나 일과 같은 시간 관련 특징
    • feat_dynamic_const : 시간 축을 따라 일정한 값을 가지는 특징을 확장
    • feat_dynamic_age : 과거의 타임스탬프에서 현재 타임스탬프로 다가올수록 값이 커지는 연령(feature) 특징
    • observed_values : 관측된 값에 대한 indicator. 값이 관측된 경우 1, 누락된 경우 0
    • is_pad : 시계열 길이가 충분하지 않으면 패딩된 타임스텝을 표시
    • forecast_start : 예측이 시작된 날짜

    * is_pad 추가 설명 : 시간 시리즈의 길이가 모델이 필요로 하는 최소 길이에 미치지 못할 때, 모델에서 패딩추가하는데, 이 때 패딩된 부분을 is_pad 필드로 구분한다. 

     

    (예시) 

    target, feat_dynamic_real(the target value one period earlier),feat_static_cat(sinusoid type)을 사용하여 사용자 정의 데이터 셋을 만들 수 있다. 

    def create_dataset(num_series, num_steps, period=24, mu=1, sigma=0.3):
        # create target: noise + pattern
        # noise
        noise = np.random.normal(mu, sigma, size=(num_series, num_steps))
    
        # pattern - sinusoid with different phase
        sin_minusPi_Pi = np.sin(
            np.tile(np.linspace(-np.pi, np.pi, period), int(num_steps / period))
        )
        sin_Zero_2Pi = np.sin(
            np.tile(np.linspace(0, 2 * np.pi, 24), int(num_steps / period))
        )
    
        pattern = np.concatenate(
            (
                np.tile(sin_minusPi_Pi.reshape(1, -1), (int(np.ceil(num_series / 2)), 1)),
                np.tile(sin_Zero_2Pi.reshape(1, -1), (int(np.floor(num_series / 2)), 1)),
            ),
            axis=0,
        )
    
        target = noise + pattern
    
        # create time features: use target one period earlier, append with zeros
        feat_dynamic_real = np.concatenate(
            (np.zeros((num_series, period)), target[:, :-period]), axis=1
        )
    
        # create categorical static feats: use the sinusoid type as a categorical feature
        feat_static_cat = np.concatenate(
            (
                np.zeros(int(np.ceil(num_series / 2))),
                np.ones(int(np.floor(num_series / 2))),
            ),
            axis=0,
        )
    
        return target, feat_dynamic_real, feat_static_cat

    np.tile에 대한 설명⬇️

    더보기

    np.tile : 주어진 배열을 여러번 복사하여 타일처럼 쌓는 역할

    np.tile(A, reps)

     

    • A: 반복할 배열.
    • reps: 각 축(axis)마다 배열을 반복할 횟수를 지정하는 정수 또는 정수 배열.
    a = np.array([[1, 2], [3, 4]])
    tiled_array = np.tile(a, (2, 3))  # 첫 번째 축은 2번, 두 번째 축은 3번 반복
    print(tiled_array)
    >>
    [[1 2 1 2 1 2]
     [3 4 3 4 3 4]
     [1 2 1 2 1 2]
     [3 4 3 4 3 4]]

     

    # define the parameters of the dataset
    custom_ds_metadata = {
        "num_series": 100,
        "num_steps": 24 * 7,
        "prediction_length": 24,
        "freq": "1H",
        "start": [pd.Period("01-01-2019", freq="1H") for _ in range(100)], 
        # 시계열 데이터의 시작 시간을 정의하는 코드
        # 이 부분은 100개의 시계열 데이터 각각에 대해 동일한 시작 시간을 할당
    }
    data_out = create_dataset(
        custom_ds_metadata["num_series"],
        custom_ds_metadata["num_steps"],
        custom_ds_metadata["prediction_length"],
    )
    
    target, feat_dynamic_real, feat_static_cat = data_out

    이제 필수 필드를 채워 훈련 데이터셋테스트 데이터셋을 쉽게 생성할 수 있다.

     

    중요한 점은 훈련 데이터셋을 만들 때는 마지막 예측 창(window)을 제외해야 한다는 점이다. 이는 모델이 학습하는 동안 미래 데이터를 보지 않도록 하기 위함이다.

    train_ds = ListDataset(
        [
            {	# 딕셔너리 생성
                FieldName.TARGET: target,
                FieldName.START: start,
                FieldName.FEAT_DYNAMIC_REAL: [fdr],
                FieldName.FEAT_STATIC_CAT: [fsc],
            }
            for (target, start, fdr, fsc) in zip( # 반복문에서 값을 하나씩 가져옴
                target[:, : -custom_ds_metadata["prediction_length"]],
                custom_ds_metadata["start"],
                feat_dynamic_real[:, : -custom_ds_metadata["prediction_length"]],
                feat_static_cat,
            )
        ],
        freq=custom_ds_metadata["freq"],
    )

    * FiledName : GluonTS에서 필드 이름을 정의하는데 사용하는 표준적인 방식으로 FieldName 객체시간 시리즈 데이터에서 자주 사용되는 필드의 이름을 미리 정의해주고, 이 필드 이름을 사용하여 코드의 가독성과 일관성을 높인다. 

    더보기

    ☑️ FEAT_DYNAMIC_REAL : [fdr]로 []안에 표현하는 이유

    FieldName.TARGET : target - 단일 배열로 나타나는 타임 시리즈 데이터

    FiledName.START : start - 2차원 배열이기 때문에 list로 감싸서 표현 (여러 개의 피처가 있을 수 있기 때문)

     

    ☑️  {} for (target, start, fdr, fsc) in zip(~) 

    : 리스트 컴프리헨션을 사용하여 파이썬 딕셔너리를 반복적으로 생성하는 방식. 여러 개의 딕셔너리를 만들어 하나의 리스트로 반환하는 작업을 수행하며, 각 딕셔너리는 target, start, feat_dynamic_real, feat_static_cat 필드를 가지고 있다. 

     

    ☑️ zip() 함수 : 여러 개의 반복 가능한 객체들을 동시에 묶어서 하나의 튜플 형태로 반환하는 함수

    예시) 
    target = np.array([[1, 2, 3], [4, 5, 6]])
    start = ["2020-01-01", "2020-01-02"]
    feat_dynamic_real = np.array([[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]])
    feat_static_cat = [0, 1]
    custom_ds_metadata = {"prediction_length": 1, "freq": "D"}
    
    코드 실행 시 아래와 같은 리스트 생성
    [
        {
            FieldName.TARGET: [1, 2],  # 마지막 값을 제외한 target
            FieldName.START: "2020-01-01",  # 시작 날짜
            FieldName.FEAT_DYNAMIC_REAL: [[0.1, 0.2]],  # 마지막 값을 제외한 동적 피처
            FieldName.FEAT_STATIC_CAT: [0],  # 정적 범주형 피처
        },
        {
            FieldName.TARGET: [4, 5],
            FieldName.START: "2020-01-02",
            FieldName.FEAT_DYNAMIC_REAL: [[0.4, 0.5]],
            FieldName.FEAT_STATIC_CAT: [1],
        }
    ]
    test_ds = ListDataset(
        [
            {
                FieldName.TARGET: target,
                FieldName.START: start,
                FieldName.FEAT_DYNAMIC_REAL: [fdr],
                FieldName.FEAT_STATIC_CAT: [fsc],
            }
            for (target, start, fdr, fsc) in zip(
                target, custom_ds_metadata["start"], feat_dynamic_real, feat_static_cat
            )
        ],
        freq=custom_ds_metadata["freq"],
    )

    이제 훈련 데이터셋테스트 데이터 셋에서 target, start, feat_dynamic_real, feat_static_cat의 필드들이 포함되어 있는 것을 확인할 수 있다.

    train_entry = next(iter(train_ds))
    train_entry.keys()
    >>dict_keys(['target', 'start', 'feat_dynamic_real', 'feat_static_cat'])
    test_entry = next(iter(test_ds))
    test_entry.keys()
    >>dict_keys(['target', 'start', 'feat_dynamic_real', 'feat_static_cat'])
    test_series = to_pandas(test_entry)
    train_series = to_pandas(train_entry)
    
    fig, ax = plt.subplots(2, 1, sharex=True, sharey=True, figsize=(10, 7))
    
    train_series.plot(ax=ax[0])
    ax[0].grid(which="both")
    ax[0].legend(["train series"], loc="upper left")
    
    test_series.plot(ax=ax[1])
    ax[1].axvline(train_series.index[-1], color="r")  # end of train dataset
    ax[1].grid(which="both")
    ax[1].legend(["test series", "end of train series"], loc="upper left")
    
    plt.show()

     

     

    아래 튜토리얼에서는 custom dataset을 이용해 실습할 예정이다.

    Transformations

    Define a transformation

    Transformation(변환)의 주요 사용 사례피처 처리다. 예를 들어, 휴일 피처를 추가하거나, 훈련 및 추론 중에 데이터셋을 적절한 윈도우로 나누는 방식 등을 정의하는데 사용된다.

     

    일반적으로, 변환 은 데이터셋의 항목들을 반복 가능한 형태로 받아서 또 다른 반복 가능한 컬렉션으로 변환한다. 이 과정에서 더 많은 필드를 추가할 수 있다. 변환은 데이터셋에 대해 모델에 유용한 여러 "action(액션)"을 정의하는 방식으로 이루어진다. 이러한 액션들은 주로 추가적인 피처생성하거나 기존 피처변환한다. 다음은 변환의 예시다.

    •  AddobservedValuesIndicator : 데이터셋에 observed_values 필드를 생성한다. 즉, 관측된 값은 1, 누락된 값은 0으로 표시하는 피처를 추가한다. 
    • AddAgeFeature : 데이터셋에 feat_dynamic_age 필드를 생성한다. 이 피처는 과거로 갈수록 작은 값을 가지며, 현재 시점에 다가올수록 값이 점차 증가하는 특성을 가진다.
    • InstanceSplitter : 데이터셋을 훈련, 검증, 예측 시 사용되는 윈도우로 나누는 방법 정의한다(is_pad_field, train_sampler, time_series_fields로 구성)

    * is_pad_field : 시계열이 패딩되었는지를 나타내는 표시자. 길이가 충분하지 않으면 패딩이 추가되었는지 여부를 나타냄

    * train_sampler : 훈련 중 윈도우가 어떻게 잘리고 샘플링되는지 정의

    * time_series_fields : 타겟과 동일한 방식으로 나눠야 하는 시간 의존 피처들 포함

    from gluonts.transform import (
        AddAgeFeature,
        AddObservedValuesIndicator,
        Chain,
        ExpectedNumInstanceSampler,
        InstanceSplitter,
        SetFieldIfNotPresent,
    )

    ExpectedNumInstanceSampler, SetFieldIfNotPresent에 대한 설명 ⬇️

    더보기

    * ExpectedNumInstanceSampler : 샘플링을 처리하는 클래스로 시계열 데이터를 모델에 훈련시키기 전에, 데이터를 적절한 크기의 윈도우로 나누어야 할 때 샘플을 어떻게 추출할지를 결정한다. 이 클래스는 각 데이터 포인트에서 기대되는 샘플링 횟수를 기반으로 샘플을 추출한다. 주로 InstanceSplitter와 함께 사용된다.

     

    역할 : 주어진 시계열에서 훈련 윈도우(training window)를 샘플링하여, 네트워크에 입력될 데이터 인스턴스를 생성한다. 이 샘플러는 샘플링의 기대 횟수를 정의하며, 샘플링할 인스턴스의 수를 조절한다.

    * num_instances : 샘플링할 인스턴스의 기대 수

    * min_feature : 샘플링할 때 필요한 최소한의 미래 값

    sampler = ExpectedNumInstanceSampler(num_instances=1, min_future=24)

    >> 한 번의 샘플을 추출하고, 미래에 필요한 데이터가 최소 24개 있어야 한다는 제약을 준다. 

     

    * SetFieldIfNotPresent : 시계열 데이터에 필드가 없을 경우, 기본값을 설정해주는 변환 클래스다. 데이터셋이 충분히 준비되지 않을 경우 필드가 누락될 수 있는데, 이 클래스는 해당 필드가 없을 경우 기본값을 설정해준다.

     

    역할 : 필드가 존재하지 않을 때 해당 필드를 추가하고 기본 값을 설정해준다. 이를 통해 모델이 데이터셋에서 필드가 없더라도 동작할 수 있도록 한다.

    * field_name : 추가하거나 확인할 필드의 이름

    * value : 필드가 없을 경우 설정할 기본값

    transform = SetFieldIfNotPresent(field_name="feat_dynamic_real", value=[0.0])

    >> feat_dynamic_real 필드가 없을 경우, 해당 필드를 0.0으로 채워넣는 기본값을 설정한다.

    def create_transformation(freq, context_length, prediction_length):
        return Chain(
            [
            	# 관측된 값 표시 - observed_values 필드 추가
                AddObservedValuesIndicator(
                    target_field=FieldName.TARGET,
                    output_field=FieldName.OBSERVED_VALUES,
                ),
                # 나이 피처 추가 
                AddAgeFeature(
                    target_field=FieldName.TARGET,
                    output_field=FieldName.FEAT_AGE,
                    pred_length=prediction_length,
                    log_scale=True,
                ),
                # 데이터 분할 - 타임 시리즈 데이터를 훈련 및 예측에 맞게
                # 과거 구간(past_length)와 미래 구간(future_length)로 분할
                # 예측할 타임 스텝(prediction_length)이 포함되도록 샘플링 진행
                # 패딩 여부와 동적 피처들도 함게 분할
                InstanceSplitter(
                    target_field=FieldName.TARGET,
                    is_pad_field=FieldName.IS_PAD,
                    start_field=FieldName.START,
                    forecast_start_field=FieldName.FORECAST_START,
                    instance_sampler=ExpectedNumInstanceSampler(
                        num_instances=1,
                        min_future=prediction_length,
                    ),
                    past_length=context_length,
                    future_length=prediction_length,
                    time_series_fields=[
                        FieldName.FEAT_AGE,
                        FieldName.FEAT_DYNAMIC_REAL,
                        FieldName.OBSERVED_VALUES,
                    ],
                ),
            ]
        )
    더보기

    creat_transformation : 데이터셋을 변환하는 파이프라인을 구성하는 함수

    GluonTS에서는 데이터를 전처리하거나 훈련 및 추론을 위한 데이터 윈도우를 생성할 때 이러한 변환을 사용

     

    * Chain : 여러 개의 변환 작업을 연결(chain)하여 순차적으로 실행하는 구조. 리스트 안에 정의된 변환들이 차례대로 적용된다.

     

    * AddobservedValuesIndicator : 목표 필드(target_field)가 관측되었는지 여부를 나타내는 observed_values라는 새로운 필드를 생성한다. target_field 값이 관측되었으면 1, 누락되었으면 0으로 설정된다. 

    - FieldName.TARGET이 시간 시리즈의 실제 목표값(target 필드)을 나타내고, 그에 따라 FieldName.OBSERVED_VALUES 필드를 새로 추가하게 된다. 

     

    * AddAgeFeature : 시간시리즈에 age feature를 추가한다. 이 피처는 시간이 지날수록 값이 증가하는 특징을 가지며, 모델이 시계열의 과거와 현재 상태를 구별할 수 있도록 한다.

    - FieldName.TARGET을 기준으로 계산되며, 새로운 필드로 FieldName.FEAT_AGE가 추가된다.

    - prediction_length예측할 구간의 길이피처 생성 시 미래구간의 값을 고려하는데 사용된다.

    - log_scale=True 피처 값을 로그 스케일로 변환한다는 의미다. 

    ✔️ Age feature를 추가하는 이유는 (1) 각 데이터 포인트가 현재 시간으로부터 얼마나 떨어져 있는지를 알기 위해 (2) 데이터의 최신성을 반영하기 위해 (3) 변화 패턴을 감지하기 위해 (4) 미래 예측을 위한 시간적 종속성을 제공하기 위함이다.

     

    * InstanceSplitter : 데이터를 윈도우로 분할하여 훈련, 검증 또는 예측 시 모델에 입력될 수 있도록 준비한다.

    - target_field = FieldName.TARGET은 예측하려는 목표 시간 시리즈이다.

    - is_pad_field=FieldName.IS_PAD는 데이터가 패딩된 경우를 나타내는 필드다.

    - start_field=FieldName.START는 시계열이 시작되는 날짜를, forecast_start_field=FieldName.FORECAST_START는 예측이 시작되는 시점을 지정하는 필드다.

    - instance_sampler는 샘플링 방식으로 ExpectedNumInstanceSampler를 사용하여 각 시계열에서 최소 하나의 샘플을 보장하며, 미래 값이 최소 prediction_length만큼 포함되도록 샘플링한다.

    - past_length=context_length는 모델이 입력으로 사용할 과거 데이터의 길이다.

    - future_length=prediction_length는 모델이 예측할 미래 데이터의 길이다.

    - time_series_fields는 목표 시간 시리즈와 함께 동일하게 분할되어야 하는 시간 의존 피처들이다.

    - FieldName.FEAT_AGE, FieldName.FEAT_DYNAMIC_REAL, FieldName.OBSERVED_VAULES 필드를 동일한 방식으로 분할한다. 

    Transform a dataset

    위에서 정의한 creat_transformation을 사용해 custom dataset변환해보자.

    transformation = create_transformation(
        custom_ds_metadata["freq"],
        2 * custom_ds_metadata["prediction_length"],  # can be any appropriate value
        custom_ds_metadata["prediction_length"],
    )

    이제 transformation에는 Observed Values, Age Feature, Instance Splitter이 포함된다. 

    train_tf = transformation(iter(train_ds), is_train=True)

    train_tf훈련 데이터셋 train_dstransformation을 적용한 결과다. is_train=True는 훈련 모드에서는 시계열 데이터의 일부를 과거와 미래로 나누어 윈도우를 생성한다.

    type(train_tf)
    >> generator

    train_tf 역시 반복 가능한 generator 객체다. InstanceSplitter는 변환된 데이터셋을 반복하며, 시계열 데이터 중 하나와 그 시계열의 시작 지점을 무작위로(random) 선택하여 윈도우를 잘아낸다. 이 randomness는 instance_sampler에 의해 정의된다. 

    train_tf_entry = next(iter(train_tf))
    [k for k in train_tf_entry.keys()]
    >>
    ['start',
     'feat_static_cat',
     'past_feat_dynamic_age',
     'future_feat_dynamic_age',
     'past_feat_dynamic_real',
     'future_feat_dynamic_real',
     'past_observed_values',
     'future_observed_values',
     'past_target',
     'future_target',
     'past_is_pad',
     'forecast_start']

    변환에 의해 새로운 필드들(observed_values, feat_dynamic_age, past_is_pad, forecast_start)이 추가되었으며, past_와 future_로 시작하는 필드는 과거와 미래 데이터를 구분하며, 이를 통해 과거 데이터를 학습에 사용하고, 미래 데이터를 예측에 사용한다.

    * train_tf와 train_tf_entry의 차이 : 반복 가능한 객체와 그 객체에서 추출된 단일 항목

     

    또한 윈도우를 과거와 미래로 분할하고, 시간 의존 필드에 해당하는 모든 필드에 적절한 접두사를 추가했다. 예를 들면 past_target 필드를 입력으로 사용하고, future_target 필드를 이용해 예측의 오차를 계산할 수 있다. 물론 과거 데이터의 길이는 context_length와 같고, 미래 데이터의 길이는 prediction_length와 동일하다. 

    print(f"past target shape: {train_tf_entry['past_target'].shape}")
    print(f"future target shape: {train_tf_entry['future_target'].shape}")
    print(f"past observed values shape: {train_tf_entry['past_observed_values'].shape}")
    print(f"future observed values shape: {train_tf_entry['future_observed_values'].shape}")
    print(f"past age feature shape: {train_tf_entry['past_feat_dynamic_age'].shape}")
    print(f"future age feature shape: {train_tf_entry['future_feat_dynamic_age'].shape}")
    print(train_tf_entry["feat_static_cat"])
    >>
    past target shape: (48,)
    future target shape: (24,)
    past observed values shape: (48,)
    future observed values shape: (24,)
    past age feature shape: (48, 1)
    future age feature shape: (24, 1)
    [0]

    참고로 transformation 이전의 original datasetfield는 다음과 같다.

    [k for k in next(iter(train_ds)).keys()]
    >>
    ['target', 'start', 'feat_dynamic_real', 'feat_static_cat']

     

    이번에는 테스트 데이터셋이 어떻게 분할되는지 살펴보자. 앞서 보았듯이 변환 과정에서는 윈도우를 과거와 미래로 분할한다. 하지만 추론시에는 is_train=False이므로 splitter는 항상 데이터셋의 마지막 윈도우(길이 context_length)만큼을 데이터셋에서 잘라내어, 그 윈도우를 사용해 이후의 알려지지 않은 값(길이 prediction_length)을 예측할 수 있게 한다. 

     

    그렇다면 미래 타겟 값을 알 수 없는 상태에서 테스트 데이터셋이 어떻게 과거와 미래로 분할될 수 있을까? 그리고 시간 의존 피처들은 어떻게 처리될까?

    test_tf = transformation(iter(test_ds), is_train=False)
    test_tf_entry = next(iter(test_tf))
    [k for k in test_tf_entry.keys()]
    >>
    ['start',
     'feat_static_cat',
     'past_feat_dynamic_age',
     'future_feat_dynamic_age',
     'past_feat_dynamic_real',
     'future_feat_dynamic_real',
     'past_observed_values',
     'future_observed_values',
     'past_target',
     'future_target',
     'past_is_pad',
     'forecast_start']
    print(f"past target shape: {test_tf_entry['past_target'].shape}")
    print(f"future target shape: {test_tf_entry['future_target'].shape}")
    print(f"past observed values shape: {test_tf_entry['past_observed_values'].shape}")
    print(f"future observed values shape: {test_tf_entry['future_observed_values'].shape}")
    print(f"past age feature shape: {test_tf_entry['past_feat_dynamic_age'].shape}")
    print(f"future age feature shape: {test_tf_entry['future_feat_dynamic_age'].shape}")
    print(test_tf_entry["feat_static_cat"])
    >>
    past target shape: (48,)
    future target shape: (24,)
    past observed values shape: (48,)
    future observed values shape: (24,)
    past age feature shape: (48, 1)
    future age feature shape: (24, 1)
    [0]

    미래 타겟 값은 비어있지만, 피처는 비어 있지 않다. 우리는 항상 미래 피처를 알고 있다고 가정한다. 여기서 수동으로 수행한 모든 작업은 DataLoader라는 내부 블록에 의해 처리된다. 이 블록은 적절한 형식의 원시 데이터셋과 변환 객체를 입력으로 받아서, 변환된 반복 가능한 데이터셋을 배치(batch) 단위로 출력한다. 

     

    결국 우리가 신경써야 할 유일한 것은 변환 필드를 올바르게 설정하는 것 뿐이다!

    Training an existing model

    Configuring an estimator

    GluonTS는 여러 개의 사전 구축된 모델을 제공하므로 사용자는 몇 가지 하이퍼파라미터만 구성하면 된다. 이 예시에서는 Simple Feed Forward Estimator모델을 사용해보자. 

     

    GluonTS의 Simple Feed Forward Estimator은 context_length 길이의 입력 창(window)를 받아 이후 prediction_length 값의 분포를 예측한다. GluonTS에서는 Feed forward network model이 Estimator의 한 예시로, Estimator 객체는 예측 모델을 나타내며, 그 안에 모델의 계수, 가중치 등 세부 정보가 포함된다. 

     

    Estimator 여러 하이퍼파라미터로 구성되며, 모든 Estimator에 공통적으로 사용할 수 있는 하이퍼파라미터도 있지만(예 : prediction_length), 특정 Estimator에서만 사용하는 파라미터 일 수도 있다. 

     

     Estimator Trainer로 구성되며, 이는 모델이 에포크 수, 학습률 등 모델이 어떻게 학습될지를 정의한다. 

    from gluonts.mx import SimpleFeedForwardEstimator, Trainer
    estimator = SimpleFeedForwardEstimator(
        num_hidden_dimensions=[10],
        prediction_length=custom_ds_metadata["prediction_length"],
        context_length=2 * custom_ds_metadata["prediction_length"],
        trainer=Trainer(
            ctx="cpu",
            epochs=5,
            learning_rate=1e-3,
            hybridize=False,
            num_batches_per_epoch=100,
        ),
    )

    Getting a predictor

    필요한 하이퍼파라미터를 모두 지정한 후, Estimatortrain 매서드를 사용하여 훈련 데이터셋으로 모델을 학습시킬 수 있다. 학습 알고리즘은 훈련된 모델(Predictor)을 반환하여 이를 사용해 예측을 생성할 수 있다.

     

    여기서 중요한 점은 위에서 정의한 단일 모델 훈련 데이터셋(train_ds)에 포함된 모든 시간 시리즈에 대해 학습된다는 것이다. 이로 인해 전역 모델이 생성되며, 이 모델은 train_ds의 모든 시간 시리즈에 대한 예측에 적합할 뿐만 아니라, 다른 관련된 새로운 시간 시리즈에 대해서도 예측할 수 있다. 

    predictor = estimator.train(train_ds)

     

    Saving/Loading an exisiting model

    훈련된 모델인 Predictor쉽게 저장하고 불러올 수 있다.

    # save the trained model in tmp/
    from pathlib import Path
    
    predictor.serialize(Path("/tmp/"))
    # loads it back
    from gluonts.model.predictor import Predictor
    
    predictor_deserialized = Predictor.deserialize(Path("/tmp/"))

     

    Evaluation 

    Getting the forecasts

    predictor가 준비되면, 이제 dataset.test의 마지막 윈도우를 예측하고, 모델의 성능을 평가할 수 있다. GluonTS는 예측 및 모델 평가 과정을 자동화하는 make_evaluation_predictions 함수를 제공한다. 이 함수는 아래와 같이 작동한다. 

    1. 예측하려는 구간의 길이만큼 dataset.test의 마지막 부분을 제거한다.
    2. Estimator는 남은 데이터를 사용해, 방금 제거된 "미래" 구간을 샘플 경로 형태로 예측한다.
    3. 이 모듈은 예측 샘플 경로와 dataset.test를 Python generator객체로 출력한다.
    from gluonts.evaluation import make_evaluation_predictions
    forecast_it, ts_it = make_evaluation_predictions(
        dataset=test_ds,  # test dataset
        predictor=predictor,  # predictor
        num_samples=100,  # number of sample paths we want for evaluation
    )

    이후의 계산을 쉽게 하기 위해 제너레이터를 리스트로 반환한다.

    forecasts = list(forecast_it)
    tss = list(ts_it)

    우선 tss의 요소를 확인해보자. tss의 첫번째 항목은 dataset.test의 첫 번째 시간 시리즈의 목표값(target)을 포함할 것이다.

    # first entry of the time series list
    ts_entry = tss[0]
    # first 5 values of the time series (convert from pandas to numpy)
    np.array(ts_entry[:5]).reshape(
        -1,
    )
    >>
    array([1.5292157 , 0.85025036, 0.7740374 , 0.941432  , 0.6723822 ],
          dtype=float32)
    # first entry of test_ds
    test_ds_entry = next(iter(test_ds))
    # first 5 values
    test_ds_entry["target"][:5]
    >>
    array([1.5292157 , 0.85025036, 0.7740374 , 0.941432  , 0.6723822 ],
          dtype=float32)

    forecast 리스트 항목은 tss보다 조금 더 복잡하다. 이들은 numpy.ndarray 형태로 모든 샘플 경로를 포함하고 있으며, 샘플 경로의 차원은 (num_samples, prediciton_length)이다. 이 외에도 예측의 시작 날짜, 시간 시리즈의 주기(frequency)등을 포함하고 있다. 이러한 정보는 해당 예측 객체의 속성으로 접근할 수 있다.

    # first entry of the forecast list
    forecast_entry = forecasts[0]
    print(f"Number of sample paths: {forecast_entry.num_samples}")
    print(f"Dimension of samples: {forecast_entry.samples.shape}")
    print(f"Start date of the forecast window: {forecast_entry.start_date}")
    print(f"Frequency of the time series: {forecast_entry.freq}")
    >>
    Number of sample paths: 100
    Dimension of samples: (100, 24)
    Start date of the forecast window: 2019-01-07 00:00
    Frequency of the time series: <Hour>

    예측 윈도우의 24개 시간 단계 각각에 대해 평균이나 분위수(quantile)를 계산하는 등의 작업을 통해 샘플 경로를 요약할 수 있다.

    print(f"Mean of the future window:\n {forecast_entry.mean}")
    print(f"0.5-quantile (median) of the future window:\n {forecast_entry.quantile(0.5)}")
    >>
    Mean of the future window:
     [ 0.90765196  0.5002642   0.58970827  0.39372978  0.09580009  0.05871432
     -0.09855336  0.21287599  0.06411848  0.36389527  0.6184984   0.82686234
      1.1362944   1.4422988   1.5806047   1.885066    1.6728904   1.8457131
      2.0097268   1.7956799   1.6775795   1.587245    1.2241242   1.0475011 ]
    0.5-quantile (median) of the future window:
     [ 9.2138731e-01  5.3099197e-01  5.4954773e-01  3.5585621e-01
      1.3284978e-01  5.4421678e-02 -5.3116154e-02  1.7734313e-01
     -1.0389383e-03  3.8981181e-01  6.4071041e-01  8.7102258e-01
      1.1050647e+00  1.2608457e+00  1.5636476e+00  1.9202025e+00
      1.7487336e+00  1.8408449e+00  2.0478199e+00  1.7664530e+00
      1.7128807e+00  1.6053847e+00  1.2772603e+00  1.0452789e+00]

    예측 객체에는 예측 경로를 평균, 예측 구간(신뢰 구간)등으로 요약하는 plot 매서드가 있다. 예측 구간은 서로 다른 색상으로 음영 처리되는 팬 차트(fan chart)로 나타난다.

    plt.plot(ts_entry[-150:].to_timestamp())
    forecast_entry.plot(show_label=True)
    plt.legend()

    Compute metrics

    GluonTS에서는 Evaluator 클래스를 사용하여 전체 성능 지표를 계산할 수 있으며, 개별 시간 시리즈별 성능 지표도 계산할 수 있다. 이는 서로 다른 특성을 가진 시계열 간의 성능을 분석하는데 유용하다.

    from gluonts.evaluation import Evaluator
    evaluator = Evaluator(quantiles=[0.1, 0.5, 0.9])
    agg_metrics, item_metrics = evaluator(tss, forecasts)

     

    종합 성능 지표(aggregate metrics)는 시간 단계(time-steps)와 시간 시리즈(time series)를 모두 종합하여 계산된 값을 나타낸다. 

    print(json.dumps(agg_metrics, indent=4))
    >>
    {
        "MSE": 0.11030595362186432,
        "abs_error": 631.219958782196,
        "abs_target_sum": 2505.765546798706,
        "abs_target_mean": 1.044068977832794,
        "seasonal_error": 0.3378558193842571,
        "MASE": 0.7856021114063411,
        "MAPE": 2.2423562081654866,
        "sMAPE": 0.5366955331961314,
        "MSIS": 5.561884554254895,
        "num_masked_target_values": 0.0,
        "QuantileLoss[0.1]": 274.70157045163216,
        "Coverage[0.1]": 0.09208333333333334,
        "QuantileLoss[0.5]": 631.2199574247934,
        "Coverage[0.5]": 0.4904166666666667,
        "QuantileLoss[0.9]": 295.76119372472164,
        "Coverage[0.9]": 0.8820833333333331,
        "RMSE": 0.332123401195797,
        "NRMSE": 0.3181048457978281,
        "ND": 0.2519070308028716,
        "wQuantileLoss[0.1]": 0.10962780249037385,
        "wQuantileLoss[0.5]": 0.25190703026115985,
        "wQuantileLoss[0.9]": 0.11803226926101591,
        "mean_absolute_QuantileLoss": 400.5609072003824,
        "mean_wQuantileLoss": 0.15985570067084987,
        "MAE_Coverage": 0.39402777777777764,
        "OWA": NaN
    }

     

    개별 성능 지표(individual metrixs)는 시간 단계(time steps)에 대해서만 종합하여 계산된다.

    item_metrics.head()
    item_metrics.plot(x="MSIS", y="MASE", kind="scatter")
    plt.grid(which="both")
    plt.show()

     

    Create your own model

    예측 모델을 만들기 위해서는 아래와 같은 작업이 필요하다.

    • 훈련 예측 네트워크 정의
    • 데이터 처리 및 네트워크를 사용하는 새로운 추정기(Estimator) 정의

    네트워크는 임의로 복잡하게 설계할 수 있지만, 몇 가지 기본 규칙을 따라야 한다.

    • 훈련 및 예측 네트워크 모두 hybrid_forward 메서드를 가지고 있어야 하며, 네트워크가 호출될 때 어떤 작업이 수행될지를 정의해야 한다.
    • 훈련 네트워크의 hybrid_forward는 예측 값과 실제 값을 기반으로 손실(Loss)를 반환해야 한다.
    • 예측 네트워크의 hybrid_forward 예측 값을 반환해야 한다. 

    추정기(Estimator)에 포함되어야 할 메서드들

    • create_transformation : 데이터를 전처리하는 변환을 정의한다.(ex) 피처 추가
    • create_training_data_loader : 주어진 데이터셋을 기반으로 훈련 중 사용할 배치를 제공하는 DataLoader를 구성한다.
    • create_training_network : 필요한 하이퍼파라미터로 구성된 훈련 네트워크를 반환한다
    • create_predictor : 예측 네트워크를 생성하고 Predictor 객체를 반환한다.

    검증 데이터셋을 사용하는 경우 추가로 정의해야 할 메서드

    • create_validation_data_loader : 검증 데이터셋을 사용하여 검증 지표를 계산하는데 필요한 DataLoader를 생성한다.

    Predictor 객체예측 네트워크를 감싸는 객체로 predict 메서드를 정의한다. 이 메서드는 테스트 데이터셋을 받아 예측 네트워크를 통과시켜 예측 값을 생성하고 반환한다.

     

    Point forecasts with a simple feedforward network

    간단한 피드포워드 네트워크를 사용하여 포인트 예측을 해보고자 한다. 이 네트워크는 context_length 길이의 입력 윈도우를 받아 prediction_length 길이의 예측 윈도우를 출력한다. 훈련 네트워크의 hybrid_forwardL1 손실의 평균을 반환한다. 예측 네트워크는 훈련 네트워크와 동일한 구조를 가지며, 예측 값을 반환한다. 

    class MyNetwork(gluon.HybridBlock):
        def __init__(self, prediction_length, num_cells, **kwargs):
            super().__init__(**kwargs)
            self.prediction_length = prediction_length
            self.num_cells = num_cells
    
            with self.name_scope():
                # Set up a 3 layer neural network that directly predicts the target values
                self.nn = mx.gluon.nn.HybridSequential()
                self.nn.add(mx.gluon.nn.Dense(units=self.num_cells, activation="relu"))
                self.nn.add(mx.gluon.nn.Dense(units=self.num_cells, activation="relu"))
                self.nn.add(
                    mx.gluon.nn.Dense(units=self.prediction_length, activation="softrelu")
                )
    
    
    class MyTrainNetwork(MyNetwork):
        def hybrid_forward(self, F, past_target, future_target):
            prediction = self.nn(past_target)
            # calculate L1 loss with the future_target to learn the median
            return (prediction - future_target).abs().mean(axis=-1)
    
    
    class MyPredNetwork(MyTrainNetwork):
        # The prediction network only receives past_target and returns predictions
        def hybrid_forward(self, F, past_target):
            prediction = self.nn(past_target)
            return prediction.expand_dims(axis=1)

    Estimator 클래스는 몇 가지 하이퍼파라미터로 구성되며, 필요한 매서드를 구현한다.

    from functools import partial
    from mxnet.gluon import HybridBlock
    from gluonts.core.component import validated
    from gluonts.dataset.loader import TrainDataLoader
    from gluonts.model.predictor import Predictor
    from gluonts.mx import (
        batchify,
        copy_parameters,
        get_hybrid_forward_input_names,
        GluonEstimator,
        RepresentableBlockPredictor,
        Trainer,
    )
    from gluonts.transform import (
        ExpectedNumInstanceSampler,
        Transformation,
        InstanceSplitter,
        TestSplitSampler,
        SelectFields,
        Chain,
    )
    class MyEstimator(GluonEstimator):
        @validated()
        def __init__(
            self,
            prediction_length: int,
            context_length: int,
            num_cells: int,
            batch_size: int = 32,
            trainer: Trainer = Trainer(),
        ) -> None:
            super().__init__(trainer=trainer, batch_size=batch_size)
            self.prediction_length = prediction_length
            self.context_length = context_length
            self.num_cells = num_cells
    
        def create_transformation(self):
            return Chain([])
    
        def create_training_data_loader(self, dataset, **kwargs):
            instance_splitter = InstanceSplitter(
                target_field=FieldName.TARGET,
                is_pad_field=FieldName.IS_PAD,
                start_field=FieldName.START,
                forecast_start_field=FieldName.FORECAST_START,
                instance_sampler=ExpectedNumInstanceSampler(
                    num_instances=1, min_future=self.prediction_length
                ),
                past_length=self.context_length,
                future_length=self.prediction_length,
            )
            input_names = get_hybrid_forward_input_names(MyTrainNetwork)
            return TrainDataLoader(
                dataset=dataset,
                transform=instance_splitter + SelectFields(input_names),
                batch_size=self.batch_size,
                stack_fn=partial(batchify, ctx=self.trainer.ctx, dtype=self.dtype),
                **kwargs,
            )
    
        def create_training_network(self) -> MyTrainNetwork:
            return MyTrainNetwork(
                prediction_length=self.prediction_length, num_cells=self.num_cells
            )
    
        def create_predictor(
            self, transformation: Transformation, trained_network: HybridBlock
        ) -> Predictor:
            prediction_splitter = InstanceSplitter(
                target_field=FieldName.TARGET,
                is_pad_field=FieldName.IS_PAD,
                start_field=FieldName.START,
                forecast_start_field=FieldName.FORECAST_START,
                instance_sampler=TestSplitSampler(),
                past_length=self.context_length,
                future_length=self.prediction_length,
            )
    
            prediction_network = MyPredNetwork(
                prediction_length=self.prediction_length, num_cells=self.num_cells
            )
    
            copy_parameters(trained_network, prediction_network)
    
            return RepresentableBlockPredictor(
                input_transform=transformation + prediction_splitter,
                prediction_net=prediction_network,
                batch_size=self.batch_size,
                prediction_length=self.prediction_length,
                ctx=self.trainer.ctx,
            )

    훈련 및 예측 네트워크추정기(Estimator) 클래스를 정의한 후, 기존 모델들과 동일한 단계를 따를 수 있다. 즉, 필요한 하이퍼파라미터를 추정기 클래스에 전달하여 추정기를 설정하고, train 메서드를 호출하여 예측기(Predictor)를 생성한 후, 마지막으로 make_evaluation_predictions 함수를 사용하여 예측 결과를 생성할 수 있다. 

    estimator = MyEstimator(
        prediction_length=custom_ds_metadata["prediction_length"],
        context_length=2 * custom_ds_metadata["prediction_length"],
        num_cells=40,
        trainer=Trainer(
            ctx="cpu",
            epochs=5,
            learning_rate=1e-3,
            hybridize=False,
            num_batches_per_epoch=100,
        ),
    )

    추정기는 trian_ds 훈련 데이터셋을 사용해 train 메서드를 호출하는 것만으로 학습할 수 있다. 학습이 완료되면 예측기(Predictor)가 반환되며, 이를 사용해 예측을 수행할 수 있다.

    predictor = estimator.train(train_ds)
    forecast_it, ts_it = make_evaluation_predictions(
        dataset=test_ds,  # test dataset
        predictor=predictor,  # predictor
        num_samples=100,  # number of sample paths we want for evaluation
    )
    forecasts = list(forecast_it)
    tss = list(ts_it)
    plt.plot(tss[0][-150:].to_timestamp())
    forecasts[0].plot(show_label=True)
    plt.legend()

    우리는 점 추정(point estimates)만 제공하므로 예측에서 예측 구간이 보이지 않는다. make_evaluation_predictions에서 100개의 샘플 경로를 요구했을 때, 이러한 네트워크에서는 100번 동일한 결과가 출력된다.

    Probabilistic forecasting

    How does a model learn a distribution?

    확률적 예측에서는 시계열의 미래 값들의 분포를 학습해야 하며, 이를 위해 미래 값들이 따르는 분포의 유형을 지정해야 한다. GluonTS는 다양한 상황에서 적용할 수 있는 여러 분포(Gaussian, Student-t, Uniform)를 제공하다.

     

    분포를 학습하려면 그 분포의 매개변수를 학습해야 한다. 예를 들어 Gaussian을 가정하는 경우 평균분산을 학습해야 한다.

     

    GluonTS에서 제공되는 각 분포는 해당 분포 클래스에 의해 정의된다.(예 : Gaussian) 이 클래스는 분포의 매개변수, 로그 가능도(log-likelihood), 매개변수를 기반으로 하는 샘플링 방법 등을 정의한다.

     

    그러나 모델을 이러한 분포와 연결하고 그 매개변수를 학습하는 방법은 단순하지 않다. 이를 해결하기 위해 각 분포에는 DistributionOutput클래스(예 : GaussianOutput)가 제공된다. 이 클래스의 역할은 모델과 분포를 연결하는 것으로 모델의 출력을 받아 이를 분포의 매개변수로 매핑하는 것이다. 이를 모델 위에 추가되는 투사(projection) 층으로 생각할 수 있다. 이 층의 매개변수도 네트워크의 나머지 부분과 함께 최적화된다.

     

     이 투사 층을 포함함으로써, 모델은 각 시간 단계에서 선택된 분포의 매개변수를 학습한다. 이런 모델은 보통 선택된 분포의 음의 로그 가능도(negative log-likelihood)를 손실함수로 사용하여 최적화된다. 모델을 최적화하고 매개변수를 학습한 후에는 학습된 분포에서 샘플을 추출하거나 유용한 통계를 도출할 수 있다.

    Feedforward network for probabilistic forecasting

    이전 모델을 확률적으로 만들기 위해 어떤 변경사항이 필요한지 살펴보자.

    • 네트워크 출력 변경 : 점 예측 네트워크에서는 출력이 prediction_length 길이의 벡터로, 직접적인 점 추정을 제공한다. 이제 DistributionOutput이 분포 매개변수로 투사하는 데 사용할 피처 세트를 출력해야 한다. 이 피처는 예측 구간의 각 시간 단계마다 달라야 하므로, prediction_length * num_features 값의 출력을 만들어야 한다.
    • DistributionOutput은 텐서를 입력 받아 마지막 차원의 분포 매개변수를 투사 : 여기서 각 시간 단계마다 분포 객체가 필요하다. 즉, prediction_length 만큼의 분포 객체가 필요하다. 네트워크의 출력이 prediction_length * num_features 값을 가지면, 이를 (prediction_length, num_features) 형태로 리셰이프하여 필요한 분포를 얻을 수 있으며, 마지막 축의 num_features가 분포 매개변수로 투사된다.
    • 예측 네트워크가 각 시계열에 대해 여러 샘플 경로를 출력 : 이를 위해 각 시계열을 샘플 경로의 수만큼 반복하고, 각 샘플에 대해 표준 예측을 수행
    • 다루는 모든 텐션에는 배치에 해당하는 첫 번째 차원이 있음. 예를 들어 네트워크 출력의 차원은(batch_size, prediction_length * num_features)이다.
    from gluonts.mx import DistributionOutput, GaussianOutput
    class MyProbNetwork(gluon.HybridBlock):
        def __init__(
            self, prediction_length, distr_output, num_cells, num_sample_paths=100, **kwargs
        ) -> None:
            super().__init__(**kwargs)
            self.prediction_length = prediction_length
            self.distr_output = distr_output
            self.num_cells = num_cells
            self.num_sample_paths = num_sample_paths
            self.proj_distr_args = distr_output.get_args_proj()
    
            with self.name_scope():
                # Set up a 2 layer neural network that its ouput will be projected to the distribution parameters
                self.nn = mx.gluon.nn.HybridSequential()
                self.nn.add(mx.gluon.nn.Dense(units=self.num_cells, activation="relu"))
                self.nn.add(
                    mx.gluon.nn.Dense(
                        units=self.prediction_length * self.num_cells, activation="relu"
                    )
                )
    
    
    class MyProbTrainNetwork(MyProbNetwork):
        def hybrid_forward(self, F, past_target, future_target):
            # compute network output
            net_output = self.nn(past_target)
    
            # (batch, prediction_length * nn_features)  ->  (batch, prediction_length, nn_features)
            net_output = net_output.reshape(0, self.prediction_length, -1)
    
            # project network output to distribution parameters domain
            distr_args = self.proj_distr_args(net_output)
    
            # compute distribution
            distr = self.distr_output.distribution(distr_args)
    
            # negative log-likelihood
            loss = distr.loss(future_target)
            return loss
    
    
    class MyProbPredNetwork(MyProbTrainNetwork):
        # The prediction network only receives past_target and returns predictions
        def hybrid_forward(self, F, past_target):
            # repeat past target: from (batch_size, past_target_length) to
            # (batch_size * num_sample_paths, past_target_length)
            repeated_past_target = past_target.repeat(repeats=self.num_sample_paths, axis=0)
    
            # compute network output
            net_output = self.nn(repeated_past_target)
    
            # (batch * num_sample_paths, prediction_length * nn_features)  ->  (batch * num_sample_paths, prediction_length, nn_features)
            net_output = net_output.reshape(0, self.prediction_length, -1)
    
            # project network output to distribution parameters domain
            distr_args = self.proj_distr_args(net_output)
    
            # compute distribution
            distr = self.distr_output.distribution(distr_args)
    
            # get (batch_size * num_sample_paths, prediction_length) samples
            samples = distr.sample()
    
            # reshape from (batch_size * num_sample_paths, prediction_length) to
            # (batch_size, num_sample_paths, prediction_length)
            return samples.reshape(
                shape=(-1, self.num_sample_paths, self.prediction_length)
            )

    추정기(Estimator)에서 필요한 변경 사항은 사소한 것들이며, 주로 네트워크에 사용하는 추가적인 distrt_output 매개변수를 반영하는 것이다.

    class MyProbEstimator(GluonEstimator):
        @validated()
        def __init__(
            self,
            prediction_length: int,
            context_length: int,
            distr_output: DistributionOutput,
            num_cells: int,
            num_sample_paths: int = 100,
            batch_size: int = 32,
            trainer: Trainer = Trainer(),
        ) -> None:
            super().__init__(trainer=trainer, batch_size=batch_size)
            self.prediction_length = prediction_length
            self.context_length = context_length
            self.distr_output = distr_output
            self.num_cells = num_cells
            self.num_sample_paths = num_sample_paths
    
        def create_transformation(self):
            return Chain([])
    
        def create_training_data_loader(self, dataset, **kwargs):
            instance_splitter = InstanceSplitter(
                target_field=FieldName.TARGET,
                is_pad_field=FieldName.IS_PAD,
                start_field=FieldName.START,
                forecast_start_field=FieldName.FORECAST_START,
                instance_sampler=ExpectedNumInstanceSampler(
                    num_instances=1, min_future=self.prediction_length
                ),
                past_length=self.context_length,
                future_length=self.prediction_length,
            )
            input_names = get_hybrid_forward_input_names(MyProbTrainNetwork)
            return TrainDataLoader(
                dataset=dataset,
                transform=instance_splitter + SelectFields(input_names),
                batch_size=self.batch_size,
                stack_fn=partial(batchify, ctx=self.trainer.ctx, dtype=self.dtype),
                **kwargs,
            )
    
        def create_training_network(self) -> MyProbTrainNetwork:
            return MyProbTrainNetwork(
                prediction_length=self.prediction_length,
                distr_output=self.distr_output,
                num_cells=self.num_cells,
                num_sample_paths=self.num_sample_paths,
            )
    
        def create_predictor(
            self, transformation: Transformation, trained_network: HybridBlock
        ) -> Predictor:
            prediction_splitter = InstanceSplitter(
                target_field=FieldName.TARGET,
                is_pad_field=FieldName.IS_PAD,
                start_field=FieldName.START,
                forecast_start_field=FieldName.FORECAST_START,
                instance_sampler=TestSplitSampler(),
                past_length=self.context_length,
                future_length=self.prediction_length,
            )
    
            prediction_network = MyProbPredNetwork(
                prediction_length=self.prediction_length,
                distr_output=self.distr_output,
                num_cells=self.num_cells,
                num_sample_paths=self.num_sample_paths,
            )
    
            copy_parameters(trained_network, prediction_network)
    
            return RepresentableBlockPredictor(
                input_transform=transformation + prediction_splitter,
                prediction_net=prediction_network,
                batch_size=self.batch_size,
                prediction_length=self.prediction_length,
                ctx=self.trainer.ctx,
            )
    estimator = MyProbEstimator(
        prediction_length=custom_ds_metadata["prediction_length"],
        context_length=2 * custom_ds_metadata["prediction_length"],
        distr_output=GaussianOutput(),
        num_cells=40,
        trainer=Trainer(
            ctx="cpu",
            epochs=5,
            learning_rate=1e-3,
            hybridize=False,
            num_batches_per_epoch=100,
        ),
    )
    predictor = estimator.train(train_ds)
    forecast_it, ts_it = make_evaluation_predictions(
        dataset=test_ds,  # test dataset
        predictor=predictor,  # predictor
        num_samples=100,  # number of sample paths we want for evaluation
    )
    forecasts = list(forecast_it)
    tss = list(ts_it)
    plt.plot(tss[0][-150:].to_timestamp())
    forecasts[0].plot(show_label=True)
    plt.legend()

    Add features and scaling

    이전 네트워크에서는 타겟 값만 사용하고 데이터셋의 특징(features)을 활용하지 않았다. 여기에서는 feat_dynamic_real 필드를 포함시켜 확률적 네트워크를 확장하여 모델의 예측 성능을 향상시킨다. 이를 위해 타겟 값과 특징 값을 결합하여 네트워크의 새로운 입력을 구성하는 향상된 벡터를 만든다.

     

    데이터셋에 있는 모든 특징은 잠재적으로 모델의 입력으로 사용할 수 있다. 그러나 이 예에서는 하나의 특징만 사용하고자 한다.

     

    또한 실무에서 자주 발생하는 중요한 문제는 데이터셋 내 시계열 값들의 크기 차이이다. 모델이 비슷한 값 범위에서 훈련되고 예측할 수 있는 것이 매우 유리하다. 이 문제를 해결하기 위해 스케일러(Scaler)를 모델에 추가하여 각 시계열의 스케일을 계산한다. 그런 다음 시계열 값이나 관련 특징들을 적절히 스케일링하여 네트워크의 입력으로 사용할 수 있다.

    from gluonts.mx import MeanScaler, NOPScaler
    class MyProbNetwork(gluon.HybridBlock):
        def __init__(
            self,
            prediction_length,
            context_length,
            distr_output,
            num_cells,
            num_sample_paths=100,
            scaling=True,
            **kwargs
        ) -> None:
            super().__init__(**kwargs)
            self.prediction_length = prediction_length
            self.context_length = context_length
            self.distr_output = distr_output
            self.num_cells = num_cells
            self.num_sample_paths = num_sample_paths
            self.proj_distr_args = distr_output.get_args_proj()
            self.scaling = scaling
    
            with self.name_scope():
                # Set up a 2 layer neural network that its ouput will be projected to the distribution parameters
                self.nn = mx.gluon.nn.HybridSequential()
                self.nn.add(mx.gluon.nn.Dense(units=self.num_cells, activation="relu"))
                self.nn.add(
                    mx.gluon.nn.Dense(
                        units=self.prediction_length * self.num_cells, activation="relu"
                    )
                )
    
                if scaling:
                    self.scaler = MeanScaler(keepdims=True)
                else:
                    self.scaler = NOPScaler(keepdims=True)
    
        def compute_scale(self, past_target, past_observed_values):
            # scale shape is (batch_size, 1)
            _, scale = self.scaler(
                past_target.slice_axis(axis=1, begin=-self.context_length, end=None),
                past_observed_values.slice_axis(
                    axis=1, begin=-self.context_length, end=None
                ),
            )
    
            return scale
    
    
    class MyProbTrainNetwork(MyProbNetwork):
        def hybrid_forward(
            self,
            F,
            past_target,
            future_target,
            past_observed_values,
            past_feat_dynamic_real,
        ):
            # compute scale
            scale = self.compute_scale(past_target, past_observed_values)
    
            # scale target and time features
            past_target_scale = F.broadcast_div(past_target, scale)
            past_feat_dynamic_real_scale = F.broadcast_div(
                past_feat_dynamic_real.squeeze(axis=-1), scale
            )
    
            # concatenate target and time features to use them as input to the network
            net_input = F.concat(past_target_scale, past_feat_dynamic_real_scale, dim=-1)
    
            # compute network output
            net_output = self.nn(net_input)
    
            # (batch, prediction_length * nn_features)  ->  (batch, prediction_length, nn_features)
            net_output = net_output.reshape(0, self.prediction_length, -1)
    
            # project network output to distribution parameters domain
            distr_args = self.proj_distr_args(net_output)
    
            # compute distribution
            distr = self.distr_output.distribution(distr_args, scale=scale)
    
            # negative log-likelihood
            loss = distr.loss(future_target)
            return loss
    
    
    class MyProbPredNetwork(MyProbTrainNetwork):
        # The prediction network only receives past_target and returns predictions
        def hybrid_forward(
            self, F, past_target, past_observed_values, past_feat_dynamic_real
        ):
            # repeat fields: from (batch_size, past_target_length) to
            # (batch_size * num_sample_paths, past_target_length)
            repeated_past_target = past_target.repeat(repeats=self.num_sample_paths, axis=0)
            repeated_past_observed_values = past_observed_values.repeat(
                repeats=self.num_sample_paths, axis=0
            )
            repeated_past_feat_dynamic_real = past_feat_dynamic_real.repeat(
                repeats=self.num_sample_paths, axis=0
            )
    
            # compute scale
            scale = self.compute_scale(repeated_past_target, repeated_past_observed_values)
    
            # scale repeated target and time features
            repeated_past_target_scale = F.broadcast_div(repeated_past_target, scale)
            repeated_past_feat_dynamic_real_scale = F.broadcast_div(
                repeated_past_feat_dynamic_real.squeeze(axis=-1), scale
            )
    
            # concatenate target and time features to use them as input to the network
            net_input = F.concat(
                repeated_past_target_scale, repeated_past_feat_dynamic_real_scale, dim=-1
            )
    
            # compute network oputput
            net_output = self.nn(net_input)
    
            # (batch * num_sample_paths, prediction_length * nn_features)  ->  (batch * num_sample_paths, prediction_length, nn_features)
            net_output = net_output.reshape(0, self.prediction_length, -1)
    
            # project network output to distribution parameters domain
            distr_args = self.proj_distr_args(net_output)
    
            # compute distribution
            distr = self.distr_output.distribution(distr_args, scale=scale)
    
            # get (batch_size * num_sample_paths, prediction_length) samples
            samples = distr.sample()
    
            # reshape from (batch_size * num_sample_paths, prediction_length) to
            # (batch_size, num_sample_paths, prediction_length)
            return samples.reshape(
                shape=(-1, self.num_sample_paths, self.prediction_length)
            )
    class MyProbEstimator(GluonEstimator):
        @validated()
        def __init__(
            self,
            prediction_length: int,
            context_length: int,
            distr_output: DistributionOutput,
            num_cells: int,
            num_sample_paths: int = 100,
            scaling: bool = True,
            batch_size: int = 32,
            trainer: Trainer = Trainer(),
        ) -> None:
            super().__init__(trainer=trainer, batch_size=batch_size)
            self.prediction_length = prediction_length
            self.context_length = context_length
            self.distr_output = distr_output
            self.num_cells = num_cells
            self.num_sample_paths = num_sample_paths
            self.scaling = scaling
    
        def create_transformation(self):
            # Feature transformation that the model uses for input.
            return AddObservedValuesIndicator(
                target_field=FieldName.TARGET,
                output_field=FieldName.OBSERVED_VALUES,
            )
    
        def create_training_data_loader(self, dataset, **kwargs):
            instance_splitter = InstanceSplitter(
                target_field=FieldName.TARGET,
                is_pad_field=FieldName.IS_PAD,
                start_field=FieldName.START,
                forecast_start_field=FieldName.FORECAST_START,
                instance_sampler=ExpectedNumInstanceSampler(
                    num_instances=1, min_future=self.prediction_length
                ),
                past_length=self.context_length,
                future_length=self.prediction_length,
                time_series_fields=[
                    FieldName.FEAT_DYNAMIC_REAL,
                    FieldName.OBSERVED_VALUES,
                ],
            )
            input_names = get_hybrid_forward_input_names(MyProbTrainNetwork)
            return TrainDataLoader(
                dataset=dataset,
                transform=instance_splitter + SelectFields(input_names),
                batch_size=self.batch_size,
                stack_fn=partial(batchify, ctx=self.trainer.ctx, dtype=self.dtype),
                **kwargs,
            )
    
        def create_training_network(self) -> MyProbTrainNetwork:
            return MyProbTrainNetwork(
                prediction_length=self.prediction_length,
                context_length=self.context_length,
                distr_output=self.distr_output,
                num_cells=self.num_cells,
                num_sample_paths=self.num_sample_paths,
                scaling=self.scaling,
            )
    
        def create_predictor(
            self, transformation: Transformation, trained_network: HybridBlock
        ) -> Predictor:
            prediction_splitter = InstanceSplitter(
                target_field=FieldName.TARGET,
                is_pad_field=FieldName.IS_PAD,
                start_field=FieldName.START,
                forecast_start_field=FieldName.FORECAST_START,
                instance_sampler=TestSplitSampler(),
                past_length=self.context_length,
                future_length=self.prediction_length,
                time_series_fields=[
                    FieldName.FEAT_DYNAMIC_REAL,
                    FieldName.OBSERVED_VALUES,
                ],
            )
    
            prediction_network = MyProbPredNetwork(
                prediction_length=self.prediction_length,
                context_length=self.context_length,
                distr_output=self.distr_output,
                num_cells=self.num_cells,
                num_sample_paths=self.num_sample_paths,
                scaling=self.scaling,
            )
    
            copy_parameters(trained_network, prediction_network)
    
            return RepresentableBlockPredictor(
                input_transform=transformation + prediction_splitter,
                prediction_net=prediction_network,
                batch_size=self.batch_size,
                prediction_length=self.prediction_length,
                ctx=self.trainer.ctx,
            )
    estimator = MyProbEstimator(
        prediction_length=custom_ds_metadata["prediction_length"],
        context_length=2 * custom_ds_metadata["prediction_length"],
        distr_output=GaussianOutput(),
        num_cells=40,
        trainer=Trainer(
            ctx="cpu",
            epochs=5,
            learning_rate=1e-3,
            hybridize=False,
            num_batches_per_epoch=100,
        ),
    )
    predictor = estimator.train(train_ds)
    forecast_it, ts_it = make_evaluation_predictions(
        dataset=test_ds,  # test dataset
        predictor=predictor,  # predictor
        num_samples=100,  # number of sample paths we want for evaluation
    )
    forecasts = list(forecast_it)
    tss = list(ts_it)
    plt.plot(tss[0][-150:].to_timestamp())
    forecasts[0].plot(show_label=True)
    plt.legend()

    From feedforward to RNN

    이전 예제에서는 피드포워드 신경망 기반의 예측모델로 네트워크 입력으로 시간 시계열의 일정 구간(window)(길이 : context_length)을 사용하고, 네트워크를 훈련시켜 다음 구간(길이 : prediction_length)을 예측하도록 하는 것이였다.

     

    이번에는 피드포워드 신경망순환 신경망(RNN)으로 대체하고자 한다.

     

    Training

    RNN의 기본 아이디어는 피드포워드 네트워크와 동일하다. RNN을 시간 단계별로 풀어갈 때, 이전의 시계열 값을 입력으로 사용하고, 다음 값을 예측한다. 입력을 강화하기 위해 여러 과거 값(예 : 계절 패턴에 따른 특정 지연값)이나 사용할 수 있는 특징을 추가할 수 있다. 하지만 이번 예제에서는 간단하게 시계열의 마지막 값만 사용한다. 네트워크의 출력은 각 시간 단계에서 다음 값의 분포이며, RNN의 상태분포 매개변수의 투사(feature vector)로 사용한다.

     

    RNN의 순차적인 특성 때문에 시계열에서 구간을 자를 때 past_와 future_의 구분은 필요하지 않다. 따라서 past_target과 future_target을 연결하여 예측하고자 하는 하나의 타겟 구간으로 처리할 수 있다. 즉 RNN의 입력 target[-(context_length + prediction_length + 1):-1](예측하려는 구간의 바로 이전 시간 단계)이다. 이 결과, 각 구간마다 context_length + prediction_length + 1개의 값이 있어야 한다. 이를 InstanceSplitter에서 정의할 수 있다. 

     

    훈련의 과정은 아래와 같다.

    • RNN에 시계열 값을 순차적으로 전달(target[-context_length + prediction_length + 1) : -1])
    • 각 시간 단계에서 RNN의 상태피처 벡터로 사용하고, 이를 분포 매개변수 영역으로 투사
    • 각 시간 단계에서의 출력다음 시간 단계 값의 분포이며, 이 전체가 예측하려는 구간의 분포가 된다. target[-(context_length+prediction_length):]

    위 과정은 unroll_encoder 메서드에 구현되어 있다.

     

    Inference

    추론 시에는 past_target값만 알고 있으므로, 훈련과 동일한 과정을 따를 수 없다. 하지만 기본 아이디어는 유사하다.

    • past_target[-(context_length + 1):] 값을 순차적으로 RNN에 전달하여 RNN의 상태를 업데이트한다.
    • 마지막 시간 단계에서 RNN의 출력은 다음 값의 분포이며, 이 값은 알 수 없다. 따라서 이 분포에서 샘플을 추천(예 : num_sample_paths번)하여 다음 시간 단계의 RNN 입력으로 사용한다.
    • 이 과정을 prediction_length번 반복한다. 

    첫 번째 단계는 unroll_encoder에, 마지막 단계는 sample_decoder 메서드에 구현되어 있다.

    class MyProbRNN(gluon.HybridBlock):
        def __init__(
            self,
            prediction_length,
            context_length,
            distr_output,
            num_cells,
            num_layers,
            num_sample_paths=100,
            scaling=True,
            **kwargs
        ) -> None:
            super().__init__(**kwargs)
            self.prediction_length = prediction_length
            self.context_length = context_length
            self.distr_output = distr_output
            self.num_cells = num_cells
            self.num_layers = num_layers
            self.num_sample_paths = num_sample_paths
            self.proj_distr_args = distr_output.get_args_proj()
            self.scaling = scaling
    
            with self.name_scope():
                self.rnn = mx.gluon.rnn.HybridSequentialRNNCell()
                for k in range(self.num_layers):
                    cell = mx.gluon.rnn.LSTMCell(hidden_size=self.num_cells)
                    cell = mx.gluon.rnn.ResidualCell(cell) if k > 0 else cell
                    self.rnn.add(cell)
    
                if scaling:
                    self.scaler = MeanScaler(keepdims=True)
                else:
                    self.scaler = NOPScaler(keepdims=True)
    
        def compute_scale(self, past_target, past_observed_values):
            # scale is computed on the context length last units of the past target
            # scale shape is (batch_size, 1, *target_shape)
            _, scale = self.scaler(
                past_target.slice_axis(axis=1, begin=-self.context_length, end=None),
                past_observed_values.slice_axis(
                    axis=1, begin=-self.context_length, end=None
                ),
            )
    
            return scale
    
        def unroll_encoder(
            self,
            F,
            past_target,
            past_observed_values,
            future_target=None,
            future_observed_values=None,
        ):
            # overall target field
            # input target from -(context_length + prediction_length + 1) to -1
            if future_target is not None:  # during training
                target_in = F.concat(past_target, future_target, dim=-1).slice_axis(
                    axis=1,
                    begin=-(self.context_length + self.prediction_length + 1),
                    end=-1,
                )
    
                # overall observed_values field
                # input observed_values corresponding to target_in
                observed_values_in = F.concat(
                    past_observed_values, future_observed_values, dim=-1
                ).slice_axis(
                    axis=1,
                    begin=-(self.context_length + self.prediction_length + 1),
                    end=-1,
                )
    
                rnn_length = self.context_length + self.prediction_length
            else:  # during inference
                target_in = past_target.slice_axis(
                    axis=1, begin=-(self.context_length + 1), end=-1
                )
    
                # overall observed_values field
                # input observed_values corresponding to target_in
                observed_values_in = past_observed_values.slice_axis(
                    axis=1, begin=-(self.context_length + 1), end=-1
                )
    
                rnn_length = self.context_length
    
            # compute scale
            scale = self.compute_scale(target_in, observed_values_in)
    
            # scale target_in
            target_in_scale = F.broadcast_div(target_in, scale)
    
            # compute network output
            net_output, states = self.rnn.unroll(
                inputs=target_in_scale,
                length=rnn_length,
                layout="NTC",
                merge_outputs=True,
            )
    
            return net_output, states, scale
    
    
    class MyProbTrainRNN(MyProbRNN):
        def hybrid_forward(
            self,
            F,
            past_target,
            future_target,
            past_observed_values,
            future_observed_values,
        ):
            net_output, _, scale = self.unroll_encoder(
                F, past_target, past_observed_values, future_target, future_observed_values
            )
    
            # output target from -(context_length + prediction_length) to end
            target_out = F.concat(past_target, future_target, dim=-1).slice_axis(
                axis=1, begin=-(self.context_length + self.prediction_length), end=None
            )
    
            # project network output to distribution parameters domain
            distr_args = self.proj_distr_args(net_output)
    
            # compute distribution
            distr = self.distr_output.distribution(distr_args, scale=scale)
    
            # negative log-likelihood
            loss = distr.loss(target_out)
            return loss
    
    
    class MyProbPredRNN(MyProbTrainRNN):
        def sample_decoder(self, F, past_target, states, scale):
            # repeat fields: from (batch_size, past_target_length) to
            # (batch_size * num_sample_paths, past_target_length)
            repeated_states = [
                s.repeat(repeats=self.num_sample_paths, axis=0) for s in states
            ]
            repeated_scale = scale.repeat(repeats=self.num_sample_paths, axis=0)
    
            # first decoder input is the last value of the past_target, i.e.,
            # the previous value of the first time step we want to forecast
            decoder_input = past_target.slice_axis(axis=1, begin=-1, end=None).repeat(
                repeats=self.num_sample_paths, axis=0
            )
    
            # list with samples at each time step
            future_samples = []
    
            # for each future time step we draw new samples for this time step and update the state
            # the drawn samples are the inputs to the rnn at the next time step
            for k in range(self.prediction_length):
                rnn_outputs, repeated_states = self.rnn.unroll(
                    inputs=decoder_input,
                    length=1,
                    begin_state=repeated_states,
                    layout="NTC",
                    merge_outputs=True,
                )
    
                # project network output to distribution parameters domain
                distr_args = self.proj_distr_args(rnn_outputs)
    
                # compute distribution
                distr = self.distr_output.distribution(distr_args, scale=repeated_scale)
    
                # draw samples (batch_size * num_samples, 1)
                new_samples = distr.sample()
    
                # append the samples of the current time step
                future_samples.append(new_samples)
    
                # update decoder input for the next time step
                decoder_input = new_samples
    
            samples = F.concat(*future_samples, dim=1)
    
            # (batch_size, num_samples, prediction_length)
            return samples.reshape(
                shape=(-1, self.num_sample_paths, self.prediction_length)
            )
    
        def hybrid_forward(self, F, past_target, past_observed_values):
            # unroll encoder over context_length
            net_output, states, scale = self.unroll_encoder(
                F, past_target, past_observed_values
            )
    
            samples = self.sample_decoder(F, past_target, states, scale)
    
            return samples
    class MyProbRNNEstimator(GluonEstimator):
        @validated()
        def __init__(
            self,
            prediction_length: int,
            context_length: int,
            distr_output: DistributionOutput,
            num_cells: int,
            num_layers: int,
            num_sample_paths: int = 100,
            scaling: bool = True,
            batch_size: int = 32,
            trainer: Trainer = Trainer(),
        ) -> None:
            super().__init__(trainer=trainer, batch_size=batch_size)
            self.prediction_length = prediction_length
            self.context_length = context_length
            self.distr_output = distr_output
            self.num_cells = num_cells
            self.num_layers = num_layers
            self.num_sample_paths = num_sample_paths
            self.scaling = scaling
    
        def create_transformation(self):
            # Feature transformation that the model uses for input.
            return AddObservedValuesIndicator(
                target_field=FieldName.TARGET,
                output_field=FieldName.OBSERVED_VALUES,
            )
    
        def create_training_data_loader(self, dataset, **kwargs):
            instance_splitter = InstanceSplitter(
                target_field=FieldName.TARGET,
                is_pad_field=FieldName.IS_PAD,
                start_field=FieldName.START,
                forecast_start_field=FieldName.FORECAST_START,
                instance_sampler=ExpectedNumInstanceSampler(
                    num_instances=1,
                    min_future=self.prediction_length,
                ),
                past_length=self.context_length + 1,
                future_length=self.prediction_length,
                time_series_fields=[
                    FieldName.FEAT_DYNAMIC_REAL,
                    FieldName.OBSERVED_VALUES,
                ],
            )
            input_names = get_hybrid_forward_input_names(MyProbTrainRNN)
            return TrainDataLoader(
                dataset=dataset,
                transform=instance_splitter + SelectFields(input_names),
                batch_size=self.batch_size,
                stack_fn=partial(batchify, ctx=self.trainer.ctx, dtype=self.dtype),
                **kwargs,
            )
    
        def create_training_network(self) -> MyProbTrainRNN:
            return MyProbTrainRNN(
                prediction_length=self.prediction_length,
                context_length=self.context_length,
                distr_output=self.distr_output,
                num_cells=self.num_cells,
                num_layers=self.num_layers,
                num_sample_paths=self.num_sample_paths,
                scaling=self.scaling,
            )
    
        def create_predictor(
            self, transformation: Transformation, trained_network: HybridBlock
        ) -> Predictor:
            prediction_splitter = InstanceSplitter(
                target_field=FieldName.TARGET,
                is_pad_field=FieldName.IS_PAD,
                start_field=FieldName.START,
                forecast_start_field=FieldName.FORECAST_START,
                instance_sampler=TestSplitSampler(),
                past_length=self.context_length + 1,
                future_length=self.prediction_length,
                time_series_fields=[
                    FieldName.FEAT_DYNAMIC_REAL,
                    FieldName.OBSERVED_VALUES,
                ],
            )
            prediction_network = MyProbPredRNN(
                prediction_length=self.prediction_length,
                context_length=self.context_length,
                distr_output=self.distr_output,
                num_cells=self.num_cells,
                num_layers=self.num_layers,
                num_sample_paths=self.num_sample_paths,
                scaling=self.scaling,
            )
    
            copy_parameters(trained_network, prediction_network)
    
            return RepresentableBlockPredictor(
                input_transform=transformation + prediction_splitter,
                prediction_net=prediction_network,
                batch_size=self.batch_size,
                prediction_length=self.prediction_length,
                ctx=self.trainer.ctx,
            )
    estimator = MyProbRNNEstimator(
        prediction_length=24,
        context_length=48,
        num_cells=40,
        num_layers=2,
        distr_output=GaussianOutput(),
        trainer=Trainer(
            ctx="cpu",
            epochs=5,
            learning_rate=1e-3,
            hybridize=False,
            num_batches_per_epoch=100,
        ),
    )
    predictor = estimator.train(train_ds)
    forecast_it, ts_it = make_evaluation_predictions(
        dataset=test_ds,  # test dataset
        predictor=predictor,  # predictor
        num_samples=100,  # number of sample paths we want for evaluation
    )
    forecasts = list(forecast_it)
    tss = list(ts_it)
    plt.plot(tss[0][-150:].to_timestamp())
    forecasts[0].plot(show_label=True)
    plt.legend()

    댓글

Designed by Tistory.