Autoscaling for AWS EMR Clusters

Allow EMR Clusters to Auto Scale based on cluster usage, allowing our applications to be more reactive and cost-efficient.

Estimated Impact

  • Better performance and reaction to load spikes.
  • Cost savings on EMR resource usage.

Measurement

  • Measurement can be done by comparing the overall number of deployed instances before/after Autoscaling is enabled everywhere possible.
  • Also, it might be possible that by enabling AutoScaling, some EMR Instance Types may be changed to less powerful ones.

What we know so far

About EMR Auto-scaling

  • EMR allows auto-scaling only on TASK node types. You can set any instance type, which is going to be used for new servers.
  • You can set the minimum and maximum instances there can be. (I have gone with 0 and 6 respectively)
  • You can set the thresholds for scaling out and in on instance count. Available metrics are the same as CloudWatch.
  • You need to specify an Autoscaling role. There’s a default AWS creates for you: `EMR_AutoScaling_DefaultRole`.
  • All this can be configured via CloudFormation using the InstanceGroupConfig type.
  • There is an issue where a cluster/stack with this configuration will fail to delete the regular way. More info HERE.
"FeedingAutoScale": {
       "Type" : "AWS::EMR::InstanceGroupConfig",
       "Properties" : {
         "AutoScalingPolicy" : {
           "Constraints": {
             "MaxCapacity": {
               "Ref": "AutoScalingInstanceMax"
             },
             "MinCapacity": {
               "Ref": "AutoScalingInstanceMin"
             }
           },
           "Rules": [
             {
               "Name": "Scale-out",
               "Description": "Scale-out policy",
               "Action": {
                 "SimpleScalingPolicyConfiguration": {
                   "AdjustmentType": "CHANGE_IN_CAPACITY",
                   "ScalingAdjustment": 1,
                   "CoolDown": 300
                 }
               },
               "Trigger": {
                 "CloudWatchAlarmDefinition": {
                   "Dimensions": [
                     {
                       "Key": "JobFlowId",
                       "Value": {
                         "Ref": "FeedingCluster"
                       }
                     }
                   ],
                   "EvaluationPeriods": 1,
                   "Namespace": "AWS/ElasticMapReduce",
                   "Period": 300,
                   "ComparisonOperator": "LESS_THAN",
                   "Statistic": "AVERAGE",
                   "Threshold": {
                     "Ref": "AutoScalingScaleOutYarnMAP"
                   },
                   "Unit": "PERCENT",
                   "MetricName": "YARNMemoryAvailablePercentage"
                 }
               }
             },
             {
               "Name": "Scale-in",
               "Description": "Scale-in policy",
               "Action": {
                 "SimpleScalingPolicyConfiguration": {
                   "AdjustmentType": "CHANGE_IN_CAPACITY",
                   "ScalingAdjustment": -1,
                   "CoolDown": 300
                 }
               },
               "Trigger": {
                 "CloudWatchAlarmDefinition": {
                   "Dimensions": [
                     {
                       "Key": "JobFlowId",
                       "Value": {
                         "Ref": "FeedingCluster"
                       }
                     }
                   ],
                   "EvaluationPeriods": 1,
                   "Namespace": "AWS/ElasticMapReduce",
                   "Period": 300,
                   "ComparisonOperator": "GREATER_THAN",
                   "Statistic": "AVERAGE",
                   "Threshold": {
                     "Ref": "AutoScalingScaleInYarnMAP"
                   },
                   "Unit": "PERCENT",
                   "MetricName": "YARNMemoryAvailablePercentage"
                 }
               }
             }
           ]
         },
         "Name": "AutoScaling TASK",
         "InstanceCount": {
           "Ref": "AutoScalingInstanceCount"
         },
         "InstanceRole" : "TASK",
         "InstanceType": {
           "Ref": "AutoScalingInstanceType"
         },
         "JobFlowId": {
           "Ref": "FeedingCluster"
         }
       }
     }

About Dynamic Allocation

For AutoScaling to work correctly, no fixed set of executors need to be configured. Instead, Spark needs to be able to allocate resources dynamically. This concept has been around for a while. It uses previous job metrics to calculate a dynamic load. There exists two relevant settings for this:

  • `spark.dynamicAllocation.enabled`  – Since Spark 1.5
  • `spark.streaming.dynamicAllocation.enabled`  – Since Spark 2.0.0

As our Feeding application is a Spark streaming one, we need the latter. However, EMR automatically sets `spark.dynamicAllocation.enabled` to true behind doors in an attempt to optimize apps for you. As these two settings are incompatible (They can’t be both enabled), we need to explicitly disable it. More info on Spark settings EMR changes automatically can be found HERE.

Other spark configs that don’t play well with StreamingDynamicAllocation are:

  • `spark.executor.cores`
  • `spark.executor.instances`

Those need to be left out.

On upgrading EMR (and Spark) versions

For Spark Streaming Dynamic Allocation, it is recommended to use Spark version > 2.0.1. Currently, our setup uses EMR release version 5.0.0, which comes with Spark 2.0.0. An EMR release upgrade is needed in order to use Dynamic Allocation and Auto-scaling working. I’ve decided to go straight for the most recent EMR release version, which as of February 2018 is EMR 5.11.1

To know more on EMR release versions, and see details, go HERE.

What changed on Spark?

Mainly, two things are different on Spark 2.2.1 in our scenario:

  • Spark stopped distributing Kinesis API’s because of licensing/legal issues. Jira HERE.
  • AWS deprecated a method we used on our streaming API to get the region based on the Kinesis endpoint. Javadoc HERE. Changes needed for SparkFeeding HERE.

To solve the first, we can use --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.0 to add spark-streaming-kinesis-asl and its dependencies into the classpath automatically, rather than building the assembly jar by ourselves. However, for SBT to don’t fail on artifact building, it is required to use the %provided keyword for that library on `build.sbt`.

To solve the second issue, the `getRegion` method o the same class can be used. That one takes a string with the name of the region, which can be easily set on the spark config with the following CloudFormation property:

«spark.search.region»: {
«Ref»: «AWS::Region»
},

Getting YARN Logs on EMR

All this investigation was possible, by taking a look at the YARN error logs. If there is a need to take a look at the reason of a failed Spark Application.

Having trouble troubleshooting a failed Spark Application like Spark_Feeding? EMR will gather stderr and controller logs and upload them to S3, but most of the time the provided information will not be enough. So, here’s how you can get it:

  1. Go to the EMR cluster’s page in AWS
  2. In the summary page, note the Master’s public DNS.
  3. Fix the URL (i.e. ip-172-00-0-00.ec2.internal: … => 172.00.0.00: …)
    1. Tip: There is an add-on for Chrome that fixes it for you!
  4. SSH as hadoop to that address, providing the team’s SearchInfra_Search credentials file.
  5. You can now issue YARN commands to list applications and get the complete logging

Example commands

  • Get active applications details: yarn application --list
  • Get failed applications details: yarn application --list -appStates FAILED
  • Get an application full logs: yarn logs --applicationId <applicationID>

Infrastructure as Code – AWS Redshift (Boto3)

The following is an example/template of Infrastructure as Code (IAC) for deploying an AWS Redshift cluster using Python and Boto3. Here’s the target architecture:

Target Redshift AWS Architecture

For this to work, you will need to create an AWS account and IAM user with appropriate permissions to Redshift and S3. After that, just input your own user secret and key into the attached notebook.

You can download the ipynb notebook. Or just check the Jupyter notebook below:

On the next post, we will discuss how to Extract, Transform and Load (ETL) data into this Redshift Database via parallel imports using S3.