How to Build and Implement a Production-Grade Data Pipeline

Picture this: You do some analytics work with Jupyter Notebook. You type out one line of code, add another cell, import pandas into Python, read in a CSV, transform some of the values, then you write 400 more cells prior to getting the desired result and saving it as CSV.

The following week, you’re interested in generating the exact same result, but you completely forgot the portion of coding that you must run first, and which cell you must avoid running. You also must push enter about 400 times prior you producing what you’re seeking. It will require from you close to half of the day just to generate this result.

Also, picture this: The input data that you require is being generated by another person’s Jupyter Notebooks. You have to ask them to produce the data set for you before you begin, which they need to run from their error-prone notebooks. For you, this means that another half-day is wasted.

You might ask yourself if there’s something out there to make your scripts reproducible, maintainable and modular; a tool that helps you to separate out the concerns and versioning; and a something to help you to provide real-world ML results; it would make your life so much easier.

Introducing Kedro

https://unsplash.com/photos/Skf7HxARcoc

“Creative output depends on creative input” — Robert C. Martin

Kedro (https://github.com/quantumblacklabs/kedro) is an open-source framework in Python to make it easy for you to create scalable and robust data pipelines by offering uniform templates, data abstraction, settings, and pipeline creation. Through kedro, we produce machine learning/data analytics scripts with principles of software engineering.

We need to install & setups some things prior using Kedro. We are not diving into all of the details on installation of kedro, you can obtain those instructions here.

To start a new project in kedro, we could run kedro new with the CLI, then we must input the name of the project, name of the repository, and name of the python package. Let’s utilize bmi for each of the names.

$ kedro new
Project Name:
=============
Please enter a human readable name for your new project.
Spaces and punctuation are allowed.
[New Kedro Project]: bmi
Repository Name:
================
Please enter a directory name for your new project repository.
Alphanumeric characters, hyphens and underscores are allowed.
Lowercase is recommended.
[new-kedro-project]: bmi
Python Package Name:
====================
Please enter a valid Python package name for your project package.
Alphanumeric characters and underscores are allowed.
Lowercase is recommended. Package name must start with a letter or underscore.
[new_kedro_project]: bmi
Generate Example Pipeline:
==========================
Do you want to generate an example pipeline in your project?
Good for first-time users. (default=N)
[y/N]: N
Change directory to the project generated in /home/user/bmi


Here’s how a kedro project is typically structured:

bmi                     # Parent directory of the template
├── conf # Project configuration files
├── data # Local project data
├── docs # Project documentation
├── logs # Project output logs
├── notebooks # Project related Jupyter notebooks
├── README.md # Project README
├── setup.cfg # Configuration options for `pytest`
└── src # Project source code

There’s a few crucial concepts for kedro, but we shall focus on 3 concepts that are most significant: DataCatalog, Node, and Pipeline.

DataCatalog

DataCatalog is a registry of all of the sources of data that the project can employ. The concept of DataCatalog is powerful. We can locate every data source & sink in one location, as opposed to a plain Python script or a Jupyter Notebook in which the data definition is scattered.

The DataCatalog is kept within the yaml file under the name catalog.yml in the conf/bmi/ folder. As an example, if we have three CSVs as the data input/output, we can use one file to define all of them:

freshman_bmi: type: pandas.CSVDataSet filepath: data/01_raw/weight_bmi.csv load_args: sep: ‘,’ save_args: index: False decimal: . freshman_with_height: type: pandas.CSVDataSet filepath: data/02_primary/weight_bmi_height.csv load_args: sep: ‘,’ save_args: index: False decimal: . freshman_bmi_summary: type: pandas.CSVDataSet filepath: data/03_summary/weight_bmi_height_summary.csv load_args: sep: ‘,’ save_args: index: False decimal: .

The topmost level of the yaml key is the catalog name (e.g freshman_bmi), this name will be used by Node later as a reference to the input/output data. We should also define the type and file path of the data.

In our example, we utilize the pandas.CSVDataSet type, but we can make use of other types such as SparkDataSet , pandas.ExcelDataSet , pandas.SQLQueryDataSet , and others. You obtain the entire DataSet type here.

Additionally, we could define arguments for load/save, such as which CSV separator we use, or whether we append or overwrite the file…etc.

Node

In kedro, Node serves as a python function wrapper that names what the outputs and inputs are of that function. We can connect a Node to another Node by setting the output of a node as the input to a second node.

As an example, let’s say there’s two tasks:

  1. Calculate height based on weight & BMI info, and save it to CSV
  2. Calculate average weight, height, and BMI for each gender, and save it to CSV

First, we need to make a function for each:

import pandas as pdimport numpy as np def calculate_height(df): df[“height”] = np.sqrt(df[“weight”] / df[“bmi”]) return df def calculate_avg_by_gender(df): df = df.groupby(‘gender’).mean() return df

The are several parameters in node :

  • func: A function corresponding to node logic.
  • inputs: The name or the list of the names of variables used as inputs to the function. We can put the catalog name that we defined on catalog.yml here.
  • outputs: The name or the list of the names of variables used as outputs to the function. We can put the catalog name that we defined on catalog.yml here.
  • name: Optional node name to be used when displaying the node in logs or any other visualisations.
  • tags: Optional set of tags to be applied to the node.

In this example, we then have to create 2 nodes:

import pandas as pdimport numpy as npfrom kedro.pipeline import node def calculate_height(df): df[“height”] = np.sqrt(df[“weight”] / df[“bmi”]) return df def calculate_avg_by_gender(df): df = df.groupby(‘gender’).mean() return df nodes = [ node( func=calculate_height, inputs=”freshman_bmi”, outputs=”freshman_with_height”, name=”calculate_height”, ), node( func=calculate_avg_by_gender, inputs=”freshman_with_height”, outputs=”freshman_bmi_summary”, name=”calculate_avg_by_gender”, ),]

As we see with the first node, named calculate_height, take freshman_bmi as the input of the function, and save the outputs as freshman_with_height.

The variables freshman_bmi and freshman_with_height are both defined in catalog.yml as a CSV file with their file path, so the function will read/write the data according to the file paths & types as defined.

The second node gets the input from the output of the first node, then save it as CSV to freshman_bmi_summary as defined on catalog.yml.

Pipeline

A pipeline organizes the dependencies and execution order of your collection of nodes and connects inputs and outputs while keeping your code modular.

The pipeline determines the node execution order by resolving dependencies and does notnecessarily run the nodes in the order in which they are passed in. The pipeline contains one or more nodes.

import pandas as pdimport numpy as npfrom kedro.pipeline import nodefrom kedro.pipeline import Pipeline def calculate_height(df): df[“height”] = np.sqrt(df[“weight”] / df[“bmi”]) return df def calculate_avg_by_gender(df): df = df.groupby(‘gender’).mean() return df pipeline = Pipeline( [ node( func=calculate_height, inputs=”freshman_bmi”, outputs=”freshman_with_height”, name=”calculate_height”, ), node( func=calculate_avg_by_gender, inputs=”freshman_with_height”, outputs=”freshman_bmi_summary”, name=”calculate_avg_by_gender”, ), ], tags=”bmi_pipeline”,)

In this example, we can utilize the code from to create a pipeline.

In terms of best practices, typically, we split the pipeline and nodes into two separate python files, to have nodes.py and pipeline.py under the folder src/bmi/pipelines/:

import pandas as pdimport numpy as np def calculate_height(df): df[“height”] = np.sqrt(df[“weight”] / df[“bmi”]) return df def calculate_avg_by_gender(df): df = df.groupby(‘gender’).mean() return df

from kedro.pipeline import Pipeline, node from src.bmi.pipelines.nodes import ( calculate_height, calculate_avg_by_gender,) def create_pipeline(): return Pipeline( [ node( func=calculate_height, inputs=”freshman_bmi”, outputs=”freshman_with_height”, name=”calculate_height”, ), node( func=calculate_avg_by_gender, inputs=”freshman_with_height”, outputs=”freshman_bmi_summary”, name=”calculate_avg_by_gender”, ), ], tags=”bmi_pipeline”, )

There’s 2 types of pipelines: the main one and the sub one.
The one that we create above is the sub-pipeline. The main pipeline combines all the sub-pipelines in the project. The main pipeline file is automatically generated when we create a new kedro project. In this example, the main pipeline should be available at src/bmi/pipeline.py.

We must “register” the sub-pipelines which we made, by importing them and calling create_pipeline() function in the main pipeline file, via the following:

“””Construction of the master pipeline.””” from typing import Dict from kedro.pipeline import Pipeline from src.bmi.pipelines import pipeline as bmi def create_pipelines(**kwargs) -> Dict[str, Pipeline]: “””Create the project’s pipeline. Args: kwargs: Ignore any additional arguments added in the future. Returns: A mapping from a pipeline name to a “Pipeline“ object. “”” bmi_pipeline = bmi.create_pipeline() return { “__default__”: bmi_pipeline, “bmi_pipeline”: bmi_pipeline, }

Executing Kedro Command

After making a pipeline, we could use the command kedro run to run the entire pipeline:

$ kedro run --env bmi --pipeline bmi_pipeline2021-01-15 17:06:03,333 - kedro.io.data_catalog - INFO - Loading data from `freshman_bmi` (CSVDataSet)...
2021-01-15 17:06:03,344 - kedro.pipeline.node - INFO - Running node: calculate_height: calculate_height([freshman_bmi]) -> [freshman_with_height]
2021-01-15 17:06:03,365 - numexpr.utils - INFO - NumExpr defaulting to 4 threads.
2021-01-15 17:06:03,374 - kedro.io.data_catalog - INFO - Saving data to `freshman_with_height` (CSVDataSet)...
2021-01-15 17:06:03,390 - kedro.runner.sequential_runner - INFO - Completed 1 out of 2 tasks
2021-01-15 17:06:03,391 - kedro.io.data_catalog - INFO - Loading data from `freshman_with_height` (CSVDataSet)...
2021-01-15 17:06:03,398 - kedro.pipeline.node - INFO - Running node: calculate_avg_by_gender: calculate_avg_by_gender([freshman_with_height]) -> [freshman_bmi_summary]
2021-01-15 17:06:03,409 - kedro.io.data_catalog - INFO - Saving data to `freshman_bmi_summary` (CSVDataSet)...
2021-01-15 17:06:03,417 - kedro.runner.sequential_runner - INFO - Completed 2 out of 2 tasks
2021-01-15 17:06:03,418 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.

The --env defines which settings we shall use. Since we place our catalog on the folder conf/bmi, then we shall pass bmi as the value. We also specify the pipeline name in the --pipeline argument.

Another thing worth mentioning: we run the pipeline which contains two nodes and generates two CSV files in less than a second!

Apart from the pipeline, we can also use --node and --tag :

$ kedro run --env bmi --node calculate_height

2021-01-15 17:17:12,367 - kedro.io.data_catalog - INFO - Loading data from `freshman_bmi` (CSVDataSet)...
2021-01-15 17:17:12,375 - kedro.pipeline.node - INFO - Running node: calculate_height: calculate_height([freshman_bmi]) -> [freshman_with_height]
2021-01-15 17:17:12,399 - numexpr.utils - INFO - NumExpr defaulting to 4 threads.
2021-01-15 17:17:12,404 - kedro.io.data_catalog - INFO - Saving data to `freshman_with_height` (CSVDataSet)...
2021-01-15 17:17:12,419 - kedro.runner.sequential_runner - INFO - Completed 1 out of 1 tasks
2021-01-15 17:17:12,420 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.
$ kedro run --env bmi --tag bmi_pipeline2021-01-15 17:18:37,251 - kedro.io.data_catalog - INFO - Loading data from `freshman_bmi` (CSVDataSet)...
2021-01-15 17:18:37,257 - kedro.pipeline.node - INFO - Running node: calculate_height: calculate_height([freshman_bmi]) -> [freshman_with_height]
2021-01-15 17:18:37,277 - numexpr.utils - INFO - NumExpr defaulting to 4 threads.
2021-01-15 17:18:37,285 - kedro.io.data_catalog - INFO - Saving data to `freshman_with_height` (CSVDataSet)...
2021-01-15 17:18:37,307 - kedro.runner.sequential_runner - INFO - Completed 1 out of 2 tasks
2021-01-15 17:18:37,308 - kedro.io.data_catalog - INFO - Loading data from `freshman_with_height` (CSVDataSet)...
2021-01-15 17:18:37,315 - kedro.pipeline.node - INFO - Running node: calculate_avg_by_gender: calculate_avg_by_gender([freshman_with_height]) -> [freshman_bmi_summary]
2021-01-15 17:18:37,324 - kedro.io.data_catalog - INFO - Saving data to `freshman_bmi_summary` (CSVDataSet)...
2021-01-15 17:18:37,333 - kedro.runner.sequential_runner - INFO - Completed 2 out of 2 tasks
2021-01-15 17:18:37,334 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.

Kedro Viz

Another cool feature of kedro is Kedro-Viz. It shows you how your data pipelines are structured. With Kedro-Viz you can:

  • See how your datasets and Python functions (nodes) are resolved in Kedro so that you can understand how your data pipeline is built
  • Get a clear picture when you have lots of datasets and nodes by using tags to visualise sub-pipelines
  • Search for nodes and datasets

This is how our BMI data pipeline looks like, as we can see clearly on the viz, we have three Data Catalog/Definition, and two Nodes. We can also see the connection between the data catalog & nodes:

kedro-viz for bmi_pipeline

Kedro In Telkomsel

We heavily use kedro in several data-related projects within Telkomsel. We use kedro in our production environment which consumes tens of TBs of data, runs hundreds of feature engineering tasks, and serves dozens of Machine Learning models.

Thanks to using kedro things are going more smoothly than ever before. Some of the benefits include the following:

  • Collaboration between the data science and data engineering is running smoothly now.
  • There’s one truth source for our feature logic, data sources & sinks, and settings
  • A complex end-to-end data pipeline runs with only a couple of commands
  • The visualizations for the data pipeline help to debug and to explain the data pipeline to our the business end-users
  • The data science scripts are high-quality, which is assured by the integration tests and the unit tests

If you’re curious, here’s how one of the kedro m pipelines looks:

Leave a comment

Your email address will not be published. Required fields are marked *