Ben's Blog


Coding the Viterbi Algorithm in Numpy

A demo of how to code the Viterbi algorithm in Numpy.

Mar 15, 2020

This tutorial explains how to code the Viterbi algorithm in Numpy, and gives a minor explanation. I’m using Numpy version 1.18.1 and Python 3.7, although this should work for any future Python or Numpy versions.

Resources

The Viterbi algorithm has been widely covered in many areas. One of my favorite explanations is from the Youtuber Mathematical Monk. This tutorial expects some background familiarity with hidden Markov models; if you’ve never heard of them, check out those videos! The Wikipedia page is also pretty good. Here’s a diagram of an unfolded hidden Markov model:

Emission Probabilities Transition Probabilities Time

Problem Statement

The goal of the Viterbi algorithm is to compute the most probable sequence of hidden states zz^* for a Hidden Markov Model defined by an observed sequence xx and a set of possible sequences of hidden states zz:

z=argminz[p(z,x)]z^* = \underset{z}{\mathrm{argmin}}[p(z,x)]

We assume that we know the observed sequence x=(x1,...,xn)x = (x_1, ..., x_n), the set of initial hidden probabilities z1=(z11,...,z1L)z_1 = (z_1^1, ..., z_1^L), the emission probabilities p(xizj)p(x_i \| z_j) and the transition probabilities p(zizi1)p(z_i \| z_{i-1}). In Numpy, these can be represented as arrays:

import numpy as np
from typing import List, Optional, Tuple

num_hidden_states = 3
num_observed_states = 2
num_time_steps = 4

# Initializes the transition probability matrix.
transition_probs = np.array([
    [0.1, 0.2, 0.7],
    [0.1, 0.1, 0.8],
    [0.5, 0.4, 0.1],
])
assert transition_probs.shape == (num_hidden_states, num_hidden_states)
assert transition_probs.sum(1).mean() == 1

# Initializes the emission probability matrix.
emission_probs = np.array([
    [0.1, 0.9],
    [0.3, 0.7],
    [0.5, 0.5],
])
assert emission_probs.shape == (num_hidden_states, num_observed_states)
assert emission_probs.sum(1).mean()

# Initalizes the initial hidden probabilities.
init_hidden_probs = np.array([0.1, 0.3, 0.6])
assert init_hidden_probs.shape == (num_hidden_states,)

# Defines the sequence of observed states.
observed_states = [1, 1, 0, 1]
assert len(observed_states) == num_time_steps

# Placeholder defining how we'll call the Viterbi algorithm.
max_seq, seq_prob = viterbi(
    emission_probs,
    transition_probs,
    init_hidden_probs,
    observed_states,
)
# max_seq: [2, 0, 2, 0]
# seq_prob: 0.0212625

Note that these matrices represent the probability distributions in column-major format, meaning that the column dimension sums to 1. Note also that if we represent the hidden state probabilities as a vector, then doing vector-matrix multiplication into the transition_probs matrix will give us the new hidden states, and doing vector-matrix multiplication into the emission_probs matrix will give us the observed states.

Maximizing Value

The maximum value of p(z,x)p(z, x) for all the preceding timesteps up to the current timestep 1,...,k1, ..., k is

μk(zk)=maxz1:k1p(z1:k,x1:k)=maxz1:k1p(xkzk)p(zkzk1)p(z1:k1,x1:k1)=maxzk1p(xkzk)p(zkzk1)maxz1:k2p(z1:k1,x1:k1) \begin{aligned} \mu_k(z_k) & = \max_{z_{1:k-1}} p(z_{1:k}, x_{1:k}) \\ & = \max_{z_{1:k-1}} p(x_k|z_k) p(z_k|z_{k-1}) p(z_{1:k-1}, x_{1:k-1}) \\ & = \max_{z_{k-1}} p(x_k|z_k) p(z_k|z_{k-1}) \max_{z_{1:k-2}} p(z_{1:k-1},x_{1:k-1}) \end{aligned}

Since the first two terms aren’t dependent on anything besides zk1z_{k-1}, you can take them out of the whole max operation.

μk(zk)=p(xkzk)maxzk1p(zkzk1)μk1(zk1)\mu_k(z_k) = p(x_k|z_k) \max_{z_{k-1}} p(z_k|z_{k-1}) \mu_{k-1}(z_{k-1})

For the recursion base case, we get

μ1(z1)=p(z1,x1)=p(z1)p(x1z1) \begin{aligned} \mu_1(z_1) & = p(z_1, x_1) \\ & = p(z_1) p(x_1 | z_1) \end{aligned}

Knowing the maximizing value, we can find the argmax by using backpointers - for some max value μk(zk)\mu_k(z_k) we get the incoming index as the argmaxzk1p(zkzk1)μk1(zk1)arg \max_{z_{k-1}} p(z_k \| z_{k-1}) \mu_{k-1}(z_{k-1}). By doing this recursively, we can get the (reversed) sequence of indices that led to the final maximum value.

def step(mu_prev: np.ndarray,
         emission_probs: np.ndarray,
         transition_probs: np.ndarray,
         observed_state: int) -> Tuple[np.ndarray, np.ndarray]:
    """Runs one step of the Viterbi algorithm.

    Args:
        mu_prev: probability distribution with shape (num_hidden),
            the previous mu
        emission_probs: the emission probability matrix (num_hidden,
            num_observed)
        transition_probs: the transition probability matrix, with
            shape (num_hidden, num_hidden)
        observed_state: the observed state at the current step

    Returns:
        - the mu for the next step
        - the maximizing previous state, before the current state,
          as an int array with shape (num_hidden)
    """

    pre_max = mu_prev * transition_probs.T
    max_prev_states = np.argmax(pre_max, axis=1)
    max_vals = pre_max[np.arange(len(max_prev_states)), max_prev_states]
    mu_new = max_vals * emission_probs[:, observed_state]

    return mu_new, max_prev_states


def viterbi(emission_probs: np.ndarray,
            transition_probs: np.ndarray,
            start_probs: np.ndarray,
            observed_states: List[int]) -> Tuple[List[int], float]:
    """Runs the Viterbi algorithm to get the most likely state sequence.

    Args:
        emission_probs: the emission probability matrix (num_hidden,
            num_observed)
        transition_probs: the transition probability matrix, with
            shape (num_hidden, num_hidden)
        start_probs: the initial probabilies for each state, with shape
            (num_hidden)
        observed_states: the observed states at each step

    Returns:
        - the most likely series of states
        - the joint probability of that series of states and the observed
    """

    # Runs the forward pass, storing the most likely previous state.
    mu = start_probs * emission_probs[:, observed_states[0]]
    all_prev_states = []
    for observed_state in observed_states[1:]:
        mu, prevs = step(mu, emission_probs, transition_probs, observed_state)
        all_prev_states.append(prevs)

    # Traces backwards to get the maximum likelihood sequence.
    state = np.argmax(mu)
    sequence_prob = mu[state]
    state_sequence = [state]
    for prev_states in all_prev_states[::-1]:
        state = prev_states[state]
        state_sequence.append(state)

    return state_sequence[::-1], sequence_prob

Complexity

The time complexity of the whole inference is O(NM2)O(N M^2) for NN time steps and MM observed states (although the M2M^2 term represents a matrix-matrix multiplication, which Numpy helps accelerate using BLAS). The space complexity is O(NM)O(N M), since we have to store the most likely previous state for each time step.