時系列データの扱い

時系列データでは、各行が独立ではありません。昨日と今日、先月と今月は連続しており、その順序を壊した瞬間に評価は簡単に過大になります。

重要なのは複雑なモデル名よりも、予測時点で使える情報だけを守って表と評価を作ることです。

未来を見ないことが最優先

時系列予測の原則は、予測時点より後の情報を特徴量へ入れないことです。ランダム分割が危険なのも、この原則を破って未来情報を暗黙に混ぜやすいからです。

トレンドと季節性を持つ疑似系列を作り、過去の値からラグ特徴量を立て、ナイーブ予測と学習モデルを比べ、複数ステップ先予測まで進めます。順序を守ると、時系列特有の難しさがどこで生まれるかが見えます。

基本語彙

要点はすべて、過去から未来へしか進まないという一点に集約されます。

モデル差より評価設計差

複数モデルを比べても、時系列では評価設計が間違っていれば比較全体が崩れます。まず疑うべきなのはモデルの弱さではなく、未来を見ていないかどうかです。

よくある誤解

shift(1)rolling は便利ですが、書き方を少し誤るだけで未来情報を混ぜてしまいます。また、1 ステップ先でうまくいくことと、12 ステップ先でうまくいくことは別問題です。長期予測では誤差が次の入力へ連鎖します。

扱う範囲

焦点は、時系列の基本的な表現と評価の作法にあります。外生変数の厳密な取り扱いや専用モデルの理論全体には踏み込まず、運用に耐える基準線の作り方を押さえます。

外生変数という例外

promo は、将来の予定があらかじめ分かっている外生変数として置きます。広告出稿や価格改定のように、未来の計画値だけは利用できる状況を表しています。

構造を持つ時系列

トレンド、季節性、キャンペーン効果を持つ疑似データを使います。構造を自分で埋め込んだ系列は、後でラグや自己相関が何を拾っているかを読みやすくします。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.base import clone
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import TimeSeriesSplit, cross_val_score, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

上昇トレンド、季節性、キャンペーン効果が入った系列を見ると、単なる数字の列ではなく、複数の要因が時間に沿って重なっていることが分かります。

plt.style.use("seaborn-v0_8-whitegrid")
rng = np.random.default_rng(7)

dates = pd.date_range("2018-01-01", periods=96, freq="MS")
trend = np.linspace(0, 95, len(dates))
seasonal = 18 * np.sin(2 * np.pi * np.arange(len(dates)) / 12)
promo = (np.arange(len(dates)) % 6 == 0).astype(int)
noise = rng.normal(loc=0.0, scale=4.5, size=len(dates))

sales = 220 + trend + seasonal + 16 * promo + noise

df = pd.DataFrame({
    "date": dates,
    "sales": sales,
    "promo": promo,
})

df.head()

過去だけで特徴量を作る

売上の過去値をラグ特徴量や移動平均へ変えると、時系列を表形式の学習問題へ写せます。各列は必ず、その時点より前の情報だけで作られていなければなりません。

fig, ax = plt.subplots(figsize=(11, 4.4))
ax.plot(df["date"], df["sales"], color="#2b6cb0", linewidth=2, label="sales")
ax.scatter(
    df.loc[df["promo"] == 1, "date"],
    df.loc[df["promo"] == 1, "sales"],
    color="#c05621",
    s=28,
    label="promo month",
)
ax.plot(df["date"], df["sales"].rolling(12).mean(), color="#4a5568", linewidth=1.6, label="12-month moving average")
ax.set_title("Monthly Sales")
ax.set_xlabel("date")
ax.set_ylabel("sales")
ax.legend(loc="upper left")
plt.tight_layout()
plt.show()

lag_12 を使うなら、最初の 12 か月に値が入らないのは自然です。未来を見ないという条件を守った結果として欠損が出ているのであって、実装の失敗ではありません。

def make_time_features(frame: pd.DataFrame, lag_steps=(1, 2, 3, 6, 12), roll_windows=(3, 6)) -> pd.DataFrame:
    out = frame.sort_values("date").reset_index(drop=True).copy()

    for lag in lag_steps:
        out[f"lag_{lag}"] = out["sales"].shift(lag)

    for window in roll_windows:
        out[f"roll_mean_{window}"] = out["sales"].shift(1).rolling(window=window).mean()

    month = out["date"].dt.month
    out["month_sin"] = np.sin(2 * np.pi * month / 12)
    out["month_cos"] = np.cos(2 * np.pi * month / 12)

    return out.dropna().reset_index(drop=True)

feature_df = make_time_features(df)
feature_df.head()

1 ステップ先予測

最後の 12 か月をテストに固定し、1 ステップ先でモデルを比べます。複雑なモデルの前に、1 か月前の値をそのまま使うナイーブ予測を基準線として置くことが重要です。

feature_cols = [
    "promo",
    "month_sin",
    "month_cos",
    "lag_1",
    "lag_2",
    "lag_3",
    "lag_6",
    "lag_12",
    "roll_mean_3",
    "roll_mean_6",
]

print("raw period :", df["date"].min().date(), "to", df["date"].max().date(), "rows:", len(df))
print("feat period:", feature_df["date"].min().date(), "to", feature_df["date"].max().date(), "rows:", len(feature_df))
print("features:", feature_cols)

自己相関は、どのくらい前の値が今と似た動きをしているかを示します。ラグの候補を考えるときの手がかりになります。

def autocorr_at_lag(values: np.ndarray, lag: int) -> float:
    x = values[lag:]
    y = values[:-lag]
    if x.std() == 0 or y.std() == 0:
        return 0.0
    return float(np.corrcoef(x, y)[0, 1])

series = feature_df["sales"].to_numpy()
lags = np.arange(1, 25)
autocorr_values = np.array([autocorr_at_lag(series, int(lag)) for lag in lags])

fig, ax = plt.subplots(figsize=(10, 3.6))
ax.bar(lags, autocorr_values, color="#2f855a")
ax.axhline(0, color="#1a202c", linewidth=1)
ax.set_title("Autocorrelation by lag")
ax.set_xlabel("lag")
ax.set_ylabel("corr")
plt.tight_layout()
plt.show()

時系列では、素朴な基準線が意外に強いことがあります。複雑なモデルがそれを上回れないなら、特徴量設計か問題設定の見直しが先です。

test_horizon = 12
train_df = feature_df.iloc[:-test_horizon].copy()
test_df = feature_df.iloc[-test_horizon:].copy()

X_train = train_df[feature_cols]
y_train = train_df["sales"]
X_test = test_df[feature_cols]
y_test = test_df["sales"]

naive_pred = test_df["lag_1"].to_numpy()
naive_mae = mean_absolute_error(y_test, naive_pred)
print(f"naive MAE: {naive_mae:.3f}")

ランダム分割でスコアが良く見えたなら、それはモデルの強さではなく未来情報の混入を疑うべき場面です。

X_all = feature_df[feature_cols]
y_all = feature_df["sales"]

X_tr_rand, X_te_rand, y_tr_rand, y_te_rand = train_test_split(
    X_all, y_all, test_size=0.2, random_state=42, shuffle=True
)

rand_pipeline = Pipeline([
    ("scaler", StandardScaler()),
    ("ridge", Ridge(alpha=1.0)),
])
rand_pipeline.fit(X_tr_rand, y_tr_rand)
rand_pred = rand_pipeline.predict(X_te_rand)
rand_mae = mean_absolute_error(y_te_rand, rand_pred)

print(f"random split ridge MAE (invalid for TS): {rand_mae:.3f}")

線形モデルは解釈しやすく、木モデルは非線形な関係を拾いやすい、という違いがあります。ただし、どちらも時間順を守った評価の上でしか意味を持ちません。

ridge_model = Pipeline([
    ("scaler", StandardScaler()),
    ("ridge", Ridge(alpha=1.0)),
])

rf_model = RandomForestRegressor(
    n_estimators=350,
    max_depth=8,
    min_samples_leaf=2,
    random_state=42,
    n_jobs=-1,
)

ridge_model.fit(X_train, y_train)
rf_model.fit(X_train, y_train)

ridge_pred = ridge_model.predict(X_test)
rf_pred = rf_model.predict(X_test)

ridge_mae = mean_absolute_error(y_test, ridge_pred)
rf_mae = mean_absolute_error(y_test, rf_pred)

print(f"naive MAE                : {naive_mae:.3f}")
print(f"chronological ridge MAE  : {ridge_mae:.3f}")
print(f"chronological rf MAE     : {rf_mae:.3f}")
print(f"random split ridge MAE   : {rand_mae:.3f} (invalid setup)")

複数ステップ先で難しくなる理由

1 ステップ先では問題がなくても、複数ステップ先では予測値を次の入力へ戻すため、誤差が前へ伝播します。遠い未来ほど不確実になるのはそのためです。

one_step_df = test_df[["date", "sales"]].copy()
one_step_df["naive"] = naive_pred
one_step_df["ridge"] = ridge_pred
one_step_df["rf"] = rf_pred

fig, ax = plt.subplots(figsize=(11, 4.4))
ax.plot(one_step_df["date"], one_step_df["sales"], marker="o", linewidth=2, label="actual")
ax.plot(one_step_df["date"], one_step_df["naive"], marker="o", linewidth=1.5, label="naive")
ax.plot(one_step_df["date"], one_step_df["ridge"], marker="o", linewidth=1.5, label="ridge")
ax.plot(one_step_df["date"], one_step_df["rf"], marker="o", linewidth=1.5, label="random forest")
ax.set_title("One-Step Forecast (Observed Updates)")
ax.set_xlabel("date")
ax.set_ylabel("sales")
ax.legend()
plt.tight_layout()
plt.show()

固定起点で 12 か月先まで読むには、逐次予測のバックテストが必要です。再帰予測は、運用時に起こる誤差の蓄積をかなり素直に表します。

def predict_next_months(history: pd.DataFrame, model, steps: int = 12) -> pd.DataFrame:
    working = history.sort_values("date").reset_index(drop=True).copy()
    preds = []

    for _ in range(steps):
        next_date = (working["date"].iloc[-1] + pd.offsets.MonthBegin(1)).normalize()
        month_index = len(working)
        next_promo = int(month_index % 6 == 0)

        next_row = {
            "date": next_date,
            "promo": next_promo,
            "lag_1": float(working["sales"].iloc[-1]),
            "lag_2": float(working["sales"].iloc[-2]),
            "lag_3": float(working["sales"].iloc[-3]),
            "lag_6": float(working["sales"].iloc[-6]),
            "lag_12": float(working["sales"].iloc[-12]),
            "roll_mean_3": float(working["sales"].iloc[-3:].mean()),
            "roll_mean_6": float(working["sales"].iloc[-6:].mean()),
            "month_sin": float(np.sin(2 * np.pi * next_date.month / 12)),
            "month_cos": float(np.cos(2 * np.pi * next_date.month / 12)),
        }

        row_df = pd.DataFrame([next_row])
        y_hat = float(model.predict(row_df[feature_cols])[0])

        preds.append({"date": next_date, "forecast": y_hat})
        working = pd.concat(
            [working, pd.DataFrame([{"date": next_date, "sales": y_hat, "promo": next_promo}])],
            ignore_index=True,
        )

    return pd.DataFrame(preds)

train_raw = df.iloc[:-test_horizon].copy()
test_raw = df.iloc[-test_horizon:].copy()
train_raw_feat = make_time_features(train_raw)

ridge_recursive = clone(ridge_model)
rf_recursive = clone(rf_model)
ridge_recursive.fit(train_raw_feat[feature_cols], train_raw_feat["sales"])
rf_recursive.fit(train_raw_feat[feature_cols], train_raw_feat["sales"])

ridge_backtest = predict_next_months(train_raw, ridge_recursive, steps=test_horizon)
rf_backtest = predict_next_months(train_raw, rf_recursive, steps=test_horizon)

ridge_recursive_mae = mean_absolute_error(test_raw["sales"], ridge_backtest["forecast"])
rf_recursive_mae = mean_absolute_error(test_raw["sales"], rf_backtest["forecast"])

print(f"recursive ridge MAE (12-step): {ridge_recursive_mae:.3f}")
print(f"recursive rf MAE (12-step)   : {rf_recursive_mae:.3f}")

1 回の分割では足りない

単発の holdout だけでは偶然の当たり外れが残ります。TimeSeriesSplit によって、複数の時点で同じ予測手順を繰り返し、評価のぶれを確かめます。

recursive_plot = test_raw[["date", "sales"]].copy()
recursive_plot["ridge_recursive"] = ridge_backtest["forecast"].to_numpy()
recursive_plot["rf_recursive"] = rf_backtest["forecast"].to_numpy()

fig, ax = plt.subplots(figsize=(11, 4.4))
ax.plot(recursive_plot["date"], recursive_plot["sales"], marker="o", linewidth=2, label="actual")
ax.plot(recursive_plot["date"], recursive_plot["ridge_recursive"], marker="o", linewidth=1.5, label="ridge recursive")
ax.plot(recursive_plot["date"], recursive_plot["rf_recursive"], marker="o", linewidth=1.5, label="rf recursive")
ax.set_title("Recursive 12-Step Backtest")
ax.set_xlabel("date")
ax.set_ylabel("sales")
ax.legend()
plt.tight_layout()
plt.show()

ウォークフォワード評価では、常に過去で学習し未来を検証するという順序が守られます。この形が本番の運用にもっとも近い評価です。

tscv = TimeSeriesSplit(n_splits=5)
cv_scores = cross_val_score(
    ridge_model,
    X_all,
    y_all,
    cv=tscv,
    scoring="neg_mean_absolute_error",
)

cv_mae = -cv_scores
print("TimeSeriesSplit MAE:", np.round(cv_mae, 3))
print("mean:", round(cv_mae.mean(), 3), "std:", round(cv_mae.std(), 3))

fold ごとの MAE を見ると、平均値だけではなく、最近の期間だけ急に悪化していないかまで確認できます。構造変化の兆候は、こうした揺れ方に表れます。

全期間で再学習して未来 12 か月を予測すると、現在時点で持っている情報だけから先を読むという、本番に近い視点へ戻れます。

ridge_final = clone(ridge_model)
ridge_final.fit(X_all, y_all)

future_pred = predict_next_months(df, ridge_final, steps=12)
future_pred.head()

将来予測の図では、当たり外れそのものだけでなく、どの時点からずれ始めるかを見ることが重要です。誤差の連鎖が強まる場所は、モデル改善の入口になります。

recent_hist = df.tail(24)
fig, ax = plt.subplots(figsize=(11, 4.4))
ax.plot(recent_hist["date"], recent_hist["sales"], marker="o", label="history (last 24 months)")
ax.plot(future_pred["date"], future_pred["forecast"], marker="o", label="future forecast (ridge)")
ax.set_title("History and 12-Month Forecast")
ax.set_xlabel("date")
ax.set_ylabel("sales")
ax.legend()
plt.tight_layout()
plt.show()

時系列モデリングでは、複雑なモデルより先に時間順を守ることが優先されます。評価設計が正しければ、ラグ特徴と移動平均だけでも十分に強い基準線になります。