Digital Turbine Inc.

11/12/2024 | News release | Distributed by Public on 11/12/2024 05:17

Data Pipeline Migration from AWS to GCS

Maintaining a data pipeline, even a basic one, is not an easy task. There are SLAs you are obligated to meet, underlying data that must be kept reliable and users that can't wait to start analyzing the current day's insights while keeping compute budget limitations. Therefore, you can already imagine how migrating a data pipeline from one cloud vendor to another can be a difficult job. Now think about this task, but for a data pipeline that processes around 1 TB of data per hour. Crazy, right? This is what we faced at Digital Turbine, and I'm here to tell you how we did it.

We migrated from AWS to GCP our data pipeline, which reads ± 120 million records per minute, processes them, and then writes them to Deep Storage and DBs for serving clients.

Our pipeline consists of a Spark Streaming Kafka consumer Jaquet-a tool named after its main purpose, converting JSONs to Parquets-which writes its output to Deep Storage and Airflow that triggers Spark jobs for processing the data and writing it back to Deep Storage. Airflow then also triggers Druid processes for indexing the transformed data so our users will be able to slice and dice it using our Web UI. Previously, Jaquet was deployed over EMR and the aggregations over Databricks, so the only component which was Kubernetes-managed was Airflow.

We decided to use the cloud migration process as an opportunity to port our Spark streaming and aggregation jobs from EMR/Databricks to K8s (actually, we already wrote about migrating to K8s). We've done it as a preliminary stage for the cloud migration itself, meaning that the entire migration process consisted of two steps:

  1. Porting our spark streaming and aggregation jobs from EMR/Databricks to K8s.
  2. Porting our cloud vendor from AWS to GCP (from EMR to GKE, from S3 to GCS)

That way we achieved

  • In the short term-better support for a gradual, safe, and smooth migration.
  • In the long term-less managed services, which means smaller cost and better control and configuration capabilities over our entire pipeline.

Note. This blogpost will show a high-level overview of the migration process (i.e., the architecture design before and after) with technical details about the crucial points we faced. For each such point, we'll also give some tips () we wish we knew before.

Stage 1: Porting streaming and aggregation jobs from EMR and Databricks to K8s (respectively)

The first step of this stage was to migrate Jaquet from EMR to K8s, while still writing its output to S3 and using instances provided by EC2.

No matter how we're going to deploy our Spark applications over K8s, we will have to use a Spark image. Unfortunately, we haven't found any open source image that meets our needs. Namely, an image which can integrate with AWS while also supplying extended logging and monitoring abilities, and so we decided to go ahead and implement one. Our custom image includes Apache Spark and Hadoop, and, in order to integrate with AWS, it also includes the hadoop-aws module. This is not the full Dockerfile of the image, but it showcases the main idea.

FROM docker.io/library/amazoncorretto:8
# URLs for the relevant components
ARG SPARK_DISRIBUTION_URL=https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-without-hadoop.tgz
ARG HADOOP_DISRIBUTION_URL=https://archive.apache.org/dist/hadoop/common/hadoop-3.2.1/hadoop-3.2.1.tar.gz
ARG HADOOP_AWS_URL=https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.1/hadoop-aws-3.2.1.jar
ARG JMX_PROMETHEUS=https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.11.0/jmx_prometheus_javaagent-0.11.0.jar
ARG GELF_LOGGER=https://repo1.maven.org/maven2/biz/paluch/logging/logstash-gelf/1.14.1/logstash-gelf-1.14.1.jar

# using yum to install some utilities packages, e.g.
# yum -y install tar wget

# Installing Spark
RUN wget --progress=bar:force ${SPARK_DISRIBUTION_URL} -O /opt/spark.tgz && \
    tar xzf /opt/spark.tgz -C /opt && \
    ln -s /opt/spark-* /opt/spark && \
    mkdir /opt/spark/work-dir && \
    rm -f /opt/spark.tgz \
# Installing Hadoop
RUN wget --progress=bar:force ${HADOOP_DISRIBUTION_URL} -O /opt/hadoop.tgz && \
    tar xzf /opt/hadoop.tgz -C /opt && \
    ln -s /opt/hadoop-* /opt/hadoop && \
    ln -s /opt/hadoop/etc/hadoop /etc/hadoop && \
    rm -f /opt/hadoop.tgz \

# Installing the AWS connector
ADD ${HADOOP_AWS_URL} /opt/spark/jars/
# Installing monitoring and logging modules
ADD ${JMX_PROMETHEUS} /opt/spark/jars/
ADD ${GELF_LOGGER} /opt/spark/jars/

WORKDIR /opt/spark/work-dir
ENTRYPOINT [ "/opt/spark/kubernetes/dockerfiles/spark/entrypoint.sh" ]

Make sure you use the same version of AWS connector and Hadoop. This will ensure that nothing will be broken at runtime. Using different versions will result in undefined behavior, and you don't want that! If you encounter ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found error, probably you should take a second look at listed modules.

This image will be the "Framework" of your Spark application, so you should mark Spark as "Provided" when building the application's .jar (using either Maven or SBT,). Also, if you have an interaction with AWS (and you probably do) like writing to S3 for example, you should mark hadoop-aws as "Provided" as well while building your application. Using "Compile" instead of "Provided" will result in an unnecessary bigger application .jar. Even worse, it can also lead to potential runtime errors, such as class and version conflicts, due to the presence of duplicate modules.

So, we have a nice Spark image. Now, we want to be able to use it for deploying actual Spark applications, but how?

There are two primary methods for deploying Spark applications over K8s. The first which comes to mind is pure Spark, as Spark natively supports K8s as a cluster manager. The second is to use the Spark Operator Open Source tool, which uses K8s Operator Pattern to deploy Spark applications over K8s.

Eventually, we chose Spark Operator. Although it means an extra component to manage and maintain, the declarative configuration (which comes in the shape of a .yaml file) is a blessing when talking about a big data pipeline with a lot of different and complex applications, each with its own dependencies, resources request, etc.

After we deployed a Spark Operator instance on our cluster, we've created a dedicated Helm chart. On top of the logging services and monitoring definitions, it also defines a SparkApplication custom resource definition (CRD). This CRD will be scraped by the Spark Operator service, and tells it which application .jar to deploy and how, which Spark image to use, how many resources to request, and other configurations to consider when deploying a Spark application.

# Source: spark-apps-generic/templates/sparkapplication.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: example-release
  namespace: example-ns
spec:
  mode: cluster
  # path to our new, custom spark image in the Dcoker Registry
  image: xxx.dkr.ecr.us-east-1.amazonaws.com/path/to/custom-spark-image:v3.1.1-1
  imagePullPolicy: IfNotPresent
  type: "Scala"
  mainClass: com.example.application.Main
  mainApplicationFile: "s3a://bucket/path/to/application.jar"
  arguments:
      - 'first-argument'
      - 'second-argument'
  sparkVersion: 3.1.1
  sparkConf:
    spark.eventLog.dir: "s3a://server-log-path/"
    spark.eventLog.enabled: "true"
    spark.eventLog.rolling.enabled: "true"
    spark.eventLog.rolling.maxFileSize: "16m"
    spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
    spark.hadoop.fs.s3a.aws.credentials.provider: "com.amazonaws.auth.profile.ProfileCredentialsProvider"
    spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: 2
  volumes:
    - name: spark-local-dir-1
      hostPath:
        path: "/mnt/stateful_partition/spark-local-dir"
        type: DirectoryOrCreate
    - name: log4jproperties
      configMap:
        name: example-release-log4j-config
  driver:
    cores: 1
    coreRequest: 2000m
    memory: 50G
    annotations:
      cluster-autoscaler.kubernetes.io/safe-to-evict: "false"
      vault.hashicorp.com/agent-inject: 'false'
      vault.hashicorp.com/agent-inject-secret-credentials: secret/jaquet
      vault.hashicorp.com/agent-inject-template-credentials: |
        [default]
        {{ with secret "secret/jaquet" -}}
        aws_access_key_id = {{ .Data.aws_access_key_id }}
        aws_secret_access_key = {{ .Data.aws_secret_access_key }}
        {{ end }}
    labels:
      app: example-release
    volumeMounts:
      - name: spark-local-dir-1
        mountPath: "/mnt/stateful_partition/spark-local-dir"
      - name: log4jproperties
        mountPath: "/opt/spark/log4j"
    javaOptions: "-Dlog4j.configuration=file:/opt/spark/log4j/log4j.properties"
    env:
    - name: AWS_CREDENTIAL_PROFILES_FILE
      value: /vault/secrets/credentials
  executor:
    coreRequest: 6000m
    cores: 4
    instances: 120
    memory: 50G
    annotations:
      cluster-autoscaler.kubernetes.io/safe-to-evict: "false"
      vault.hashicorp.com/agent-inject: 'false'
      vault.hashicorp.com/agent-inject-secret-credentials: secret/jaquet
      vault.hashicorp.com/agent-inject-template-credentials: |
        [default]
        {{ with secret "secret/jaquet" -}}
        aws_access_key_id = {{ .Data.aws_access_key_id }}
        aws_secret_access_key = {{ .Data.aws_secret_access_key }}
        {{ end }}
    volumeMounts:
      - name: log4jproperties
        mountPath: "/opt/spark/log4j"
    javaOptions: "-Dlog4j.configuration=file:/opt/spark/log4j/log4j.properties"
    env:
    - name: AWS_CREDENTIAL_PROFILES_FILE
      value: /vault/secrets/credentials
  monitoring:
    exposeDriverMetrics: true
    exposeExecutorMetrics: true
    prometheus:
      jmxExporterJar: "/opt/spark/jars/jmx_prometheus_javaagent-0.11.0.jar"
      port: 8090

If your spark application reads from, or writes to, S3, make sure to set spark.hadoop.fs.s3a.impl to org.apache.hadoop.fs.s3a.S3AFileSystem which is a part of the hadoop-aws module mentioned earlier.

As the spark.hadoop.fs.s3a.aws.credentials.provider configuration suggests, we used a profile credentials file strategy to authenticate to AWS. We've mounted the file using Vault, but there are more options to achieve it, for example, by mounting the file as a secret before applying the Helm release.

Read which fileoutputcommitter is best suited for your needs. This configuration defines how to write the output data to the Deep Storage (in this case, to S3) and it can significantly impact performance or even cause duplications if not choosing the one suitable for your case. We'll elaborate about it some more later on this blog.

If you like to review your Spark applications, check out the spark.eventLog.* Spark configurations ( e.g., spark.eventLog.dir, spark.eventLog.enabled, etc.). These configurations define how your Spark application will write its event log files and where. Of course, in order to decode these files properly, you'll need a Spark History Server service up and running. You can read more about it here.

If using Helm to manage your Spark aggregations releases, make sure to uninstall the aggregations releases once finished to free up unused resources. We had a case where we didn't delete the Helm releases and soon we got an "IP space exhaustion" failure each time when trying to submit a new Spark aggregation, since having not been uninstalled once done, the Helm releases caused a lot of IPs to be occupied by the live (but unneeded) K8s services. If you get such an error or, for example, a "quota reached" one, we strongly suggest you make sure your cleaning mechanism of the finished Helm releases is working properly.

One can observe that the setup described so far will work regardless of the deployed Spark's application nature, whether it is a streaming or an aggregation one. Therefore, we used this setup to migrate both our streaming and aggregation jobs from EMR and Databricks to our self-managed K8s cluster.

At this point, we're still using AWS exclusively as our cloud vendor, but instead of using its managed services for our Streams (and Databricks for our Aggregations), we are managing them on our own, and only using its computing and storage resources, as well as EKS for managing our K8s cluster. This is the state so far:

Stage 2: Porting our cloud vendor from AWS to GCP (from EMR to GKE, from S3 to GCS)

The next step was to adjust our image and Helm chart to integrate with both AWS and GCP. This will give us flexibility when gradually migrating our components from AWS to GCP. After doing it for AWS, adjusting it for GCS was a straightforward task.

Just as the hadoop-aws allows Spark to read from, or write to, S3, adding the gcs-connector module to the custom image will give the ability to interact with GCS. This is achieved by incorporating the following lines into our custom Spark Dockerfile:

ARG GCS_CONNECTOR=https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.17.jar
ADD ${GCS_CONNECTOR} /opt/spark/jars/

As for the SparkApplication CRD file, under the executor.annotation and driver.annotation, add the following lines:

vault.hashicorp.com/agent-inject-secret-gcs.sa: secret/kubernetes/prd-use1-data/auction-object-gcs/serviceaccount
vault.hashicorp.com/agent-inject-template-gcs.sa: |
        {{ with secret "secret/kubernetes/prd-use1-data/auction-object-gcs/serviceaccount" -}}
        {{ .Data.data }}
        {{ end }}

This tells Vault to populate the gcs.sa file with the right service account credentials to use when reading from, or writing to, GCS.

Still in the CRD file, add the following configurations under sparkConf key:

# Defining "GoogleHadoopFS" (provided by the gcs-connector) as the protocol to use when handling gs:// URIs
"spark.hadoop.fs.AbstractFileSystem.gs.impl": com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS

# Setting the authentication method as Json keyfile, and poitning at the keyfile location inside the Pods
"spark.hadoop.fs.gs.auth.type": SERVICE_ACCOUNT_JSON_KEYFILE
"spark.hadoop.fs.gs.auth.service.account.json.keyfile": "/vault/secrets/gcs.sa"

And that's it! Now the spark job is able to run on both EKS/GKE and read from, or write to, S3/GCS, interchangeably.

As mentioned, we used the keyfile authentication method, but there are other and more convenient ways. One of them is the Application Default Credentials (ADC) strategy, which searches in predefined locations (like dedicated environment variable or predefined filesystem path) for the credentials. You can use it by replacing the keyfile configurations mentioned above with: "spark.hadoop.fs.gs.auth.type": APPLICATION_DEFAULT.

FileOutputCommitters v1 & v2 are NOT SAFE to use when writing to GCS, since GCS doesn't support atomic directory-renaming. We discovered this the hard way after investigating some unexpected data duplications on our aggregations output. It turns out that extraordinarily intense spot eviction rush hours caused a large number of executors to fail in a non-graceful manner-leaving behind their intermediate results in the final GCS locations without cleaning them up. What you should do is use the Manifest Committer, which is designed specifically to address this issue, while also being faster than the traditional algorithms. You can configure it by adding these Spark configurations:

sparkConf:
spark.hadoop.mapreduce.outputcommitter.factory.scheme.gs: "org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory"
spark.sql.parquet.output.committer.class: "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter"
spark.sql.sources.commitProtocolClass: "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"

Another consideration when interacting with a GCS Bucket is to make sure whether your Bucket versioning is turned on or off. Keep in mind that turning it on for no good reason can highly increase the storage cost. It can also result in some weird behavior when reading from the versioned bucket.For example, we've suffered some data duplications since we didn't know the bucket versioning was turned on, while our aggregations were not prepared to interact with such a bucket.

Having such a setup means, from the data pipeline perspective, that the cloud migration was encapsulated almost entirely, as each Stream/Aggregation was able to read/write from GCP/AWS independently. This helps us a lot with migrating the pipeline in a gradual manner and to perform quick rollback when needed.

The migration process itself involved two Kafka-Jaquet deployments the legacy production Kafka-Jaquet deployment which already existed over AWS and the fresh Kafka-Jaquet deployment over GCP. We started to gradually migrate traffic from AWS Kafka to GCP Kafka. At this point, both AWS Jaquet and GCP Jaquet wrote their output to S3, and the aggregations got executed over EKS. Once we passed the 50%-traffic-over-GCP milestone, we felt pretty confident with the new GCP setup to work smoothly under our heavy load, and so we re-configured both Jaquets to write to GCS and the aggregations to be deployed over GKE.

Of course, we had dependencies. Changing the output location of a stream from AWS to GCS should have resulted in a change of the input source of all of its clients as well. To handle it, we replicated the data from GCS to AWS for some period of time using a third-party tool. That way we could migrate components gradually, as all of the aggregations which haven't been migrated yet have been fed from the AWS replicated data until their time to migrate to GCP has come. DT is a big company, so there are a lot of internal clients of our data which haven't made their migration to GCP at the same pace as we did, and so the data-synchronization solution helped us to accommodate their demand as well.

After reaching 100%-traffic-over-GCP, we spin off the AWS deployments, which means that we basically finished with the migration process, as all of our big components are running over Google K8s Engine:

Big architectural changes such as migrating cloud providers or porting managed services to K8s are good opportunities to consider cost optimizations. We did a lot of cost reduction work during the migration process, for example:

We've configured the Cluster Autoscaler over our K8s clusters. This is an open source, standalone tool for tuning a K8s cluster size on the fly. It helps save costs dramatically by keeping the cluster's node number and resource utilization as tight as possible.

The Cluster Autoscaler also comes with a Priority Expander feature. This feature defines the Autoscaler which scaling group to try first when spinning up fresh nodes. It is highly beneficial when speaking about saving costs, for instance by defining to try spot scaling groups before on-demand ones as a fallback, or trying cheap instance-type scaling groups before more expensive ones. These are common use cases when speaking about data pipelines.

No matter which cloud provider you migrate to, don't forget to set lifecycle policies for buckets to automatically delete old, unused objects and avoid unnecessary costs.

Consider upgrading your technology versions to utilize their new features. For example, we've upgraded the Spark version of our aggregations and streams to enable the pvc-reuse and shuffle data recovery features, which improve cost efficiency when using executors on spot instances.

Migrating a data pipeline from one cloud provider to another isn't a simple task-it's a journey with high stakes which involves porting multiple, complex (and potentially real-time) components, while still satisfying SLA criteria and keeping all services stable. As we've explored here, there's a lot to weigh before you even begin, like which technologies to use, how the actual migration process will take place, and how to use this opportunity to upgrade the pipeline and optimize costs. We hope the insights we've gathered from our own migration experience and shared throughout this article help you tackle these challenges and make your migration smoother.