This article was written by Harold Snyers – Senior ML & Data Engineer Distributed Cloud @ Devoteam Belgium.
Today, businesses rely on many data sources, including custom APIs. However, extracting and transforming data from APIs for analysis can be complex and time-consuming. This is where Fivetran comes in, offering a powerful solution for seamless data integration. While Fivetran supports hundreds of pre-built connectors, it also empowers you to extract data from unsupported sources using Google Cloud Functions as connectors. This tutorial dives deep into setting up a production-ready environment with Terraform for managing infrastructure as code (IaC).
What is Fivetran?
Fivetran is a cloud-based, fully managed ETL (Extract, Transform, Load) service that automates the process of moving data from various sources, including databases, applications, and custom APIs, to your data warehouse. It simplifies data ingestion by handling schema management, data transformations, and scheduling, allowing you to focus on analysing your data and deriving insights.
Leveraging Google Cloud Functions as Fivetran Connectors
Fivetran’s robust capabilities extend to integrate custom APIs through Google Cloud Functions. These functions act as bridges, fetching data from your API, transforming it as needed for Fivetran processing, and delivering it for loading into BigQuery.
- Simplified Data Ingestion: Automate data extraction and transformation from your custom API, eliminating manual coding and maintenance. Leverage Fivetran’s built-in monitoring for streamlined oversight.
- Scalability and Flexibility: Google Cloud Functions offer serverless execution, scaling automatically to handle varying data volumes from your API. Fivetran will also automatically handle server/request time-outs when reaching a request limit.
- Enhanced Data Quality: Leverage Fivetran’s transformation capabilities within the Cloud Function or in Fivetran itself to ensure data consistency and accuracy before loading into BigQuery, or handle this later in your pipeline.
- Infrastructure as Code (IaC) with Terraform: Terraform allows you to manage your entire infrastructure, including Google Cloud resources and Fivetran configurations. This enables version control, automated deployments, and easier infrastructure management across environments (development, staging, and production).
Setting Up the Solution
Now that we have set the context, let’s explore how you can set this infrastructure up for this data integration pipeline. In my experience, I always prefer to start by setting up everything in the UI, making sure the resources can communicate with each other, and then developing the necessary code infrastructure for it to be production-ready.
Develop your Google Cloud Function:
First, let’s dive deeper into setting up the cloud function. Google Cloud functions support multiple languages (Python, Go and NodeJS.) Choose the language that best suits your requirements. As per the setup tutorial of Fivetran written here, the first task is to develop the Cloud Function code. On Google Cloud, a simple cloud function has a request parameter with specific parameters built in and some custom parameters. Here is an example request for a cloud function. Fivetran will automatically fill in these parameters except for the state.
Python
{
'state': {
'current_sync_id': 'bca62c36-*-0693e1230bee',
'table_state': {
'CUSTOMER': {
'current_sync_unix': None,
'has_more': False,
'last_status': {'message': None, 'type': 'OK'},
'next_page_url': None,
'next_sync_unix': 1727389295,
'rows_read': 0
},
}
},
'secrets': {},
'agent': 'Fivetran Google Cloud Functions Connector/<id>/<name>',
'sync_id': '93d1e969-*-5e5a68d8f4b0'
}
As you see above, we received a state from Fivetran, which we need to keep track of where we are in sync with our custom API. In the response to Fivetran, we need to respect some conventions on what to return, which Fivetran will keep in mind for the next requests it sends to sync the data. Here is a basis for the Fivetran response in our function.
Python
fivetran_response = {
"state": {
"table_state": {},
"current_sync_id": req.get("sync_id", req["state"].get("current_sync_id")),
},
"insert": {},
"schema": {},
"hasMore": False,
}
With that, you can build your code to request the data from your API. As with other Fivetran connectors, we set a schedule for the sync from the source to Fivetran. This is set up because Fivetran will manage when to sync from the API and send the request to the cloud function. The cloud function can process and return the data to Fivetran in the correct format. You can read everything you need about formatting your response to Fivetran here.
Nevertheless, what is important here is to keep track of whether you were able to retrieve all data during your sync to your source API or if additional data still needs to be synced. If that is the case, by setting the hasMore variable to True, you tell Fivetran, it needs to send another request as the sync is not done. Of course, then you set up how you manage your API sync. In an example case I developed, some API calls had more data than others, so it was essential to keep track of each table (endpoint), what stage you were at, and if an additional request was needed for it.
Fivetran
Let’s first test the connection with the Fivetran UI.
Create the Fivetran Connector:
Within the Fivetran web interface, navigate to the Connectors section and select “Custom Function.” Provide a name and description for your connector. Configure the connector to point to the deployed URL of your Google Cloud Function. And finally, specify the authentication method used by your Cloud Function to access the custom API (e.g., API key, OAuth).
Set Up the BigQuery Destination:
In the Fivetran interface, define a destination for the extracted data. Select BigQuery as the destination type and configure the connection details for your BigQuery project and dataset.
Infrastructure as Code (IaC) with Terraform:
Best practices when working with the Cloud are to use Terraform or another Infrastructure as a Code framework. We will set up the infrastructure with Terraform. With Terraform, we can manage the creation and deployment of resources in Google Cloud Platform (GCP) and Fivetran. This includes your Cloud Function, BigQuery dataset, and Fivetran connector configuration (using the Fivetran API). Terraform allows you to version control your infrastructure code and automate deployments across environments.
For this post, I will not go through the whole infrastructure setup. It is recommended that you set up some CI/CD for your Cloud Function Code so that when changes are needed, these are directly updated upon merging your code. So, let’s dive deeper into the setup of your Fivetran connector and destination.
First, you need to define your Fivetran group to let Fivetran know where to deploy your connectors, destinations, sources, etc. You can do this with the resource fivetran_group.
Unset
resource "fivetran_group" "data_warehouse_api_ingestion" {
name = "data_warehouse_api_ingestion_${var.env}"
}
We’ll configure BigQuery as the target data warehouse to define our destination. While Fivetran’s UI offers a convenient way to explore the required parameters for each destination and connector, I recommend referring to it as a supplement to the official documentation. Terraform setup for a given destination or source is not always clear. I’ve contacted Fivetran, suggesting they include a small terraform sample code in their documentation for even greater clarity. However, a combination of both resources will ensure a smooth setup process.
Python
resource "fivetran_destination" "destination_data_warehouse_api_ingestion" {
group_id = fivetran_group.data_warehouse_api_ingestion.id
region = "GCP_EUROPE_WEST3"
service = "big_query"
time_zone_offset = "+2"
run_setup_tests = true
trust_certificates = false
trust_fingerprints = false
config {
project_id = var.project_id
secret_key = base64decode(google_service_account_key.fivetran_sa_key.private_key)
data_set_location = "EU"
}
}
For BigQuery, I defined a module based on the source, a set of tables for the destination. This module will then create a raw dataset, a landing_zone dataset, and a current dataset. The raw dataset will be the name of your connector, as this is how Fivetran works. The landing zone will contain almost a replica of our raw layer but with some preprocessing to respect our naming convention. This layer includes the whole history of our data. Depending on your data platform, you will have different projects or datasets to do your transformations.
Python
module "bigquery_datalake_datasets" {
source = "./modules/bq-datalake-destination"
for_each = var.connectors
source_name = each.value.name
with_current = each.value.destination_config.with_current
}
Finally, we can proceed with the definition of our connectors. We define the Fivetran group in which the resource belongs and which specific we want as a connector; in our case, this will be “google_cloud_functions”. Our destination schema is our BigQuery dataset name. And then finally, the configuration “config” for our given service. We wanted to make the infrastructure as reusable as possible, thus defining each configuration as optional.
Python
resource "fivetran_connector" "connector" {
for_each = var.connectors
group_id = fivetran_group.data_warehouse_api_ingestion.id
service = each.value.service
run_setup_tests = true
trust_certificates = false
trust_fingerprints = false
destination_schema {
name = module.bigquery_datalake_datasets[each.key].raw_dataset_name
}
dynamic "config" {
for_each = each.value.config == null ? [] : [0]
content {
abs_connection_method = each.value.config.abs_connection_method
abs_connection_string = each.value.config.abs_connection_string
...
api_token = each.value.config.api_token == null ? each.value.config.api_token : local.fivetran_passwords[each.value.config.api_token]
api_type = each.value.config.api_type
api_url = each.value.config.api_url
api_usage = each.value.config.api_usage
}
}
depends_on = [
fivetran_destination.destination_data_warehouse_api_ingestion
]
}
For each connector, we also need to define the scheduling. Optionally, you might want to specify the schema you want to respect if you want a written configuration. Otherwise, you can use the UI to select which tables and columns you want to synchronise.
Python
resource "fivetran_connector_schedule" "connector_schedule" {
for_each = { for name, con in var.connectors : name => con.schedule if con.schedule != null }
connector_id = fivetran_connector.connector[each.key].id
sync_frequency = each.value.sync_frequency
daily_sync_time = each.value.daily_sync_time
paused = each.value.paused
pause_after_trial = each.value.pause_after_trial
schedule_type = each.value.schedule_type
}
resource "fivetran_connector_schema_config" "connector_schema_config" {
for_each = { for name, con in var.connectors : name => con.schema_config if con.schema_config != null }
connector_id = fivetran_connector.connector[each.key].id
schema_change_handling = each.value.schema_change_handling
dynamic "schema" {
for_each = each.value.schemas
content {
name = schema.key
enabled = schema.value.enabled
dynamic "table" {
for_each = schema.value.tables
content {
name = table.key
enabled = table.value.enabled
sync_mode = var.destination.service == "aws_msk_wh" ? null : table.value.sync_mode
dynamic "column" {
for_each = table.value.columns
content {
name = column.key
hashed = column.value.hashed
enabled = column.value.enabled
}
}
}
}
}
}
}
Finally, we create the connector variable that we will use to define our different ingestions to BigQuery.
Python
variable "connectors" {
type = map(object({
name = string
service = string
config = optional(object({
abs_connection_method = optional(string)
abs_connection_string = optional(string)
...
api_key = optional(string)
api_secret = optional(string)
...
}))
schedule = optional(object({
sync_frequency = string
daily_sync_time = string
paused = bool
pause_after_trial = bool
schedule_type = string
}))
schema_config = optional(object({
schema_change_handling = optional(string, "ALLOW_COLUMNS")
schemas = optional(map(object({
enabled = optional(bool, false)
tables = optional(map(object({
enabled = optional(bool, false)
sync_mode = optional(string, "SOFT_DELETE")
columns = optional(map(object({
enabled = optional(bool, false)
hashed = optional(bool, false)
})), {})
})), {})
})), {})
}))
destination_config = optional(
object({
with_current = optional(bool, true)
}),
{
with_current = true
}
)
}))
}
And after configuring your terraform variables, you are pretty much done. The only thing remaining is to set up the tfvars with your input variables for each connector, source, and destination. In this current setup, if we use BigQuery as a destination, we define our connector variable with the correct parameters.
Note: As a security practice, we do not want passwords or API keys visible. Therefore, we set up a secret manager for each key and password that we retrieve through the secret name when we need it in the connector. An example is shown in the Okta connection with the API token.
Here is an example of the terraform. tfvars for the connector variable :
Python
connectors = {
"fivetran_log" = {
"name" = "fivetran_log"
"service" = "fivetran_log"
"config" = {}
schedule = {
sync_frequency = 360
daily_sync_time = "03:00"
paused = false
pause_after_trial = false
schedule_type = "auto"
}
destination_config = {
with_current = false
}
},
"source1" = {
"name" = "source1"
"service" = "google_cloud_function"
config = {}
schedule = {
sync_frequency = "360"
daily_sync_time = "06:00"
paused = false
pause_after_trial = false
schedule_type = "auto"
}
},
"okta" = {
name = "okta"
service = "okta"
config = {
domain = "domain"
sub_domain = "subdomain"
api_token = "okta_api_key"
}
schedule = {
sync_frequency = "720"
daily_sync_time = "06:00"
paused = false
pause_after_trial = false
schedule_type = "auto"
},
destination_config = {
with_current = false
}
},
}
Conclusion
Combining Fivetran’s data integration capabilities with Google Cloud Functions as connectors, you can establish a robust and automated pipeline for ingesting data from your custom API into BigQuery. Looking at your data platform, we provided a powerful and easy-to-integrate EL (Extract Load) service that you can integrate into your data platform, where you will have all your transformation layers and data marts.
Further Exploration:
This blog post provides a foundational overview. To delve deeper, explore the following resources:
Google Cloud Function Documentation: https://cloud.google.com/functions
Fivetran Documentation on Custom Connectors: https://fivetran.com/docs/connectors/functions
The best time to talk about your Cloud journey is now!
Contact our certified experts to guide your Google Cloud projects.