Deploying models with Apache Spark

 In this workshop, we’ll use an XGBoost model for scoring data with Apache Spark (AWS EMR)

Plan:

  • Prepare docker images
  • Set up an EMR cluster
  • Connect to it with Jupyter
  • Use spark-submit to submit spark jobs

Prerequisites

  • Python 3.7 (or Python 3.8). The easiest way to install it — use Anaconda (https://www.anaconda.com/products/individual)
  • XGBoost and pyarrow for generating data and training a model (install with pip)
  • Docker
  • AWS account and CLI installed and configured

Let’s start!

Create a bucket with data and model

  • Create a bucket “spark-workshop-data”
  • Upload the data there. E.g. to s3://spark-workshop-data/data-sessions/
  • Upload the model to the root of the bucket, e.g. to s3://spark-workshop-data/model.pkl
  • Script for generating the data and training the model: generate_data_model.ipynb

Preparing Docker images

Create a docker file pyspark-base.dockerfile:

FROM amazoncorretto:8

RUN yum -y update

RUN yum -y install yum-utils

RUN yum -y groupinstall development

RUN yum -y install python3 python3-dev python3-pip python3-virtualenv

ENV PYSPARK_DRIVER_PYTHON python3

ENV PYSPARK_PYTHON python3

RUN pip3 install --upgrade pip

RUN pip3 install numpy pandas boto3 --no-cache-dir

This will be our base image, which we will extend with other libraries. For our example, we want to use XGBoost, so let’s extend the image and install it. Let’s create another dockerfile pyspark-xgboost.dockerfile:

FROM pyspark-base

RUN pip3 install xgboost --no-cache-dir

Build them:

docker build -t pyspark-base -f pyspark-base.dockerfile .

docker build -t pyspark-xgboost -f pyspark-xgboost.dockerfile .

Create an ECR repo:

aws ecr create-repository --repository-name pyspark-images

The output:

{

    "repository": {

        "repositoryArn""arn:aws:ecr:eu-west-1:XXXXXXXXXXXX:repository/pyspark-images",

        "registryId""XXXXXXXXXXXX",

        "repositoryName""pyspark-images",

        "repositoryUri""XXXXXXXXXXXX.dkr.ecr.eu-west-1.amazonaws.com/pyspark-images",

        "createdAt"1605303665.0,

        "imageTagMutability""MUTABLE",

        "imageScanningConfiguration": {

            "scanOnPush"false

        },

        "encryptionConfiguration": {

            "encryptionType""AES256"

        }

    }

}

Where “XXXXXXXXXXXX” is your AWS account number.

From this output, we’re interested in the repositoryUri.

Tag the images:

 

ACCOUNT=XXXXXXXXXXXX

docker tag pyspark-base ${ACCOUNT}.dkr.ecr.eu-west-1.amazonaws.com/pyspark-images:pyspark-base

docker tag pyspark-xgboost ${ACCOUNT}.dkr.ecr.eu-west-1.amazonaws.com/pyspark-images:pyspark-xgboost

Log in to ECR

$(aws ecr get-login --no-include-email)

And push the images to ECR

docker push ${ACCOUNT}.dkr.ecr.eu-west-1.amazonaws.com/pyspark-images:pyspark-base

docker push ${ACCOUNT}.dkr.ecr.eu-west-1.amazonaws.com/pyspark-images:pyspark-xgboost

Note: if you use a different region, be sure to replace “eu-west-1” with it.

Create key pair

Now we need to create a key pair to be able to SSH to the EC2 machines of the cluster.

  • Go to services ⇒ “EC2”
  • Select “Key Pairs” under “Network & Security”
  • Click “Create new key pair”, call it “emr_deploy_workshop”

Put the key to “.aws” or any other location

Set permissions:

chmod 600 emr_deploy_workshop.pem

Create roles

We need to create an EC2 instance profile for the instances of the cluster.

  • Go to Services ⇒ IAM
  • Select “roles”, click “Create role”
  • Select “AWS service”, choose “EMR” from the list

  • Select “EMR Role for EC2”

  • Use the default list of attached policies — it should contain only one item “AmazonElasticMapReduceforEC2Role”

  • Skip tags
  • Name it “EMR_workshop_ec2_profile”
  • Click “Create role”

The role should look like that:

Prepare configuration

Now we need to prepare a config for the EMR cluster.

  • We need to add our ECR to the list of trusted repositories
  • We also need to set the base image as the default docker image for all spark jobs

[

   {

      "Classification""container-executor",

      "Properties": {},

      "Configurations": [

         {

            "Classification""docker",

            "Properties": {

               "docker.privileged-containers.registries""local,centos,<ACCOUNT_NUMBER>.dkr.ecr.<REGION>.amazonaws.com",

               "docker.trusted.registries""local,centos,<ACCOUNT_NUMBER>.dkr.ecr.<REGION>.amazonaws.com"

            }

         }

      ]

   },

   {

      "Classification""livy-conf",

      "Properties": {

         "livy.spark.master""yarn",

         "livy.spark.deploy-mode""cluster",

         "livy.server.session.timeout""16h"

      }

   },

   {

      "Classification""hive-site",

      "Properties": {

         "hive.execution.mode""container"

      }

   },

   {

      "Classification""spark-defaults",

      "Properties": {

         "spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE""docker",

         "spark.yarn.am.waitTime""300s",

         "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE""docker",

         "spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE""<ACCOUNT_NUMBER>.dkr.ecr.eu-west-1.amazonaws.com/pyspark-images:pyspark-base",

         "spark.executor.instances""2",

         "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE""<ACCOUNT_NUMBER>.dkr.ecr.eu-west-1.amazonaws.com/pyspark-images:pyspark-base"

      }

   }

]

Upload the config to our bucket:

aws cp config.json s3://spark-workshop-data/config.json

Create EMR cluster

Now we’re ready to create an EMR cluster:

  • Go to services ⇒ EMR

  • Click “Create cluster”, select “Go to advanced options”

  • Choose “emr-6.2.0” from the dropdown list of releases (or any EMR of 6+ version — older releases don’t support Docker)
  • Add “Spark” and “JupyterEnterpriseGateway” there
  • Load the config file from S3: Use “s3://spark-workshop-data/config.json”

  • Click next, keep the default settings.
  • Click next, put a name like “EMR-deployment-workshop”.

  • Click next.
  • For the EC2 key pair, select the key we created previously.
  • Choose “Custom” permissions and select “EMR_workshop_ec2_profile” for EC2 instance profile — the instance profile we created earlier.

  • Click “Create cluster”
  • Wait till the cluster changes the status from “Starting” to “Waiting”

Note the cluster id — we’ll need it.

Jupyter notebook

You can use the cluster from a Jupyter notebook

  • Go to notebooks, click “Create notebook”.
  • Select the existing cluster, chose the default role (or select “Create default role” if you’re doing it for the first time)

  • Then, wait till the notebook is ready
  • Now you can open it in Jupyter (or JupyterHub)

In Jupyter:

Create a new notebook, select “PySpark” kernels.

First, let PySpark know that it shouldn’t use virtualenv — else it won’t use our Docker image. Run this in the first cell:

%%configure -f

{"conf": {"spark.pyspark.virtualenv.enabled""false"}}

Then check that it uses the right image. The easiest way to do it is to check if the version of numpy matches what we have in Docker: the default one is older than what we have in the image.

Run that:

import numpy

numpy.__version__

In our case, it prints '1.19.4', which is the same version we have in Docker.

To use a different image, we need to specify it in the config. Restart the kernel and put this config in the first cell before running anything else.

%%configure -f

{

    "conf": {

        "spark.pyspark.virtualenv.enabled""false",

        "spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE""XXXXXXXXXXXX.dkr.ecr.eu-west-1.amazonaws.com/pyspark-images:pyspark-xgboost",

        "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE""XXXXXXXXXXXX.dkr.ecr.eu-west-1.amazonaws.com/pyspark-images:pyspark-xgboost"

    }

}

To make sure the correct image is loaded, try to import xgboost (the default image doesn’t have it):

import xgboost as xgb

It should produce no errors.

Now let’s read the data:

df = spark.read.parquet('s3://spark-workshop-data/data-sessions/')

And have a look:

df.show()

Code for applying the model

Let’s create a script with the code — entrypoint.py (you can also put this to Jupyter for experimenting).

First, let’s do the imports:

import pickle

import numpy as np

import pandas as pd

import xgboost as xgb

import pyspark

from pyspark.sql import SparkSession

from pyspark.sql import types

import boto3

Now we need to download the model and load it with pickle:

s3 = boto3.client('s3')

s3.download_file('spark-workshop-data''model.pkl''model.pkl')

with open('model.pkl''rb'as f_in:

    model = pickle.load(f_in)

Next, we define the function for applying the model:

def apply_model(columns, model, batch):

    df_batch = pd.DataFrame(batch, columns=columns)

    X = df_batch[['f_views_fraction''f_replies_fraction''f_scrolls_fraction']].values

    dm = xgb.DMatrix(X)

    y_pred = model.predict(dm)

    df_batch['prediction'] = y_pred

    for _, row in df_batch[['session_long''prediction']].iterrows():

        yield (row.session_long, float(row.prediction))

After that, create a spark session and load the data:

spark = SparkSession\

        .builder\

        .appName("spark test") \

        .getOrCreate()

df = spark.read.parquet('s3://spark-workshop-data/data-sessions/')

Often, we need to do some data transformation before we can use it in the model. It’s also the case for our example:

df = df \

    .withColumn('f_views_fraction', df.f_view_sessions / df.f_sessions) \

    .withColumn('f_replies_fraction', df.f_reply_sessions / df.f_sessions) \

    .withColumn('f_scrolls_fraction', df.f_scroll_sessions / df.f_sessions) \

    .select('session_long''f_views_fraction''f_replies_fraction''f_scrolls_fraction')

Now let’s use the apply_model function in mapPartition:

columns = df.columns

output_schema =  types.StructType([

    types.StructField("session_long", types.StringType()),

    types.StructField("predictions", types.FloatType()),

])

df_output = df.rdd \

    .mapPartitions(lambda p: apply_model(columns, model, p)) \

    .toDF(output_schema)

It’s ready, so we can write the results back to S3:

df_output.write.mode('overwrite').parquet('s3://spark-workshop-data/output/2020-10-09/')

Sometimes, when our partitions are too large, it makes sense to chunk each partition in smaller batches:

from itertools import islice

def split_into_batches(iterable, size):

    while True:

        batch = islice(iterable, size)

        batch = list(batch)

        if len(batch) == 0:

            break

        yield batch

def apply_model_batch(columns, model, partition):

    for batch in split_into_batches(partition, 10000):

        df_batch = pd.DataFrame(batch, columns=columns)

        X = df_batch[['f_views_fraction''f_replies_fraction''f_scrolls_fraction']].values

        dm = xgb.DMatrix(X)

        y_pred = model.predict(dm)

        df_batch['prediction'] = y_pred

        for _, row in df_batch[['session_long''prediction']].iterrows():

            yield (row.session_long, float(row.prediction))

You can see the entire file here: entrypoint.py

Spark-Submit

To submit it, ssh to the master, copy the entrypoint file and run spark-submit:

DOCKER_IMAGE_NAME=${ACCOUNT}.dkr.ecr.eu-west-1.amazonaws.com/pyspark-images:pyspark-xgboost

spark-submit \

    --master yarn \

    --deploy-mode cluster \

    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \

    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${DOCKER_IMAGE_NAME} \

    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \

    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${DOCKER_IMAGE_NAME} \

    --num-executors 2 \

    entrypoint.py

Of course, you don’t have to do it from the master, but it requires additional configuration.

You can also do it with AWS CLI — for that you don’t need to SSH to the master:

CLUSTER_ID="j-3FIB6N1RLNZZE"

DOCKER_IMAGE_NAME=${ACCOUNT}.dkr.ecr.eu-west-1.amazonaws.com/pyspark-images:pyspark-xgboost

aws emr add-steps \

    --cluster-id ${CLUSTER_ID} \

    --steps Type=spark,Name=spark-test,Args=[--master,yarn,--deploy-mode,cluster,--conf,spark.yarn.submit.waitAppCompletion=true,--num-executors,2,--conf,spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker,--conf,spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${DOCKER_IMAGE_NAME},--conf,spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker,--conf,spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${DOCKER_IMAGE_NAME},s3://spark-workshop-data/entrypoint.py],ActionOnFailure=CONTINUE

In yellow we have the same configuration we put to spark-submit. Note that entrypoint.py is uploaded to S3.

SSH

To make sure you can SSH to the master, select the security group for master:

  • Click “edit inbound rules”
  • Click “add rule”, select “SSH” from the dropdown list. In the source field, select “0.0.0.0/0” — this will let everyone see this port.
  • Click “save rules”.

Now, connect:

Connect:

CLUSTER_ID="j-L5LEQNTOUF5Z"

aws emr ssh \

    --cluster-id ${CLUSTER_ID} \

    --key-pair-file ~/.aws/emr_deploy_workshop.pem

 

You will see something like that:

hadoop@ec2-18-202-33-134.eu-west-1.compute.amazonaws.com -t

Last login: Fri Nov 13 11:45:58 2020

       __|  __|_  )

       _|  (     /   Amazon Linux 2 AMI

      ___|\___|___|

https://aws.amazon.com/amazon-linux-2/

                                                                   

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR    

E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R  

EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R

  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R

  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R

  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R

  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR  

  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R  

  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R

  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R

EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R

E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R

EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

                                                                   

[hadoop@ip-172-31-4-226 ~]$

Download the model:

aws s3 cp s3://spark-workshop-data/model.pkl model.pkl

Run “pyspark” there. It’ll take a while for the first time (it’s downloading docker images from ECR)

Check that you can access the data:

>>> df = spark.read.parquet('s3://spark-workshop-data/data-sessions/')

>>> df.show()

+----------------+----------+---------------+----------------+-----------------+

|    session_long|f_sessions|f_view_sessions|f_reply_sessions|f_scroll_sessions|

+----------------+----------+---------------+----------------+-----------------+

|998f87f9e1cf925b|        11|              0|               0|                4|

|54e5f263c354f5e5|        13|              8|               1|                3|

|9fbacf3f7d99bf65|        37|             16|               2|                6|

|bcddabefafe67329|        39|             23|               0|               10|

|4bf89784be7a9353|        50|             12|               3|               23|

|b139fe56dc546a70|        14|             11|               0|                0|

|225526257eabd96b|        25|              3|               2|                5|

|77de18f11ec908be|        77|              0|               4|               15|

|14b3a805ece01bef|        15|              7|               1|                1|

|a5e91efd12d3839b|       121|             43|               3|               57|

|1985577238472167|        42|              8|               0|               18|

|9fa8b0c72496782c|        55|             43|               2|                5|

|1e99d0191f7ecd76|         7|              5|               0|                0|

|a9455abfe37707c2|        26|             11|               0|                9|

|3048615f158e964f|        34|              6|               2|                4|

|b50595318caac4c3|        12|              4|               1|                5|

|73e5c3d46427893b|       104|              0|              10|               10|

|358b02c1f17dc4b2|        21|              3|               1|                5|

|7240e9c5cf2c7bc7|         3|              1|               0|                1|

|8789969141d9e74c|        92|             56|               1|                7|

+----------------+----------+---------------+----------------+-----------------+

only showing top 20 rows

                                        

Without AWS CLI, use the DNS of the master node:

ssh -i ~/.aws/emr_deploy_workshop.pem \

    hadoop@ec2-3-249-198-217.eu-west-1.compute.amazonaws.com

Resources:

Comments

Popular posts from this blog

Flutter for Single-Page Scrollable Websites with Navigator 2.0

A Data Science Portfolio is More Valuable than a Resume

Better File Storage in Oracle Cloud