Querying our Data Lake in S3 using Zeppelin and Spark SQL

Until recently, most companies used the traditional approach for storing all the company’s data in a Data Warehouse.

The internet growth caused an increase in the number of data sources and the massive quantities of data to be stored, requiring scaling these Data Warehouses constantly. They were not designed to handle petabytes of data, so companies were driven into using big data platforms (such as Hadoop) capable of transforming big data into actionable insights at high speed.

Hadoop traditionally coupled storage and computation together, so this was relatively expensive. Each node on a cluster wasn’t just a storage system, it was a server with processors and memory resources. The more data you keep, the more compute resources you needed to pay for, even though you didn’t really need that extra computational power to help you analyze your data. As a result, compute and storage were supposed to be scaled together and the clusters were persistently on, otherwise the data becames inaccessible.

This is the force that is driving the separation of storage from compute, to be able to scale them separately. This is how the term Data Lake emerged, being defined as: “a collection of storage instances of various data assets additional to the originating data sources.”

A Data Lake is a repository capable of storing vast quantities of data in various formats. It can contain data from webserver logs, application databases, and third-party data. Data can flow into the Data Lake by either batch or real-time processing of streaming data.

The reason to build a Data Lake is to provide with the ability to query all data sources combined in a single place, providing “Data and Analytics as a Service” for business users such as data scientists and business analysts.

The Data Lake offers an approach where compute and storage can be separated, in our case, S3 is used as the object storage, and any processing engines (Spark, Presto, etc) can be used for the compute. This means we can scale separately depending on the business needs.

As I explained in one of my previous posts, we decided to build our data lake based on Amazon Simple Storage Service (S3) to combine private and third party data in order to enable users to answer their most interesting business questions.

We use AWS Kinesis Firehose to push data into S3, AWS lambda functions for some pre-processing and Spark for our data pipelines.

In order to query S3 we needed a querying engine that allowed us to use SQL to provide access to TBs of structured and semi-structured data for quick analysis. We evaluated Presto, Athena and Spark SQL.

Out data is arranged in S3 in the following format (hadoop like partitions):

<bucket_name>\processed\<event_type>\dt=<date>\hh=<time>\*.json

In Presto and Athena (managed version of Presto by AWS) we encountered errors either when we created the tables with HIVE or when querying these tables. We found that both Presto and Athena do not “like” data types that change over time (for example: timestamp was string and then bigint) or any special characters at the beginning of the field names (_data_type). We corrected the data, using Spark and the pre-processing lambdas, so we thought this will suffice, but this was not enough, we had very bad performance when querying small JSON files. We knew that Parquet will perform much better, but the partitions dictated smaller files. So what did we do?

We thought to try Spark SQL as our query engine. Spark SQL provides the capability to expose the Spark datasets over JDBC API and allow running the SQL like queries on Spark data using traditional BI and visualization tools.

On top of this, our analysts and data scientist needed strong visualization tools to find data patterns and correlations.

Apache Zeppelin was our choice. Zeppelin is a Web-based notebook that enables data-driven, interactive data analytics and collaborative documents with SQL, Scala, Python and more. It provides analysts a great way to create interactive web notebooks to write the queries and visualize the results. The notebooks can be shared in real time. The queries can be scheduled.


Tutorial

1- Create a cluster in AWS EMR with Spark and Zeppelin.

2- Click on the link “Zeppelin” after the cluster was provisioned to access Zeppelin UI.

3- Download sample data for the San Francisco Traffic accidents data. (it can be found at: https://data.sfgov.org/api/views/vv57-2fgy/rows.csv?accessType=DOWNLOAD).

4- Place it in an S3 bucket , for example: “test-zeppelin-ni”.

Now we can access and query the data using Spark SQL and Zeppelin.

We will write some Scala code inside Zeppelin to visualize this CSV file and extract information contained on it. In order to view the contents of this file and manipulate the data, we will create a dataframe, bind it to a schema, create a temporary table and then use SQL for queries.

“A powerful feature of Spark SQL is that you can programmatically bind a schema to a Data Source and map it into Scala case classes which can be navigated and queried in a typesafe manner.”

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
import org.joda.time.DateTime
case class TrafficData(IncidntNum: String, Category: String, Descript: String, DayOfWeek: String,
Date: String, Time: String, PdDistrict: String,Resolution:String,
Address:String,
X:Double, Y:Double, Location:String, PdId:String)

val schema: StructType = Encoders.product[TrafficData].schema

val df = sqlContext.read.schema(schema).csv(“s3a://test-zeppelin-ni/*”)
df.printSchema()

Here is the second paragraph:

df.registerTempTable(“TrafficTable”)

Here is the third paragraph:

%sql

select * from TrafficTable

As you can see, after loading the data into Spark , we created a temp table and then we can query it using regular SQL.

Wrapping it up

I hope this article gave you a good starting point to learn about how to use Spark and Zeppelin.

In this article, we have learned how to read a large dataset from a S3 public bucket and perform SQL queries on it. This combination is one of the most common used setups for Machine Learning projects running data at scale.

Note: Remember that the changes you make on Zeppelin only persist as long as the EMR cluster is running.

If you want to know more about Zeppelin, watch this video by Moon soo Lee, who created Zeppelin, speaking in Amsterdam at Spark Summit Europe 2015.

Book photo created by rawpixel.com – www.freepik.com

Design a site like this with WordPress.com
Get started