Source code for crepes.martingales

"""Conformal classifiers, regressors, and predictive systems (crepes) martingales

Functions for generating conformal test martingales.

Author: Henrik Boström (bostromh@kth.se)

Copyright 2025 Henrik Boström

License: BSD 3 clause

"""

import numpy as np
import pandas as pd

class BettingFunction():
    """
    The class contains two sub-classes: :class:`.EpsilonBettingFunction`,
    and :class:`.StepBettingFunction`.
    """
    
    def __init__(self):
        pass

[docs]class EpsilonBettingFunction(BettingFunction): """ An epsilon betting function generates rewards for a sequence of p-values. """ def __init__(self, E = [-1, -0.5, 0, 0.5, 1]): """ Parameters ---------- E : list of floats or integers, default=[-1, -0.5, 0, 0.5, 1] epsilons, where each epsilon is in the range (-2, 2) Examples -------- An epsilon betting function using default values is obtained by: .. code-block:: python bf = EpsilonBettingFunction() An epsilon betting function using the values -1, 0 and 1 is obtained by: .. code-block:: python bf = EpsilonBettingFunction(E = [-1, 0, 1]) Note ---- A runtime error is reported if not all values in E are within the range (-2, 2). """ if not np.all(np.array(E) > -2) or not np.all(np.array(E) < 2): raise RuntimeError(("each value in E must be in the range (-2, 2)")) self.E = np.array(E)
[docs] def apply(self, p_values): """ Obtain rewards from p-values. Parameters ---------- p_values : array-like of shape (n_values,) p-values Returns ------- rewards : ndarray of shape (n_values, n_epsilons) rewards for each p-value and epsilon, where n_epsilons is the number of values in E (provided during initialization) Examples -------- Assuming that ``p_values`` is a vector of p-values, then rewards using the default epsilon values are generated by: .. code-block:: python rewards = EpsilonBettingFunction().apply(p_values) Rewards using the epsilon values [-1, 0 and 1] are obtained by: .. code-block:: python rewards = EpsilonBettingFunction(E = [-1, 0, 1]).apply(p_values) """ E = self.E reward = np.zeros((len(p_values), len(E))) for j in range(len(E)): reward[:, j] = 1 + E[j] * (p_values - 0.5) return reward
[docs]class StepBettingFunction(BettingFunction): """ A step betting function generates rewards for a sequence of p-values. """ def __init__(self, G = 10, Gs = None, A = None, s=None): """ Parameters ---------- G : integer, default=10 grid size, ignored if Gs is not None Gs : list of floats, default=None grid values, set to [1/G, ..., (G-1)/G] if Gs is None A : real or array-like of shape (a_values,), default=None threshold values, set to Gs if A is None s : integer, default=None start index, used for step betting function with drift Examples -------- A step betting function using default values is obtained by: .. code-block:: python bf = StepBettingFunction() A step betting function using the threshold values [0.2, 0.4, 0.6] is obtained by: .. code-block:: python bf = EpsilonBettingFunction(A = [0.2, 0.4, 0.6]) """ if Gs is None: self.Gs = [j/G for j in range(1,G)] else: self.Gs = Gs if A is None: self.A = self.Gs else: self.A = A self.s = s
[docs] def apply(self, p_values, s=None): """ Obtain rewards from p-values. Parameters ---------- p_values : array-like of shape (n_values,) p-values Returns ------- rewards : ndarray of shape (n_values, grid_size) rewards for each p-value and epsilon, where grid_size is the number pairs in the grid (specified during initialization) Examples -------- Assuming that ``p_values`` is a vector of p-values, then rewards using the default step betting function are generated by: .. code-block:: python rewards = StepBettingFunction().apply(p_values) where ``rewards`` will be an ndarray with the same number of rows as the length of ``p_values`` and with 81 columns, which follows from that the default grid size (G) is 10, resulting in a grid that consists of the pairs formed from the grid values [1/G, ..., (G-1)/G]. """ Gs = self.Gs A = self.A if s is None: s = self.s if s is None: if type(A) is list or type(A) is np.ndarray: reward = np.zeros((len(p_values), len(Gs)*len(A))) for i, a in enumerate(A): leq = (p_values <= a) ge = ~leq for j, b in enumerate(Gs): reward[leq, i*len(Gs)+j] = b/a reward[ge, i*len(Gs)+j] = (1-b)/(1-a) else: a = A reward = np.zeros((len(p_values), len(Gs))) leq = (p_values <= a) ge = ~leq for i, b in enumerate(Gs): reward[leq, i] = b/a reward[ge, i] = (1-b)/(1-a) else: # drift with start index (s >= 0) if type(A) is list or type(A) is np.ndarray: reward = np.zeros((len(p_values), len(Gs)*len(A))) weights = np.array([(s+1)/(s+i+1) for i in range(len(p_values))]) for i, a in enumerate(A): for j, b in enumerate(Gs): a_prime = a * weights + b * (1-weights) leq = (p_values <= a_prime) ge = ~leq reward[leq, i*len(Gs)+j] = b/a_prime[leq] reward[ge, i*len(Gs)+j] = (1-b)/(1-a_prime[ge]) else: a = A reward = np.zeros((len(p_values), len(Gs))) weights = np.array([(s+1)/(s+i+1) for i in range(len(p_values))]) for i, b in enumerate(Gs): a_prime = a * weights + b * (1-weights) leq = (p_values <= a_prime) ge = ~leq reward[leq, i] = b/a_prime[leq] reward[ge, i] = (1-b)/(1-a_prime[ge]) return reward
class Martingale: """ The class contains four sub-classes: :class:`.SimpleJumper`, :class:`.SleeperStayer`, :class:`.SleeperDrifter` and :class:`.CompositeMartingale`. """ def __init__(self): pass
[docs]class SimpleJumper(Martingale): """ SimpleJumper computes martingale values given a vector of p-values. """ def __init__(self, J = 0.01, bf = EpsilonBettingFunction()): """ Parameters ---------- J : float, 0 < J < 1 jumping rate bf : :class:`.Martingale` betting function Examples -------- SimpleJumper with default settings can be obtained by: .. code-block:: python m = SimpleJumper() We may initialize SimpleJumper with a specific jumping rate and betting function: .. code-block:: python m = SimpleJumper(J = 0.1, bf = StepBettingFunction()) Note ---- The specified betting function must have a method `apply` that given a vector of p-values (ndarray of shape (n_values,)), returns an ndarray of shape (n_values, n_accounts), where n_accounts >= 1. Note ---- SimpleJumper is a generalization of Alg. 8.1: Simple Jumper betting martingale in ALRW, 2ed, p., 232. """ if J <= 0 or J >= 1: raise RuntimeError(("J must be in the range (0, 1)")) self.J = J self.bf = bf
[docs] def apply(self, p_values, c=None): """ Generate a vector of martingale values, given a vector of p-values. Parameters ---------- p_values : array-like of shape (n_values,) p-values c : integer, default=None threshold value Returns ------- martingales : ndarray of shape (n_values,) martingale values, returned only if c is None index : integer, 0 <= index <= n_values index of the first martingale value that is equal to or greater than c, if any, and index = n_values, otherwise, returned only if c is not None Examples -------- Assuming that ``p_values`` is a vector of p-values, then martingale values can be generated by: .. code-block:: python v = SimpleJumper().apply(p_values) The following gives the index of the first martingale value that is equal to or exceeds 100: .. code-block:: python i = SimpleJumper().apply(p_values, c=100) Note ---- If a threshold (c) is specified, the computation is terminated as soon as a martingale value reaches this, which may save some time compared to computing the full sequence. """ J = self.J bf = self.bf rewards = bf.apply(p_values) C = np.full(rewards.shape[1], 1/rewards.shape[1]) S = np.zeros(len(p_values)) for i in range(len(p_values)): C *= rewards[i] S[i] = np.sum(C) if c is not None: if S[i] >= c: return i C = (1-J)*C + (J/len(C))*S[i] if c is not None: return len(p_values) else: return S
[docs]class SleeperStayer(Martingale): """ SleeperStayer computes martingale values given a vector of p-values. """ def __init__(self, R = 0.001, bf = StepBettingFunction()): """ Parameters ---------- R : float, 0 < R < 1 investment rate bf : :class:`.Martingale` betting function Examples -------- SleeperStayer with default settings can be obtained by: .. code-block:: python m = SleeperStayer() We may initialize SleeperStayer with a specific investment rate and betting function: .. code-block:: python m = SleeperStayer(J = 0.1, bf = EpsilonBettingFunction()) Note ---- The specified betting function must have a method `apply` that given a vector of p-values (ndarray of shape (n_values,)), returns an ndarray of shape (n_values, n_accounts), where n_accounts >= 1. Note ---- SleeperStayer is a generalization of Alg. 9.4: Sleeper/Stayer in ALRW, 2ed, p., 283. """ if R <= 0 or R >= 1: raise RuntimeError(("R must be in the range (0, 1)")) self.R = R self.bf = bf
[docs] def apply(self, p_values, c=None): """ Generate a vector of martingale values, given a vector of p-values. Parameters ---------- p_values : array-like of shape (n_values,) p-values c : integer, default=None threshold value Returns ------- martingales : ndarray of shape (n_values,) martingale values, returned only if c is None index : integer, 0 <= index <= n_values index of the first martingale value that is equal to or greater than c, if any, and index = n_values, otherwise, returned only if c is not None Examples -------- Assuming that ``p_values`` is a vector of p-values, then martingale values can be generated by: .. code-block:: python v = SleeperStayer().apply(p_values) The following gives the index of the first martingale value that is equal to or exceeds 100: .. code-block:: python i = SleeperStayer().apply(p_values, c=100) Note ---- If a threshold (c) is specified, the computation is terminated as soon as a martingale value reaches this, which may save some time compared to computing the full sequence. """ R = self.R bf = self.bf rewards = bf.apply(p_values) C = np.full(rewards.shape[1], R/rewards.shape[1]) S = np.zeros(len(p_values)) S_box = 1-R for i in range(len(p_values)): C *= rewards[i] S[i] = S_box + np.sum(C) if c is not None: if S[i] >= c: return i C += R*S_box/len(C) S_box *= (1-R) if c is not None: return len(p_values) else: return S
[docs]class SleeperDrifter(Martingale): """ SleeperDrifter computes martingale values given a vector of p-values. """ def __init__(self, R = 0.1, M = 100, bf = StepBettingFunction(), drift=True): """ Parameters ---------- R : float, 0 < R < 1 investment rate M : integer, 0 < M, default=100 wake-up frequency bf : :class:`.Martingale` betting function Examples -------- SleeperDrifter with default settings can be obtained by: .. code-block:: python m = SleeperDrifter() We may initialize SleeperDrifter with a specific investment rate, wake-up frequency and betting function: .. code-block:: python m = SleeperStayer(J = 0.1, M = 50, bf = EpsilonBettingFunction()) Note ---- The specified betting function must have a method `apply` that given a vector of p-values (ndarray of shape (n_values,)), returns an ndarray of shape (n_values, n_accounts), where n_accounts >= 1. Note ---- SleeperDrifter is a generalization of Alg. 9.5: Sleeper/Drifter in ALRW, 2ed, p., 284 with a slightly modified initialization, by which rewards greater than 1 may be obtained for the first M p-values. """ if R <= 0 or R >= 1: raise RuntimeError(("R must be in the range (0, 1)")) if M <= 0: raise RuntimeError(("M must greater than 1")) self.R = R self.M = M self.bf = bf self.drift = drift
[docs] def apply(self, p_values, c=None): """ Generate a vector of martingale values, given a vector of p-values. Parameters ---------- p_values : array-like of shape (n_values,) p-values c : integer, default=None threshold value Returns ------- martingales : ndarray of shape (n_values,) martingale values, returned only if c is None index : integer, 0 <= index <= n_values index of the first martingale value that is equal to or greater than c, if any, and index = n_values, otherwise, returned only if c is not None Examples -------- Assuming that ``p_values`` is a vector of p-values, then martingale values can be generated by: .. code-block:: python v = SleeperDrifter().apply(p_values) The following gives the index of the first martingale value that is equal to or exceeds 100: .. code-block:: python i = SleeperDrifter().apply(p_values, c=100) Note ---- If a threshold (c) is specified, the computation is terminated as soon as a martingale value reaches this, which may save some time compared to computing the full sequence. """ R = self.R M = self.M bf = self.bf drift = self.drift if drift and isinstance(bf, StepBettingFunction): rewards = [bf.apply(p_values[s*M:], s=(s+1)*M-1) for s in range(len(p_values)//M+1)] else: all_rewards = bf.apply(p_values) rewards = [all_rewards[s*M:] for s in range(1+len(p_values)//M)] S = np.zeros(len(p_values)) C = np.zeros((1+len(p_values)//M, rewards[0].shape[1])) C[0,:] = R/C.shape[1] S_box = 1-R for i in range(len(p_values)): for j in range(0, 1+i//M): C[j] *= rewards[j][i-j*M] S[i] = S_box + np.sum(C) if c is not None: if S[i] >= c: return i if (i+1) % M == 0: C[(i+1)//M] = R*S_box/C.shape[1] S_box *= (1-R) if c is not None: return len(p_values) else: return S
[docs]class CompositeMartingale(Martingale): """ A composite martingale averages the output of multiple conformal test martingales given at initialization. """ def __init__(self, Martingales): """ Parameters ---------- Martingales : list of conformal test martingales conformal test martingales, used for averaging Examples -------- An composite martingale can be obtained by: .. code-block:: python cm = CompositeMartingale([SimpleJumper(), SleeperStayer()]) Note ---- Each object in the provided list of conformal test martingales is required to have a method `apply` that given a vector of p-values returns an equally long vector of martingale values. """ self.Martingales = Martingales
[docs] def apply(self, p_values, c=None): """ Generate a vector of martingale values, given a vector of p-values. Parameters ---------- p_values : array-like of shape (n_values,) p-values c : integer, default=None threshold value Returns ------- martingales : ndarray of shape (n_values,) martingale values, returned only if c is None index : integer, 0 <= index <= n_values index of the first martingale value that is equal to or greater than c, if any, and index = n_values, otherwise, returned only if c is not None Examples -------- Assuming that ``p_values`` is a vector of p-values, then martingale values can be generated by: .. code-block:: python cm = CompositeMartingale([SimpleJumper(), SleeperStayer()]) v = cm.apply(p_values) The following gives the index of the first martingale value that is equal to or exceeds 100: .. code-block:: python i = cm.apply(p_values, c=100) """ S = np.zeros(len(p_values)) for M in self.Martingales: S += M.apply(p_values) martingales = S/len(self.Martingales) if c is not None: gt = np.argwhere(martingales >= c) if len(gt) > 0: return gt[0,0] else: return len(p_values) else: return martingales
[docs]def semi_online_p_values(alphas, seed=None): """ Computes semi-online p-values from a vector of non-conformity scores. Parameters ---------- alphas : array-like of shape (n_values,) non-conformity scores seed : int, default=None set random seed Returns ------- p_values : array-like of shape (n_values,) p-values Examples -------- Assuming that alphas is a vector of non-conformity scores, p-values are obtained by: .. code-block:: python from crepes.martingales import semi_online_p_values p_values = semi_online_p_values(alphas) The above results in that ``p_values`` is assigned a vector of the same length as ``alphas`` and where the p-value for each non-conformity score is computed with respect to the preceding non-conformity scores only. Note ---- The output p-values are smoothed and generated in the semi-online mode according to the terminology in in ALRW, 2ed, p., 111. """ if seed is not None: random_state = np.random.get_state() np.random.seed(seed) p_values = np.zeros(len(alphas)) thetas = np.random.rand(len(alphas)) for q in range(len(alphas)): p_values[q] = (np.sum(alphas[:q] > alphas[q]) + \ thetas[q]*(1+np.sum(alphas[:q] == alphas[q])))/(q+1) if seed is not None: np.random.set_state(random_state) return p_values