Chapter 5B: Data Processing
Data processing is a crucial step in any machine learning pipeline, serving to transform raw data into a clean and structured format suitable for model training. By adhering to the best practices introduced in this chapter, you ensure that your model is trained on high-quality inputs, which directly affects the accuracy, robustness, and maintainability of your model.
Why Data Processing Is Essential
Raw datasets are seldom “model‑ready.” Typical issues include: missing values, outliers, mixed data types, inconsistent feature scales, and temporal leakage. A robust processing pipeline resolves these problems reproducibly, so the exact same transformations can be applied in training, validation, and production inference.
5B.1. Core Processing Techniques
Data Imputation
Data imputation is the process of replacing missing or invalid values in a dataset with plausible estimates so that the dataset can be used by analytical or machine‑learning algorithms that require complete, numerical input. Common strategies include filling with summary statistics (mean, median, mode), using model‑based predictions, copying from similar records, or propagating last valid observations in time‑series data. Imputation preserves the overall data structure and sample size, reducing information loss and preventing errors that many algorithms encounter when they encounter NaN or null entries.
Outlier Detection
Outlier detection is the process of identifying data points whose values deviate markedly from the overall pattern of a dataset. Such points—called outliers, anomalies, or novelties—may arise from measurement errors, data‑entry mistakes, natural rare events, or previously unseen behaviors. Detecting them helps improve model robustness, data quality, and decision‑making by allowing practitioners to investigate, correct, remove, or separately model these atypical observations. Outliers can skew model performance and may represent errors in data entry or genuinely rare but valid observations. Detecting them can help decide whether they should be removed, capped, or analyzed separately.
One-Hot Encoding of Categorical Variables
One‑hot encoding converts a categorical feature with n possible values into n binary (0/1) indicator columns, where exactly one column is 1 for each record and 0 for all other columns. This transforms non‑numeric categories into a numerical format that most machine‑learning algorithms can process without imposing any ordinal relationship among the categories. Columns in the HDB resale price dataset like town, flat_type, and flat_model are prime candidates for one-hot encoding.
Feature Scaling of Numerical Variables
Feature scaling is the process of transforming numerical variables so they share a common scale—typically by standardizing them to zero‑mean/unit‑variance or rescaling them to a fixed range (e.g., 0 to 1). This prevents variables with larger magnitudes from dominating distance measures or gradient updates, speeds up optimization, and improves convergence and interpretability in many ML algorithms.
Feature Engineering
Feature engineering is the discipline of creating, transforming, or selecting input variables so they capture the most predictive and meaningful information for a ML model. It encompasses deriving new features from raw data (e.g., ratios, interaction terms, time lags), encoding categorical or text attributes, aggregating temporal or spatial patterns, and removing or combining less‑informative variables. Effectively engineered features can significantly improve model performance by highlighting relevant relationships within the data. Domain knowledge is often critical to extracting features that are predictive.
Train-Validation-Test Split
Train–validation–test split is the practice of partitioning a dataset into three disjoint subsets:
- A Train set – data the model learns from.
- A Validation set – data used for hyperparameter tuning and early‑stopping decisions during development.
- A Test set – data held out until final evaluation to estimate real‑world generalisation.
Separating data into train, validation, and test sets prevents overfitting and provides an unbiased measure of model performance.
Overview of Different Data Processing Approaches
| Technique | What It Does | Problems Solved | Popular Python Tools |
|---|---|---|---|
| Data Imputation | Fills in missing values. | NaN, blank strings. |
sklearn.impute, pandas.fillna, fancyimpute, miceforest, MissForest |
| Outlier Detection | Flags or caps extreme points. | Skewed stats, noisy sensors. | pyod, scipy.stats, IsolationForest, alibi_detect, PyNomaly |
| One‑Hot Encoding | Converts categoricals to binary vectors. | Algorithms need numeric input; avoids ordinal bias. | sklearn.preprocessing.OneHotEncoder, pandas.get_dummies |
| Feature Scaling | Standardises or normalises numeric features. | Unequal scales slow or bias learning. | sklearn.preprocessing.StandardScaler, sklearn.preprocessing.MinMaxScaler, sklearn.preprocessing.RobustScaler, numpy, pandas |
| Feature Engineering | Creates new informative variables. | Captures domain insight, non‑linear effects. | pandas, numpy |
| Train‑Val‑Test Split | Separates data chronologically or randomly. | Prevents leakage, enables fair evaluation. | sklearn.model_selection.train_test_split |
5B.2. Integrating a Processing Step into a SageMaker Pipeline
Below is the pipeline.py fragment that registers a preprocessing job as a ProcessingStep.
# pipeline.py
sklearn_processor = SKLearnProcessor(
framework_version="1.2-1",
instance_type=processing_instance_type,
instance_count=processing_instance_count,
base_job_name=f"{base_job_prefix}/preprocess",
sagemaker_session=pipeline_session,
role=role,
)
process_args = sklearn_processor.run(
outputs=[
ProcessingOutput(
output_name="train",
destination=f"{output_s3_path}/processing",
source="/opt/ml/processing/train",
),
ProcessingOutput(
output_name="validation",
destination=f"{output_s3_path}/processing",
source="/opt/ml/processing/validation",
),
ProcessingOutput(
output_name="test",
destination=f"{output_s3_path}/processing",
source="/opt/ml/processing/test",
),
],
code=os.path.join(BASE_DIR, "preprocess.py"),
arguments=["--input-data", input_data],
)
step_process = ProcessingStep(
name="PreprocessData",
step_args=process_args,
)
Key Points
- SKLearnProcessor spins up a managed scikit‑learn image, which executes a
preprocess.pyfile. - ProcessingInput/ProcessingOutput map S3 paths to container folders. After the processing script finishes, SageMaker uploads all files in
/opt/ml/processing/{train,validation,test}back to S3. - The resulting
step_processnode then feeds clean datasets to downstream steps (tuning, evaluation, etc.).
Below is the preprocess.py file that contains the data processing code applied to the HDB resale dataset.
# preprocess.py
import argparse
import logging
import os
import pathlib
import boto3
import numpy as np
import pandas as pd
from urllib.parse import urlparse
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
# Define input (categorical and numerical) and output features
label_column = "resale_price"
input_features = [
"town",
"flat_type",
"storey_range",
"floor_area_sqm",
"flat_model",
"remaining_lease_years",
]
categorical_features = ["town", "flat_type", "storey_range", "flat_model"]
numeric_features = [col for col in input_features if col not in categorical_features]
# Define train, validation, test split conditions
validation_quarter = pd.Period("2024Q2")
test_quarter = pd.Period("2024Q3")
new_test_quarter = pd.Period("2024Q4") # To simulate data drift over time
def apply_transformation(
df: pd.DataFrame, preprocess: ColumnTransformer, fit_transform: bool = True
) -> pd.DataFrame:
"""Apply feature transformation for both numerical and categorical features and
returns the transformed features with output column as the first column.
Args:
df (pd.DataFrame): The unprocessed dataset with both input and output columns.
preprocess (ColumnTransformer): ColumnTransformer class object
fit_transform (bool): Indicates if fit_transform or transform will be applied to the input columns.
Returns:
np.array: A 2D array containing the transformed features.
"""
y = df.pop(label_column)
if fit_transform:
X_pre = preprocess.fit_transform(df).toarray()
else:
X_pre = preprocess.transform(df).toarray()
y_pre = y.to_numpy().reshape(len(y), 1)
X = np.concatenate((y_pre, X_pre), axis=1)
return X
if __name__ == "__main__":
##############################
# Download Data from S3
##############################
logger.info("Starting preprocessing.")
parser = argparse.ArgumentParser()
parser.add_argument("--input-data", type=str, required=True)
args = parser.parse_args()
base_dir = "/opt/ml/processing"
pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True)
# Parse the input S3 path
input_data = args.input_data
parsed_s3_url = urlparse(input_data)
bucket = parsed_s3_url.netloc # S3 bucket name
key = parsed_s3_url.path.lstrip("/") # S3 object key without leading '/'
logger.info("Downloading data from bucket: %s, key: %s", bucket, key)
fn = f"{base_dir}/data/resale-dataset.csv" # Output path
# Download from S3
s3 = boto3.resource("s3")
try:
s3.Bucket(bucket).download_file(key, fn)
logger.info("Successfully downloaded the data.")
except Exception as e:
logger.error(f"Failed to download data from S3: {e}")
raise
##############################
# Load Data into Pandas
##############################
logger.info("Reading downloaded data.")
df = pd.read_csv(fn)
os.unlink(fn)
##############################
# Process Data
##############################
logger.info("Formatting time stamp column.")
df["time"] = pd.to_datetime(df["month"] + "-01", format="%Y-%m-%d")
df["quarter"] = df["time"].dt.to_period("Q")
df.sort_values("time", inplace=True)
logger.info("Performing feature engineering.")
df["remaining_lease_years"] = df["remaining_lease"].apply(
lambda x: int(x.split("years")[0].strip())
)
logger.info("Splitting train, validation, and test sets.")
train_df = df[df["quarter"] < validation_quarter].reset_index(drop=True)
validation_df = df[df["quarter"] == validation_quarter].reset_index(drop=True)
test_df = df[df["quarter"] == test_quarter].reset_index(drop=True)
logger.info("Dropping irrelevant fields.")
train_df = train_df[input_features + [label_column]]
validation_df = validation_df[input_features + [label_column]]
test_df = test_df[input_features + [label_column]]
logger.info("Defining transformers.")
numeric_transformer = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler()),
]
)
categorical_transformer = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
("onehot", OneHotEncoder(handle_unknown="ignore")),
]
)
preprocess = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
]
)
logger.info("Applying transforms.")
train_array = apply_transformation(train_df, preprocess, fit_transform=True)
validation_array = apply_transformation(
validation_df, preprocess, fit_transform=False
)
test_array = apply_transformation(test_df, preprocess, fit_transform=False)
##############################
# Save Data Locally
##############################
logger.info("Writing out datasets to %s.", base_dir)
pd.DataFrame(train_array).to_csv(
f"{base_dir}/train/train.csv", header=False, index=False
)
pd.DataFrame(validation_array).to_csv(
f"{base_dir}/validation/validation.csv", header=False, index=False
)
pd.DataFrame(test_array).to_csv(
f"{base_dir}/test/test.csv", header=False, index=False
)
- Argument Parsing & Setup: Reads an S3 path (
--input-data), configures logging, and prepares local directories under/opt/ml/processing. - Data Ingestion :Downloads the raw CSV from S3 into the processing container using
boto3. - Initial Load & Cleanup: Loads the CSV into a Pandas DataFrame and removes the temporary file.
- Temporal & Feature Engineering: Parses the
monthcolumn into a datetime/quarter, sorts chronologically, and derives new features (e.g.,remaining_lease_years). - Chronological Splitting: Splits the data into train, validation, and test sets based on predefined quarter boundaries to avoid future information leakage.
- Preprocessor Definition: Defines numeric and categorical pipelines (imputation, scaling, one‑hot encoding) and combines them in a
ColumnTransformer. - Apply Transformations: Fits the transformer on the train set and applies it to all splits, concatenating the target label with transformed features.
- Export Processed Data: Writes headerless CSVs for train, validation, and test to
/opt/ml/processing/{train,validation,test}, ready for downstream pipeline steps.
5B.3. ßSummary
In this chapter, we covered different data processing approaches, including data imputation, outlier detection, one-hot-encoding, feature scaling, feature engineering, and train-validation-test splitting. Each step is important from an MLOps best practice perspective, as it ensures your dataset remains consistent, high-quality, and ready for model training.
By integrating these data processing steps into your existing codebase, you will have a robust preprocessing pipeline and ensure that subsequent stages—such as hyperparameter tuning and model evaluation—are built on a solid foundation of well-prepared data.