Cloud Dataproc provides a managed Apache Spark and Apache Hadoop service. Hadoop is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models, most commonly via the Map-Reduce pattern. Cloud Dataproc obviates the need for users to configure and manage Hadoop itself. As shown by the figure below, this entails a significant number of software components.

We will use the gcloud CLI to deploy the resources for the lab. An alternate way for doing so is via the web console as shown in this codelab.

In this lab, we'll be computing the value of π using massively parallel dart throwing. Workers will be tasked with randomly throwing darts at a unit circle (map) and the results will be collected (reduce) to determine the value of π.

Consider a square with sides of length 1 and area 1, centered around the origin of an x-y graph. A circle drawn within it will have diameter 1 and radius ½. The area of the circle will then be π*(½)2 or π/4.

When one randomly throws darts into the square, the ratio of the darts in the circle to the total number of darts thrown should be the ratio of the areas or π/4.

Solving for π, we then have the following formula:

If a dart is thrown, how do we determine if it falls in the circle? From geometry:

Our code for computing π will spawn 1000 dart-throwers in the map and collect dart counts in the reduce. The computation will be modified slightly to focus on the positive quadrant of the prior graph.

A Python version of the code is shown below. The function inside() randomly picks two values for x and y uniformly between 0 and 1 and checks whether the point resides within the unit circle (orange area).

def inside(p):
  x, y = random.random(), random.random()
  return x*x + y*y < 1

To perform the computation, we use an Apache Spark context (sc) to parallelize NUM_SAMPLES dart throws, filter out those that fall inside the circle, then count their occurrences. The number is then plugged into the formula to output an estimate for π.

count = sc.parallelize(xrange(0, NUM_SAMPLES)).filter(inside).count()
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

Visit Cloud Shell and enable the API

gcloud services enable dataproc.googleapis.com

Set the default zone and region for Compute Engine and Dataproc.

gcloud config set compute/zone us-west1-b
gcloud config set compute/region us-west1
gcloud config set dataproc/region us-west1

Set an environment variable (CLUSTERNAME) to <OdinID>-dplab that will be used to name our cluster of Compute Engine VMs for processing our Dataproc jobs.

CLUSTERNAME=${USER}-dplab

We'll first create a cluster with the tag "codelab" in us-west1-b.

gcloud dataproc clusters create ${CLUSTERNAME} \
  --scopes=cloud-platform \
  --tags codelab \
  --region=us-west1 \
  --zone=us-west1-b

If you get quota errors, your default machine type may be set to a high-end machine. You may use the following flags in the command above to directly specify the types of machines to use:

  --master-machine-type=n1-standard-2 \
  --worker-machine-type=n1-standard-2 \
  --master-boot-disk-size=10GB \
  --worker-boot-disk-size=10GB

View the cluster in the web console of Dataproc

View the nodes of the cluster in the web console of Compute Engine:

Note the current time, then submit the job, specifying 1000 workers. We'll run the Java version of the program that comes included in the Apache Spark distribution. For this computation, the program's stdout and stderr is sent to output.txt via the >& shell redirection syntax. In addition, the command is placed in the background with the & operator at the end.

date

gcloud dataproc jobs submit spark --cluster ${CLUSTERNAME} \
  --class org.apache.spark.examples.SparkPi \
  --jars file:///usr/lib/spark/examples/jars/spark-examples.jar -- 1000 \
  >& output.txt &

After launching the job, you can list its status periodically and print the time via

gcloud dataproc jobs list --cluster ${CLUSTERNAME}

date

When the computation completes, note the time.

For your lab notebook:

List the cluster to find the numInstances used for the master and the workers.

gcloud dataproc clusters describe ${CLUSTERNAME}

Allocate two additional pre-emptible machines to the cluster. Such machines are significantly cheaper, but can be reclaimed by Compute Engine if demand spikes.

gcloud dataproc clusters update ${CLUSTERNAME} --num-secondary-workers=2

Repeat the listing to see that they show up in the Config section.

gcloud dataproc clusters describe ${CLUSTERNAME}

Then, visit Compute Engine to see the new nodes in the cluster.

Note the current time, then submit the job again, redirecting the output to a different file output2.txt.

date

gcloud dataproc jobs submit spark --cluster ${CLUSTERNAME} \
  --class org.apache.spark.examples.SparkPi \
  --jars file:///usr/lib/spark/examples/jars/spark-examples.jar -- 1000 \
  >& output2.txt &

List its status periodically and print the time:

gcloud dataproc jobs list --cluster ${CLUSTERNAME}

date

When the computation completes, note the time.

For your lab notebook:

Leave the cluster up for the next lab.

In Cloud Shell, delete the original cluster

gcloud dataproc clusters delete $CLUSTERNAME

Dataflow is a managed runner that supports Apache Beam workloads. Beam is an open-source project spun out of Google to implement large scale processing of both stream and batch workloads. Beam uses a transform-based processing approach and has a programming paradigm that is similar to a functional programming one where functions do not maintain state. Beam computations are expressed in graph-like form. Input flows into the graph, is transformed via computations within nodes of the graph, and then output via nodes that serve as sinks for the results. The abstraction is useful for applications such as log ingestion and analysis from a web site or sensor data ingestion from IoT devices. One of the features of Dataflow is its ability to be 'serverless'. Processing capacity is dynamically brought up and down as the computation within the graph proceeds. This is in contrast to our prior Dataproc labs in which clusters must be explicitly allocated and deallocated by the operator.

In this lab, we'll demonstrate the programming model in action using a simple program that determines the top packages included in a Java code base.

First, clone the repository containing the code and change into its directory:

git clone https://github.com/GoogleCloudPlatform/training-data-analyst.git

cd training-data-analyst/courses/machine_learning/deepdive/04_features/dataflow/python/

Then, create a Python virtual environment and install the Apache Beam package that is configured for execution on Google Cloud Platform. In addition, since we will eventually deploy our pipelines onto GCP, an OAuth client package must be installed so our program can use the credentials we supply it to authorize its access on our project.

sudo apt-get install python3-venv
python3 -m venv env
source env/bin/activate
pip install -U pip
pip install apache-beam[gcp] oauth2client==3.0.0

Dataflow transforms can be mapped onto their own compute nodes for execution. In Python, this is done with special syntax. Consider the code below for a pipeline that performs a string search (e.g. grep). The code instantiates p, a Beam pipeline, and configures variables specifying its input sources, output sinks, and a search term. It then specifies an expression for the computation that takes file names from input and reads lines from them (beam.io.ReadFromText(input)), checks to see if the lines start with the searchTerm (beam.FlatMap(lambda line: my_grep(line, searchTerm)), then writes the lines out to a directory (beam.io.WriteToText(output_prefix)). The expression is then evaluated via p.run().

grep.py

import apache_beam as beam
import sys

def my_grep(line, term):
   if line.startswith(term):
      yield line

p = beam.Pipeline(argv=sys.argv)
input = '../*.java'
output_prefix = '/tmp/...'
searchTerm = 'import'

(p
   | 'GetJava' >> beam.io.ReadFromText(input)
   | 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) )
   | 'write' >> beam.io.WriteToText(output_prefix)
)

p.run().wait_until_finish()

Bring up is_popular.py in the code editor.

edit is_popular.py

Reverse-engineer the code and make note of where the input is taken from and where the output goes to by default.

Answer the following questions for your lab notebook.

The operations in the pipeline mimic a Map-Reduce pattern, demonstrating Beam's ability to support it.

Answer the following question for your lab notebook.

Go back to Cloud Shell and run the pipeline.

python is_popular.py

Go to where the output file is written out and cat the file.

In this codelab, we'll be running a word counting example directly from the Apache Beam package. The code can be run locally as well as executed on nodes in GCP's Dataflow service, a managed runner that supports Apache Beam pipelines. We'll be running a word counting example from the Apache Beam package. Ensure you have activated the Python environment used in the previous lab.

When using Python virtual environments, packages are installed within the environment's directory. In the directory that you have created the Python virtual environment, bring up the code for the wordcount example that comes with the Python package.

edit env/lib/python3.7/site-packages/apache_beam/examples/wordcount.py

When the code is invoked as a module, it executes the run() function. Within the function, a pipeline p is incrementally constructed using the Beam syntax (lines, counts, output) before the entire pipeline is executed via p.run(). Examine the code that implements the function and answer the following questions for your lab notebook:

We'll first run the pipeline locally in Cloud Shell. In this case, we use the -m flag to have Python execute the module directly, specifying the output prefix as outputs.

python -m apache_beam.examples.wordcount \
  --output outputs

After running the script, perform the following and show a screenshot of the results in your lab notebook:

The pipeline specified treats every word as is. As a result, it is case-sensitive, leading to 'King' and 'KING' having distinct counts. Go back to the code editor and edit wordcount.py. Find a place in the pipeline that you can insert a stage that transforms all of the characters it receives into lowercase. The snippet below can be used:

      | 'lowercase' >> beam.Map(unicode.lower)

Perform the following and show a screenshot of the results in your lab notebook:

Cloud Dataflow can be used to execute the pipeline in parallel in an on-demand, "serverless" fashion. To use Dataflow, we must enable the APIs for the lab. Dataflow requires Compute Engine instances to execute the code and storage buckets to store the results so those components must also be enabled.

gcloud services enable dataflow compute_component storage_component storage_api

Throughout the codelab, we'll be referencing our storage bucket and our desired region in environment variables. The commands below will do so by configuring our bucket to be the name of our project and the region to be us-west1.

export BUCKET=${GOOGLE_CLOUD_PROJECT}
export REGION=us-west1

Then, create the storage bucket if it hasn't been created already.

gsutil mb gs://${BUCKET}

We will need to create a service account to run our workload with. Change into your home directory and use gcloud to create a service account named df-lab.

cd

gcloud iam service-accounts create df-lab

To run a Dataflow pipeline, we require permissions across several IAM roles. Specifically, roles/dataflow.admin allows us to create and manage Dataflow jobs. Compute Engine VMs (workers) are spun up on-demand and used to run the Dataflow pipeline. Dataflow also requires a controller service account that has the roles/dataflow.worker role. It has the necessary permissions for a Compute Engine service account to execute work units in our Dataflow pipeline. The permissions given are shown in the documentation. Finally, in order to create Compute Engine VMs that use a specific service account, we require the role roles/iam.serviceAccountUser.

gcloud projects add-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \
  --member serviceAccount:df-lab@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com \
  --role roles/dataflow.admin

gcloud projects add-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \
  --member serviceAccount:df-lab@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com \
  --role roles/dataflow.worker

gcloud projects add-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \
  --member serviceAccount:df-lab@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com \
  --role roles/iam.serviceAccountUser

gcloud projects add-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \
  --member serviceAccount:df-lab@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com \
  --role roles/storage.admin

Once the service account has the necessary policies attached, we will create a service account key (df-lab.json) that will allow us to access the df-lab account.

gcloud iam service-accounts keys create df-lab.json --iam-account df-lab@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com

We must also set the environment variable that can be used by our Python environment to access the service account key.

export GOOGLE_APPLICATION_CREDENTIALS=$PWD/df-lab.json

We can now repeat our execution, but rather than use a local runner, we can specify a DataflowRunner and a set of locations for the pipeline to take input from and to store results to.

python -m apache_beam.examples.wordcount \
  --region ${REGION} \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://$BUCKET/results/outputs \
  --runner DataflowRunner \
  --project ${GOOGLE_CLOUD_PROJECT} \
  --temp_location gs://${BUCKET}/tmp/

After executing the program, which takes around 5 minutes to complete, visit Dataflow in the web console and click on the Dataflow job that was executed. Examine both "Job Graph" and "Job Metrics".

Include the following in your lab notebook:

Delete the IAM policies and the service account.

gcloud projects remove-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \
  --member serviceAccount:df-lab@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com \
  --role roles/dataflow.admin

gcloud projects remove-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \
  --member serviceAccount:df-lab@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com \
  --role roles/dataflow.worker

gcloud projects remove-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \
  --member serviceAccount:df-lab@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com \
  --role roles/iam.serviceAccountUser

gcloud projects remove-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \
  --member serviceAccount:df-lab@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com \
  --role roles/storage.admin

gcloud iam service-accounts delete df-lab@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com

Delete the storage bucket

gsutil -m rm -r gs://${BUCKET}