简介¶

贝叶斯理论认为任意末知量都可以被看作一个随机变量,同时对该变量的描述也可以用一个概率分 布来概括,这就是贝叶斯学派最基本的观点。当这个概率分布事先被人为确定时被称作先验概率分 布,在结合样本 $X$ 后得到的概率分布被称作它的后验概率分布。 经典的贝叶斯公式表达如下: $$ P(\theta \mid X)=\frac{P(X \mid \theta) P(\theta)}{\int P(X \mid \theta) P(\theta) d \theta} $$ 其中, $P(\theta), P(\theta \mid X), P(X \mid \theta)$ 分别称作随机变量 $\theta$ 的先验分布、后验分布和样本 $X$ 的似然函 数。 对于末知量或者隐变量的求解任务,通常情况下都可以使用这个公式来进行后验分布推断,但上式 往往是很难计算的。因为在直观上看,它不仅需要考虑所有的 $\theta$ ,还要保证 $P(X \mid \theta) P(\theta)$ 可以被积分,同时在 $\theta$ 维度较高的情况下它还存在进行多重积分的可能。

后验分布推断任务主要有三种求解方法,它们分别是解析法、采样法和变分法。

  • 解析法就是直接套用上述公式计算后验分布,但这往往会导致计算量过大,一般而言,只有在先验分布是似然函数的共轭先验的情况下,我们才会去考虑使用解析法;
  • 采样法是尝试对后验分布进行采样,使用大量的采样结果来近似结果,但对于高维情况下这种方法通常会更加耗时;
  • 变分法为我们提供了另外的一种思路,它是直接去近似后验分布,通过优化方法来不断地去提高近似程度,相比于采样法,它的 效率会更快。

PyMC3 是一个用 Python 编写的开源的概率编程框架。通过 PyMC3,我们可以灵活地去创建自定义概率模型并进行贝叶斯推断,实现在数据中完成洞悉和学习,同时由于它是基于贝叶斯方法 的,因此在使用过程中常常是需要去指定先验分布来约束我们的模型,从而得到末知量的后验分布的不确定性估计。

采样法¶

使用马尔科夫链蒙特卡洛的 metropolis-Hastings 方法,对目标采样分布 $P(z)$ 进行采样的步骤如下:

  • 第一步:随机选定一个起始点 $z$ ,指定燃烧期 $m$ 和稳定期 $N$ 。
  • 第二步:开始采样,每一轮采样都以上一轮的采样值 $z$ 为均值,方差为 1 ,生成一个正态分布, 然后在这个正态分布中依概率随机选取一个值 $z^*$ 。
  • 第三步:在 $[0,1]$ 的均匀分布中随机生成一 个数 $U$ ,并指定接收概率 $\alpha=\min \left\{1, \frac{P\left(z^*\right)}{P(z)}\right\}$ ,如果 $U<\alpha$ ,则本轮新的采样值为 $z=z^*$ ,否则本轮新的采样值仍为上一轮的 $z$ 。 重复第二步 第三步采样过程 $m+N$ 次,结束后,保 留后 $N$ 次采样结果作为目标分布的近似采样。 为什么使用这个方法来求解后验分布是行得通的?

在本文中,这里的目标采样分布 $P(z)$ 可以被看成 $P(\theta \mid X)$ ,而整个采样过程最核心的是接受概率 $\alpha$ ,从直观上来看,当新采样的 $\theta^*$ 的后验概率大于原来的 $\theta$ 的后验概率时,我们是一定接受这个新的采样值 $\theta^*$ ,而当前者小于后者时,我们则会再根据均匀分布采样值来考虑是否接受 $\theta^*$ ,因此可以看出我们是更倾向于后验概率较大的 $\theta$ ,这也符合常理。 同时,由于使用后验分布相畭的操作,这使得原先在解析法所遇到的积分困难得到了避开。 $$ \begin{aligned} \frac{P\left(\theta^* \mid X\right)}{P(\theta \mid X)} &=\frac{\frac{P\left(X \mid \theta^*\right) P\left(\theta^*\right)}{\int P(X \mid \theta) P(\theta) d \theta}}{\frac{P(X \mid \theta) P(\theta)}{\int P(X \mid \theta) P(\theta) d \theta}} \\ &=\frac{P\left(X \mid \theta^*\right) P\left(\theta^*\right)}{P(X \mid \theta) P(\theta)} \end{aligned} $$ 可以看到,原来需要进行积分的项因为相除操作而被消除。

马尔科夫链蒙特卡洛方法的优势是存在数学证明,当采样数量足够多时最终的采样分布将会与真实的后验分布是一致的。

参考资料

  • https://zhuanlan.zhihu.com/p/515432470

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats

np.random.seed(123)

import warnings
warnings.filterwarnings('ignore')

class sampling_method(object):
    
    def __init__(self):
        pass
    
    def fit(self, data, sigma_true, mu_prior_mu, sigma_prior_mu, mu_init, samples, proposal_width, plot, x=None):
        mu_current = mu_init
        posterior = [mu_current]
        for i in range(samples):

            mu_proposal = stats.norm(mu_current, proposal_width).rvs()

            likelihood_current = stats.norm(mu_current, 1).pdf(data).prod()
            likelihood_proposal = stats.norm(mu_proposal, 1).pdf(data).prod()

            prior_current = stats.norm(mu_prior_mu, sigma_prior_mu).pdf(mu_current)
            prior_proposal = stats.norm(mu_prior_mu, sigma_prior_mu).pdf(mu_proposal)

            p_current = likelihood_current * prior_current
            p_proposal = likelihood_proposal * prior_proposal

            p_accept = p_proposal / p_current

            accept = np.random.rand() < p_accept

            if plot:
                self.plot_proposal(mu_current, mu_proposal, mu_prior_mu, sigma_prior_mu, sigma_true, data, accept, posterior, i, x)

            if accept:
                mu_current = mu_proposal

            posterior.append(mu_current)

        return posterior

    def plot_proposal(self, mu_current, mu_proposal, mu_prior_mu, sigma_prior_mu, sigma_true, data, accepted, trace, i, x):
        from copy import copy
        trace = copy(trace)
        
        fig, (ax1, ax2, ax3, ax4) = plt.subplots(ncols=4, figsize=(16, 4))
        fig.suptitle('Iteration %i' % (i + 1))
        color = 'g' if accepted else 'r'

        prior_current = stats.norm(mu_prior_mu, sigma_prior_mu).pdf(mu_current)
        prior_proposal = stats.norm(mu_prior_mu, sigma_prior_mu).pdf(mu_proposal)
        prior = stats.norm(mu_prior_mu, sigma_prior_mu).pdf(x) # 为了求它,因此需要传这些参数
        ax1.plot(x, prior)
        ax1.plot([mu_current] * 2, [0, prior_current], marker='o', color='b')
        ax1.plot([mu_proposal] * 2, [0, prior_proposal], marker='o', color=color)
        ax1.annotate("", xy=(mu_proposal, 0.2), xytext=(mu_current, 0.2),
                     arrowprops=dict(arrowstyle="->", lw=2.))
        ax1.set(ylabel='Probability Density', title='current: prior(mu=%.2f) = %.2f\nproposal: prior(mu=%.2f) = %.2f' % (mu_current, prior_current, mu_proposal, prior_proposal))

        likelihood_current = stats.norm(mu_current, sigma_true).pdf(data).prod()
        likelihood_proposal = stats.norm(mu_proposal, sigma_true).pdf(data).prod()
        y = stats.norm(loc=mu_proposal, scale=sigma_true).pdf(x)
        sns.distplot(data, kde=False, norm_hist=True, ax=ax2)
        ax2.plot(x, y, color=color)
        ax2.axvline(mu_current, color='b', linestyle='--', label='mu_current')
        ax2.axvline(mu_proposal, color=color, linestyle='--', label='mu_proposal')
        ax2.annotate("", xy=(mu_proposal, 0.2), xytext=(mu_current, 0.2),
                     arrowprops=dict(arrowstyle="->", lw=2.))
        ax2.set(title='likelihood(mu=%.2f) = %.2f\nlikelihood(mu=%.2f) = %.2f' % (mu_current, 1e14*likelihood_current, mu_proposal, 1e14*likelihood_proposal))

        posterior_analytical = self.calc_posterior_analytical(data, x, mu_prior_mu, sigma_prior_mu)
        ax3.plot(x, posterior_analytical)
        posterior_current = self.calc_posterior_analytical(data, mu_current, mu_prior_mu, sigma_prior_mu)
        posterior_proposal = self.calc_posterior_analytical(data, mu_proposal, mu_prior_mu, sigma_prior_mu)
        ax3.plot([mu_current] * 2, [0, posterior_current], marker='o', color='b')
        ax3.plot([mu_proposal] * 2, [0, posterior_proposal], marker='o', color=color)
        ax3.annotate("", xy=(mu_proposal, 0.2), xytext=(mu_current, 0.2),
                     arrowprops=dict(arrowstyle="->", lw=2.))
        ax3.set(title='posterior(mu=%.2f) = %.5f\nposterior(mu=%.2f) = %.5f' % (mu_current, posterior_current, mu_proposal, posterior_proposal))

        if accepted:
            trace.append(mu_proposal)
        else:
            trace.append(mu_current)
            
        ax4.plot(trace)
        ax4.set(xlabel='iteration', ylabel='mu', title='trace')
        plt.tight_layout()

    def calc_posterior_analytical(self, data, x, mu_0, sigma_0):
        sigma = 1.
        n = len(data)
        mu_post = (mu_0 / sigma_0**2 + data.sum() / sigma**2) / (1. / sigma_0**2 + n / sigma**2)
        sigma_post = (1. / sigma_0**2 + n / sigma**2)**-1
        return stats.norm(mu_post, np.sqrt(sigma_post)).pdf(x)

### 样本分布的假设 正态
mu_true = 1.5
sigma_true = 2
data = stats.norm.rvs(mu_true, sigma_true, size=200)

### mu的先验分布设置 正态
mu_prior_mu = 0.5
sigma_prior_mu = 1

### mu更新的初始值设置
mu_init = 0  

### 采样法的其它参数
samples = 120  ### 被采样的次数
proposal_width = 0.5
plot = True
x = np.linspace(-5, 5, 5000)
In [2]:
sm = sampling_method()
sm.fit(data, sigma_true, mu_prior_mu, sigma_prior_mu, mu_init, samples, proposal_width, plot, x);

上面的每一行分别表示 Metropolis 采样的一次迭代过程: 第一列是 $\mu$ 的先验分布 $P(\mu)$ ,可以看到该分布是静态的,我们只是在其中画出了每次提出跳转 的 $\mu$ ,其中蓝线表示原先的 $\mu$ ,红线和绿线均表示本次迭代新采样的 $\mu$ ,只是前者表示拒绝而后 者是被接受;
第二列是似然函数 $P(X \mid \theta)$ ,蓝色的直方图表示样本数据,用来衡量模型对数据的解释能力,可 以看到似然函数随着 $\mu$ 的变化而变化。直观上看,似然函数与样本数据的直方图的重叠度越大, 模型对数据的解释能力越好; 第三列是后验分布 $P(\theta \mid X)$ ,这里是直接画出了正态分布的后验,也就是使用解析法来确定后验 分布,其实我们可以直接先验乘上似然来得到没有归一化的后验分布。
第四列表示的是迹,这里保存了所有的采样 $\mu$ ,如果采样值被拒绝的话,就保留上一个采样值, 从图中看起来就是一条水平的线。

In [ ]: