Thursday, July 16, 2020

How to run a PySpark job in Kubernetes (AWS EKS)

A complete tutorial on deploying an EKS cluster with Terraform and running a PySpark job using the Spark Operator

Bogdan Cojocar
Jul 16 · 6 min read
Image for post
Photo by Samson on Unsplash.com

In this tutorial, we will focus on deploying a Spark application on AWS EKS end to end. We will do the following steps:

  • deploy an EKS cluster inside a custom VPC in AWS
  • install the Spark Operator
  • run a simple PySpark application

TL;DR: Github code repo

Step 1: Deploying the Kubernetes infrastructure

To deploy Kubernetes on AWS we will need at a minimum to deploy :

  • VPC, subnets and security groups to take care of the networking in the cluster
  • EKS control plane to basically run the Kubernetes services such as etcd and Kubernetes API
  • EKS worker nodes to be able to run pods and more specific for our case spark jobs

Let’s dive into the Terraform code. First, let’s see the VPC:

module "vpc" {
source = "terraform-aws-modules/vpc/aws"
version = "2.6.0"
name = join("-", [var.cluster_name, var.environment, "vpc"])
cidr = var.vpc_cidr
azs = data.aws_availability_zones.available.names
private_subnets = [var.private_subnet_az_1, var.private_subnet_az_2, var.private_subnet_az_3]
public_subnets = [var.public_subnet_az_1, var.public_subnet_az_2, var.public_subnet_az_3]
enable_nat_gateway = true
single_nat_gateway = false
one_nat_gateway_per_az = true
enable_dns_hostnames = true
enable_dns_support = true
tags = {
"kubernetes.io/cluster/${var.cluster_name}" = "shared"
}
public_subnet_tags = {
"kubernetes.io/cluster/${var.cluster_name}" = "shared"
"kubernetes.io/role/elb" = "1"
}
private_subnet_tags = {
"kubernetes.io/cluster/${var.cluster_name}" = "shared"
"kubernetes.io/role/internal-elb" = "1"
}
}

A VPC is an isolated network where we can have different infrastructure components. We can break down this network into smaller blocks and on AWS we call them subnets. Some subnets can have access to the internet and that is why we call the public subnets and some don’t have any access to the internet and they are called private subnets. Another terminology that we will in context to the network traffic is egress and ingress. Egress means traffic from inside the network to the outside world and ingress traffic from the outside world to the network. As you can expect these rules can be different depending on the use case. We also use security groups, which are traffic rules inside the VPC, that define how the EC2 instances “talk” with each other, basically on which network ports.

For the Spark EKS cluster see will use private subnets for the workers. All the data processing is done in total isolation. But we need egress traffic to the internet, to do updates or install open source libraries. To enable traffic to the internet we use NAT gateways into our VPC. We have to add them to public subnets. In the Terraform code, this is done using the flag enable_nat_gateway.

Another thing we can notice is that we are using three public and private subnets. This is because we want to have network fault tolerance. The subnets are deployed in different availability zones in a region.

The tags are created as per the requirements from AWS. They are needed for the Control plane to recognize the worker nodes. We can go into more detail about the networking, but it is outside of the scope of this tutorial, so if you need more details please have a look into the Github code where you can find the full example.

And let’s see the EKS cluster setup as well:

module "eks" {
source = "terraform-aws-modules/eks/aws"
cluster_name = join("-", [var.cluster_name, var.environment, random_string.suffix.result])
subnets = module.vpc.private_subnets
tags = {
Environment = var.environment
}
vpc_id = module.vpc.vpc_id
cluster_endpoint_private_access = true
cluster_enabled_log_types = ["api", "audit", "authenticator", "controllerManager", "scheduler"]worker_groups = [
{
name = "worker-group-spark"
instance_type = var.cluster_instance_type
additional_userdata = "worker nodes"
asg_desired_capacity = var.cluster_number_of_nodes
additional_security_group_ids = [aws_security_group.all_worker_mgmt.id, aws_security_group.inside_vpc_traffic.id]
}
]
workers_group_defaults = {
key_name = "eks_key"
subnets = module.vpc.private_subnets
}
}

Again in this snippet, we can see that we declare the cluster inside private subnets. We enable the Clowdwatch logs for all the components of the Control plane. We set the EC2 instance types and number for a config varmodule and as defaults, we use m5.xlarge as the instance type and 3 nodes. We set an EC2 key eks_key if we need to ssh into the worker nodes.

To be able to run the code in this tutorial we need to install a couple of tools. On Mac we can use brew:

brew install terraform aws-iam-authenticator kubernetes-cli helm

And to reach AWS we need to also set up our AWS credentials.

Now we can start to initialize Terraform in order to get all the dependencies needed to deploy the infrastructure:

cd deployment/ && terraform init

If everything runs successfully you should be able to see something similar to the image:

Image for post

We are ready to deploy the infrastructure. To do so run:

terraform apply

It will take some time until the deployment is done, so we can sit back and relax for a bit.

Once done you should see the message Apply complete! Resources: 55 added, 0 changed, 0 destroyed. and the names of the resources deployed.

One additional step we can do to check if the deployment was correct is to see if the worker nodes have been attached to the cluster. For that we setup kubectl:

aws --region your-region eks update-kubeconfig --name your-cluster-name

We should be able to see three nodes when we run the following command:

kubectl get nodes

Step 2: Installing the Spark Operator

Usually, we deploy spark jobs using the spark-submit , but in Kubernetes, we have a better option, more integrated with the environment called the Spark Operator. Some of the improvements that it brings are automatic application re-submission, automatic restarts with a custom restart policy, automatic retries of failed submissions, and easy integration with monitoring tools such as Prometheus.

We can install it via helm:

helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
helm install spark-op incubator/sparkoperator

If we run helm list in the terminal the spark-op chart should be available. Also, we should have a running pod for the spark operator. We can watch what pods are running in the default namespace with the command kubectl get pods.

Step 3: Running a PySpark app

Now we can finally run python spark apps in K8s. The first thing we need to do is to create a spark user, in order to give the spark jobs, access to the Kubernetes resources. We create a service account and a cluster role binding for this purpose:

apiVersion: v1
kind: ServiceAccount
metadata:
name: spark
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: spark-role
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: edit
subjects:
- kind: ServiceAccount
name: spark
namespace: default

To execute the creation of the role:

kubectl apply -f spark/spark-rbac.yml

You will get notified with serviceaccount/spark created and clusterrolebinding.rbac.authorization.k8s.io/spark-role created.

The Spark Operator job definition:

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-job
namespace: default
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "uprush/apache-spark-pyspark:2.4.5"
imagePullPolicy: Always
mainApplicationFile: local:////opt/spark/examples/src/main/python/pi.py
sparkVersion: "2.4.5"
restartPolicy:
type: OnFailure
onFailureRetries: 2
driver:
cores: 1
memory: "1G"
labels:
version: 2.4.5
serviceAccount: spark
executor:
cores: 1
instances: 1
memory: "1G"
labels:
version: 2.4.5

We define our spark run parameters in a yml file, similar to any other resource declarations on Kubernetes. Basically we are defining that we are running a Python 3 spark app and we are the image uprush/apache-spark-pyspark:2.4.5. I recommend using this image because it comes with a newer version of yarn that handles writes to s3a more efficiently. We have a retry policy if the job fails, it will be restarted. Some resource allocations for the driver and the executors. As the job is very simple, we use just an executor. Another thing we can notice is that we use the spark service account that we defined earlier. To code we are using is a classic example of computing the pi number.

To submit the code we are using kubectl again:

kubectl apply -f spark/spark-job.yml

Upon completion, if we inspect the pods again we should have a similar result:

Image for post

And if we check the logs by running kubectl logs spark-job-driver we should find one line in the logs giving an approximate value of pi Pi is roughly 3.142020.

That was all folks. I hope you enjoyed this tutorial. We’ve seen how we can create our own AWS EKS cluster using terraform, to easily re-deploy it in different environments and how we can submit PySpark jobs using a more Kubernetes friendly syntax.

No comments:

Must Watch YouTube Videos for Databricks Platform Administrators

  While written word is clearly the medium of choice for this platform, sometimes a picture or a video can be worth 1,000 words. Below are  ...