自己回帰モデルとフローベースモデル
このノートでは、深層生成モデルの中でもよく比較される2系統を扱います。
自己回帰モデルは「順番に条件付き分布を掛ける」設計で、フローベースモデルは「可逆変換で分布を写す」設計です。どちらも尤度に基づく学習ができますが、何が得意で何が苦手かはかなり違います。ここでは最小実装を通して、その違いを手で追える形にします。
厳密尤度を持てても、生成の仕方はまったく同じではない
自己回帰モデルもフローベースモデルも尤度で学習できますが、前者は順番に条件づけ、後者は可逆変換で全体を写します。この違いは、学習より推論とサンプリングの性格に強く現れます。
このノートでは、まず 2 変数の自己回帰モデルで「条件付き分布を掛ける」とは何かを見ます。そのあとで affine coupling を持つフローへ移り、逆変換と log-det が簡単になる理由を確認し、最後に速度差と使い分けを整理します。
中心にあるのは、分解の仕方の違いです
自己回帰は確率分布そのものを順番に分解し、フローは簡単な分布からデータ分布へ写像します。どちらも尤度を計算できますが、どこで逐次性が出るかが違います。
最初の山場は、自己回帰がなぜ逐次生成になるかです
条件付き分布を積で書けるのは強みですが、サンプルを出すには前の値が必要になります。ここが、後でフローと比べたときの一番大きな違いになります。
フローの面白さは、逆変換できるように設計している点にある
可逆変換にしておくと、サンプリングも尤度計算も整理しやすくなります。その代わり、変換の形には制約が入り、設計の自由度は別の場所で効いてきます。
後半では、速度差を「構造の差」として読む
逐次生成が遅いのは実装が悪いからではなく、条件付き構造そのものに由来します。一方でフローが速いのは、変換をまとめて適用できるからです。
ここでは大規模フローではなく、構造差が見える最小例に絞る
自己回帰とフローの違いは、高性能実装より先に、逐次性と可逆性の差として現れます。まずそこだけをはっきり見える形にしています。
まずは 2 変数の自己回帰を置く
最初の節では、条件付き分布の積だけで相関を再現できることを確認します。自己回帰の最小例です。
import math
import random
import statistics
import time
まず全体像を式で揃えます。
自己回帰の基本は連鎖律です。
フローベースモデルの基本は変数変換です。可逆写像 と基底分布 を使って、
と書けます。尤度を直接計算できる点は共通ですが、
- 自己回帰: 尤度計算が簡潔、サンプリングは逐次で遅くなりやすい
- フロー: 変換がうまく設計できれば、尤度とサンプリングの両方が扱いやすい
という違いがあります。
まず自己回帰モデルから始めます。ここでは2変数データ (x1, x2) を用意し、
p(x1)p(x2 | x1)
を学習して、連鎖律で同時分布を表現する流れを確認します。
random.seed(25)
def make_ar_dataset(n=1200):
xs = []
for _ in range(n):
x1 = random.gauss(0.7, 1.1)
x2 = 1.35 * x1 - 0.4 + random.gauss(0.0, 0.55)
xs.append((x1, x2))
return xs
def corr(xs):
x1 = [v[0] for v in xs]
x2 = [v[1] for v in xs]
m1 = statistics.mean(x1)
m2 = statistics.mean(x2)
c = sum((a - m1) * (b - m2) for a, b in xs) / len(xs)
s1 = statistics.pstdev(x1)
s2 = statistics.pstdev(x2)
return c / (s1 * s2)
data_ar = make_ar_dataset()
print('dataset size =', len(data_ar))
print('mean x1/x2 =', round(statistics.mean(v[0] for v in data_ar), 4), round(statistics.mean(v[1] for v in data_ar), 4))
print('std x1/x2 =', round(statistics.pstdev(v[0] for v in data_ar), 4), round(statistics.pstdev(v[1] for v in data_ar), 4))
print('corr(x1,x2)=', round(corr(data_ar), 4))
ここで使う最小自己回帰モデルは次です。
x1 ~ N(mu1, sigma1^2)x2 | x1 ~ N(a * x1 + b, sigma2^2)
パラメータは theta = [mu1, log_sigma1, a, b, log_sigma2] とし、負の対数尤度(NLL)を最小化します。log_sigma を直接学習するのは、分散が必ず正になるようにするためです。
def clamp_log_sigma(v):
return max(-4.0, min(3.5, v))
def sigma_from_log(log_sigma):
return math.exp(clamp_log_sigma(log_sigma))
def log_normal(x, mu, sigma):
return -0.5 * math.log(2.0 * math.pi) - math.log(sigma) - 0.5 * ((x - mu) / sigma) ** 2
def nll_ar(theta, dataset):
mu1, log_s1, a, b, log_s2 = theta
s1 = sigma_from_log(log_s1)
s2 = sigma_from_log(log_s2)
total = 0.0
for x1, x2 in dataset:
lp1 = log_normal(x1, mu1, s1)
mu2 = a * x1 + b
lp2 = log_normal(x2, mu2, s2)
total += -(lp1 + lp2)
return total / len(dataset)
def finite_diff_grad(fn, params, h=1e-4):
grads = []
for i in range(len(params)):
p1 = params[:]
p2 = params[:]
p1[i] += h
p2[i] -= h
grads.append((fn(p1) - fn(p2)) / (2.0 * h))
return grads
自己回帰で相関を学習する
ここでは NLL を下げながら、生成分布が元の相関構造へ寄っていくかを見ます。
def train_ar(theta_init, dataset, steps=240, lr=0.04, log_every=30):
theta = theta_init[:]
history = []
for step in range(steps + 1):
grad = finite_diff_grad(lambda th: nll_ar(th, dataset), theta)
theta = [p - lr * g for p, g in zip(theta, grad)]
# log_sigma の発散抑制
theta[1] = clamp_log_sigma(theta[1])
theta[4] = clamp_log_sigma(theta[4])
if step % log_every == 0:
history.append((step, nll_ar(theta, dataset), theta[:]))
return theta, history
def sample_ar(theta, n=1500):
mu1, log_s1, a, b, log_s2 = theta
s1 = sigma_from_log(log_s1)
s2 = sigma_from_log(log_s2)
out = []
for _ in range(n):
x1 = random.gauss(mu1, s1)
x2 = random.gauss(a * x1 + b, s2)
out.append((x1, x2))
return out
theta0 = [0.0, 0.0, 0.2, 0.0, 0.0]
print('initial nll =', round(nll_ar(theta0, data_ar), 5))
trained_ar, ar_hist = train_ar(theta0, data_ar)
for step, value, th in ar_hist:
print(f'step={step:03d}', 'nll=', round(value, 5), 'theta=', [round(v, 4) for v in th])
print('final nll =', round(nll_ar(trained_ar, data_ar), 5))
次に、可逆変換で分布を写す
フロー側では 2 次元の affine coupling を使い、逆写像と log-det がどう簡単になるかを確認します。
random.seed(25)
synthetic_ar = sample_ar(trained_ar)
print('AR data mean x1/x2 =', round(statistics.mean(v[0] for v in data_ar), 4), round(statistics.mean(v[1] for v in data_ar), 4))
print('AR gen mean x1/x2 =', round(statistics.mean(v[0] for v in synthetic_ar), 4), round(statistics.mean(v[1] for v in synthetic_ar), 4))
print('AR data std x1/x2 =', round(statistics.pstdev(v[0] for v in data_ar), 4), round(statistics.pstdev(v[1] for v in data_ar), 4))
print('AR gen std x1/x2 =', round(statistics.pstdev(v[0] for v in synthetic_ar), 4), round(statistics.pstdev(v[1] for v in synthetic_ar), 4))
print('AR data corr =', round(corr(data_ar), 4))
print('AR gen corr =', round(corr(synthetic_ar), 4))
ここまでで、自己回帰モデルが「条件付き分布を積む」ことで相関構造を学べることを確認できました。
次にフローベースモデルへ移ります。フローは「分かりやすい分布 z を可逆変換で x に写す」設計です。RealNVPで中心になるのは affine coupling で、Jacobian の行列式が計算しやすいように変換を組みます。
2次元の最小 affine coupling を使います。
ここで s(z1)=a_s z1 + b_s, t(z1)=a_t z1 + b_t と置くと、逆変換が明示的に書けます。
このとき Jacobian が計算しやすいのが重要です。y1=z1 なので Jacobian は三角構造になり、対角要素は 1 と exp(s(z1)) になります。したがって行列式は exp(s(z1))、log-det は s(z1) だけで計算できます。
これが RealNVP 系で「可逆だが尤度計算も軽い」理由です。
def standard_normal_logpdf(z1, z2):
return -math.log(2.0 * math.pi) - 0.5 * (z1 * z1 + z2 * z2)
def safe_exp(v):
# 数値発散防止
return math.exp(max(-5.0, min(5.0, v)))
def coupling_forward(z1, z2, theta):
a_s, b_s, a_t, b_t = theta
s = a_s * z1 + b_s
t = a_t * z1 + b_t
x1 = z1
x2 = z2 * safe_exp(s) + t
log_det = s
return x1, x2, log_det
def coupling_inverse(x1, x2, theta):
a_s, b_s, a_t, b_t = theta
s = a_s * x1 + b_s
t = a_t * x1 + b_t
z1 = x1
z2 = (x2 - t) / safe_exp(s)
log_det_inv = -s
return z1, z2, log_det_inv
# 可逆性チェック
theta_check = [0.55, -0.18, 1.1, 0.35]
z1, z2 = 0.7, -1.2
x1, x2, ld = coupling_forward(z1, z2, theta_check)
z1_hat, z2_hat, ld_inv = coupling_inverse(x1, x2, theta_check)
print('forward :', round(x1, 6), round(x2, 6), 'logdet=', round(ld, 6))
print('inverse :', round(z1_hat, 6), round(z2_hat, 6), 'logdet_inv=', round(ld_inv, 6))
print('recovery error =', round(abs(z1 - z1_hat) + abs(z2 - z2_hat), 12))
次のセルでは、既知の変換 teacher_theta を使って合成観測データを作ります。意図は、学習後に theta が teacher_theta に近づくかを確認することです。
手順は z ~ N(0,I) をサンプルし、x = f_teacher(z) を作るだけです。実務では x しか観測できませんが、この教材では正解変換を知っている設定にして、フロー学習が「逆写像を通じた尤度最大化」で何をしているかを見やすくしています。
random.seed(25)
# 教師変換で2次元データを作る(実務では観測データに相当)
teacher_theta = [0.8, -0.25, 1.35, 0.4]
def make_flow_dataset(n=1500):
out = []
for _ in range(n):
z1 = random.gauss(0.0, 1.0)
z2 = random.gauss(0.0, 1.0)
x1, x2, _ = coupling_forward(z1, z2, teacher_theta)
out.append((x1, x2))
return out
data_flow = make_flow_dataset()
print('flow dataset size =', len(data_flow))
print('mean x1/x2 =', round(statistics.mean(v[0] for v in data_flow), 4), round(statistics.mean(v[1] for v in data_flow), 4))
print('std x1/x2 =', round(statistics.pstdev(v[0] for v in data_flow), 4), round(statistics.pstdev(v[1] for v in data_flow), 4))
print('corr(x1,x2)=', round(corr(data_flow), 4))
可逆性と尤度補正を確かめる
ここでは recovery error と logdet を見て、変換が可逆であり、密度補正が正しく入っているかを確認します。
def nll_flow(theta, dataset):
total = 0.0
for x1, x2 in dataset:
z1, z2, log_det_inv = coupling_inverse(x1, x2, theta)
lp = standard_normal_logpdf(z1, z2) + log_det_inv
total += -lp
return total / len(dataset)
def train_flow(theta_init, dataset, steps=260, lr=0.03, log_every=40):
theta = theta_init[:]
history = []
for step in range(steps + 1):
grad = finite_diff_grad(lambda th: nll_flow(th, dataset), theta)
theta = [p - lr * g for p, g in zip(theta, grad)]
# sの線形係数が暴れすぎないように軽く制限
theta[0] = max(-2.5, min(2.5, theta[0]))
theta[1] = max(-2.5, min(2.5, theta[1]))
if step % log_every == 0:
history.append((step, nll_flow(theta, dataset), theta[:]))
return theta, history
flow_theta0 = [0.05, 0.0, 0.1, 0.0]
print('initial flow nll =', round(nll_flow(flow_theta0, data_flow), 5))
trained_flow, flow_hist = train_flow(flow_theta0, data_flow)
for step, value, th in flow_hist:
print(f'step={step:03d}', 'nll=', round(value, 5), 'theta=', [round(v, 4) for v in th])
print('final flow nll =', round(nll_flow(trained_flow, data_flow), 5))
マスク付き coupling で変換の設計を広げる
最後は 4 次元の簡易例で、どの成分を固定しどの成分を更新するかによって、フローの設計がどう変わるかを見ます。
def sample_flow(theta, n=1500):
out = []
for _ in range(n):
z1 = random.gauss(0.0, 1.0)
z2 = random.gauss(0.0, 1.0)
x1, x2, _ = coupling_forward(z1, z2, theta)
out.append((x1, x2))
return out
random.seed(25)
synthetic_flow = sample_flow(trained_flow)
print('FLOW data mean x1/x2 =', round(statistics.mean(v[0] for v in data_flow), 4), round(statistics.mean(v[1] for v in data_flow), 4))
print('FLOW gen mean x1/x2 =', round(statistics.mean(v[0] for v in synthetic_flow), 4), round(statistics.mean(v[1] for v in synthetic_flow), 4))
print('FLOW data std x1/x2 =', round(statistics.pstdev(v[0] for v in data_flow), 4), round(statistics.pstdev(v[1] for v in data_flow), 4))
print('FLOW gen std x1/x2 =', round(statistics.pstdev(v[0] for v in synthetic_flow), 4), round(statistics.pstdev(v[1] for v in synthetic_flow), 4))
print('FLOW data corr =', round(corr(data_flow), 4))
print('FLOW gen corr =', round(corr(synthetic_flow), 4))
print('learned theta =', [round(v, 4) for v in trained_flow])
print('teacher theta =', [round(v, 4) for v in teacher_theta])
ここで使った2次元 coupling は、RealNVP の最小断面です。実際の画像タスクでは、
- 次元を分割するマスク(checkerboard / channel-wise)
- 複数の coupling layer をスタック
- squeeze / unsqueeze で空間・チャネルを組み替え
という構成で表現力を上げます。重要なのは、どれだけ複雑にしても「可逆性」と「log-det が計算できる設計」を保つことです。
ここでは RealNVP で使うマスク機構を4次元ベクトルで模擬します。
mask=1 の次元はその層では固定し、mask=0 の次元だけを affine 変換します。固定側の値から cond(条件コンテキスト)を作り、その cond を使って scale/shift を決めるのが coupling の基本です。
mask_a と mask_b を交互に使う理由は、毎層で固定される次元を入れ替え、最終的にすべての次元を更新可能にするためです。
def masked_affine_coupling_forward(x, mask, theta):
# x: 4次元ベクトル
# mask=1 の次元は固定、mask=0 の次元を変換
a_s, b_s, a_t, b_t = theta
x_fixed = [x[i] for i in range(4) if mask[i] == 1]
cond = sum(x_fixed) / len(x_fixed)
s = a_s * cond + b_s
t = a_t * cond + b_t
scale = safe_exp(s)
y = x[:]
log_det = 0.0
for i in range(4):
if mask[i] == 0:
y[i] = x[i] * scale + t
log_det += s
return y, log_det
def masked_affine_coupling_inverse(y, mask, theta):
a_s, b_s, a_t, b_t = theta
y_fixed = [y[i] for i in range(4) if mask[i] == 1]
cond = sum(y_fixed) / len(y_fixed)
s = a_s * cond + b_s
t = a_t * cond + b_t
scale = safe_exp(s)
x = y[:]
log_det_inv = 0.0
for i in range(4):
if mask[i] == 0:
x[i] = (y[i] - t) / scale
log_det_inv -= s
return x, log_det_inv
mask_a = [1, 0, 1, 0]
mask_b = [0, 1, 0, 1]
vec = [0.3, -1.0, 0.8, 1.2]
params = [0.7, -0.1, 0.9, 0.2]
y1, ld1 = masked_affine_coupling_forward(vec, mask_a, params)
y2, ld2 = masked_affine_coupling_forward(y1, mask_b, params)
x1, ild2 = masked_affine_coupling_inverse(y2, mask_b, params)
x0, ild1 = masked_affine_coupling_inverse(x1, mask_a, params)
print('input vec =', [round(v, 6) for v in vec])
print('after 2 flows =', [round(v, 6) for v in y2])
print('recovered vec =', [round(v, 6) for v in x0])
print('recovery err =', round(sum(abs(a - b) for a, b in zip(vec, x0)), 12))
print('logdet fwd/inv=', round(ld1 + ld2, 6), round(ild1 + ild2, 6))
速度差はどこから来るかを見る
最後は sampling time を比べて、自己回帰の逐次性とフローの一括変換の差を実感します。
# ざっくりした計算コスト観察(教材用の粗い比較)
def benchmark_ar(theta, num=2000):
t0 = time.time()
_ = sample_ar(theta, n=num)
t1 = time.time()
return t1 - t0
def benchmark_flow(theta, num=2000):
t0 = time.time()
_ = sample_flow(theta, n=num)
t1 = time.time()
return t1 - t0
ar_sec = benchmark_ar(trained_ar)
flow_sec = benchmark_flow(trained_flow)
print('sampling time (AR) =', round(ar_sec, 5), 'sec')
print('sampling time (Flow) =', round(flow_sec, 5), 'sec')
print('note: この比較は2次元toyなので、実務速度の結論そのものには使わない')
このノートの要点は3つです。
1つ目は、自己回帰モデルは連鎖律で設計が明確で、条件付き関係を素直に書けることです。2つ目は、フローベースモデルは可逆性と log-det 計算可能性を守ることで、尤度とサンプリングの両方を直接扱えることです。3つ目は、どちらが優れているかは問題設定しだいで、モデルの目的(高忠実度サンプリングか、密度推定の安定性か、計算効率か)に合わせて選ぶ必要があることです。
RealNVPやMAF/IAFに進むときは、今回の最小実装を「次元を増やしたときに何がボトルネックになるか」という視点で読み替えると、論文の設計意図がかなり理解しやすくなります。