HMM_Baum_Welch

HMM_Baum_Welch

算法原理简述:

Baum_Welch算法是在给定观测序列而不知道状态序列的情况下,如何估计HMM模型中的A,B,PI参数的算法。因为存在隐遍历状态序列所以我们可以利用EM算法来对参数进行估计。EM算法此处不再赘述,其中算法的迭代过程主要用到了先前博客中提到的前向函数(alpha_func)和后向函数(beta_func)以及根据这两个函数所计算得到gamma_func以及kesi_func;具体的参数迭代过程和公式可以参考李航老师的书籍的隐马尔科夫模型一章。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : HMM_Baum_Welch.py
# @Author: mfzhu
# @Date : 2017/5/5
# @Desc :
import numpy as np
import copy
class HMM_Baum_Welch(object):
def __init__(self, input_data, num_state, num_observation):
self.time_length = len(input_data)
# 记录时线的长度
self.num_state = num_state
# 记录状态变量的个数
self.num_observation = num_observation
# 记录观测变量的个数
self.data = input_data
# 记录当前输入观测向量的数据
self.iteration = 5
# 迭代次数
# self.state2state = np.ones([self.num_state, self.num_state]) / self.num_state
# self.state2observation = np.ones([self.num_state, self.num_observation]) / self.num_observation
# self.pi = np.ones([1, self.num_state])[0] / self.num_state
# 以上注释部分为原始代码,为了测试代码正确性利用李航老师书上数据进行测试
self.state2state = np.array([[0.5, 0.2, 0.3], [0.3, 0.5, 0.2], [0.2, 0.3, 0.5]])
self.state2observation = np.array([[0.5, 0.5], [0.4, 0.6], [0.7, 0.3]])
self.pi = np.array([0.2, 0.4, 0.4])
# 测试数据
def alpha_func(self, time, sequence):
# 计算时间为time,当前观测为sequence的前向权重向量
alpha = self.pi * self.state2observation[:, sequence[0]]
for i in range(1, time):
nextAlpha = []
for j in range(self.num_state):
perState = alpha * self.state2state[:, j]
nextAlpha.append(np.sum(perState) * self.state2observation[j, sequence[i]])
alpha = copy.deepcopy(nextAlpha)
return np.array(alpha)
def beta_func(self, time, sequence):
# 计算时间为time,当前观测为sequence的后向权重向量
beta = np.ones([1, self.num_state])[0]
for i in range(len(sequence), time, -1):
prebeta = []
if i == 1:
beta = beta * self.pi * self.state2observation[:, sequence[0]]
break
for j in range(self.num_state):
prebeta.append(np.sum(self.state2state[j, :] * self.state2observation[:, sequence[i - 1]] * beta))
beta = copy.deepcopy(prebeta)
return np.array(beta)
def gamma_func(self, time, sequence):
# 计算时间为time时的各个状态的对应概率向量gamma
alpha = self.alpha_func(time, sequence)
beta = self.beta_func(time, sequence)
return (alpha * beta) / np.sum(alpha * beta)
def kesi_func(self, time, sequence):
# 计算时间为time, 状态为i,时间为time+1,状态为j的kesi函数
# 取各个状态的可能最后生成一个矩阵kesi
alpha = self.alpha_func(time, sequence)
beta = self.beta_func(time + 1, sequence)
kesi = np.zeros([self.num_state, self.num_state])
demo = 0
for i in range(self.num_state):
for j in range(self.num_state):
demo += alpha[i] * self.state2state[i, j] \
* self.state2observation[j, sequence[time]] * beta[j]
for i in range(self.num_state):
for j in range(self.num_state):
kesi[i, j] = alpha[i] * self.state2state[i, j] * \
self.state2observation[j, sequence[time]] * beta[j]
kesi /= demo
return kesi
def train(self):
# 根据以上函数和输入数据进行训练模型得到模型的参数
for i in range(self.iteration):
# 迭代次数的循环
new_pi = self.gamma_func(1, self.data)
# 关于新的初始概率的计算
new_state2state = np.zeros([self.num_state, self.num_state])
# 定义新的概率转移矩阵
demo = np.zeros([1, self.num_state])[0]
# 计算矩阵中所用的分母
numer = np.zeros([self.num_state, self.num_state])
# 计算矩阵中所用到的分子
for t in range(1, self.time_length):
demo += self.gamma_func(t, self.data)
# 计算分母
for t in range(1, self.time_length):
numer += self.kesi_func(t, self.data)
# 计算分子
for i in range(self.num_state):
for j in range(self.num_state):
new_state2state[i][j] = numer[i, j] / np.sum(demo[i])
# 计算状态转移矩阵的各个概率
new_state2observation = np.zeros([self.num_state, self.num_observation])
# 初始化状态和观测值的转移概率
demo = np.zeros([1, self.num_state])[0]
# 初始化计算分母所用的向量
for t in range(1, self.time_length + 1):
demo += self.gamma_func(t, self.data)
# 计算分母
for k in range(self.num_observation):
# k对应观测序列的可能进行循环
numer = np.zeros([1, self.num_state])[0]
# 每个k对应一个分子向量
for t in range(1, self.time_length + 1):
# 在所有的状态序列中找到时间下和k观测相同的进行计算
if k == self.data[t - 1]:
numer += self.gamma_func(t, self.data)
# 相同的情况计算分子
new_state2observation[:, k] = numer / demo
# 对应每个k更新矩阵
self.pi = new_pi
self.state2state = new_state2state
self.state2observation = new_state2observation
# 分布更新状态转移矩阵,观测转移矩阵,初始状态向量
return self.state2state, self.state2observation, self.pi
if __name__ == '__main__':
test_data = np.array([0, 1, 0])
hmm_bm = HMM_Baum_Welch(test_data, 3, 2)
s2s, s2o, pi = hmm_bm.train()
print("the initial probality is:")
print(pi)
print("the state matrix is:")
print(s2s)
print("the observation matrix is:")
print(s2o)

运行结果:

在程序中为了测试主要是利用了李航老师课本的原始数据来验证程序的正确性。在进行5轮的迭代后得到如下图的结果,通过和他人的博客结果进行比较可知程序结果的正确性。
img