This notebook aims at illustrating the theory presented in Lecture 9 Markov Processes of my class on Macroeconomics II. It presents three examples of Markov chain processes:
For each of these examples, we simulate the Markov chain and the evolution of the distribution of states.
Randall Romero-Aguilar, PhD
October 2017
import numpy as np
from warnings import warn
import seaborn
import matplotlib.pyplot as plt
%matplotlib inline
We first write up a class to represent the Markov chain, and to define how to plot its simulations and distribution.
class MarkovChain:
def __init__(self, P, pi0=None, z=None):
"""
Parameters
----------
P transition probability matrix, n.n
pi0 initial distribution, n vector
z stochastic outcomes, n vector
"""
P = np.atleast_2d(P)
n, n2 = P.shape
assert n == n2, "P must be a square array"
assert np.all(P >= 0), "P elements must be nonnegative"
assert np.allclose(P.sum(1), 1), "The rows of P must sum to 1"
self.P = P
self.n = n
self.cdf = P.cumsum(1)
if pi0 is None:
self.pi0 = self.stationary_distribution if self.has_unique_stationary_distribution else np.ones((n))/n
else:
pi0 = np.asarray(pi0)
assert pi0.ndim == 1 and pi0.size == n, "pi0 must be a vector with %d elements" % n
assert np.all(pi0 >= 0), "pi0 elements must be nonnegative"
assert np.allclose(pi0.sum(), 1), "The elements of pi0 must sum to 1"
self.pi0 = pi0
if z is None:
self.outcomes = np.arange(n)
else:
z = np.asarray(z)
assert z.ndim == 1 and z.size == n, "z must be a vector with %d elements" % n
self.outcomes = z
self.string_outcomes = isinstance(self.outcomes[0], str)
@property
def stationary_distribution(self):
if self.has_unique_stationary_distribution:
A = np.vstack((np.eye(self.n)-self.P.T, np.ones((1,self.n))))
b = np.hstack((np.zeros(self.n), np.ones((1))))
return np.linalg.lstsq(A,b)[0]
else:
warn("This Markov process does not have a unique stationary distribution")
@property
def has_unique_stationary_distribution(self): #fixme: ugly implementation
pn = self.P
for it in range(50):
if np.any(pn == 0.0):
pn = pn @ self.P
else:
return True
return False
def rand(self, j):
"""
Draw next state
Parameters
----------
j current state, integer = 0, 1,...,n-1
Returns
-------
Integer: next state
"""
return (np.random.rand() > self.cdf[j]).sum()
def simulate(self, T: int = 120, j: int = 0):
"""
Simulate the Markov process
Parameters
----------
T
j
Returns
-------
Vector: array of simulated values
"""
assert j in range(self.n), 'Initial state must be a nonnegative integer less than %d' % self.n
y = np.empty((1 + T))
y[0] = j if self.string_outcomes else self.outcomes[j]
for t in range(T):
j = self.rand(j)
y[t + 1] = j if self.string_outcomes else self.outcomes[j]
return y
def unconditional_distribution(self, T=24):
prob = np.empty((T+1, self.n))
prob[0] = self.pi0
for t in range(T):
prob[t + 1] = np.dot(prob[t], self.P)
return prob
def plot(self, ax, T=120, j=0, *args):
y = self.simulate(T, j)
ax.plot(y, '-o', drawstyle='steps', *args)
if isinstance(self.outcomes[0], str):
ax.set_ylim([-0.5, self.n - 0.5])
ax.set_yticks(np.arange(self.n))
ax.set_yticklabels(self.outcomes)
else:
ax.set_yticks(np.sort(self.outcomes))
def plot_probability(self, ax, T=24):
p = self.unconditional_distribution(T).T
lb = np.zeros((T+1))
current_palette = seaborn.color_palette(n_colors=self.n)
tt = np.arange(1+T)
for k in range(self.n):
ax.bar(tt, p[k], bottom=lb, color=current_palette[k], label=str(self.outcomes[k]))
lb += p[k]
ax.set_ylim([0, 1.25])
ax.legend(loc='upper right', ncol=self.n)
def print_stationary(self):
if self.has_unique_stationary_distribution:
for z, p in zip(self.outcomes, self.stationary_distribution):
print(f'{z:6s} {100*p:9.2f}')
def __repr__(self):
txt = 'A Markov Chain process with transition probability\n'
txt += self.P.__str__()
txt += '\n\nand possible outcomes\n'
txt += self.outcomes.__str__()
return txt
Set the simulation horizon to $T=48$ periods.
T=48
A worker can either be employed or unemployed:
The worker is unemployed at $t = 0$. Then the Markov chain is:
outcomes = ['unemployed', 'employed']
P = [
[0.55, 0.45],
[0.05, 0.95],
]
pi0 = [1, 0]
worker = MarkovChain(P, pi0, outcomes)
fig, (ax0,ax1) = plt.subplots(2, 1, sharex=True, figsize=[12,8])
worker.plot(ax0, T, 0)
worker.plot_probability(ax1,T)
worker.print_stationary()
outcomes = ['single', 'married', 'divorced']
P = [
[0.92, 0.08, 0.00],
[0.00, 0.65, 0.35],
[0.00, 0.15, 0.85]
]
civil = MarkovChain(P, pi0=None, z=outcomes)
fig, (ax0,ax1) = plt.subplots(2, 1, sharex=True, figsize=[12,8])
civil.plot(ax0, T, 0)
civil.plot_probability(ax1,T)
civil.print_stationary()
P = [
[90.34, 5.62, 0.39, 0.08, 0.03, 0, 0, 0, 3.54],
[0.64, 88.78, 6.72, 0.47, 0.06, 0.09, 0.02, 0.01, 3.21],
[0.07, 2.16, 87.94, 4.97, 0.47, 0.19, 0.01, 0.04, 4.15],
[0.03, 0.24, 4.56, 84.26, 4.19, 0.76, 0.15, 0.22, 5.59],
[0.03, 0.06, 0.4, 6.09, 76.09, 6.82, 0.96, 0.98, 8.57],
[0, 0.09, 0.29, 0.41, 5.11, 74.62, 3.43, 5.3, 10.75],
[0.13, 0, 0.26, 0.77, 1.66, 8.93, 53.19, 21.94, 13.12],
[0, 0, 0, 0, 1, 3.1, 9.29, 51.29, 35.32],
[0, 0, 0, 0, 0, 0.1, 8.55, 74.06, 17.29]
]
P = np.array(P)/100
outcomes = ['AAA', 'AA', 'A', 'BBB', 'BB', 'B', 'CCC', 'D', 'N.R.']
initial = [1, 0, 0, 0, 0, 0, 0, 0, 0]
ratings = MarkovChain(P,initial, outcomes)
fig, (ax0,ax1) = plt.subplots(2, 1, sharex=True, figsize=[12,8])
ratings.plot(ax0, T, 0)
ratings.plot_probability(ax1,T)
ratings.print_stationary()