自己回帰モデルとフローベースモデル

このノートでは、深層生成モデルの中でもよく比較される2系統を扱います。

自己回帰モデルは「順番に条件付き分布を掛ける」設計で、フローベースモデルは「可逆変換で分布を写す」設計です。どちらも尤度に基づく学習ができますが、何が得意で何が苦手かはかなり違います。ここでは最小実装を通して、その違いを手で追える形にします。

厳密尤度を持てても、生成の仕方はまったく同じではない

自己回帰モデルもフローベースモデルも尤度で学習できますが、前者は順番に条件づけ、後者は可逆変換で全体を写します。この違いは、学習より推論とサンプリングの性格に強く現れます。

このノートでは、まず 2 変数の自己回帰モデルで「条件付き分布を掛ける」とは何かを見ます。そのあとで affine coupling を持つフローへ移り、逆変換と log-det が簡単になる理由を確認し、最後に速度差と使い分けを整理します。

中心にあるのは、分解の仕方の違いです

自己回帰は確率分布そのものを順番に分解し、フローは簡単な分布からデータ分布へ写像します。どちらも尤度を計算できますが、どこで逐次性が出るかが違います。

最初の山場は、自己回帰がなぜ逐次生成になるかです

条件付き分布を積で書けるのは強みですが、サンプルを出すには前の値が必要になります。ここが、後でフローと比べたときの一番大きな違いになります。

フローの面白さは、逆変換できるように設計している点にある

可逆変換にしておくと、サンプリングも尤度計算も整理しやすくなります。その代わり、変換の形には制約が入り、設計の自由度は別の場所で効いてきます。

後半では、速度差を「構造の差」として読む

逐次生成が遅いのは実装が悪いからではなく、条件付き構造そのものに由来します。一方でフローが速いのは、変換をまとめて適用できるからです。

ここでは大規模フローではなく、構造差が見える最小例に絞る

自己回帰とフローの違いは、高性能実装より先に、逐次性と可逆性の差として現れます。まずそこだけをはっきり見える形にしています。

まずは 2 変数の自己回帰を置く

最初の節では、条件付き分布の積だけで相関を再現できることを確認します。自己回帰の最小例です。

import math
import random
import statistics
import time

まず全体像を式で揃えます。

自己回帰の基本は連鎖律です。

p(x1:D)=d=1Dp(xdx<d)p(x_{1:D}) = \prod_{d=1}^{D} p(x_d \mid x_{<d})

フローベースモデルの基本は変数変換です。可逆写像 x=f(z)x=f(z) と基底分布 pZp_Z を使って、

logpX(x)=logpZ(f1(x))+logdetf1(x)x\log p_X(x) = \log p_Z(f^{-1}(x)) + \log \left|\det \frac{\partial f^{-1}(x)}{\partial x}\right|

と書けます。尤度を直接計算できる点は共通ですが、

という違いがあります。

まず自己回帰モデルから始めます。ここでは2変数データ (x1, x2) を用意し、

を学習して、連鎖律で同時分布を表現する流れを確認します。

random.seed(25)

def make_ar_dataset(n=1200):
    xs = []
    for _ in range(n):
        x1 = random.gauss(0.7, 1.1)
        x2 = 1.35 * x1 - 0.4 + random.gauss(0.0, 0.55)
        xs.append((x1, x2))
    return xs


def corr(xs):
    x1 = [v[0] for v in xs]
    x2 = [v[1] for v in xs]
    m1 = statistics.mean(x1)
    m2 = statistics.mean(x2)
    c = sum((a - m1) * (b - m2) for a, b in xs) / len(xs)
    s1 = statistics.pstdev(x1)
    s2 = statistics.pstdev(x2)
    return c / (s1 * s2)


data_ar = make_ar_dataset()
print('dataset size =', len(data_ar))
print('mean x1/x2 =', round(statistics.mean(v[0] for v in data_ar), 4), round(statistics.mean(v[1] for v in data_ar), 4))
print('std  x1/x2 =', round(statistics.pstdev(v[0] for v in data_ar), 4), round(statistics.pstdev(v[1] for v in data_ar), 4))
print('corr(x1,x2)=', round(corr(data_ar), 4))

ここで使う最小自己回帰モデルは次です。

パラメータは theta = [mu1, log_sigma1, a, b, log_sigma2] とし、負の対数尤度(NLL)を最小化します。log_sigma を直接学習するのは、分散が必ず正になるようにするためです。

def clamp_log_sigma(v):
    return max(-4.0, min(3.5, v))


def sigma_from_log(log_sigma):
    return math.exp(clamp_log_sigma(log_sigma))


def log_normal(x, mu, sigma):
    return -0.5 * math.log(2.0 * math.pi) - math.log(sigma) - 0.5 * ((x - mu) / sigma) ** 2


def nll_ar(theta, dataset):
    mu1, log_s1, a, b, log_s2 = theta
    s1 = sigma_from_log(log_s1)
    s2 = sigma_from_log(log_s2)

    total = 0.0
    for x1, x2 in dataset:
        lp1 = log_normal(x1, mu1, s1)
        mu2 = a * x1 + b
        lp2 = log_normal(x2, mu2, s2)
        total += -(lp1 + lp2)
    return total / len(dataset)


def finite_diff_grad(fn, params, h=1e-4):
    grads = []
    for i in range(len(params)):
        p1 = params[:]
        p2 = params[:]
        p1[i] += h
        p2[i] -= h
        grads.append((fn(p1) - fn(p2)) / (2.0 * h))
    return grads

自己回帰で相関を学習する

ここでは NLL を下げながら、生成分布が元の相関構造へ寄っていくかを見ます。

def train_ar(theta_init, dataset, steps=240, lr=0.04, log_every=30):
    theta = theta_init[:]
    history = []

    for step in range(steps + 1):
        grad = finite_diff_grad(lambda th: nll_ar(th, dataset), theta)
        theta = [p - lr * g for p, g in zip(theta, grad)]

        # log_sigma の発散抑制
        theta[1] = clamp_log_sigma(theta[1])
        theta[4] = clamp_log_sigma(theta[4])

        if step % log_every == 0:
            history.append((step, nll_ar(theta, dataset), theta[:]))

    return theta, history


def sample_ar(theta, n=1500):
    mu1, log_s1, a, b, log_s2 = theta
    s1 = sigma_from_log(log_s1)
    s2 = sigma_from_log(log_s2)
    out = []
    for _ in range(n):
        x1 = random.gauss(mu1, s1)
        x2 = random.gauss(a * x1 + b, s2)
        out.append((x1, x2))
    return out


theta0 = [0.0, 0.0, 0.2, 0.0, 0.0]
print('initial nll =', round(nll_ar(theta0, data_ar), 5))

trained_ar, ar_hist = train_ar(theta0, data_ar)
for step, value, th in ar_hist:
    print(f'step={step:03d}', 'nll=', round(value, 5), 'theta=', [round(v, 4) for v in th])

print('final nll =', round(nll_ar(trained_ar, data_ar), 5))

次に、可逆変換で分布を写す

フロー側では 2 次元の affine coupling を使い、逆写像と log-det がどう簡単になるかを確認します。

random.seed(25)
synthetic_ar = sample_ar(trained_ar)

print('AR data mean x1/x2 =', round(statistics.mean(v[0] for v in data_ar), 4), round(statistics.mean(v[1] for v in data_ar), 4))
print('AR gen  mean x1/x2 =', round(statistics.mean(v[0] for v in synthetic_ar), 4), round(statistics.mean(v[1] for v in synthetic_ar), 4))
print('AR data std  x1/x2 =', round(statistics.pstdev(v[0] for v in data_ar), 4), round(statistics.pstdev(v[1] for v in data_ar), 4))
print('AR gen  std  x1/x2 =', round(statistics.pstdev(v[0] for v in synthetic_ar), 4), round(statistics.pstdev(v[1] for v in synthetic_ar), 4))
print('AR data corr       =', round(corr(data_ar), 4))
print('AR gen  corr       =', round(corr(synthetic_ar), 4))

ここまでで、自己回帰モデルが「条件付き分布を積む」ことで相関構造を学べることを確認できました。

次にフローベースモデルへ移ります。フローは「分かりやすい分布 z を可逆変換で x に写す」設計です。RealNVPで中心になるのは affine coupling で、Jacobian の行列式が計算しやすいように変換を組みます。

2次元の最小 affine coupling を使います。

y1=z1y_1 = z_1 y2=z2exp(s(z1))+t(z1)y_2 = z_2 \cdot \exp(s(z_1)) + t(z_1)

ここで s(z1)=a_s z1 + b_s, t(z1)=a_t z1 + b_t と置くと、逆変換が明示的に書けます。

z1=y1,z2=(y2t(y1))exp(s(y1))z_1 = y_1, \quad z_2 = (y_2 - t(y_1)) \exp(-s(y_1))

このとき Jacobian が計算しやすいのが重要です。y1=z1 なので Jacobian は三角構造になり、対角要素は 1exp(s(z1)) になります。したがって行列式は exp(s(z1))、log-det は s(z1) だけで計算できます。

これが RealNVP 系で「可逆だが尤度計算も軽い」理由です。

def standard_normal_logpdf(z1, z2):
    return -math.log(2.0 * math.pi) - 0.5 * (z1 * z1 + z2 * z2)


def safe_exp(v):
    # 数値発散防止
    return math.exp(max(-5.0, min(5.0, v)))


def coupling_forward(z1, z2, theta):
    a_s, b_s, a_t, b_t = theta
    s = a_s * z1 + b_s
    t = a_t * z1 + b_t
    x1 = z1
    x2 = z2 * safe_exp(s) + t
    log_det = s
    return x1, x2, log_det


def coupling_inverse(x1, x2, theta):
    a_s, b_s, a_t, b_t = theta
    s = a_s * x1 + b_s
    t = a_t * x1 + b_t
    z1 = x1
    z2 = (x2 - t) / safe_exp(s)
    log_det_inv = -s
    return z1, z2, log_det_inv


# 可逆性チェック
theta_check = [0.55, -0.18, 1.1, 0.35]
z1, z2 = 0.7, -1.2
x1, x2, ld = coupling_forward(z1, z2, theta_check)
z1_hat, z2_hat, ld_inv = coupling_inverse(x1, x2, theta_check)

print('forward  :', round(x1, 6), round(x2, 6), 'logdet=', round(ld, 6))
print('inverse  :', round(z1_hat, 6), round(z2_hat, 6), 'logdet_inv=', round(ld_inv, 6))
print('recovery error =', round(abs(z1 - z1_hat) + abs(z2 - z2_hat), 12))

次のセルでは、既知の変換 teacher_theta を使って合成観測データを作ります。意図は、学習後に thetateacher_theta に近づくかを確認することです。

手順は z ~ N(0,I) をサンプルし、x = f_teacher(z) を作るだけです。実務では x しか観測できませんが、この教材では正解変換を知っている設定にして、フロー学習が「逆写像を通じた尤度最大化」で何をしているかを見やすくしています。

random.seed(25)

# 教師変換で2次元データを作る(実務では観測データに相当)
teacher_theta = [0.8, -0.25, 1.35, 0.4]

def make_flow_dataset(n=1500):
    out = []
    for _ in range(n):
        z1 = random.gauss(0.0, 1.0)
        z2 = random.gauss(0.0, 1.0)
        x1, x2, _ = coupling_forward(z1, z2, teacher_theta)
        out.append((x1, x2))
    return out


data_flow = make_flow_dataset()
print('flow dataset size =', len(data_flow))
print('mean x1/x2 =', round(statistics.mean(v[0] for v in data_flow), 4), round(statistics.mean(v[1] for v in data_flow), 4))
print('std  x1/x2 =', round(statistics.pstdev(v[0] for v in data_flow), 4), round(statistics.pstdev(v[1] for v in data_flow), 4))
print('corr(x1,x2)=', round(corr(data_flow), 4))

可逆性と尤度補正を確かめる

ここでは recovery error と logdet を見て、変換が可逆であり、密度補正が正しく入っているかを確認します。

def nll_flow(theta, dataset):
    total = 0.0
    for x1, x2 in dataset:
        z1, z2, log_det_inv = coupling_inverse(x1, x2, theta)
        lp = standard_normal_logpdf(z1, z2) + log_det_inv
        total += -lp
    return total / len(dataset)


def train_flow(theta_init, dataset, steps=260, lr=0.03, log_every=40):
    theta = theta_init[:]
    history = []

    for step in range(steps + 1):
        grad = finite_diff_grad(lambda th: nll_flow(th, dataset), theta)
        theta = [p - lr * g for p, g in zip(theta, grad)]

        # sの線形係数が暴れすぎないように軽く制限
        theta[0] = max(-2.5, min(2.5, theta[0]))
        theta[1] = max(-2.5, min(2.5, theta[1]))

        if step % log_every == 0:
            history.append((step, nll_flow(theta, dataset), theta[:]))

    return theta, history


flow_theta0 = [0.05, 0.0, 0.1, 0.0]
print('initial flow nll =', round(nll_flow(flow_theta0, data_flow), 5))

trained_flow, flow_hist = train_flow(flow_theta0, data_flow)
for step, value, th in flow_hist:
    print(f'step={step:03d}', 'nll=', round(value, 5), 'theta=', [round(v, 4) for v in th])

print('final flow nll =', round(nll_flow(trained_flow, data_flow), 5))

マスク付き coupling で変換の設計を広げる

最後は 4 次元の簡易例で、どの成分を固定しどの成分を更新するかによって、フローの設計がどう変わるかを見ます。

def sample_flow(theta, n=1500):
    out = []
    for _ in range(n):
        z1 = random.gauss(0.0, 1.0)
        z2 = random.gauss(0.0, 1.0)
        x1, x2, _ = coupling_forward(z1, z2, theta)
        out.append((x1, x2))
    return out


random.seed(25)
synthetic_flow = sample_flow(trained_flow)

print('FLOW data mean x1/x2 =', round(statistics.mean(v[0] for v in data_flow), 4), round(statistics.mean(v[1] for v in data_flow), 4))
print('FLOW gen  mean x1/x2 =', round(statistics.mean(v[0] for v in synthetic_flow), 4), round(statistics.mean(v[1] for v in synthetic_flow), 4))
print('FLOW data std  x1/x2 =', round(statistics.pstdev(v[0] for v in data_flow), 4), round(statistics.pstdev(v[1] for v in data_flow), 4))
print('FLOW gen  std  x1/x2 =', round(statistics.pstdev(v[0] for v in synthetic_flow), 4), round(statistics.pstdev(v[1] for v in synthetic_flow), 4))
print('FLOW data corr       =', round(corr(data_flow), 4))
print('FLOW gen  corr       =', round(corr(synthetic_flow), 4))
print('learned theta        =', [round(v, 4) for v in trained_flow])
print('teacher theta        =', [round(v, 4) for v in teacher_theta])

ここで使った2次元 coupling は、RealNVP の最小断面です。実際の画像タスクでは、

という構成で表現力を上げます。重要なのは、どれだけ複雑にしても「可逆性」と「log-det が計算できる設計」を保つことです。

ここでは RealNVP で使うマスク機構を4次元ベクトルで模擬します。

mask=1 の次元はその層では固定し、mask=0 の次元だけを affine 変換します。固定側の値から cond(条件コンテキスト)を作り、その cond を使って scale/shift を決めるのが coupling の基本です。

mask_amask_b を交互に使う理由は、毎層で固定される次元を入れ替え、最終的にすべての次元を更新可能にするためです。

def masked_affine_coupling_forward(x, mask, theta):
    # x: 4次元ベクトル
    # mask=1 の次元は固定、mask=0 の次元を変換
    a_s, b_s, a_t, b_t = theta
    x_fixed = [x[i] for i in range(4) if mask[i] == 1]
    cond = sum(x_fixed) / len(x_fixed)

    s = a_s * cond + b_s
    t = a_t * cond + b_t
    scale = safe_exp(s)

    y = x[:]
    log_det = 0.0
    for i in range(4):
        if mask[i] == 0:
            y[i] = x[i] * scale + t
            log_det += s
    return y, log_det


def masked_affine_coupling_inverse(y, mask, theta):
    a_s, b_s, a_t, b_t = theta
    y_fixed = [y[i] for i in range(4) if mask[i] == 1]
    cond = sum(y_fixed) / len(y_fixed)

    s = a_s * cond + b_s
    t = a_t * cond + b_t
    scale = safe_exp(s)

    x = y[:]
    log_det_inv = 0.0
    for i in range(4):
        if mask[i] == 0:
            x[i] = (y[i] - t) / scale
            log_det_inv -= s
    return x, log_det_inv


mask_a = [1, 0, 1, 0]
mask_b = [0, 1, 0, 1]
vec = [0.3, -1.0, 0.8, 1.2]
params = [0.7, -0.1, 0.9, 0.2]

y1, ld1 = masked_affine_coupling_forward(vec, mask_a, params)
y2, ld2 = masked_affine_coupling_forward(y1, mask_b, params)

x1, ild2 = masked_affine_coupling_inverse(y2, mask_b, params)
x0, ild1 = masked_affine_coupling_inverse(x1, mask_a, params)

print('input vec     =', [round(v, 6) for v in vec])
print('after 2 flows =', [round(v, 6) for v in y2])
print('recovered vec =', [round(v, 6) for v in x0])
print('recovery err  =', round(sum(abs(a - b) for a, b in zip(vec, x0)), 12))
print('logdet fwd/inv=', round(ld1 + ld2, 6), round(ild1 + ild2, 6))

速度差はどこから来るかを見る

最後は sampling time を比べて、自己回帰の逐次性とフローの一括変換の差を実感します。

# ざっくりした計算コスト観察(教材用の粗い比較)
def benchmark_ar(theta, num=2000):
    t0 = time.time()
    _ = sample_ar(theta, n=num)
    t1 = time.time()
    return t1 - t0


def benchmark_flow(theta, num=2000):
    t0 = time.time()
    _ = sample_flow(theta, n=num)
    t1 = time.time()
    return t1 - t0


ar_sec = benchmark_ar(trained_ar)
flow_sec = benchmark_flow(trained_flow)
print('sampling time (AR)   =', round(ar_sec, 5), 'sec')
print('sampling time (Flow) =', round(flow_sec, 5), 'sec')
print('note: この比較は2次元toyなので、実務速度の結論そのものには使わない')

このノートの要点は3つです。

1つ目は、自己回帰モデルは連鎖律で設計が明確で、条件付き関係を素直に書けることです。2つ目は、フローベースモデルは可逆性と log-det 計算可能性を守ることで、尤度とサンプリングの両方を直接扱えることです。3つ目は、どちらが優れているかは問題設定しだいで、モデルの目的(高忠実度サンプリングか、密度推定の安定性か、計算効率か)に合わせて選ぶ必要があることです。

RealNVPやMAF/IAFに進むときは、今回の最小実装を「次元を増やしたときに何がボトルネックになるか」という視点で読み替えると、論文の設計意図がかなり理解しやすくなります。