Source code for torch_frame.transforms.cat_to_num_transform
from __future__ import annotations
import copy
import logging
from typing import Any
import pandas as pd
import torch
import torch.nn.functional as F
from torch_frame import NAStrategy, TensorFrame, stype
from torch_frame.data.stats import StatType, compute_col_stats
from torch_frame.transforms import FittableBaseTransform
[docs]class CatToNumTransform(FittableBaseTransform):
r"""Transforms categorical features in :class:`TensorFrame` using target
statistics. The original transform is explained in
`A preprocessing scheme for high-cardinality categorical attributes in
classification and prediction problems
<https://dl.acm.org/doi/10.1145/507533.507538>`_ paper.
Specifically, each categorical feature is transformed into numerical
feature using m-probability estimate, defined by
.. math::
\frac{n_c + p \cdot m}{n + m}
where :math:`n_c` is the count of the category, :math:`n` is the total
count, :math:`p` is the prior probability and :math:`m` is a smoothing
factor.
"""
def _fit(
self,
tf_train: TensorFrame,
col_stats: dict[str, dict[StatType, Any]],
) -> None:
if tf_train.y is None:
raise RuntimeError(
"'{self.__class__.__name__}' cannot be used when target column"
" is None.")
if stype.categorical not in tf_train.col_names_dict:
logging.info(
"The input TensorFrame does not contain any categorical "
"columns. No fitting will be performed.")
self._transformed_stats = col_stats
return
tensor = self._replace_nans(tf_train.feat_dict[stype.categorical],
NAStrategy.MOST_FREQUENT)
self.col_stats = col_stats
columns = []
self.data_size = tensor.size(0)
# Check if it is multiclass classification task.
# If it is multiclass classification task, then it doesn't make sense
# to assume the target mean as the prior. Therefore, we need to expand
# the number of columns to (num_target_classes - 1). More details can
# be found in https://dl.acm.org/doi/10.1145/507533.507538
if not torch.is_floating_point(tf_train.y) and tf_train.y.max() > 1:
self.num_classes = tf_train.y.max() + 1
target = F.one_hot(tf_train.y, self.num_classes)[:, :-1]
self.target_mean = target.float().mean(dim=0)
num_rows, num_cols = tf_train.feat_dict[stype.categorical].shape
transformed_tensor = torch.zeros(num_rows,
num_cols * (self.num_classes - 1),
dtype=torch.float32,
device=tf_train.device)
else:
self.num_classes = 2
target = tf_train.y.unsqueeze(1)
mask = ~torch.isnan(target)
if (~mask).any():
target = target[mask]
if target.numel() == 0:
raise ValueError("Target value contains only nans.")
self.target_mean = torch.mean(target.float())
transformed_tensor = torch.zeros_like(
tf_train.feat_dict[stype.categorical], dtype=torch.float32)
for i in range(len(tf_train.col_names_dict[stype.categorical])):
col_name = tf_train.col_names_dict[stype.categorical][i]
count = torch.tensor(col_stats[col_name][StatType.COUNT][1],
device=tf_train.device)
feat = tensor[:, i]
v = torch.index_select(count, 0, feat).unsqueeze(1).repeat(
1, self.num_classes - 1)
start = i * (self.num_classes - 1)
end = (i + 1) * (self.num_classes - 1)
transformed_tensor[:, start:end] = ((v + self.target_mean) /
(self.data_size + 1))
columns += [f"{col_name}_{i}" for i in range(self.num_classes - 1)]
self.new_columns = columns
transformed_df = pd.DataFrame(transformed_tensor.cpu().numpy(),
columns=columns)
transformed_col_stats = dict()
if stype.numerical in tf_train.col_names_dict:
for col in tf_train.col_names_dict[stype.numerical]:
transformed_col_stats[col] = copy.copy(col_stats[col])
for col in columns:
# TODO: Make col stats computed purely with PyTorch
# (without mapping back to pandas series).
transformed_col_stats[col] = compute_col_stats(
transformed_df[col], stype.numerical)
self._transformed_stats = transformed_col_stats
def _forward(self, tf: TensorFrame) -> TensorFrame:
if stype.categorical not in tf.col_names_dict:
logging.info(
"The input TensorFrame does not contain any categorical "
"columns. The original TensorFrame will be returned.")
return tf
tensor = self._replace_nans(
tf.feat_dict[stype.categorical],
NAStrategy.MOST_FREQUENT,
)
if not torch.is_floating_point(tf.y) and tf.y.max() > 1:
num_rows, num_cols = tf.feat_dict[stype.categorical].shape
transformed_tensor = torch.zeros(
num_rows,
num_cols * (self.num_classes - 1),
dtype=torch.float32,
device=tf.device,
)
else:
transformed_tensor = torch.zeros_like(
tf.feat_dict[stype.categorical],
dtype=torch.float32,
)
target_mean = self.target_mean.to(tf.device)
for i in range(len(tf.col_names_dict[stype.categorical])):
col_name = tf.col_names_dict[stype.categorical][i]
count = torch.tensor(
self.col_stats[col_name][StatType.COUNT][1],
device=tf.device,
)
feat = tensor[:, i]
max_cat = feat.max()
if max_cat >= len(count):
raise RuntimeError(
f"'{col_name}' contains new category '{max_cat}' not seen "
f"during fit stage.")
v = count[feat].unsqueeze(1).repeat(1, self.num_classes - 1)
start = i * (self.num_classes - 1)
end = (i + 1) * (self.num_classes - 1)
transformed_tensor[:, start:end] = ((v + target_mean) /
(self.data_size + 1))
# turn the categorical features into numerical features
if stype.numerical in tf.feat_dict:
tf.feat_dict[stype.numerical] = torch.cat(
(tf.feat_dict[stype.numerical], transformed_tensor),
dim=1).to(torch.float32)
tf.col_names_dict[stype.numerical] = tf.col_names_dict[
stype.numerical] + self.new_columns
else:
tf.feat_dict[stype.numerical] = transformed_tensor
tf.col_names_dict[stype.numerical] = self.new_columns
# delete the categorical features
tf.col_names_dict.pop(stype.categorical)
tf.feat_dict.pop(stype.categorical)
return tf