Building a Data Engineering Pipeline with Python, Prefect, dbt, Terraform, Google Cloud Storageā¦
24 December, 2023
22
22
0
Contributors
TLDR: See the final product here: Dashboard Link
My project aims to extract Formula 1 racing data from the Ergast API, transform it using dbt, store it in Google Cloud Storage and Google BigQuery, and visualize the final results with Looker Data Studio. Letās take a closer look at each stage of the pipeline and the tools involved.
Terraform
To manage the necessary cloud resources, we employ Terraform, an infrastructure as code tool. Terraform allows us to define and provision the infrastructure required for our data engineering pipeline in a declarative manner. Using the GCP Terraform provider, we can provision Google Cloud Storage buckets, Google BigQuery datasets, and other necessary resources with ease. Terraform also ensures that our infrastructure is version-controlled and can be easily replicated across different environments.
Hereās a terraform config file:
terraform {
required_version = ">= 1.0"
backend "local" {} # Can change from "local" to "gcs" (for google) or "s3" (for aws), if you would like to preserve your tf-state online
required_providers {
google = {
source = "hashicorp/google"
}
}
}
provider "google" {
project = var.project
region = var.region
// credentials = file(var.credentials) # Use this if you do not want to set env-var GOOGLE_APPLICATION_CREDENTIALS
}
# Data Lake Bucket
# Ref: https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/storage_bucket
resource "google_storage_bucket" "data-lake-bucket" {
name = "f1-data-bucket" # Concatenating DL bucket & Project name for unique naming
location = var.region
# Optional, but recommended settings:
storage_class = var.storage_class
uniform_bucket_level_access = true
versioning {
enabled = true
}
lifecycle_rule {
action {
type = "Delete"
}
condition {
age = 30 // days
}
}
force_destroy = true
}
# DWH
# Ref: https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/bigquery_dataset
resource "google_bigquery_dataset" "dataset" {
dataset_id = var.BQ_DATASET
project = var.project
location = var.region
delete_contents_on_destroy = true
}
This Terraform configuration file sets up infrastructure on Google Cloud Platform (GCP). It configures a GCP Storage Bucket named "f1-data-bucket"
and a Google BigQuery dataset with settings for dataset name, project, and location, enabling automatic deletion of all contents when the dataset is removed. After configuring your Terraform project like this, you can run terraform init
to initialize it and terraform apply
to create the specified resources in your GCP project. Terraform will generate a state file to keep track of the created resources.
Fetching and Preparing the Data
To kickstart our pipeline, we use Python to fetch data from the Ergast F1 API.
Our pipeline will have the following functions:
createDownloadsFolder()
task creates a folder called"downloads"
if it doesn't already exist. This folder will be used to store the downloaded F1 data.downloadFile()
task fetches the F1 data from the Ergast API. It uses therequests
library to download the data in chunks and saves it to a specified file path. It also measures the execution time of the download process.unzipFileToCSV()
task extracts the downloaded ZIP file containing the F1 data and saves the extracted CSV files to the"downloads"
folder.deleteZipFile()
task removes the downloaded ZIP file once the CSV files have been extracted.
def downloadFile(uri: str, file_path: str):
# get filename from url
uri_res_name = uri.split("/")[-1]
# create path where file will be saved
filepath = os.path.join(file_path, uri_res_name)
timeNow = time.time()
r = requests.get(uri, stream=True)
if r.ok:
print("saving to:", os.path.abspath(filepath))
with open(filepath, "wb") as f:
for chunk in r.iter_content(chunk_size=1024 * 8):
if chunk:
f.write(chunk)
f.flush()
os.fsync(f.fileno())
else:
print("Download failed, status code {}\{}".format(r.status_code, r.text))
return "Error downloading file"
unzipFileToCSV(filepath)
deleteZipFile(filepath)
return f"Total Download Execution Time: {time.time() - timeNow} seconds"
Uploading to Google Cloud Storage
The uploadToStorage()
task uploads the cleaned CSV files to Google Cloud Storage. It utilizes the google-cloud-storage
library and the Client()
class to create a storage client and connect to the desired bucket. It iterates over the CSV files in the "downloads"
folder, creates a blob for each file, and uploads it to the specified bucket.
Loading Data into Google BigQuery
The makeBigQueryTables()
task loads the data from the uploaded CSV files into Google BigQuery. It uses the google-cloud-bigquery
library to create a BigQuery client and connect to the desired dataset. It lists the blobs in the storage bucket, retrieves each CSV file's name, and creates a table in BigQuery with a corresponding name. It configures the job to read the CSV files as the data source, skipping the header row, and autodetecting the schema.
def makeBigQueryTables(dataset_id: str, bucket_name: str):
storage_client = Client()
bigquery_client = QueryClient()
timeNow = time.time()
bucket = storage_client.bucket(bucket_name)
blobs = bucket.list_blobs()
for blob in blobs:
file_name = blob.name
table_id = file_name.replace(".csv", "").replace("/", "_")
table_ref = bigquery_client.dataset(dataset_id).table(table_id)
job_config = LoadJobConfig()
job_config.source_format = SourceFormat.CSV
job_config.skip_leading_rows = 1
job_config.autodetect = True
load_job = bigquery_client.load_table_from_uri(
f"gs://{bucket_name}/{file_name}", table_ref, job_config=job_config
)
load_job.result()
return f"Total Table Created Time: {time.time() - timeNow} seconds"
Transforming Data with dbt
The trigger_dbt_flow()
task triggers a dbt (Data Build Tool) operation. It uses the prefect_dbt
library to interact with dbt. Within the task, the DbtCliProfile
and DbtCoreOperation
classes are utilized to define and execute dbt commands, such as debug, dependencies installation, seeding, and building.
Visualization and Reporting with Looker Data Studio
To visualize and report on our transformed data, we integrate Looker Data Studio into our pipeline. With Looker, we can explore and analyze our data, create interactive visualizations, and share insights with stakeholders. Its integration with Google BigQuery ensures real-time access to the latest data, enabling timely decision-making.
- Setup your Google Cloud environment
2. Install all required dependencies into your environment
pip install -r requirements.txt
3. Install Terraform
4. Run these commands
cd terraform/
terraform init
terraform plan -var="project=<your-gcp-project-id>"
terraform apply -var="project=<your-gcp-project-id>"she
5. Run the pipeline
python prefect/web_to_gcs.py
In this blog post, I explored the end-to-end process of building a data engineering pipeline. We leveraged Python for data ingestion, dbt for data transformation, Terraform for infrastructure management, Google Cloud Storage and Google BigQuery for data storage and warehousing, and Looker Data Studio for visualization and reporting. By integrating these powerful tools, we were able to extract, transform, store, and analyze Formula 1 racing data efficiently and effectively.
Embarking on this project allowed me to gain hands-on experience with cutting-edge data engineering technologies and showcase the power of cloud-based data solutions. I hope that this blog post inspires you to explore these tools and techniques further and empowers you to build your own data engineering pipelines.