We saw in the previous sections, we can create dataproc cluster, submit spark jobs on dataproc cluster from google cloud console. But, in the enterprise level, we need to run our workflows automatically at scheduled times. It means, if we are creating a batch job using spark then there may be a need to run the job weekly or daily etc. To run the jobs at scheduled times, we need a scheduling service or a platform to schedule our workflows. On GCP, we can schedule our workflows using cloud composer or airflow. we can create an workflow management platform by installing airflow on a compute engine instance or we can make use of google cloud composer which is a fully managed orchestration service available on google cloud.
Cloud composer is a fully managed workflow orchestration service built on Apache Airflow. Understanding airflow really important to work with Composer. Lets understand Apache airflow briefly.
The basic concepts of airflow are DAG, task and Relationships.
DAG: A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run. Dags are written in python files. A python file can contain multiple dags.
Tasks: A Task is the basic unit of execution in Airflow. Tasks are arranged into DAGs, and then have upstream and downstream dependencies set between them into order to express the order they should run in.
There are three basic kinds of tasks.
Operators, predefined task templates that you can string together quickly to build most parts of your DAGs.
Sensors, a special subclass of Operators which are entirely about waiting for an external event to happen.
A TaskFlow-decorated @task, which is a custom Python function packaged up as a Task.
Relationships: The key part of using Tasks is defining how they relate to each other - their dependencies, or as we say in Airflow, their upstream and downstream tasks. You declare your Tasks first, and then you declare their dependencies second.
We will see how we can implement this in the composer example. Lets say we want to create a workflow in composer which should create a dataproc cluster, submit a pyspark job in the cluster and destro the cluster upon completion of the pyspark job. The pyspark should read the data from orders-payments, aggregate the payment value based on payment type and write the output to Bigquery table.
lets create a composer environment. go to cloud console and create a composer environment.
click on create to create the composer environment.
We can access the airflow web by clicking the"OPEN AIRFLOW UI" link and we can access the dags folder by clicking the "OPEN DAGS FOLDER".
once we upload the dag in the bucket, airflow will automatically pick up the dag and shedule it as per the schedule mentioned in the dags. so lets write the dag.
you can download the dag from here - https://github.com/dataengineertech/gcpdataproc/blob/main/examples/composer-dags/pyspark-dag-new.py now, we have completed our dag, lets upload the dag to the composer dag bucket so that airflow can pick it up.
once the dag gets picked up by airflow. go to the dag and run trigger dag
once the first task is successful, it will create the dataproc cluster
once the second task is successful, it will submit the job in the running cluster.
once the third task is successful, it destroys the cluster.