import cmdstanpyimport arviz as azimport numpy as npimport iqplotimport bebi103import bokeh.ioimport bokeh.plottingbokeh.io.output_notebook()
Loading BokehJS ...
We have posited that we can use MCMC to sample out of an arbitrary distribution so long as we can write down the PDF. We will put this to use in Bayesian inference contexts soon, but for now, let us generate samples out of a nontrivial distribution, that for the arrival times of a nonhomogeneous Poisson process. In doing so, we will expose some neat features of Stan.
You will recall from ?sec-variable-spiking that we can use thinning to sample out of a nonhomogeneous Poisson process. We repeat the sample_nhpp() function below in a (folded) code cell.
Code
def sample_nhpp(beta, beta_j, t_j, beta_args=()):"""Draw arrival times of a nonhomogeneous Poisson process. Parameters ---------- beta : function, call signature beta(t, *beta_args) The function of the rate of arrivals as a function of time. beta_j : scalar or array_like If scalar, a value of beta that is greater than beta(t) for all time. If array_like, then beta_j[j] > beta(t) for all times between t_j[j-1] and t_j[j]. t_j : scalar or array_like If scalar, the maximum time of observation. If array_like, must be the same length of `beta_j`. beta_j[j] is the value of the the upper bound of the rate for the interval between t[j-1] and t[j]. beta_args : tuple, default () Any other arguments passed to beta. Returns ------- output : Numpy array Times for arrivals of the nonhomogeneous Poisson process. Notes ----- .. This is an implementation of the algorithm on page 86 of Simulation, 5th Ed., by Sheldon Ross, Academic Press, 2013. """# Convert scalar inputs to arraysif np.isscalar(beta_j): beta_j = np.array([beta_j])if np.isscalar(t_j): t_j = np.array([t_j])# Make sure dimensions matchiflen(beta_j) !=len(t_j):raiseRuntimeError(f'`beta_j` is length {len(beta_j)} 'f'and `t_j` is length {len(t_j)}. ''They must have the same length.' )return _sample_nhpp(beta, beta_j, t_j, beta_args)def _sample_nhpp(beta, beta_j, t_j, beta_args=()):""" Parameters ---------- beta : function, call signature beta(t, *beta_args) The function of the rate of arrivals as a function of time. beta_j : Numpy array If scalar, a value of beta that is greater than beta(t) for all time. If array_like, then beta_j[j] > beta(t) for all times between t_j[j-1] and t_j[j]. t_j : Numpy array Must be the same length of `beta_j`. beta_j[j] is the value of the the upper bound of the rate for the interval between t[j-1] and t[j]. beta_args : tuple, default () Any other arguments passed to beta. Returns ------- output : Numpy array Times for arrivals of the nonhomogeneous Poisson process. Notes ----- .. This is an implementation of the algorithm on page 86 of Simulation, 5th Ed., by Sheldon Ross, Academic Press, 2013. """# Number of samples to take before concatenating arrays n_samples =1000# Initializations t =0.0# time j =0# Index in beta_j and t_j arrays i =0# index in sample array n =0# total number of samples drawn x_from_boundary =False# If we've hit a boundary of# Array for storing subtrajectory samples = np.empty(n_samples, dtype=float)# Output array for all samples samples_output = np.array([], dtype=float)# Loop until done (we exceed final time point) not_done =Truewhile not_done:# Take samples until we fill array# We do it this way for speed to avoid list append operationswhile not_done and i < n_samples:if x_from_boundary: x = (x - t_j[j] + t) * beta_j[j] / beta_j[j+1] t = t_j[j] j +=1else: x = np.random.exponential(1/ beta_j[j])if t + x > t_j[j]:# If we got here, we went past the edge of this intervalif j ==len(t_j) -1: not_done =Falseelse: x_from_boundary =Trueelse: t += x x_from_bounday =Falseif np.random.uniform() <= beta(t, *beta_args) / beta_j[j]: samples[i] = t i +=1 n +=1 samples_output = np.concatenate((samples_output, samples)) i =0return np.array(samples_output[:n])
We can again sample out of this distribution using the thinning technique so we have some samples to compare. We will again have a sinusoidal variation in arrival rate.
def beta(t, offset, tau):"""Poisson process arrival rate as a function of t"""return offset + np.sin(t / tau)# Parameters for arrival rate functionoffset =1.15tau =10beta_args = (offset, tau)# Sample arrival timesarrival_times = sample_nhpp(beta, 2.15, 500, beta_args=beta_args)# Plot the arrivalsp = iqplot.strip(arrival_times, marker='dash', marker_kwargs=dict(alpha=0.3))bokeh.io.show(p)
We can write down the probability density function for the arrival times, given by {Equation 16.11}. Barring any chronic issues with the sampler, we can use Markov chain Monte Carlo to do the sampling.
Here, I present the Stan code for doing so, and then discuss its contents.
functions {real beta_(real t, real offset_, real tau) {return offset_ + sin(t / tau); }vector beta_(vector t, real offset_, real tau) {return offset_ + sin(t / tau); }real integral_beta(real tn, real offset_, real tau) {return tau + offset_ * tn - tau * cos(tn / tau); }}data {int n;real offset_;real tau;}parameters {positive_ordered[n] arrival_times;}model {target += -integral_beta(arrival_times[n], offset_, tau);target += sum(log(beta_(arrival_times, offset_, tau)));}
First, we see a demonstration of how functions in the functions block work. Each function is declared with its return type. The function beta_, for example, returns a real, so it is declared as real beta_(...). (We name the function beta_ instead of beta because beta is part of the Stan language, standing for the Beta distribution.) The type of each argument must also be specified, separated by commas. Each function must also have a return statement, with the value being returned following return.
Next, not how we used the data block to enable passing in parameters. The data block is used to pass anything that does not change or get sampled in a Stan program.
In the parameters block, we define a positive_ordered data type that we use for the arrival times. This is a convenient data type in Stan. It comprises a vector where all entries are positive, and they are ordered from smallest to largest in magnitude. Since the arrival times are indeed positive and ordered, this data type is perfect for the present application.
In the model block, we add values to target. target is a special Stan variable that is the log probability density of the distribution being sampled out of, the so-called target distribution. Under the hood, sampling statements, such as x ~ std_normal(), add the appropriate quantities to target. When a sampling statement is not available available for a given distribution, as is the case here, we directly add the appropriate log probability density to target.
Now that we have our code, let’s compile it and give it a whirl! Since we aim to draw a single set of spike times, we will only use one chain and take one sample (since a single sample is 500 spikes). We will still take 1000 warm up samples (though this is excessive; we do not need that many).
# Compile Stan codesm = cmdstanpy.CmdStanModel(stan_file='sinusoid_nhpp.stan')# Parameters for our toy sinusoid and number of spikes to generate.data =dict(n=1000, offset_=1.5, tau=10.0)# Sample using Stansamples = sm.sample( data=data, chains=1, iter_sampling=1,)# Convert to ArviZ InferenceData instancesamples = az.from_cmdstanpy(posterior=samples)
19:17:29 - cmdstanpy - INFO - compiling stan file /Users/bois/Dropbox/git/datasai/2025/content/content/lessons/sampling/sinusoid_nhpp.stan to exe file /Users/bois/Dropbox/git/datasai/2025/content/content/lessons/sampling/sinusoid_nhpp
19:17:33 - cmdstanpy - INFO - compiled model executable: /Users/bois/Dropbox/git/datasai/2025/content/content/lessons/sampling/sinusoid_nhpp
19:17:33 - cmdstanpy - INFO - CmdStan start processing
19:17:44 - cmdstanpy - INFO - CmdStan done processing.
Let’s plot to see if we get the same kind of set of spikes as in the sampling by thinning.
To demonstrate that we are actually sampling out of the correct distribution, we can approximate the arrival rate by counting how many spikes arrive within each one-time-unit bin. To do that, we will get many more samples and then perform the binning and average the number of spikes that arrived in each bin for each sample.
First, the samples….
samples = sm.sample(data=data)# Convert to ArviZ InferenceData instancesamples = az.from_cmdstanpy(posterior=samples)
19:17:44 - cmdstanpy - INFO - CmdStan start processing
19:18:07 - cmdstanpy - INFO - CmdStan done processing.
Now we can reshape the data, stacking all of the draws together in a single Numpy array.
Now we can compute the mean number of spikes per unit time in each one-time-unit bin.
# Compute bins and summingbins = np.arange(501)hists = []for spikes in arrival_times: hist, _ = np.histogram(spikes, bins=bins) hists.append(hist)# Compute average bin countsapprox_rate = np.array(hists).mean(axis=0)
To make the plot, it is useful to have a function that can convert bins-histogram values to \(x\) and \(y\) coordinates to plot a histogram.
def binshist_to_xy(bins, hist):"""Convert bins/histogram data generated by np.histogram to x-y values for plotting. Parameters ---------- bins : Numpy array Edges of bins for histogram hist : Numpy array Values of histogram; len(hist) = len(bins) - 1. Returns ------- x : Numpy array, shape (2 * len(hist),) x-values for plotting y : Numpy array, shape (2 * len(hist),) y-values for plotting """# Quick check of inputiflen(bins.shape) !=1orlen(hist.shape) !=1:raiseRuntimeError('`bins` and `hist` must both be 1D arrays.')iflen(bins) !=len(hist) +1:raiseRuntimeError('Must have `len(bins) = len(hist) + 1`.')# x-values are achieved by interleaving interior bin edges# and then putting in the end binds x = np.empty(2* (len(bins) -1)) x[1:-1:2] = bins[1:-1] x[2:-1:2] = bins[1:-1] x[0] = bins[0] x[-1] = bins[-1]# y-values are achieved by interleaving histogram values y = np.empty(2*len(hist)) y[::2] = hist y[1::2] = histreturn x, y
This this function in hand, we can make out plot!
p = bokeh.plotting.figure( frame_width=500, frame_height=200, x_axis_label='time', y_axis_label='beta',)p.line(*binshist_to_xy(bins, approx_rate))bokeh.io.show(p)
Beautiful! Indeed we are sampling arrival times out of the correct nonhomogeneous Poisson process.