Markov examples

This notebook aims at illustrating the theory presented in Lecture 9 Markov Processes of my class on Macroeconomics II. It presents three examples of Markov chain processes:

  • Unemployment rate
  • Marital status
  • Credit ratings

For each of these examples, we simulate the Markov chain and the evolution of the distribution of states.

Randall Romero-Aguilar, PhD

October 2017

In [1]:
import numpy as np
from warnings import warn
import seaborn
import matplotlib.pyplot as plt

%matplotlib inline

Preparation

We first write up a class to represent the Markov chain, and to define how to plot its simulations and distribution.

In [2]:
class MarkovChain:

    def __init__(self, P, pi0=None, z=None):
        """

        Parameters
        ----------
        P   transition probability matrix, n.n
        pi0 initial distribution, n vector
        z   stochastic outcomes, n vector
        """
        P = np.atleast_2d(P)
        n, n2 = P.shape
        assert n == n2, "P must be a square array"
        assert np.all(P >= 0), "P elements must be nonnegative"
        assert np.allclose(P.sum(1), 1), "The rows of P must sum to 1"

        self.P = P
        self.n = n
        self.cdf = P.cumsum(1)

        if pi0 is None:
            self.pi0 = self.stationary_distribution if self.has_unique_stationary_distribution else np.ones((n))/n
        else:
            pi0 = np.asarray(pi0)
            assert pi0.ndim == 1 and pi0.size == n, "pi0 must be a vector with %d elements" % n
            assert np.all(pi0 >= 0), "pi0 elements must be nonnegative"
            assert np.allclose(pi0.sum(), 1), "The elements of pi0 must sum to 1"
            self.pi0 = pi0

        if z is None:
            self.outcomes = np.arange(n)
        else:
            z = np.asarray(z)
            assert z.ndim == 1 and z.size == n, "z must be a vector with %d elements" % n
            self.outcomes = z

        self.string_outcomes = isinstance(self.outcomes[0], str)

    @property
    def stationary_distribution(self):
        if self.has_unique_stationary_distribution:
            A = np.vstack((np.eye(self.n)-self.P.T, np.ones((1,self.n))))
            b = np.hstack((np.zeros(self.n), np.ones((1))))
            return np.linalg.lstsq(A,b)[0]
        else:
            warn("This Markov process does not have a unique stationary distribution")

    @property
    def has_unique_stationary_distribution(self):  #fixme: ugly implementation
        pn = self.P
        for it in range(50):
            if np.any(pn == 0.0):
                pn = pn @ self.P
            else:
                return True

        return False

    def rand(self, j):
        """
        Draw next state
        Parameters
        ----------
        j current state, integer = 0, 1,...,n-1

        Returns
        -------
        Integer: next state
        """
        return (np.random.rand() > self.cdf[j]).sum()

    def simulate(self, T: int = 120, j: int = 0):
        """
        Simulate the Markov process
        Parameters
        ----------
        T
        j

        Returns
        -------
        Vector: array of simulated values
        """
        assert j in range(self.n), 'Initial state must be a nonnegative integer less than %d' % self.n

        y = np.empty((1 + T))
        y[0] = j if self.string_outcomes else self.outcomes[j]
        for t in range(T):
            j = self.rand(j)
            y[t + 1] = j if self.string_outcomes else self.outcomes[j]

        return y

    def unconditional_distribution(self, T=24):
        prob = np.empty((T+1, self.n))
        prob[0] = self.pi0
        for t in range(T):
            prob[t + 1] = np.dot(prob[t], self.P)
        return prob


    def plot(self, ax, T=120, j=0, *args):
        y = self.simulate(T, j)
        ax.plot(y, '-o', drawstyle='steps', *args)

        if isinstance(self.outcomes[0], str):
            ax.set_ylim([-0.5, self.n - 0.5])
            ax.set_yticks(np.arange(self.n))
            ax.set_yticklabels(self.outcomes)
        else:
            ax.set_yticks(np.sort(self.outcomes))

    def plot_probability(self, ax, T=24):
        p = self.unconditional_distribution(T).T
        lb = np.zeros((T+1))

        current_palette = seaborn.color_palette(n_colors=self.n)

        tt = np.arange(1+T)
        for k in range(self.n):
            ax.bar(tt, p[k], bottom=lb, color=current_palette[k], label=str(self.outcomes[k]))
            lb += p[k]
        ax.set_ylim([0, 1.25])
        ax.legend(loc='upper right', ncol=self.n)

    def print_stationary(self):
        if self.has_unique_stationary_distribution:
            for z, p in zip(self.outcomes, self.stationary_distribution):
                print(f'{z:6s}   {100*p:9.2f}')
    
    def __repr__(self):
        txt = 'A Markov Chain process with transition probability\n'
        txt += self.P.__str__()
        txt += '\n\nand possible outcomes\n'
        txt += self.outcomes.__str__()
        return txt

Set the simulation horizon to $T=48$ periods.

In [3]:
T=48

Example: unemployment

A worker can either be employed or unemployed:

  • If unemployed, she will get a job with probability $p = 45\%$
  • If employed, she will lose her job with probability $q = 5\%$

The worker is unemployed at $t = 0$. Then the Markov chain is:

  • outcomes {unemployed, employed} or $z = \begin{bmatrix}0 \\ 1 \end{bmatrix}$
  • initial probability $\pi_0 = \begin{bmatrix}1 \\ 0 \end{bmatrix}$
  • transition probability $P = \begin{bmatrix}1-p & p \\ q & 1-q \end{bmatrix} = \begin{bmatrix}0.55 & 0.45 \\ 0.05 & 0.95 \end{bmatrix}$
In [4]:
outcomes = ['unemployed', 'employed']

P = [
    [0.55, 0.45],
    [0.05, 0.95],
]

pi0 = [1, 0]


worker = MarkovChain(P, pi0, outcomes)

fig, (ax0,ax1) = plt.subplots(2, 1, sharex=True, figsize=[12,8])
worker.plot(ax0, T, 0)
worker.plot_probability(ax1,T)
worker.print_stationary()
unemployed       10.00
employed       90.00

Example 2: Marital status

In [5]:
outcomes = ['single', 'married', 'divorced']

P = [
    [0.92, 0.08, 0.00],
    [0.00, 0.65, 0.35],
    [0.00, 0.15, 0.85]
]

civil = MarkovChain(P, pi0=None, z=outcomes)

fig, (ax0,ax1) = plt.subplots(2, 1, sharex=True, figsize=[12,8])
civil.plot(ax0, T, 0)
civil.plot_probability(ax1,T)
civil.print_stationary()

Example 3: Credit rating

In [6]:
P = [
    [90.34, 5.62, 0.39, 0.08, 0.03, 0, 0, 0, 3.54],
    [0.64, 88.78, 6.72, 0.47, 0.06, 0.09, 0.02, 0.01, 3.21],
    [0.07, 2.16, 87.94, 4.97, 0.47, 0.19, 0.01, 0.04, 4.15],
    [0.03, 0.24, 4.56, 84.26, 4.19, 0.76, 0.15, 0.22, 5.59],
    [0.03, 0.06, 0.4, 6.09, 76.09, 6.82, 0.96, 0.98, 8.57],
    [0, 0.09, 0.29, 0.41, 5.11, 74.62, 3.43, 5.3, 10.75],
    [0.13, 0, 0.26, 0.77, 1.66, 8.93, 53.19, 21.94, 13.12],
    [0, 0, 0, 0, 1, 3.1, 9.29, 51.29, 35.32],
    [0, 0, 0, 0, 0, 0.1, 8.55, 74.06, 17.29]
]

P = np.array(P)/100
outcomes = ['AAA', 'AA', 'A', 'BBB', 'BB', 'B', 'CCC', 'D', 'N.R.']
initial = [1, 0, 0, 0, 0, 0, 0, 0, 0]

ratings = MarkovChain(P,initial, outcomes)

fig, (ax0,ax1) = plt.subplots(2, 1, sharex=True, figsize=[12,8])
ratings.plot(ax0, T, 0)
ratings.plot_probability(ax1,T)
ratings.print_stationary()
AAA           0.28
AA            0.88
A             2.78
BBB           4.06
BB            5.75
B            11.25
CCC          12.93
D            40.29
N.R.         21.78