Preprocessor
This module implements the sequential data preprocessing method proposed by Wang et al. (2025).
from pycfrl import preprocessor
- class pycfrl.preprocessor.preprocessor.Preprocessor
Bases:
objectBase class for preprocessors.
Subclasses must implement the
preprocess_single_stepandpreprocess_multiple_stepsmethods.- __init__() None
- abstract preprocess_multiple_steps(zs: list | ndarray, xs: list | ndarray, actions: list | ndarray, rewards: list | ndarray | None = None, **kwargs) tuple[ndarray, ndarray] | ndarray
An abstract prototype of methods that preprocess a whole trajectory.
- Args:
- zs (list or np.ndarray):
The observed sensitive attributes of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Sensitive Attributes Format.
- states (list or np.ndarray):
The state trajectory that is to be preprocessed. It should be a 3D list or array following the Full-trajectory States Format.
- actions (list or np.ndarray):
The action trajectory that is to be preprocessed, often generated using a behavior policy. It should be a 2D list or array following the Full-trajectory Actions Format.
- rewards (list or np.ndarray, optional):
The reward trajectory that is to be preprocessed. It should be a 2D list or array following the Full-trajectory Rewards Format.
- Returns:
- xs_tilde (np.ndarray):
The preprocessed states trajectory. It should be a 3D array following the Full-trajectory States Format.
- rs_tilde (np.ndarray, optional):
The preprocessed reward trajectory. It should be a 2D array following the Full-trajectory Rewards Format.
rs_tildeis not returned ifrewards=Nonein the function input.
- abstract preprocess_single_step(z: list | ndarray, xt: list | ndarray, xtm1: list | ndarray | None = None, atm1: list | ndarray | None = None, rtm1: list | ndarray | None = None, **kwargs) tuple[ndarray, ndarray] | ndarray
An abstract prototype of methods that preprocess the states at a single time step.
- Args:
- zs (list or np.ndarray):
The observed sensitive attributes of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Sensitive Attributes Format.
- xt (list or np.ndarray):
The states at the current time step of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Single-time States Format.
- xtm1 (list or np.ndarray, optional):
The states at the previous time step of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Single-time States Format.
- atm1 (list or np.ndarray, optional):
The actions at the previous time step of each individual in the trajectory that is to be preprocessed. It should be a 1D list or array following the Single-time Actions Format.
- rtm1 (list or np.ndarray, optional):
The rewards at the previous time step of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Single-time States Format.
- Returns:
- xt_tilde (np.ndarray):
The preprocessed states at the given time step. It should be a 2D array following the Single-time States Format.
- rt_tilde (np.ndarray, optional):
The preprocessed rewards at the given time step. It should be a 1D array following the Single-time Rewards Format.
rt_tildeis not returned ifrtm1=Nonein the function input.
- class pycfrl.preprocessor.preprocessor.SequentialPreprocessor(z_space: list | ndarray, num_actions: int, cross_folds: int = 1, mode: Literal['single', 'sensitive'] = 'single', reg_model: Literal['lm', 'nn'] = 'nn', hidden_dims: list[int] = [64, 64], epochs: int = 1000, learning_rate: int | float = 0.005, batch_size: int = 512, is_action_onehot: bool = True, is_normalized: bool = False, is_loss_monitored: bool = True, is_early_stopping: bool = False, test_size: int | float = 0.2, loss_monitoring_patience: int = 10, loss_monitoring_min_delta: int | float = 0.01, early_stopping_patience: int = 10, early_stopping_min_delta: int | float = 0.01)
Bases:
PreprocessorImplementation of the sequential data preprocessing method proposed by Wang et al. (2025).
The preprocessor first learns a model \(\mu(s, a, z)\) of the transition dynamics of the MDP underlying the input trajectory. Then, at each time step, it uses \(\mu\) to reconstruct the counterfactual states and concatenates the reconstructed counterfactual states into a new augmented state vector.
That is, let \(z_i\) be the observed sensitive attribute. At \(t=0\) (i.e. the initial time step), for each individual \(i\) and sensitive attribute level \(z\), the preprocessor calculates
\[\hat{x}_{i1}^z = x_{i1} - \hat{\mathbb{E}}(X_1|Z=z_i) + \hat{\mathbb{E}}(X_1|Z=z)\]and forms \(\tilde{x}_{i1} = [\hat{x}_{i1}^{z^{(1)}}, \dots, \hat{x}_{i1}^{z^{(K)}}]\).
At \(t>0\), for each individual \(i\) and sensitive attribute level \(z\), the preprocessor calculates
\[[\hat{x}_{it}^z, \hat{r}_{i,t-1}^z] = x_{i1} - \hat{\mu}(x_{i,t-1}, a_{i,t-1}, z_i) + \hat{\mu}(\hat{x}_{i,t-1}^z, a_{i,t-1}, z)\]and forms \(\tilde{x}_{it} = [\hat{x}_{it}^{z^{(1)}}, \dots, \hat{x}_{it}^{z^{(K)}}]\) and \(\tilde{r}_{i,t-1} = \Sigma_{k=1}^K\hat{\mathbb{P}}(Z=z^{(k)})\hat{r}_{i,t-1}^{z^{(K)}}\).
- References:
- __init__(z_space: list | ndarray, num_actions: int, cross_folds: int = 1, mode: Literal['single', 'sensitive'] = 'single', reg_model: Literal['lm', 'nn'] = 'nn', hidden_dims: list[int] = [64, 64], epochs: int = 1000, learning_rate: int | float = 0.005, batch_size: int = 512, is_action_onehot: bool = True, is_normalized: bool = False, is_loss_monitored: bool = True, is_early_stopping: bool = False, test_size: int | float = 0.2, loss_monitoring_patience: int = 10, loss_monitoring_min_delta: int | float = 0.01, early_stopping_patience: int = 10, early_stopping_min_delta: int | float = 0.01) None
- Args:
- z_space (list or np.ndarray):
A 2D list or array of shape (K, zdim) where K is the total number of legit values of the sensitive attribute and zdim is the dimension of the sensitive attribute variable. It contains all legit values of the sensitive attribute. Each legit value should occupy a separate row.
- num_actions (int):
The total number of legit actions.
- cross_folds (int, optional):
The number of cross folds used during training. When
cross_folds=k, the preprocessor will learnkmodels using different subset of the training data, and the final output ofpreprocess_single_stepandpreprocess_multiple_stepswill be generally the average of the outputs from each of thekmodels.- mode (str, optional):
Can either be “single” or “sensitive”. When
mode="single", the preprocessor will learn a single model of the transition dynamics where the sensitive attribute is an input to the model. Whenmode="sensitive", the preprocessor will learn one transition dynamics model for each level of the sensitive attribute, and transitions under each sensitive attribute \(z\) will be estimated using the model corresponding to \(z\).- reg_model (str, optional):
The type of the model used for learning the transition dynamics. Can be “lm” (polynomial regression) or “nn” (neural network). Currently, only ‘nn’ is supported.
- hidden_dims (list[int], optional):
The hidden dimensions of the neural network. This argument is not used if
reg_model="lm".- epochs (int, optional):
The number of training epochs for the neural network. This argument is not used if reg_model=”lm”.
- learning_rate (int or float, optional):
The learning rate of the neural network. This argument is not used if
reg_model="lm".- batch_size (int, optional):
The batch size of the neural network. This argument is not used if
reg_model="lm".- is_action_onehot (bool, optional):
When set to
True, the actions will be one-hot encoded.- is_normalized (bool, optional):
When set to
True, the states will be normalized following the formulax_normalized = (x - mean(x)) / std(x).- is_loss_monitored (bool, optional):
When set to
True, will split the training data into a training set and a validation set, and will monitor the validation loss during training. A warning will be raised if the percent absolute change in the validation loss is greater thanloss_monitoring_min_deltafor at least one of the final \(p\) epochs during neural network training, where \(p\) is specified by the argumentloss_monitoring_patience. This argument is not used ifreg_model="lm".- is_early_stopping (bool, optional):
When set to
True, will split the training data into a training set and a validation set, and will enforce early stopping based on the validation loss during neural network training. That is, neural network training will stop early if the percent decrease in the validation loss is no greater thanearly_stopping_min_deltafor \(q\) consecutive training epochs, where \(q\) is specified by the argumentearly_stopping_patience. This argument is not used ifreg_model="lm".- test_size (int or float, optional):
An
intorfloatbetween 0 and 1 (inclusive) that specifies the proportion of the full training data that is used as the validation set for loss monitoring and early stopping. This argument is not used ifreg_model="lm"or bothis_loss_monitoredandis_early_stoppingareFalse.- loss_monitoring_patience (int, optional):
The number of consecutive epochs with barely-changing validation loss at the end of training that is needed for loss monitoring to not raise warnings. This argument is not used if
reg_model="lm"oris_loss_monitored=False.- loss_monitoring_min_delta (int for float, optional):
The maximum amount of percent absolute change in the validation loss for it to be considered barely-changing by the loss monitoring mechanism. This argument is not used if
reg_model="lm"oris_loss_monitored=False.- early_stopping_patience (int, optional):
The number of consecutive epochs with barely-decreasing validation loss during training that is needed for early stopping to be triggered. This argument is not used if
reg_model="lm"oris_early_stopping=False.- early_stopping_min_delta (int for float, optional):
The maximum amount of decrease in the validation loss for it to be considered barely-decreasing by the early stopping mechanism. This argument is not used if
reg_model="lm"oris_early_stopping=False.
- preprocess_multiple_steps(zs: list | ndarray, xs: list | ndarray, actions: list | ndarray, rewards: list | ndarray | None = None) tuple[ndarray, ndarray] | ndarray
Preprocess a whole trajectory.
When some \(k>1\) cross folds are specified, the final output will be the avearge of the outputs of each of the \(k\) transition models.
- Args:
- zs (list or np.ndarray):
The observed sensitive attributes of each individual in the trajectory that is to be preprocessed. It should be a list or array following the Sensitive Attributes Format.
- states (list or np.ndarray):
The state trajectory that is to be preprocessed. It should be a list or array following the Full-trajectory States Format.
- actions (list or np.ndarray):
The action trajectory that is to be preprocessed, often generated using a behavior policy. It should be a list or array following the Full-trajectory Actions Format.
- rewards (list or np.ndarray, optional):
The reward trajectory that is to be preprocessed. It should be a list or array following the Full-trajectory Rewards Format.
- Returns:
- xs_tilde (np.ndarray):
The preprocessed states trajectory. It should be a 3D array following the Full-trajectory States Format.
- rs_tilde (np.ndarray, optional):
The preprocessed reward trajectory. It should be a 2D array following the Full-trajectory Rewards Format.
rs_tildeis not returned ifrewards=Nonein the function input.
- preprocess_single_step(z: list | ndarray, xt: list | ndarray, xtm1: list | ndarray | None = None, atm1: list | ndarray | None = None, rtm1: list | ndarray | None = None) tuple[ndarray, ndarray] | ndarray
Preprocess one single time step of the trajectory.
When some \(k>1\) cross folds are specified, the final output will be the avearge of the outputs of each of the \(k\) transition models.
Important Note: A
SequentialPreprocessorobject internally stores the preprocessed counterfactual states from the previous function call using a states buffer, and the stored counterfactual states will be used to preprocess the inputs of the current function call. In this case, supposepreprocess_single_step()is called on a set of transitions at time \(t\) in some trajectory. Then, at the next call ofpreprocess_single_step()for this instance ofSequentialPreprocessor, the transitions passed to the function must be from time \(t+1\) of the same trajectory to ensure that the buffer works correctly. To preprocess another trajectory, either use another instance ofSequentialPreprocessor, or pass the initial step of the trajectory topreprocess_single_step()withxtm1=Noneandatm1=Noneto reset the buffer.In general, unless step-wise preprocessing is necessary, we recommend using
preprocess_multiple_steps()to preprocess a whole trajectory to avoid unintended bugs.- Args:
- zs (list or np.ndarray):
The observed sensitive attributes of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Sensitive Attributes Format.
- xt (list or np.ndarray):
The states at the current time step of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Single-time States Format.
- xtm1 (list or np.ndarray, optional):
The states at the previous time step of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Single-time States Format.
- atm1 (list or np.ndarray, optional):
The actions at the previous time step of each individual in the trajectory that is to be preprocessed. It should be a 1D list or array following the Single-time Actions Format. When both
xtm1andatm1are set toNone, the preprocessor will consider the input to be from the initial time step of a new trajectory, and the internal states buffer will be reset.- rtm1 (list or np.ndarray, optional):
The rewards at the previous time step of each individual in the trajectory that is to be preprocessed. It should be a 2D list or array following the Single-time States Format.
- Returns:
- xt_tilde (np.ndarray):
The preprocessed states at the given time step. It should be a 2D array following the Single-time States Format.
- rt_tilde (np.ndarray, optional):
The preprocessed rewards at the given time step. It should be a 1D array following the Single-time Rewards Format.
rt_tildeis not returned ifrtm1=Nonein the function input.
- train_preprocessor(zs: list | ndarray, xs: list | ndarray, actions: list | ndarray, rewards: list | ndarray) tuple[ndarray, ndarray]
Train the sequential preprocessor and preprocess the training trajectory.
When some \(k>1\) cross folds are specified, then \(k\) transition models will be trained, each using all but one of the folds. That is, for each fold in the training trajectory, we train a model using all the other folds, and we preprocess the current fold with this model. The detailed preprocessing procedure can be found here.
- Args:
- zs (list or np.ndarray):
The observed sensitive attributes of each individual in the training data. It should be a 2D list or array following the Sensitive Attributes Format.
- xs (list or np.ndarray):
The state trajectory used for training. It should be a 3D list or array following the Full-trajectory States Format.
- actions (list or np.ndarray):
The action trajectory used for training, often generated using a behavior policy. It should be a 2D list or array following the Full-trajectory Actions Format.
- rewards (list or np.ndarray):
The reward trajectory used for training. It should be a 2D list or array following the Full-trajectory Rewards Format.
- Returns:
- xs_tilde (np.ndarray):
The preprocessed states trajectory. It should be a 3D array following the Full-trajectory States Format.
- rs_tilde (np.ndarray):
The preprocessed reward trajectory. It should be a 2D array following the Full-trajectory Rewards Format.