How to run a PySpark job in Kubernetes (AWS EKS)
A complete tutorial on deploying an EKS cluster with Terraform and running a PySpark job using the Spark Operator
In this tutorial, we will focus on deploying a Spark application on AWS EKS end to end. We will do the following steps:
- deploy an EKS cluster inside a custom VPC in AWS
- install the Spark Operator
- run a simple PySpark application
TL;DR: Github code repo
Step 1: Deploying the Kubernetes infrastructure
To deploy Kubernetes on AWS we will need at a minimum to deploy :
- VPC, subnets and security groups to take care of the networking in the cluster
- EKS control plane to basically run the Kubernetes services such as
etcd
andKubernetes API
- EKS worker nodes to be able to run pods and more specific for our case spark jobs
Let’s dive into the Terraform code. First, let’s see the VPC:
module "vpc" {
source = "terraform-aws-modules/vpc/aws"
version = "2.6.0"name = join("-", [var.cluster_name, var.environment, "vpc"])
cidr = var.vpc_cidr
azs = data.aws_availability_zones.available.names
private_subnets = [var.private_subnet_az_1, var.private_subnet_az_2, var.private_subnet_az_3]
public_subnets = [var.public_subnet_az_1, var.public_subnet_az_2, var.public_subnet_az_3]
enable_nat_gateway = true
single_nat_gateway = false
one_nat_gateway_per_az = true
enable_dns_hostnames = true
enable_dns_support = truetags = {
"kubernetes.io/cluster/${var.cluster_name}" = "shared"
}public_subnet_tags = {
"kubernetes.io/cluster/${var.cluster_name}" = "shared"
"kubernetes.io/role/elb" = "1"
}private_subnet_tags = {
"kubernetes.io/cluster/${var.cluster_name}" = "shared"
"kubernetes.io/role/internal-elb" = "1"
}
}
A VPC is an isolated network where we can have different infrastructure components. We can break down this network into smaller blocks and on AWS we call them subnets. Some subnets can have access to the internet and that is why we call the public subnets and some don’t have any access to the internet and they are called private subnets. Another terminology that we will in context to the network traffic is egress and ingress. Egress means traffic from inside the network to the outside world and ingress traffic from the outside world to the network. As you can expect these rules can be different depending on the use case. We also use security groups, which are traffic rules inside the VPC, that define how the EC2 instances “talk” with each other, basically on which network ports.
For the Spark EKS cluster see will use private subnets for the workers. All the data processing is done in total isolation. But we need egress traffic to the internet, to do updates or install open source libraries. To enable traffic to the internet we use NAT gateways into our VPC. We have to add them to public subnets. In the Terraform code, this is done using the flag enable_nat_gateway.
Another thing we can notice is that we are using three public and private subnets. This is because we want to have network fault tolerance. The subnets are deployed in different availability zones in a region.
The tags are created as per the requirements from AWS. They are needed for the Control plane to recognize the worker nodes. We can go into more detail about the networking, but it is outside of the scope of this tutorial, so if you need more details please have a look into the Github code where you can find the full example.
And let’s see the EKS cluster setup as well:
module "eks" {
source = "terraform-aws-modules/eks/aws"
cluster_name = join("-", [var.cluster_name, var.environment, random_string.suffix.result])
subnets = module.vpc.private_subnetstags = {
Environment = var.environment
}vpc_id = module.vpc.vpc_id
cluster_endpoint_private_access = truecluster_enabled_log_types = ["api", "audit", "authenticator", "controllerManager", "scheduler"]worker_groups = [
{
name = "worker-group-spark"
instance_type = var.cluster_instance_type
additional_userdata = "worker nodes"
asg_desired_capacity = var.cluster_number_of_nodes
additional_security_group_ids = [aws_security_group.all_worker_mgmt.id, aws_security_group.inside_vpc_traffic.id]
}
]workers_group_defaults = {
key_name = "eks_key"
subnets = module.vpc.private_subnets
}
}
Again in this snippet, we can see that we declare the cluster inside private subnets. We enable the Clowdwatch logs for all the components of the Control plane. We set the EC2 instance types and number for a config var
module and as defaults, we use m5.xlarge
as the instance type and 3 nodes. We set an EC2 key eks_key
if we need to ssh into the worker nodes.
To be able to run the code in this tutorial we need to install a couple of tools. On Mac we can use brew:
brew install terraform aws-iam-authenticator kubernetes-cli helm
And to reach AWS we need to also set up our AWS credentials.
Now we can start to initialize Terraform in order to get all the dependencies needed to deploy the infrastructure:
cd deployment/ && terraform init
If everything runs successfully you should be able to see something similar to the image:
We are ready to deploy the infrastructure. To do so run:
terraform apply
It will take some time until the deployment is done, so we can sit back and relax for a bit.
Once done you should see the message Apply complete! Resources: 55 added, 0 changed, 0 destroyed.
and the names of the resources deployed.
One additional step we can do to check if the deployment was correct is to see if the worker nodes have been attached to the cluster. For that we setup kubectl:
aws --region your-region eks update-kubeconfig --name your-cluster-name
We should be able to see three nodes when we run the following command:
kubectl get nodes
Step 2: Installing the Spark Operator
Usually, we deploy spark jobs using the spark-submit
, but in Kubernetes, we have a better option, more integrated with the environment called the Spark Operator. Some of the improvements that it brings are automatic application re-submission, automatic restarts with a custom restart policy, automatic retries of failed submissions, and easy integration with monitoring tools such as Prometheus.
We can install it via helm:
helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
helm install spark-op incubator/sparkoperator
If we run helm list
in the terminal the spark-op
chart should be available. Also, we should have a running pod for the spark operator. We can watch what pods are running in the default
namespace with the command kubectl get pods.
Step 3: Running a PySpark app
Now we can finally run python spark apps in K8s. The first thing we need to do is to create a spark
user, in order to give the spark jobs, access to the Kubernetes resources. We create a service account and a cluster role binding for this purpose:
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: spark-role
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: edit
subjects:
- kind: ServiceAccount
name: spark
namespace: default
To execute the creation of the role:
kubectl apply -f spark/spark-rbac.yml
You will get notified with serviceaccount/spark created
and clusterrolebinding.rbac.authorization.k8s.io/spark-role created.
The Spark Operator job definition:
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-job
namespace: default
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "uprush/apache-spark-pyspark:2.4.5"
imagePullPolicy: Always
mainApplicationFile: local:////opt/spark/examples/src/main/python/pi.py
sparkVersion: "2.4.5"
restartPolicy:
type: OnFailure
onFailureRetries: 2
driver:
cores: 1
memory: "1G"
labels:
version: 2.4.5
serviceAccount: spark
executor:
cores: 1
instances: 1
memory: "1G"
labels:
version: 2.4.5
We define our spark run parameters in a yml
file, similar to any other resource declarations on Kubernetes. Basically we are defining that we are running a Python 3
spark app and we are the image uprush/apache-spark-pyspark:2.4.5.
I recommend using this image because it comes with a newer version of yarn that handles writes to s3a
more efficiently. We have a retry policy if the job fails, it will be restarted. Some resource allocations for the driver and the executors. As the job is very simple, we use just an executor. Another thing we can notice is that we use the spark
service account that we defined earlier. To code we are using is a classic example of computing the pi
number.
To submit the code we are using kubectl
again:
kubectl apply -f spark/spark-job.yml
Upon completion, if we inspect the pods again we should have a similar result:
And if we check the logs by running kubectl logs spark-job-driver
we should find one line in the logs giving an approximate value of pi Pi is roughly 3.142020.
That was all folks. I hope you enjoyed this tutorial. We’ve seen how we can create our own AWS EKS cluster using terraform, to easily re-deploy it in different environments and how we can submit PySpark jobs using a more Kubernetes friendly syntax.
Comments