dataflow-example/dag/composer-dataflow-dag-v1.py (35 lines of code) (raw):
"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.
This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
created.
created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""
import datetime
from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago
bucket_path = models.Variable.get("bucket_path")
project_id = models.Variable.get("project_id")
gce_region = models.Variable.get("gce_region")
gce_zone = models.Variable.get("gce_zone")
output_table = models.Variable.get("output_table")
default_args = {
# Tell airflow to start one day ago, so that it runs as soon as you upload it
"start_date": days_ago(1),
"dataflow_default_options": {
"project": project_id,
# Set to your region
"region": gce_region,
# Set to your zone
"zone": gce_zone,
# This is a subfolder for storing temporary files, like the staged pipeline job.
"tempLocation": bucket_path + "/tmp/",
},
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
# The id you will see in the DAG airflow page
"composer_dataflow_dag",
default_args=default_args,
# The interval with which to schedule the DAG
schedule_interval=datetime.timedelta(days=1), # Override to match your needs
) as dag:
start_template_job = DataflowTemplateOperator(
# The task id of your job
task_id="dataflow_operator_transform_csv_to_bq",
# The name of the template that you're using.
# Below is a list of all the templates you can use.
# For versions in non-production environments, use the subfolder 'latest'
# https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
# Use the link above to specify the correct parameters for your template.
parameters={
"javascriptTextTransformFunctionName": "transformCSVtoJSON",
"JSONPath": bucket_path + "/jsonSchema.json",
"javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
"inputFilePattern": bucket_path + "/inputFile.txt",
"outputTable": output_table,
"bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
},
)