19  Nonhomogeneous Poisson process arrival times with Stan

Open in Google Colab | Download notebook


Code
# Colab setup ------------------
import os, shutil, sys, subprocess, urllib.request
if "google.colab" in sys.modules:
    cmd = "pip install --upgrade iqplot bebi103 arviz cmdstanpy watermark"
    process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    from cmdstanpy.install_cmdstan import latest_version
    cmdstan_version = latest_version()
    cmdstan_url = f"https://github.com/stan-dev/cmdstan/releases/download/v{cmdstan_version}/"
    fname = f"colab-cmdstan-{cmdstan_version}.tgz"
    urllib.request.urlretrieve(cmdstan_url + fname, fname)
    shutil.unpack_archive(fname)
    os.environ["CMDSTAN"] = f"./cmdstan-{cmdstan_version}"
    data_path = "https://s3.amazonaws.com/bebi103.caltech.edu/data/"
else:
    data_path = "../data/"
# ------------------------------
import cmdstanpy
import arviz as az

import numpy as np

import iqplot

import bebi103

import bokeh.io
import bokeh.plotting
bokeh.io.output_notebook()
Loading BokehJS ...

We have posited that we can use MCMC to sample out of an arbitrary distribution so long as we can write down the PDF. We will put this to use in Bayesian inference contexts soon, but for now, let us generate samples out of a nontrivial distribution, that for the arrival times of a nonhomogeneous Poisson process. In doing so, we will expose some neat features of Stan.

You will recall from ?sec-variable-spiking that we can use thinning to sample out of a nonhomogeneous Poisson process. We repeat the sample_nhpp() function below in a (folded) code cell.

Code
def sample_nhpp(beta, beta_j, t_j, beta_args=()):
    """Draw arrival times of a nonhomogeneous Poisson process.

    Parameters
    ----------
    beta : function, call signature beta(t, *beta_args)
        The function of the rate of arrivals as a function of time.
    beta_j : scalar or array_like
        If scalar, a value of beta that is greater than beta(t)
        for all time. If array_like, then beta_j[j] > beta(t) for
        all times between t_j[j-1] and t_j[j].
    t_j : scalar or array_like
        If scalar, the maximum time of observation. If array_like, must
        be the same length of `beta_j`. beta_j[j] is the value 
        of the the upper bound of the rate for the interval between
        t[j-1] and t[j].

    beta_args : tuple, default ()
        Any other arguments passed to beta.

    Returns
    -------
    output : Numpy array
        Times for arrivals of the nonhomogeneous Poisson process.

    Notes
    -----
    .. This is an implementation of the algorithm on page 86 of 
       Simulation, 5th Ed., by Sheldon Ross, Academic Press, 2013.
    """
    # Convert scalar inputs to arrays
    if np.isscalar(beta_j):
        beta_j = np.array([beta_j])
    if np.isscalar(t_j):
        t_j = np.array([t_j])

    # Make sure dimensions match
    if len(beta_j) != len(t_j):
        raise RuntimeError(
            f'`beta_j` is length {len(beta_j)} '
            f'and `t_j` is length {len(t_j)}. '
            'They must have the same length.'
        )

    return _sample_nhpp(beta, beta_j, t_j, beta_args)

                                                  
def _sample_nhpp(beta, beta_j, t_j, beta_args=()):
    """

    Parameters
    ----------
    beta : function, call signature beta(t, *beta_args)
        The function of the rate of arrivals as a function of time.
    beta_j : Numpy array
        If scalar, a value of beta that is greater than beta(t)
        for all time. If array_like, then beta_j[j] > beta(t) for
        all times between t_j[j-1] and t_j[j].
    t_j : Numpy array
        Must be the same length of `beta_j`. beta_j[j] is the value 
        of the the upper bound of the rate for the interval between
        t[j-1] and t[j].

    beta_args : tuple, default ()
        Any other arguments passed to beta.

    Returns
    -------
    output : Numpy array
        Times for arrivals of the nonhomogeneous Poisson process.

    Notes
    -----
    .. This is an implementation of the algorithm on page 86 of 
       Simulation, 5th Ed., by Sheldon Ross, Academic Press, 2013.
    """
    # Number of samples to take before concatenating arrays
    n_samples = 1000

    # Initializations
    t = 0.0  # time
    j = 0    # Index in beta_j and t_j arrays
    i = 0    # index in sample array
    n = 0    # total number of samples drawn
    x_from_boundary = False  # If we've hit a boundary of

    # Array for storing subtrajectory
    samples = np.empty(n_samples, dtype=float)

    # Output array for all samples
    samples_output = np.array([], dtype=float)

    # Loop until done (we exceed final time point)
    not_done = True
    while not_done:
        # Take samples until we fill array
        # We do it this way for speed to avoid list append operations
        while not_done and i < n_samples:
            if x_from_boundary:
                x = (x - t_j[j] + t) * beta_j[j] / beta_j[j+1]
                t = t_j[j]
                j += 1
            else:
                x = np.random.exponential(1 / beta_j[j])

            if t + x > t_j[j]:
                # If we got here, we went past the edge of this interval
                if j == len(t_j) - 1:
                    not_done = False
                else:
                    x_from_boundary = True
            else:
                t += x
                x_from_bounday = False
                if np.random.uniform() <= beta(t, *beta_args) / beta_j[j]:
                    samples[i] = t
                    i += 1
                    n += 1
        samples_output = np.concatenate((samples_output, samples))
        i = 0

    return np.array(samples_output[:n])

We can again sample out of this distribution using the thinning technique so we have some samples to compare. We will again have a sinusoidal variation in arrival rate.

def beta(t, offset, tau):
    """Poisson process arrival rate as a function of t"""
    return offset + np.sin(t / tau)

# Parameters for arrival rate function
offset = 1.15
tau = 10
beta_args = (offset, tau)

# Sample arrival times
arrival_times = sample_nhpp(beta, 2.15, 500, beta_args=beta_args)

# Plot the arrivals
p = iqplot.strip(arrival_times, marker='dash', marker_kwargs=dict(alpha=0.3))
bokeh.io.show(p)

We can write down the probability density function for the arrival times, given by {Equation 16.11}. Barring any chronic issues with the sampler, we can use Markov chain Monte Carlo to do the sampling.

Here, I present the Stan code for doing so, and then discuss its contents.

functions {
    real beta_(real t, real offset_, real tau) {
        return offset_ + sin(t / tau);
    }

    vector beta_(vector t, real offset_, real tau) {
        return offset_ + sin(t / tau);
    }

    real integral_beta(real tn, real offset_, real tau) {
        return tau + offset_ * tn - tau * cos(tn / tau);
    }
}


data {
    int n;
    real offset_;
    real tau;
}


parameters {
    positive_ordered[n] arrival_times;
}


model {
    target += -integral_beta(arrival_times[n], offset_, tau);
    target += sum(log(beta_(arrival_times, offset_, tau)));
}

First, we see a demonstration of how functions in the functions block work. Each function is declared with its return type. The function beta_, for example, returns a real, so it is declared as real beta_(...). (We name the function beta_ instead of beta because beta is part of the Stan language, standing for the Beta distribution.) The type of each argument must also be specified, separated by commas. Each function must also have a return statement, with the value being returned following return.

Next, not how we used the data block to enable passing in parameters. The data block is used to pass anything that does not change or get sampled in a Stan program.

In the parameters block, we define a positive_ordered data type that we use for the arrival times. This is a convenient data type in Stan. It comprises a vector where all entries are positive, and they are ordered from smallest to largest in magnitude. Since the arrival times are indeed positive and ordered, this data type is perfect for the present application.

In the model block, we add values to target. target is a special Stan variable that is the log probability density of the distribution being sampled out of, the so-called target distribution. Under the hood, sampling statements, such as x ~ std_normal(), add the appropriate quantities to target. When a sampling statement is not available available for a given distribution, as is the case here, we directly add the appropriate log probability density to target.

Now that we have our code, let’s compile it and give it a whirl! Since we aim to draw a single set of spike times, we will only use one chain and take one sample (since a single sample is 500 spikes). We will still take 1000 warm up samples (though this is excessive; we do not need that many).

# Compile Stan code
sm = cmdstanpy.CmdStanModel(stan_file='sinusoid_nhpp.stan')

# Parameters for our toy sinusoid and number of spikes to generate.
data = dict(n=1000, offset_=1.5, tau=10.0)

# Sample using Stan
samples = sm.sample(
    data=data,
    chains=1,
    iter_sampling=1,
)

# Convert to ArviZ InferenceData instance
samples = az.from_cmdstanpy(posterior=samples)
19:17:29 - cmdstanpy - INFO - compiling stan file /Users/bois/Dropbox/git/datasai/2025/content/content/lessons/sampling/sinusoid_nhpp.stan to exe file /Users/bois/Dropbox/git/datasai/2025/content/content/lessons/sampling/sinusoid_nhpp
19:17:33 - cmdstanpy - INFO - compiled model executable: /Users/bois/Dropbox/git/datasai/2025/content/content/lessons/sampling/sinusoid_nhpp
19:17:33 - cmdstanpy - INFO - CmdStan start processing
                                                                                
19:17:44 - cmdstanpy - INFO - CmdStan done processing.

Let’s plot to see if we get the same kind of set of spikes as in the sampling by thinning.

# Plot the arrivals
p = iqplot.strip(
    samples.posterior.arrival_times.squeeze(), 
    marker='dash', 
    marker_kwargs=dict(alpha=0.3),
    x_range=[0, 500],
)
bokeh.io.show(p)

Indeed, we do!

To demonstrate that we are actually sampling out of the correct distribution, we can approximate the arrival rate by counting how many spikes arrive within each one-time-unit bin. To do that, we will get many more samples and then perform the binning and average the number of spikes that arrived in each bin for each sample.

First, the samples….

samples = sm.sample(data=data)

# Convert to ArviZ InferenceData instance
samples = az.from_cmdstanpy(posterior=samples)
19:17:44 - cmdstanpy - INFO - CmdStan start processing
                                                                                                                                                                                                                                                                                                                                
19:18:07 - cmdstanpy - INFO - CmdStan done processing.

Now we can reshape the data, stacking all of the draws together in a single Numpy array.

arrival_times = samples.posterior.arrival_times.stack(
        {'draws': ('chain', 'draw')}
).values.transpose()

Now we can compute the mean number of spikes per unit time in each one-time-unit bin.

# Compute bins and summing
bins = np.arange(501)
hists = []
for spikes in arrival_times:
    hist, _ = np.histogram(spikes, bins=bins)
    hists.append(hist)

# Compute average bin counts
approx_rate = np.array(hists).mean(axis=0)

To make the plot, it is useful to have a function that can convert bins-histogram values to \(x\) and \(y\) coordinates to plot a histogram.

def binshist_to_xy(bins, hist):
    """Convert bins/histogram data generated by np.histogram
    to x-y values for plotting.

    Parameters
    ----------
    bins : Numpy array
        Edges of bins for histogram
    hist : Numpy array
        Values of histogram; len(hist) = len(bins) - 1.

    Returns
    -------
    x : Numpy array, shape (2 * len(hist),)
        x-values for plotting
    y : Numpy array, shape (2 * len(hist),)
        y-values for plotting
    """
    # Quick check of input
    if len(bins.shape) != 1 or len(hist.shape) != 1:
        raise RuntimeError('`bins` and `hist` must both be 1D arrays.')
    if len(bins) != len(hist) + 1:
        raise RuntimeError('Must have `len(bins) = len(hist) + 1`.')
    
    # x-values are achieved by interleaving interior bin edges
    # and then putting in the end binds
    x = np.empty(2 * (len(bins) - 1))
    x[1:-1:2] = bins[1:-1]
    x[2:-1:2] = bins[1:-1]
    x[0] = bins[0]
    x[-1] = bins[-1]

    # y-values are achieved by interleaving histogram values
    y = np.empty(2 * len(hist))
    y[::2] = hist
    y[1::2] = hist

    return x, y

This this function in hand, we can make out plot!

p = bokeh.plotting.figure(
    frame_width=500,
    frame_height=200,
    x_axis_label='time',
    y_axis_label='beta',
)
p.line(*binshist_to_xy(bins, approx_rate))
bokeh.io.show(p)

Beautiful! Indeed we are sampling arrival times out of the correct nonhomogeneous Poisson process.

bebi103.stan.clean_cmdstan()

19.1 Computing environment

%load_ext watermark
%watermark -v -p numpy,cmdstanpy,arviz,iqplot,bebi103,bokeh,jupyterlab
print("cmdstan   :", bebi103.stan.cmdstan_version())
Python implementation: CPython
Python version       : 3.12.11
IPython version      : 9.1.0

numpy     : 2.1.3
cmdstanpy : 1.2.5
arviz     : 0.21.0
iqplot    : 0.3.7
bebi103   : 0.1.27
bokeh     : 3.6.2
jupyterlab: 4.3.7

cmdstan   : 2.36.0