Preprocessor

This module implements the sequential data preprocessing method proposed by Wang et al. (2025).

from pycfrl import preprocessor
class pycfrl.preprocessor.preprocessor.Preprocessor

Bases: object

Base class for preprocessors.

Subclasses must implement the preprocess_single_step and preprocess_multiple_steps methods.

__init__() None
abstract preprocess_multiple_steps(zs: list | ndarray, xs: list | ndarray, actions: list | ndarray, rewards: list | ndarray | None = None, **kwargs) tuple[ndarray, ndarray] | ndarray

An abstract prototype of methods that preprocess a whole trajectory.

Args:
zs (list or np.ndarray):

The observed sensitive attributes of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Sensitive Attributes Format.

states (list or np.ndarray):

The state trajectory that is to be preprocessed. It should be a 3D list or array following the Full-trajectory States Format.

actions (list or np.ndarray):

The action trajectory that is to be preprocessed, often generated using a behavior policy. It should be a 2D list or array following the Full-trajectory Actions Format.

rewards (list or np.ndarray, optional):

The reward trajectory that is to be preprocessed. It should be a 2D list or array following the Full-trajectory Rewards Format.

Returns:
xs_tilde (np.ndarray):

The preprocessed states trajectory. It should be a 3D array following the Full-trajectory States Format.

rs_tilde (np.ndarray, optional):

The preprocessed reward trajectory. It should be a 2D array following the Full-trajectory Rewards Format. rs_tilde is not returned if rewards=None in the function input.

abstract preprocess_single_step(z: list | ndarray, xt: list | ndarray, xtm1: list | ndarray | None = None, atm1: list | ndarray | None = None, rtm1: list | ndarray | None = None, **kwargs) tuple[ndarray, ndarray] | ndarray

An abstract prototype of methods that preprocess the states at a single time step.

Args:
zs (list or np.ndarray):

The observed sensitive attributes of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Sensitive Attributes Format.

xt (list or np.ndarray):

The states at the current time step of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Single-time States Format.

xtm1 (list or np.ndarray, optional):

The states at the previous time step of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Single-time States Format.

atm1 (list or np.ndarray, optional):

The actions at the previous time step of each individual in the trajectory that is to be preprocessed. It should be a 1D list or array following the Single-time Actions Format.

rtm1 (list or np.ndarray, optional):

The rewards at the previous time step of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Single-time States Format.

Returns:
xt_tilde (np.ndarray):

The preprocessed states at the given time step. It should be a 2D array following the Single-time States Format.

rt_tilde (np.ndarray, optional):

The preprocessed rewards at the given time step. It should be a 1D array following the Single-time Rewards Format. rt_tilde is not returned if rtm1=None in the function input.

class pycfrl.preprocessor.preprocessor.SequentialPreprocessor(z_space: list | ndarray, num_actions: int, cross_folds: int = 1, mode: Literal['single', 'sensitive'] = 'single', reg_model: Literal['lm', 'nn'] = 'nn', hidden_dims: list[int] = [64, 64], epochs: int = 1000, learning_rate: int | float = 0.005, batch_size: int = 512, is_action_onehot: bool = True, is_normalized: bool = False, is_loss_monitored: bool = True, is_early_stopping: bool = False, test_size: int | float = 0.2, loss_monitoring_patience: int = 10, loss_monitoring_min_delta: int | float = 0.01, early_stopping_patience: int = 10, early_stopping_min_delta: int | float = 0.01)

Bases: Preprocessor

Implementation of the sequential data preprocessing method proposed by Wang et al. (2025).

The preprocessor first learns a model \(\mu(s, a, z)\) of the transition dynamics of the MDP underlying the input trajectory. Then, at each time step, it uses \(\mu\) to reconstruct the counterfactual states and concatenates the reconstructed counterfactual states into a new augmented state vector.

That is, let \(z_i\) be the observed sensitive attribute. At \(t=0\) (i.e. the initial time step), for each individual \(i\) and sensitive attribute level \(z\), the preprocessor calculates

\[\hat{x}_{i1}^z = x_{i1} - \hat{\mathbb{E}}(X_1|Z=z_i) + \hat{\mathbb{E}}(X_1|Z=z)\]

and forms \(\tilde{x}_{i1} = [\hat{x}_{i1}^{z^{(1)}}, \dots, \hat{x}_{i1}^{z^{(K)}}]\).

At \(t>0\), for each individual \(i\) and sensitive attribute level \(z\), the preprocessor calculates

\[[\hat{x}_{it}^z, \hat{r}_{i,t-1}^z] = x_{i1} - \hat{\mu}(x_{i,t-1}, a_{i,t-1}, z_i) + \hat{\mu}(\hat{x}_{i,t-1}^z, a_{i,t-1}, z)\]

and forms \(\tilde{x}_{it} = [\hat{x}_{it}^{z^{(1)}}, \dots, \hat{x}_{it}^{z^{(K)}}]\) and \(\tilde{r}_{i,t-1} = \Sigma_{k=1}^K\hat{\mathbb{P}}(Z=z^{(k)})\hat{r}_{i,t-1}^{z^{(K)}}\).

References:
__init__(z_space: list | ndarray, num_actions: int, cross_folds: int = 1, mode: Literal['single', 'sensitive'] = 'single', reg_model: Literal['lm', 'nn'] = 'nn', hidden_dims: list[int] = [64, 64], epochs: int = 1000, learning_rate: int | float = 0.005, batch_size: int = 512, is_action_onehot: bool = True, is_normalized: bool = False, is_loss_monitored: bool = True, is_early_stopping: bool = False, test_size: int | float = 0.2, loss_monitoring_patience: int = 10, loss_monitoring_min_delta: int | float = 0.01, early_stopping_patience: int = 10, early_stopping_min_delta: int | float = 0.01) None
Args:
z_space (list or np.ndarray):

A 2D list or array of shape (K, zdim) where K is the total number of legit values of the sensitive attribute and zdim is the dimension of the sensitive attribute variable. It contains all legit values of the sensitive attribute. Each legit value should occupy a separate row.

num_actions (int):

The total number of legit actions.

cross_folds (int, optional):

The number of cross folds used during training. When cross_folds=k, the preprocessor will learn k models using different subset of the training data, and the final output of preprocess_single_step and preprocess_multiple_steps will be generally the average of the outputs from each of the k models.

mode (str, optional):

Can either be “single” or “sensitive”. When mode="single", the preprocessor will learn a single model of the transition dynamics where the sensitive attribute is an input to the model. When mode="sensitive", the preprocessor will learn one transition dynamics model for each level of the sensitive attribute, and transitions under each sensitive attribute \(z\) will be estimated using the model corresponding to \(z\).

reg_model (str, optional):

The type of the model used for learning the transition dynamics. Can be “lm” (polynomial regression) or “nn” (neural network). Currently, only ‘nn’ is supported.

hidden_dims (list[int], optional):

The hidden dimensions of the neural network. This argument is not used if reg_model="lm".

epochs (int, optional):

The number of training epochs for the neural network. This argument is not used if reg_model=”lm”.

learning_rate (int or float, optional):

The learning rate of the neural network. This argument is not used if reg_model="lm".

batch_size (int, optional):

The batch size of the neural network. This argument is not used if reg_model="lm".

is_action_onehot (bool, optional):

When set to True, the actions will be one-hot encoded.

is_normalized (bool, optional):

When set to True, the states will be normalized following the formula x_normalized = (x - mean(x)) / std(x).

is_loss_monitored (bool, optional):

When set to True, will split the training data into a training set and a validation set, and will monitor the validation loss during training. A warning will be raised if the percent absolute change in the validation loss is greater than loss_monitoring_min_delta for at least one of the final \(p\) epochs during neural network training, where \(p\) is specified by the argument loss_monitoring_patience. This argument is not used if reg_model="lm".

is_early_stopping (bool, optional):

When set to True, will split the training data into a training set and a validation set, and will enforce early stopping based on the validation loss during neural network training. That is, neural network training will stop early if the percent decrease in the validation loss is no greater than early_stopping_min_delta for \(q\) consecutive training epochs, where \(q\) is specified by the argument early_stopping_patience. This argument is not used if reg_model="lm".

test_size (int or float, optional):

An int or float between 0 and 1 (inclusive) that specifies the proportion of the full training data that is used as the validation set for loss monitoring and early stopping. This argument is not used if reg_model="lm" or both is_loss_monitored and is_early_stopping are False.

loss_monitoring_patience (int, optional):

The number of consecutive epochs with barely-changing validation loss at the end of training that is needed for loss monitoring to not raise warnings. This argument is not used if reg_model="lm" or is_loss_monitored=False.

loss_monitoring_min_delta (int for float, optional):

The maximum amount of percent absolute change in the validation loss for it to be considered barely-changing by the loss monitoring mechanism. This argument is not used if reg_model="lm" or is_loss_monitored=False.

early_stopping_patience (int, optional):

The number of consecutive epochs with barely-decreasing validation loss during training that is needed for early stopping to be triggered. This argument is not used if reg_model="lm" or is_early_stopping=False.

early_stopping_min_delta (int for float, optional):

The maximum amount of decrease in the validation loss for it to be considered barely-decreasing by the early stopping mechanism. This argument is not used if reg_model="lm" or is_early_stopping=False.

preprocess_multiple_steps(zs: list | ndarray, xs: list | ndarray, actions: list | ndarray, rewards: list | ndarray | None = None) tuple[ndarray, ndarray] | ndarray

Preprocess a whole trajectory.

When some \(k>1\) cross folds are specified, the final output will be the avearge of the outputs of each of the \(k\) transition models.

Args:
zs (list or np.ndarray):

The observed sensitive attributes of each individual in the trajectory that is to be preprocessed. It should be a list or array following the Sensitive Attributes Format.

states (list or np.ndarray):

The state trajectory that is to be preprocessed. It should be a list or array following the Full-trajectory States Format.

actions (list or np.ndarray):

The action trajectory that is to be preprocessed, often generated using a behavior policy. It should be a list or array following the Full-trajectory Actions Format.

rewards (list or np.ndarray, optional):

The reward trajectory that is to be preprocessed. It should be a list or array following the Full-trajectory Rewards Format.

Returns:
xs_tilde (np.ndarray):

The preprocessed states trajectory. It should be a 3D array following the Full-trajectory States Format.

rs_tilde (np.ndarray, optional):

The preprocessed reward trajectory. It should be a 2D array following the Full-trajectory Rewards Format. rs_tilde is not returned if rewards=None in the function input.

preprocess_single_step(z: list | ndarray, xt: list | ndarray, xtm1: list | ndarray | None = None, atm1: list | ndarray | None = None, rtm1: list | ndarray | None = None) tuple[ndarray, ndarray] | ndarray

Preprocess one single time step of the trajectory.

When some \(k>1\) cross folds are specified, the final output will be the avearge of the outputs of each of the \(k\) transition models.

Important Note: A SequentialPreprocessor object internally stores the preprocessed counterfactual states from the previous function call using a states buffer, and the stored counterfactual states will be used to preprocess the inputs of the current function call. In this case, suppose preprocess_single_step() is called on a set of transitions at time \(t\) in some trajectory. Then, at the next call of preprocess_single_step() for this instance of SequentialPreprocessor, the transitions passed to the function must be from time \(t+1\) of the same trajectory to ensure that the buffer works correctly. To preprocess another trajectory, either use another instance of SequentialPreprocessor, or pass the initial step of the trajectory to preprocess_single_step() with xtm1=None and atm1=None to reset the buffer.

In general, unless step-wise preprocessing is necessary, we recommend using preprocess_multiple_steps() to preprocess a whole trajectory to avoid unintended bugs.

Args:
zs (list or np.ndarray):

The observed sensitive attributes of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Sensitive Attributes Format.

xt (list or np.ndarray):

The states at the current time step of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Single-time States Format.

xtm1 (list or np.ndarray, optional):

The states at the previous time step of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Single-time States Format.

atm1 (list or np.ndarray, optional):

The actions at the previous time step of each individual in the trajectory that is to be preprocessed. It should be a 1D list or array following the Single-time Actions Format. When both xtm1 and atm1 are set to None, the preprocessor will consider the input to be from the initial time step of a new trajectory, and the internal states buffer will be reset.

rtm1 (list or np.ndarray, optional):

The rewards at the previous time step of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Single-time States Format.

Returns:
xt_tilde (np.ndarray):

The preprocessed states at the given time step. It should be a 2D array following the Single-time States Format.

rt_tilde (np.ndarray, optional):

The preprocessed rewards at the given time step. It should be a 1D array following the Single-time Rewards Format. rt_tilde is not returned if rtm1=None in the function input.

train_preprocessor(zs: list | ndarray, xs: list | ndarray, actions: list | ndarray, rewards: list | ndarray) tuple[ndarray, ndarray]

Train the sequential preprocessor and preprocess the training trajectory.

When some \(k>1\) cross folds are specified, then \(k\) transition models will be trained, each using all but one of the folds. That is, for each fold in the training trajectory, we train a model using all the other folds, and we preprocess the current fold with this model. The detailed preprocessing procedure can be found here.

Args:
zs (list or np.ndarray):

The observed sensitive attributes of each individual in the training data. It should be a 2D list or array following the Sensitive Attributes Format.

xs (list or np.ndarray):

The state trajectory used for training. It should be a 3D list or array following the Full-trajectory States Format.

actions (list or np.ndarray):

The action trajectory used for training, often generated using a behavior policy. It should be a 2D list or array following the Full-trajectory Actions Format.

rewards (list or np.ndarray):

The reward trajectory used for training. It should be a 2D list or array following the Full-trajectory Rewards Format.

Returns:
xs_tilde (np.ndarray):

The preprocessed states trajectory. It should be a 3D array following the Full-trajectory States Format.

rs_tilde (np.ndarray):

The preprocessed reward trajectory. It should be a 2D array following the Full-trajectory Rewards Format.