"""Conformal classifiers, regressors, and predictive systems (crepes) martingales
Functions for generating conformal test martingales.
Author: Henrik Boström (bostromh@kth.se)
Copyright 2025 Henrik Boström
License: BSD 3 clause
"""
import numpy as np
import pandas as pd
class BettingFunction():
"""
The class contains two sub-classes: :class:`.EpsilonBettingFunction`,
and :class:`.StepBettingFunction`.
"""
def __init__(self):
pass
[docs]class EpsilonBettingFunction(BettingFunction):
"""
An epsilon betting function generates rewards for a sequence of p-values.
"""
def __init__(self, E = [-1, -0.5, 0, 0.5, 1]):
"""
Parameters
----------
E : list of floats or integers, default=[-1, -0.5, 0, 0.5, 1]
epsilons, where each epsilon is in the range (-2, 2)
Examples
--------
An epsilon betting function using default values is obtained by:
.. code-block:: python
bf = EpsilonBettingFunction()
An epsilon betting function using the values -1, 0 and 1 is obtained by:
.. code-block:: python
bf = EpsilonBettingFunction(E = [-1, 0, 1])
Note
----
A runtime error is reported if not all values in E are within the range
(-2, 2).
"""
if not np.all(np.array(E) > -2) or not np.all(np.array(E) < 2):
raise RuntimeError(("each value in E must be in the range (-2, 2)"))
self.E = np.array(E)
[docs] def apply(self, p_values):
"""
Obtain rewards from p-values.
Parameters
----------
p_values : array-like of shape (n_values,)
p-values
Returns
-------
rewards : ndarray of shape (n_values, n_epsilons)
rewards for each p-value and epsilon, where n_epsilons is the
number of values in E (provided during initialization)
Examples
--------
Assuming that ``p_values`` is a vector of p-values, then rewards using
the default epsilon values are generated by:
.. code-block:: python
rewards = EpsilonBettingFunction().apply(p_values)
Rewards using the epsilon values [-1, 0 and 1] are obtained by:
.. code-block:: python
rewards = EpsilonBettingFunction(E = [-1, 0, 1]).apply(p_values)
"""
E = self.E
reward = np.zeros((len(p_values), len(E)))
for j in range(len(E)):
reward[:, j] = 1 + E[j] * (p_values - 0.5)
return reward
[docs]class StepBettingFunction(BettingFunction):
"""
A step betting function generates rewards for a sequence of p-values.
"""
def __init__(self, G = 10, Gs = None, A = None, s=None):
"""
Parameters
----------
G : integer, default=10
grid size, ignored if Gs is not None
Gs : list of floats, default=None
grid values, set to [1/G, ..., (G-1)/G] if Gs is None
A : real or array-like of shape (a_values,), default=None
threshold values, set to Gs if A is None
s : integer, default=None
start index, used for step betting function with drift
Examples
--------
A step betting function using default values is obtained by:
.. code-block:: python
bf = StepBettingFunction()
A step betting function using the threshold values [0.2, 0.4, 0.6]
is obtained by:
.. code-block:: python
bf = EpsilonBettingFunction(A = [0.2, 0.4, 0.6])
"""
if Gs is None:
self.Gs = [j/G for j in range(1,G)]
else:
self.Gs = Gs
if A is None:
self.A = self.Gs
else:
self.A = A
self.s = s
[docs] def apply(self, p_values, s=None):
"""
Obtain rewards from p-values.
Parameters
----------
p_values : array-like of shape (n_values,)
p-values
Returns
-------
rewards : ndarray of shape (n_values, grid_size)
rewards for each p-value and epsilon, where grid_size is the
number pairs in the grid (specified during initialization)
Examples
--------
Assuming that ``p_values`` is a vector of p-values, then rewards using
the default step betting function are generated by:
.. code-block:: python
rewards = StepBettingFunction().apply(p_values)
where ``rewards`` will be an ndarray with the same number of rows as
the length of ``p_values`` and with 81 columns, which follows from that
the default grid size (G) is 10, resulting in a grid that consists of
the pairs formed from the grid values [1/G, ..., (G-1)/G].
"""
Gs = self.Gs
A = self.A
if s is None:
s = self.s
if s is None:
if type(A) is list or type(A) is np.ndarray:
reward = np.zeros((len(p_values), len(Gs)*len(A)))
for i, a in enumerate(A):
leq = (p_values <= a)
ge = ~leq
for j, b in enumerate(Gs):
reward[leq, i*len(Gs)+j] = b/a
reward[ge, i*len(Gs)+j] = (1-b)/(1-a)
else:
a = A
reward = np.zeros((len(p_values), len(Gs)))
leq = (p_values <= a)
ge = ~leq
for i, b in enumerate(Gs):
reward[leq, i] = b/a
reward[ge, i] = (1-b)/(1-a)
else: # drift with start index (s >= 0)
if type(A) is list or type(A) is np.ndarray:
reward = np.zeros((len(p_values), len(Gs)*len(A)))
weights = np.array([(s+1)/(s+i+1)
for i in range(len(p_values))])
for i, a in enumerate(A):
for j, b in enumerate(Gs):
a_prime = a * weights + b * (1-weights)
leq = (p_values <= a_prime)
ge = ~leq
reward[leq, i*len(Gs)+j] = b/a_prime[leq]
reward[ge, i*len(Gs)+j] = (1-b)/(1-a_prime[ge])
else:
a = A
reward = np.zeros((len(p_values), len(Gs)))
weights = np.array([(s+1)/(s+i+1)
for i in range(len(p_values))])
for i, b in enumerate(Gs):
a_prime = a * weights + b * (1-weights)
leq = (p_values <= a_prime)
ge = ~leq
reward[leq, i] = b/a_prime[leq]
reward[ge, i] = (1-b)/(1-a_prime[ge])
return reward
class Martingale:
"""
The class contains four sub-classes: :class:`.SimpleJumper`,
:class:`.SleeperStayer`, :class:`.SleeperDrifter` and
:class:`.CompositeMartingale`.
"""
def __init__(self):
pass
[docs]class SimpleJumper(Martingale):
"""
SimpleJumper computes martingale values given a vector of p-values.
"""
def __init__(self, J = 0.01, bf = EpsilonBettingFunction()):
"""
Parameters
----------
J : float, 0 < J < 1
jumping rate
bf : :class:`.Martingale`
betting function
Examples
--------
SimpleJumper with default settings can be obtained by:
.. code-block:: python
m = SimpleJumper()
We may initialize SimpleJumper with a specific jumping rate and betting
function:
.. code-block:: python
m = SimpleJumper(J = 0.1, bf = StepBettingFunction())
Note
----
The specified betting function must have a method `apply` that given a
vector of p-values (ndarray of shape (n_values,)), returns an ndarray
of shape (n_values, n_accounts), where n_accounts >= 1.
Note
----
SimpleJumper is a generalization of Alg. 8.1: Simple Jumper betting
martingale in ALRW, 2ed, p., 232.
"""
if J <= 0 or J >= 1:
raise RuntimeError(("J must be in the range (0, 1)"))
self.J = J
self.bf = bf
[docs] def apply(self, p_values, c=None):
"""
Generate a vector of martingale values, given a vector of p-values.
Parameters
----------
p_values : array-like of shape (n_values,)
p-values
c : integer, default=None
threshold value
Returns
-------
martingales : ndarray of shape (n_values,)
martingale values, returned only if c is None
index : integer, 0 <= index <= n_values
index of the first martingale value that is equal to or greater
than c, if any, and index = n_values, otherwise, returned only if
c is not None
Examples
--------
Assuming that ``p_values`` is a vector of p-values, then martingale
values can be generated by:
.. code-block:: python
v = SimpleJumper().apply(p_values)
The following gives the index of the first martingale value that
is equal to or exceeds 100:
.. code-block:: python
i = SimpleJumper().apply(p_values, c=100)
Note
----
If a threshold (c) is specified, the computation is terminated as soon
as a martingale value reaches this, which may save some time compared to
computing the full sequence.
"""
J = self.J
bf = self.bf
rewards = bf.apply(p_values)
C = np.full(rewards.shape[1], 1/rewards.shape[1])
S = np.zeros(len(p_values))
for i in range(len(p_values)):
C *= rewards[i]
S[i] = np.sum(C)
if c is not None:
if S[i] >= c:
return i
C = (1-J)*C + (J/len(C))*S[i]
if c is not None:
return len(p_values)
else:
return S
[docs]class SleeperStayer(Martingale):
"""
SleeperStayer computes martingale values given a vector of p-values.
"""
def __init__(self, R = 0.001, bf = StepBettingFunction()):
"""
Parameters
----------
R : float, 0 < R < 1
investment rate
bf : :class:`.Martingale`
betting function
Examples
--------
SleeperStayer with default settings can be obtained by:
.. code-block:: python
m = SleeperStayer()
We may initialize SleeperStayer with a specific investment rate and
betting function:
.. code-block:: python
m = SleeperStayer(J = 0.1, bf = EpsilonBettingFunction())
Note
----
The specified betting function must have a method `apply` that given a
vector of p-values (ndarray of shape (n_values,)), returns an ndarray
of shape (n_values, n_accounts), where n_accounts >= 1.
Note
----
SleeperStayer is a generalization of Alg. 9.4: Sleeper/Stayer in ALRW,
2ed, p., 283.
"""
if R <= 0 or R >= 1:
raise RuntimeError(("R must be in the range (0, 1)"))
self.R = R
self.bf = bf
[docs] def apply(self, p_values, c=None):
"""
Generate a vector of martingale values, given a vector of p-values.
Parameters
----------
p_values : array-like of shape (n_values,)
p-values
c : integer, default=None
threshold value
Returns
-------
martingales : ndarray of shape (n_values,)
martingale values, returned only if c is None
index : integer, 0 <= index <= n_values
index of the first martingale value that is equal to or greater
than c, if any, and index = n_values, otherwise, returned only if
c is not None
Examples
--------
Assuming that ``p_values`` is a vector of p-values, then martingale
values can be generated by:
.. code-block:: python
v = SleeperStayer().apply(p_values)
The following gives the index of the first martingale value that
is equal to or exceeds 100:
.. code-block:: python
i = SleeperStayer().apply(p_values, c=100)
Note
----
If a threshold (c) is specified, the computation is terminated as soon
as a martingale value reaches this, which may save some time compared to
computing the full sequence.
"""
R = self.R
bf = self.bf
rewards = bf.apply(p_values)
C = np.full(rewards.shape[1], R/rewards.shape[1])
S = np.zeros(len(p_values))
S_box = 1-R
for i in range(len(p_values)):
C *= rewards[i]
S[i] = S_box + np.sum(C)
if c is not None:
if S[i] >= c:
return i
C += R*S_box/len(C)
S_box *= (1-R)
if c is not None:
return len(p_values)
else:
return S
[docs]class SleeperDrifter(Martingale):
"""
SleeperDrifter computes martingale values given a vector of p-values.
"""
def __init__(self, R = 0.1, M = 100, bf = StepBettingFunction(),
drift=True):
"""
Parameters
----------
R : float, 0 < R < 1
investment rate
M : integer, 0 < M, default=100
wake-up frequency
bf : :class:`.Martingale`
betting function
Examples
--------
SleeperDrifter with default settings can be obtained by:
.. code-block:: python
m = SleeperDrifter()
We may initialize SleeperDrifter with a specific investment rate,
wake-up frequency and betting function:
.. code-block:: python
m = SleeperStayer(J = 0.1, M = 50, bf = EpsilonBettingFunction())
Note
----
The specified betting function must have a method `apply` that given a
vector of p-values (ndarray of shape (n_values,)), returns an ndarray
of shape (n_values, n_accounts), where n_accounts >= 1.
Note
----
SleeperDrifter is a generalization of Alg. 9.5: Sleeper/Drifter in ALRW, 2ed,
p., 284 with a slightly modified initialization, by which rewards greater than
1 may be obtained for the first M p-values.
"""
if R <= 0 or R >= 1:
raise RuntimeError(("R must be in the range (0, 1)"))
if M <= 0:
raise RuntimeError(("M must greater than 1"))
self.R = R
self.M = M
self.bf = bf
self.drift = drift
[docs] def apply(self, p_values, c=None):
"""
Generate a vector of martingale values, given a vector of p-values.
Parameters
----------
p_values : array-like of shape (n_values,)
p-values
c : integer, default=None
threshold value
Returns
-------
martingales : ndarray of shape (n_values,)
martingale values, returned only if c is None
index : integer, 0 <= index <= n_values
index of the first martingale value that is equal to or greater
than c, if any, and index = n_values, otherwise, returned only if
c is not None
Examples
--------
Assuming that ``p_values`` is a vector of p-values, then martingale
values can be generated by:
.. code-block:: python
v = SleeperDrifter().apply(p_values)
The following gives the index of the first martingale value that
is equal to or exceeds 100:
.. code-block:: python
i = SleeperDrifter().apply(p_values, c=100)
Note
----
If a threshold (c) is specified, the computation is terminated as soon
as a martingale value reaches this, which may save some time compared to
computing the full sequence.
"""
R = self.R
M = self.M
bf = self.bf
drift = self.drift
if drift and isinstance(bf, StepBettingFunction):
rewards = [bf.apply(p_values[s*M:], s=(s+1)*M-1) for s in range(len(p_values)//M+1)]
else:
all_rewards = bf.apply(p_values)
rewards = [all_rewards[s*M:] for s in range(1+len(p_values)//M)]
S = np.zeros(len(p_values))
C = np.zeros((1+len(p_values)//M, rewards[0].shape[1]))
C[0,:] = R/C.shape[1]
S_box = 1-R
for i in range(len(p_values)):
for j in range(0, 1+i//M):
C[j] *= rewards[j][i-j*M]
S[i] = S_box + np.sum(C)
if c is not None:
if S[i] >= c:
return i
if (i+1) % M == 0:
C[(i+1)//M] = R*S_box/C.shape[1]
S_box *= (1-R)
if c is not None:
return len(p_values)
else:
return S
[docs]class CompositeMartingale(Martingale):
"""
A composite martingale averages the output of multiple conformal test
martingales given at initialization.
"""
def __init__(self, Martingales):
"""
Parameters
----------
Martingales : list of conformal test martingales
conformal test martingales, used for averaging
Examples
--------
An composite martingale can be obtained by:
.. code-block:: python
cm = CompositeMartingale([SimpleJumper(), SleeperStayer()])
Note
----
Each object in the provided list of conformal test martingales
is required to have a method `apply` that given a vector of
p-values returns an equally long vector of martingale values.
"""
self.Martingales = Martingales
[docs] def apply(self, p_values, c=None):
"""
Generate a vector of martingale values, given a vector of p-values.
Parameters
----------
p_values : array-like of shape (n_values,)
p-values
c : integer, default=None
threshold value
Returns
-------
martingales : ndarray of shape (n_values,)
martingale values, returned only if c is None
index : integer, 0 <= index <= n_values
index of the first martingale value that is equal to or greater
than c, if any, and index = n_values, otherwise, returned only if
c is not None
Examples
--------
Assuming that ``p_values`` is a vector of p-values, then martingale
values can be generated by:
.. code-block:: python
cm = CompositeMartingale([SimpleJumper(), SleeperStayer()])
v = cm.apply(p_values)
The following gives the index of the first martingale value that
is equal to or exceeds 100:
.. code-block:: python
i = cm.apply(p_values, c=100)
"""
S = np.zeros(len(p_values))
for M in self.Martingales:
S += M.apply(p_values)
martingales = S/len(self.Martingales)
if c is not None:
gt = np.argwhere(martingales >= c)
if len(gt) > 0:
return gt[0,0]
else:
return len(p_values)
else:
return martingales
[docs]def semi_online_p_values(alphas, seed=None):
"""
Computes semi-online p-values from a vector of non-conformity scores.
Parameters
----------
alphas : array-like of shape (n_values,)
non-conformity scores
seed : int, default=None
set random seed
Returns
-------
p_values : array-like of shape (n_values,)
p-values
Examples
--------
Assuming that alphas is a vector of non-conformity scores, p-values are
obtained by:
.. code-block:: python
from crepes.martingales import semi_online_p_values
p_values = semi_online_p_values(alphas)
The above results in that ``p_values`` is assigned a vector of the same length
as ``alphas`` and where the p-value for each non-conformity score is computed
with respect to the preceding non-conformity scores only.
Note
----
The output p-values are smoothed and generated in the semi-online mode
according to the terminology in in ALRW, 2ed, p., 111.
"""
if seed is not None:
random_state = np.random.get_state()
np.random.seed(seed)
p_values = np.zeros(len(alphas))
thetas = np.random.rand(len(alphas))
for q in range(len(alphas)):
p_values[q] = (np.sum(alphas[:q] > alphas[q]) + \
thetas[q]*(1+np.sum(alphas[:q] == alphas[q])))/(q+1)
if seed is not None:
np.random.set_state(random_state)
return p_values