Source: How to transfer BigQuery tables between locations with Cloud Composer from Google Cloud
BigQuery is a fast, highly scalable, cost-effective, and fully-managed enterprise data warehouse for analytics at any scale. As BigQuery has grown in popularity, one question that often arises is how to copy tables across locations in an efficient and scalable manner. BigQuery has some limitations for data management, one being that the destination dataset must reside in the same location as the source dataset that contains the table being copied. For example, you cannot copy a table from an US-based dataset and write it to a EU-based dataset. Luckily you can use Cloud Composer to implement a data pipeline to transfer a list of tables in an efficient and scalable way.
Cloud Composer is a fully managed workflow orchestration service that empowers you to author, schedule, and monitor pipelines that span across clouds and on-premises data centers. Built on the open source Apache Airflow and operated using the Python programming language, Cloud Composer is free from lock-in and easy to use. Plus, it’s deeply integrated within Google Cloud Platform, giving users the ability to orchestrate their full pipeline. Cloud Composer has robust, built-in integration with many products, including Google BigQuery, Cloud Dataflow, Cloud Dataproc, Cloud Datastore, Cloud Storage, Cloud Pub/Sub, and Cloud ML Engine.
In this blog you will learn how to create and run an Apache Airflow workflow in Cloud Composer that completes the following tasks:
In this post you will learn:
How to access your Cloud Composer environment through the Google Cloud Platform Console, Cloud SDK, and Airflow web interface.
How to use Cloud Stackdriver Logging Client python library for logging
How to dynamically generate DAGs (directed acyclic graphs) based on a config file
Create a Cloud Composer environment and wait until the environment creation step completes. Enable the Composer API if asked to do so, then click Create. Then set the following parameters for your environment:
Leave all other settings as default.
The environment creation process is completed when the green checkmark displays to the left of the environment name on the Environments page in the GCP Console.
It can take up to 20 minutes for the environment to complete the setup process. Move on to the next sections to create your Cloud Storage buckets and a new BigQuery dataset.
Create two Cloud Storage multi-regional buckets in your project, one located in the US as source and the other in EU as destination. These buckets will be used to copy the exported tables across locations, i.e., US to EU.
Note: One can select regional buckets from the same location to minimize cost and unnecessary replication, for simplicity this post uses multi-regional buckets to keep referring to locations (US and EU).
Go to Navigation menu > Storage > Browser and then click Create bucket.
Give your two buckets a universally unique name including the location as a suffix (e.g., 6552634-us, 6552634-eu).
Create the destination BigQuery Dataset in EU from the BigQuery new web UI:
Navigation menu > Big Data > Big Query
Then select your project ID.
Finally click Create Data Set, use the name “nyc_tlc_EU” and Data location EU.
Confirm the dataset has been created and is empty.
Cloud Composer workflows are comprised of DAGs (Directed Acyclic Graphs). The code shown in
bq_copy_across_locations.py is the workflow code, also referred to as the DAG. You can find sample code here.
To orchestrate all the workflow tasks, the DAG imports the following operators:
DummyOperator: Creates Start and End dummy tasks for better visual representation of the DAG.
BigQueryToCloudStorageOperator: Exports BigQuery tables to Cloud Storage buckets using the Avro format.
GoogleCloudStorageToGoogleCloudStorageOperator: Copies files across Cloud Storage buckets.
GoogleCloudStorageToBigQueryOperator: Imports tables from Avro files in Cloud Storage bucket.
We first define the function
read_table_list() to read the config file and build the list of tables to copy .
Go back to Composer to check on the status of your environment.
Once your environment has been created, click the name of the environment to see its details.
The Environment details page provides information, such as the Airflow web UI URL, Kubernetes Engine cluster ID, name of the Cloud Storage bucket connected to the
Note: Cloud Composer uses Cloud Storage to store Apache Airflow DAGs, also known as workflows. Each environment has an associated Cloud Storage bucket. Cloud Composer schedules only the DAGs in the Cloud Storage bucket.
Set a variable in your Cloud shell to refer to the DAGs Cloud Storage bucket, we will be using this variable few times during the post, e.g., using the above DAGs folder:
Set Airflow variables using
gcloud commands in Cloud Shell, alternatively these can be set using the Airflow UI. To set the three variables, run the
gcloud composer command once for each row from the above table. This
gcloud composer command executes the Airflow CLI sub-command
variables. The sub-command passes the arguments to the gcloud command line tool.
For example, the
gcs_source_bucket variable would be set like this:
ENVIRONMENT_NAME is the name of the environment.
LOCATION is the Compute Engine region where the environment is located. The
gcloud composer command requires including the
--location flag or setting the default location before running the gcloud command.
VALUE specify the variable and its value to set. Include a space two dashes space (
--) between the left-side
gcloud command with gcloud-related arguments and the right-side Airflow sub-command-related arguments. Also include a space between the
VALUE arguments. using the
gcloud composer environments run command with the variables sub-command in
For example, run the following:
To upload the DAG and its dependencies:
Clone the GCP Python docs samples repository on your Cloud shell.
2. Upload a copy of the third party hook and operator to the plugins folder of your Composer DAGs Cloud Storage bucket, e.g.:
3. Next upload the DAG and config file to the DAGs Cloud Storage bucket of your environment, e.g.:
Cloud Composer registers the DAG in your Airflow environment automatically, DAG changes occur within 3-5 minutes. You can see task status in the Airflow web interface and confirm the DAG is not scheduled as per the settings.
To access the Airflow web interface using the GCP Console:
Go back to the Environments page.
In the Airflow webserver column for the environment, click the new window icon. The Airflow web UI opens in a new browser window.
Use your GCP credentials.
You’ll now be in the Airflow UI.
For information about the Airflow UI, see Accessing the web interface
The variables you set earlier persist in your environment. You can view the variables by selecting Admin > Variables from the Airflow menu bar.
When you upload your DAG file to the dags folder in Cloud Storage, Cloud Composer parses the file. If no errors are found, the name of the workflow appears in the DAG listing, and the workflow is queued to run immediately if the schedule conditions are met, in our case indicates None as seen above in the settings.
To trigger the DAG manually click the play button:
bq_copy_us_to_eu_01 to open the DAG details page. This page includes a graphical representation of workflow tasks and dependencies.
To run the workflow again from the Graph View:
In the Airflow UI Graph View, click the start graphic.
Click Clear to reset all the tasks and then click OK to confirm.
Finally, check the status and results of the
bq_copy_us_to_eu_01 workflow by going to the following Console pages:
Cloud Storage Browser to see the intermediate Avro files in the source and destination buckets.
You’ve have successfully copied two tables programmatically from the US region to the EU region!
Feel free to reuse this DAG to copy as many tables as you require across your locations.
Sign up for the Apache dev and commits mailing lists (send emails to firstname.lastname@example.org and email@example.com to subscribe to each)
Sign up for an Apache JIRA account and re-open any issues that you care about in the Apache Airflow JIRA project