潜在変数モデルと混合モデル

このノートの主題は、見えていない原因(潜在変数)を導入すると、単純モデルでは説明しにくいデータをどう扱えるようになるか、です。分類ではなく「分布そのもの」を学ぶ生成モデルの視点で、EMアルゴリズムまでを一気につなげます。

1 種類に見えないデータを、見えない担当者に分けて考える

潜在変数モデルの直感は単純で、観測だけでは説明しにくいデータを「裏で複数の原因が混ざっている」と考えることです。混合モデルはその最も分かりやすい形で、どの成分がどれだけ担当したかを確率で持ちます。

このノートでは、まず 2 コインの例で潜在変数の必要性を見て、次に責任率と EM の流れを追います。そのあとで多次元混合ベルヌーイと GMM に広げて、見えない割当を推論しながら分布を学ぶ感覚を固めます。

中心にあるのは、硬いクラスタ割当ではなく「あいまいな担当率」です

潜在変数が見えない以上、最初から 0 か 1 に決め打ちはできません。そこで E-step では責任率を置き、M-step ではその重みでパラメータを更新します。この往復が EM の核です。

最初の 2 コイン例は、小さいが本質を全部含んでいる

どちらのコインで投げたかは見えないが、表裏の出方には違いがある。この設定だけで、潜在変数、責任率、同定不能性、EM の考え方が一通り現れます。

後半では、離散の混合から連続の混合へ広げる

混合ベルヌーイが分かると、GMM もだいぶ読みやすくなります。違いは観測分布がベルヌーイかガウスかであって、見えない割当を推論する骨格は同じです。

ここで押さえたいのは、EM が「見えない変数を埋めながら学ぶ」手続きだということ

数式の詳細より、E-step と M-step が役割分担された反復だと理解できれば十分です。その視点で後の変分推論や VAE にもつながります。

ここでの例は小さいが、潜在変数モデルの骨格はそのまま入っている

混合ベルヌーイも最小 GMM も、見えない割当を推論しながら分布を学ぶという意味では同じ構造を持っています。複雑な変分推論へ進む前の土台として読んでください。

まずは 2 コインから始める

最初の節では、1 ビット観測しか見えない状況で、なぜ潜在変数が欲しくなるのかを確かめます。

import math
import random
from statistics import mean

random.seed(21)

まず「なぜ潜在変数が必要か」を直感で押さえます。

同じコイン投げの観測列に見えても、実は2種類のコインが混ざっている場合があります。観測は表裏だけで、どのコインを使ったかは記録されていない。この「見えていないラベル」が潜在変数です。

def generate_coin_mixture(n=40, pi=0.6, p_a=0.8, p_b=0.3):
    data = []
    latent = []
    for _ in range(n):
        z = 0 if random.random() < pi else 1  # 0: coin A, 1: coin B
        x = 1 if random.random() < (p_a if z == 0 else p_b) else 0
        data.append(x)
        latent.append(z)
    return data, latent


data, latent = generate_coin_mixture(n=50, pi=0.65, p_a=0.82, p_b=0.28)
print('observed heads ratio =', round(sum(data) / len(data), 3))
print('true latent count A/B =', latent.count(0), latent.count(1))
print('first 20 observations =', data[:20])

もし潜在変数 z を無視すると、1種類のコイン確率しか推定できません。すると「2種類が混ざっている」という構造情報を失います。混合モデルは、まさにこの構造を扱うためのモデルです。

1. 混合モデルの数式イメージ

混合モデルは「まず潜在クラス z を引いて、次に x を生成する」という2段階で書けます。

問題は、観測時には z が見えないことです。ここでEMが効きます。

def bernoulli(x, p):
    p = min(max(p, 1e-9), 1 - 1e-9)
    return p if x == 1 else (1 - p)


def responsibility_2comp(x, pi, p0, p1):
    # gamma0 = p(z=0|x)
    num0 = pi * bernoulli(x, p0)
    num1 = (1 - pi) * bernoulli(x, p1)
    den = num0 + num1 + 1e-12
    return num0 / den, num1 / den


example_x = [0, 1, 1, 0, 1]
for x in example_x:
    g0, g1 = responsibility_2comp(x, pi=0.5, p0=0.8, p1=0.2)
    print(f'x={x} -> gamma(z=0|x)={g0:.3f}, gamma(z=1|x)={g1:.3f}')

gamma(z=k|x) を負担率(responsibility)と呼びます。硬いクラスタ割当(0か1か)ではなく、確率として割り当てるのがポイントです。これにより境界点も自然に扱えます。

2. EMアルゴリズム(混合ベルヌーイ)

EMは次を反復します。

この流れは「見えない z を埋める」→「埋めたと仮定して最尤更新する」の往復です。

def e_step_binary(data, pi, p0, p1):
    gammas = []
    for x in data:
        g0, _ = responsibility_2comp(x, pi, p0, p1)
        gammas.append(g0)
    return gammas


def m_step_binary(data, gammas):
    n = len(data)
    sum_g = sum(gammas)
    sum_1mg = n - sum_g

    pi_new = sum_g / n
    p0_new = sum(g * x for g, x in zip(gammas, data)) / max(sum_g, 1e-9)
    p1_new = sum((1 - g) * x for g, x in zip(gammas, data)) / max(sum_1mg, 1e-9)

    # 数値安定
    p0_new = min(max(p0_new, 1e-6), 1 - 1e-6)
    p1_new = min(max(p1_new, 1e-6), 1 - 1e-6)
    pi_new = min(max(pi_new, 1e-6), 1 - 1e-6)
    return pi_new, p0_new, p1_new


def loglik_binary(data, pi, p0, p1):
    ll = 0.0
    for x in data:
        prob = pi * bernoulli(x, p0) + (1 - pi) * bernoulli(x, p1)
        ll += math.log(max(prob, 1e-12))
    return ll


# 初期値はわざとずらす
pi, p0, p1 = 0.5, 0.55, 0.45
trace = []
for t in range(20):
    g = e_step_binary(data, pi, p0, p1)
    pi, p0, p1 = m_step_binary(data, g)
    ll = loglik_binary(data, pi, p0, p1)
    trace.append(ll)
    print(f'iter={t:02d} pi={pi:.3f} p0={p0:.3f} p1={p1:.3f} ll={ll:.3f}')

print('log-likelihood monotonic non-decrease check =', all(trace[i] <= trace[i+1] + 1e-9 for i in range(len(trace)-1)))

EMは局所解に落ちる可能性があるため、初期値を変えて複数回走らせるのが実務では基本です。

def run_em_binary(data, n_iter=30):
    pi = random.uniform(0.2, 0.8)
    p0 = random.uniform(0.1, 0.9)
    p1 = random.uniform(0.1, 0.9)

    for _ in range(n_iter):
        g = e_step_binary(data, pi, p0, p1)
        pi, p0, p1 = m_step_binary(data, g)

    ll = loglik_binary(data, pi, p0, p1)
    return ll, (pi, p0, p1)


trials = []
for _ in range(8):
    trials.append(run_em_binary(data))

trials.sort(key=lambda x: x[0], reverse=True)
print('best run ll =', round(trials[0][0], 4), 'params =', tuple(round(v, 4) for v in trials[0][1]))
print('worst run ll =', round(trials[-1][0], 4), 'params =', tuple(round(v, 4) for v in trials[-1][1]))

ここで重要な注意があります。上の2コイン例は「1サンプル=1ビット観測」なので、
P(x=1)=πp0+(1π)p1P(x=1)=\pi p_0 + (1-\pi)p_1 の1本しか観測制約がなく、(π,p0,p1)(\pi,p_0,p_1) は一般に一意に同定できません。

そのため、同じ尤度でも異なるパラメータ組が出ます。これはEMのバグではなく、観測情報の不足による同定不能性です。
実務では多次元特徴や時系列観測を使って情報量を増やし、同定性を改善します。

3. 多次元の混合ベルヌーイ

画像の2値化データのように、観測が多次元になると各次元の確率パラメータを成分ごとに持ちます。ここでは長さ6の2値ベクトルで、MNIST前の練習を行います。

def make_binary_vector_data(n=120):
    # 成分0: 前半が1になりやすい, 成分1: 後半が1になりやすい
    mu0 = [0.85, 0.75, 0.7, 0.2, 0.15, 0.1]
    mu1 = [0.2, 0.25, 0.3, 0.8, 0.75, 0.7]
    pi = 0.55

    xs = []
    zs = []
    for _ in range(n):
        z = 0 if random.random() < pi else 1
        mu = mu0 if z == 0 else mu1
        x = [1 if random.random() < p else 0 for p in mu]
        xs.append(x)
        zs.append(z)
    return xs, zs


vec_data, vec_latent = make_binary_vector_data(n=160)
print('sample x[0:3] =', vec_data[:3])
print('latent count =', vec_latent.count(0), vec_latent.count(1))

責任率で重み付けしながら更新する

ここでは EM を実際に回し、尤度とパラメータがどう動くかを見ます。E-step の確率が M-step の重みに変わるところが肝です。

def bernoulli_vec_prob(x, mu):
    prob = 1.0
    for xi, p in zip(x, mu):
        p = min(max(p, 1e-8), 1 - 1e-8)
        prob *= p if xi == 1 else (1 - p)
    return prob


def e_step_vec(data, pi, mu0, mu1):
    g = []
    for x in data:
        a = pi * bernoulli_vec_prob(x, mu0)
        b = (1 - pi) * bernoulli_vec_prob(x, mu1)
        g.append(a / max(a + b, 1e-12))
    return g


def m_step_vec(data, g):
    n = len(data)
    d = len(data[0])
    sum_g = sum(g)
    sum_1mg = n - sum_g

    pi_new = sum_g / n
    mu0 = []
    mu1 = []
    for j in range(d):
        num0 = sum(g[i] * data[i][j] for i in range(n))
        num1 = sum((1 - g[i]) * data[i][j] for i in range(n))
        mu0.append(min(max(num0 / max(sum_g, 1e-9), 1e-6), 1 - 1e-6))
        mu1.append(min(max(num1 / max(sum_1mg, 1e-9), 1e-6), 1 - 1e-6))
    return min(max(pi_new, 1e-6), 1 - 1e-6), mu0, mu1


def loglik_vec(data, pi, mu0, mu1):
    ll = 0.0
    for x in data:
        p = pi * bernoulli_vec_prob(x, mu0) + (1 - pi) * bernoulli_vec_prob(x, mu1)
        ll += math.log(max(p, 1e-12))
    return ll


pi = 0.5
mu0 = [0.6] * 6
mu1 = [0.4] * 6
for t in range(25):
    g = e_step_vec(vec_data, pi, mu0, mu1)
    pi, mu0, mu1 = m_step_vec(vec_data, g)

print('estimated pi =', round(pi, 3))
print('estimated mu0 =', [round(v, 2) for v in mu0])
print('estimated mu1 =', [round(v, 2) for v in mu1])
print('final ll =', round(loglik_vec(vec_data, pi, mu0, mu1), 3))

4. 混合ガウス分布(GMM)とEM

連続値データでは混合ガウスが基本になります。E-stepで負担率を計算し、M-stepで混合比・平均・共分散を更新します。DGM第2回で扱う中心テーマです。

def sample_gmm_2d(n=240):
    # 2成分の簡易データ
    params = [
        {'pi': 0.5, 'mu': (-1.8, -1.2), 'sigma': (0.55, 0.45)},
        {'pi': 0.5, 'mu': (2.2, 1.8), 'sigma': (0.6, 0.5)},
    ]

    xs = []
    zs = []
    for _ in range(n):
        z = 0 if random.random() < params[0]['pi'] else 1
        p = params[z]
        x = (
            random.gauss(p['mu'][0], p['sigma'][0]),
            random.gauss(p['mu'][1], p['sigma'][1]),
        )
        xs.append(x)
        zs.append(z)
    return xs, zs


gmm_data, gmm_true_z = sample_gmm_2d(n=300)
print('first 3 points =', [tuple(round(v, 3) for v in x) for x in gmm_data[:3]])
print('true cluster counts =', gmm_true_z.count(0), gmm_true_z.count(1))

連続データでも同じ発想が残る

GMM へ移ると数式は増えますが、やっていることは同じです。どのガウスがどれだけ担当したかを推定し、その重みで平均や分散を更新します。

def gaussian_pdf_diag(x, mu, var):
    # 対角共分散のみ(教育用に簡略化)
    v0 = max(var[0], 1e-6)
    v1 = max(var[1], 1e-6)
    z0 = (x[0] - mu[0]) ** 2 / v0
    z1 = (x[1] - mu[1]) ** 2 / v1
    coef = 1.0 / (2 * math.pi * math.sqrt(v0 * v1))
    return coef * math.exp(-0.5 * (z0 + z1))


def e_step_gmm_diag(data, pis, mus, vars_):
    gamma = []
    for x in data:
        probs = [pis[k] * gaussian_pdf_diag(x, mus[k], vars_[k]) for k in range(2)]
        s = sum(probs) + 1e-12
        gamma.append([p / s for p in probs])
    return gamma


def m_step_gmm_diag(data, gamma):
    n = len(data)
    nk = [sum(g[k] for g in gamma) for k in range(2)]
    pis = [nk[k] / n for k in range(2)]

    mus = []
    vars_ = []
    for k in range(2):
        mx = sum(gamma[i][k] * data[i][0] for i in range(n)) / max(nk[k], 1e-12)
        my = sum(gamma[i][k] * data[i][1] for i in range(n)) / max(nk[k], 1e-12)
        mus.append((mx, my))

        vx = sum(gamma[i][k] * (data[i][0] - mx) ** 2 for i in range(n)) / max(nk[k], 1e-12)
        vy = sum(gamma[i][k] * (data[i][1] - my) ** 2 for i in range(n)) / max(nk[k], 1e-12)
        vars_.append((max(vx, 1e-4), max(vy, 1e-4)))

    return pis, mus, vars_


def loglik_gmm_diag(data, pis, mus, vars_):
    ll = 0.0
    for x in data:
        p = 0.0
        for k in range(2):
            p += pis[k] * gaussian_pdf_diag(x, mus[k], vars_[k])
        ll += math.log(max(p, 1e-12))
    return ll


pis = [0.5, 0.5]
mus = [(-0.5, -2.5), (1.0, 2.5)]
vars_ = [(1.2, 1.0), (1.0, 1.3)]
ll_trace = []

for t in range(30):
    gamma = e_step_gmm_diag(gmm_data, pis, mus, vars_)
    pis, mus, vars_ = m_step_gmm_diag(gmm_data, gamma)
    ll = loglik_gmm_diag(gmm_data, pis, mus, vars_)
    ll_trace.append(ll)
    if t % 5 == 0 or t == 29:
        print(f'iter={t:02d} pis={[round(v,3) for v in pis]} mus={[tuple(round(u,2) for u in m) for m in mus]} ll={ll:.2f}')

print('monotonic ll =', all(ll_trace[i] <= ll_trace[i+1] + 1e-8 for i in range(len(ll_trace)-1)))

尤度の上がり方と成分分離を見る

最後は EM の反復が本当に分布を分けられているかを、推定された成分パラメータと尤度推移で確認します。

# ソフト割当の例: 境界付近では責任率が0/1に張り付きにくい
mid = ((mus[0][0] + mus[1][0]) / 2, (mus[0][1] + mus[1][1]) / 2)
probe_points = [
    (mid[0], mid[1]),
    (mid[0] + 0.4, mid[1] + 0.2),
    (mid[0] - 0.4, mid[1] - 0.2),
    (mus[0][0], mus[0][1]),
    (mus[1][0], mus[1][1]),
]
for x in probe_points:
    g = e_step_gmm_diag([x], pis, mus, vars_)[0]
    print('x=', tuple(round(v,2) for v in x), 'responsibility=', [round(v, 3) for v in g])

責任率が0/1に張り付かない点が重要です。これにより、境界データを無理にハード割当せずに学習できます。K-meansが苦手な場面でGMMが効く理由の1つです。

5. 実務での注意点

EMは強力ですが、次の点で失敗しやすいです。

AIC/BICや検証データ尤度を使って K を決めるのが基本です。

def bic(loglik, n_samples, n_params):
    return -2 * loglik + n_params * math.log(max(n_samples, 2))


n = len(gmm_data)
# 2成分・2次元・対角分散の粗いパラメータ数:
# pi(1自由度) + mu(2*2) + var(2*2) = 9
bic_score = bic(ll_trace[-1], n_samples=n, n_params=9)
print('final loglik =', round(ll_trace[-1], 2))
print('BIC (rough)  =', round(bic_score, 2))

潜在変数モデルと混合モデルの核は、「見えない変数を推論しながら分布を学習する」ことです。EMはこの目的に対する最も重要な基本アルゴリズムです。

このあとVAEや拡散モデルを学ぶときも、実は同じ構図が続きます。観測されない中間変数をうまく扱うことで、生成の表現力と安定性を上げていく、という流れです。