Data Engineering Essentials #1: Your First Pipeline

October 25, 2023
October 25, 2023 Jasper

Introduction to Data Engineering Essentials

Data Engineering can be a complex field to get started in. In addition of having to know languages such as Python and SQL, there are so many different tools and frameworks out there. A few examples of this are Pandas, PyArrow, Airflow, Prefect, Spark, Kafka,  dbt, Docker, Kubernetes, Terraform… and the list goes on.. And then there are databases, data lakes, data warehouses and cloud platforms!

I find that many of the popular data engineering tutorials and projects focus on more intermediate students, that already know a bunch of the general principles. I have found this very frustrating while starting, and this is why I aim to write an extensive series of articles, in which we start with the absolute basics: a basic Python script (I guess you could call it a pipeline!) which fetches data from the Web and saves it into a database. And that’s all we will cover.

I will try to following best practices within the scope of the article, but I will not introduces a ton of data engineering concepts from the get go. Instead, I will improve on the project by introducing new technologies in the following parts. This way, I hope I can show why we use certain tools, while bringing the project closer to best practice with each step.

What we will cover in this part

In this part we will start with the data engineering basics, while trying to fetch some data on the Danish power system. The data shows the Danish power generation on an hourly basis. The dataset can be found here at Energinets website:

https://www.energidataservice.dk/

The specific API endpoint we will use for now is this one:

https://api.energidataservice.dk/dataset/PowerSystemRightNow

I will try to cover Python basics and best practices while building a basic data engineering pipeline. l will guide you through the process of getting started with Python, including setting up pip, managing project dependencies with a requirements.txt file, and using virtual environments to isolate your projects.

Prerequisites

Python

All right, let’s go! First up, we need to ensure you have Python running on your system.

Before we begin, make sure you have Python installed on your system. You can check if Python is installed by opening a terminal and running the following command:

python --version

If Python is not installed, download and install the latest version of Python from the official website: Python Downloads.

Code Editor

To create our scripts we need an IDE. I strongly recommend VS Code, which can be found here: https://code.visualstudio.com/

Git

While not strictly necessary, you will at some point need to learn to work with Git, a source control system. This allows you to save your codes changes, work together with other developers, and so much more. I will be committing starting and ending code for each of my articles in this series. This way you can simply download (clone) my code and move on from there.

Ready to go?

Alright, now we should have the bare essentials ready to start coding our first pipeline!

Hello World!

Startup your IDE and create a new folder called de-essentials, and create a file called pipeline.py. Or if you like using your terminal create a new directory by code:

mkdir de-essentials

cd de-essentials

touch pipeline.py

code .

Here we create a new directory, move into it, and create an empty python script. The last line simply opens your directory in VS Code.

Let’s start by testing that everything works. Simply enter the following print statement in your script file:

print("Hello World")

Now, go back to your terminal and run:

python3 pipeline.py

And sure enough:

Hurraah! Now, let’s do some data engineering!

Our first ETL pipeline

Let’s build our first ETL pipeline. ETL stands for Extract, Transform and Load. In other words, we build a process which first fetches data, transforms it, and afterwards loads it into a database.

Let’s start our pipeline.py script with a docstring. What is this I hear you say? It is a documentation string at the top of your file (or function) that other people that read your code (or yourself a month from now!) can use to understand your code. If you use it in functions it will also show up as you try to autocomplete function calls.

We do it like this:

'''
A pipeline which fetches data from an API, transforms the data, and uploads it into a SQLite database through pandas.
'''

Now, we need to fetch some data from our API. For this article I will be using https://api.energidataservice.dk/meta/dataset/PowerSystemRightNow

For this we are going to need the requests library. For data wrangling we will use pandas. And for writing to our SQLite database we use the create_engine from the sqlalchemy package.

import requests
import pandas as pd
from sqlalchemy import create_engine

But you will get an error: ModuleNotFoundError: No module named ‘requests’.

This is because we need to install these packages. We can use pip for this, which is a Python package manager. You could simply use pip install package_name, but if you do this straight in the terminal now you will install them into your Python base installation. We should not do this before creating a virtual environment.

Creating a Virtual Environment

A virtual environment is a self-contained directory where you can install dependencies for a specific project without interfering with the system-wide Python installation or other projects. It keeps your project self-contained, and especially if you need to start using specific package versions it will help to get things working.

To create a virtual environment we can use the following command:

python3 -m venv <venv_name>

We can then activate the virtual environment:

source venv_name/bin/activate

I picked the venv name “de-essentials”.

Your terminal prompt should change to indicate that you’re now in the virtual environment, by showing the environment name between parentheses in the begining of the command prompt.

Now we are ready to install those packages.

pip

pip is a package manager for Python, which is used to install, upgrade, and manage Python packages. It usually comes pre-installed with Python, but you should ensure it’s up to date. To upgrade pip to the latest version, use the following command:

pip install --upgrade pip

Installing and Importing Packages

Now that you have a virtual environment set up, you can start installing Python packages using pip. You can install packages individually, or you can list all your project’s dependencies in a requirements.txt file and install them in one go.

To install a single package, simply run:

pip install package_name

Replace package_name with the name of the package you want to install.

Another possibility is the use of a requirements.txt file, which makes it easy to document and replicate your project’s dependencies. This is especially useful if you share your code to others. When putting your project on Github you can then avoid committing all packages, but simply suffice with  committing the requirements.txt file. Other people can then install the required packages at once. To install all the packages listed in the requirements.txt file in your virtual environment, use the following command:

pip install -r requirements.txt

After you have installed packages in your virtual environment, you can then generate a requirements.txt file with the use of the pip freeze command :

pip freeze > requirements.txt

To conclude, enter the following commands:

pip install requests
pip install pandas
pip install sqlalchemy

pip freeze > requirements.txt

We code, at last

Now it is time to start coding! We will do this in three parts, and every part will be in its own function. We will starting fetching the data, do some transforming, and finally add it into a database.

Fetching the data

We start by entering the relevant import statements:

import requests
import pandas as pd
from sqlalchemy import create_engine

Let’s continue by saving the url in a variable:

url =  'https://api.energidataservice.dk/meta/dataset/PowerSystemRightNow?limit=1000'

I have put a limit of 100 on the API call.

We can move this together with the rest of the code in its own function (and remember to use a docstring!):

def fetch_data():
    """ 
    Fetches data from https://api.energidataservice.dk/meta/dataset/PowerSystemRightNow into     a variable
    """
    url = 'https://api.energidataservice.dk/meta/dataset/PowerSystemRightNow'
    data = requests.get(url).json()
    return data["records"]

Nothing to difficult here. We fetch the data, transfer it into JSON format, and then everything within the records key.

Now we just need to call this function from a main function. The __main__ special variable  gets called automatically if you run the script from the terminal.

def main():
    '''
    Calls the different functions
    '''
    print(fetch_data())

if __name__ == "__main__":
    main()

The if __name__ == ‘__main__’  conditional statement is a Python programming construct that controls the execution of a script. It is basically the starting point of your script. We use it to call the main function, which in turns calls everything else.

Run the script again, and hopefully you see a bunch of JSON from the API printed out. You can now remove the print statement around fetch_data and save the returned JSON in a variable instead.

def main():
    '''
    Calls the different functions
    '''
    json = fetch_data()

Now let’s start working with the data with Pandas.

Transforming the data

Now it is time to enter the data into a pandas DataFrame.

def transform(data):
    """ Transforms the dataset"""
    df = pd.DataFrame(data)
    print(f"Total number of rows {len(data)}")
    return df

We see 1000 rows being returned, which makes sense since we put a limit of 1000 on the API call.

Now let’s edit the main function:

def main():
    '''
    Calls the different functions
    '''
    json = fetch_data()
    df = transform(json)

If you print the dataframe you will see something as follows:

You might notice that 6 of the columns are None. Let’s remove all 6 columns:

def transform(data):
    """ Transforms the dataset"""
    df = pd.DataFrame(data)
    print(f"Total number of rows {len(data)}")

    df.drop(columns=['aFRR_ActivatedDK1', 'aFRR_ActivatedDK2',
            'mFRR_ActivatedDK1', 'mFRR_ActivatedDK2', 'ImbalanceDK1', 'ImbalanceDK2'], inplace=True)
    return df

This is a very simple example of some data transformation. Let’s leave it like that for now.

Now all we need is to load the data into a database.

Loading the data into the database

Now, all that is needed is to enter the data in a simple SQLite database.

We use SQLAlchemy’s create_engine function to create a engine, which is used to save the dataframe into SQLite.

def ingest(df):
    """ Ingests data into SQLite database"""
    engine = create_engine('sqlite:///data.db')
    df.to_sql('energy', engine, if_exists='replace')

You should see a data.db file in your project directory. If you want you can use a GUI interface such as DBeaver to look at it.

Final code

That was all! Great job! Here is the complete code:

''' 
A pipeline which fetches data from an API, transforms the data, and uploads it into a SQLite database through pandas. 
'''

import requests
import pandas as pd
from sqlalchemy import create_engine

def fetch_data():
    """ 
    Fetches data from https://api.energidataservice.dk/meta/dataset/Gasflow into a variable
    """
    url = 'https://api.energidataservice.dk/dataset/PowerSystemRightNow?limit=1000'
    data = requests.get(url).json()
    return data["records"]

def transform(data):
    """ Transforms the dataset"""
    df = pd.DataFrame(data)
    print(f"Total number of rows {len(data)}")

    df.drop(columns=['aFRR_ActivatedDK1', 'aFRR_ActivatedDK2',
            'mFRR_ActivatedDK1', 'mFRR_ActivatedDK2', 'ImbalanceDK1', 'ImbalanceDK2'], inplace=True)
    return df

def ingest(df):
    """ Ingests data into sqllite database"""
    engine = create_engine('sqlite:///data.db')
    df.to_sql('energy', engine, if_exists='replace')

def main():
    '''
    Calls the different functions
    '''
    json = fetch_data()
    df = transform(json)
    print(df)
    ingest(df)


if __name__ == "__main__":
    main()

 

Or you can look at my repository:

https://github.com/JAlblas/de-essentials

Deactivating the Virtual Environment

As a final tip related to virtual envoronments. When you’re done working on your project, you can deactivate the virtual environment to return to your system’s global Python environment.

deactivate

Conclusion

This was hopefully a useful introduction to some of the most basic tools every data engineer needs. To be continued.

, ,