Building Modern Data Lakes

The volumes of data required for Machine Learning projects are continuously growing. Data scientists and Data engineers need the ability to access huge amounts of data in a timely manner.

In order to build data platforms that can handle massive quantities of data ingestion and processing, a number of technologies have emerged. This article will focus on Apache Iceberg and how it solves many of the challenges faced by its predecessors, but let’s start with the evolution of Data systems.

First, they were databases…

A database stores real-time information and can run simple queries very quickly. 

In general, a database stores data related to a certain part of your business, for example if you have a microservices architecture where each service has its own database.

The ability to scale a database is limited, because you scale the computing power , RAM and disks at the same time. 

If you are running a DB cluster on X machines and you need to scale the cluster, you will need to add additional machines with RAM, disks and CPUs, meaning that regardless of the bottleneck type, you will have to scale all parameters at once, adding unnecessary costs.

Data warehouse

Data warehouses are very similar to Databases in terms of architecture and how they scale, but they were built to handle much bigger data volumes. 

The key difference between regular databases and Data warehouses is that Data warehouses were built to be a centralized data repository that pulls together data from different sources and exposes them for reporting and analytics. They are not updated in real time and contain historical data with longer SLAs.

Some of the challenges presented by the huge quantities of data were similar to the challenges solved by traditional databases, but further scaling databases or data warehouses was inefficient and expensive. Something new had to be done.

Separating Storage and compute

Separating the compute layer from the storage layer created a way to isolate compute costs, meaning customers only paid for the processing power needed for the queries thereby further lowering costs. Storage and compute separate scaling allowed for elastic tuning of resources bringing flexibility to the emerging data lake architectures.

Separation of compute and storage layers

And then there was the Data Lake…

A Data Lake is a central repository of unstructured data from different sources that can be accessed to create insights and analytics, similar to a data warehouse.

The key difference is that Data Lake architectures can scale to hundreds of terabytes with acceptable read/write speeds.

In order to scale the storage and the compute power separately, solutions like Hadoop and Hive were developed among others.

Hadoop was first developed by Yahoo in 2005. Apache Hadoop is an open source software framework for storage and large scale processing of data-sets on clusters of commodity hardware. 

In 2009 Facebook understood that while Hadoop solved many of their requirements, it also had issues that needed improvement, for example:

  • In order to query data in Hadoop, a user has to write a MapReduce algorithm in java.
  • There was no data metadata or schema.

So, they built Hive.

Apache Hive started as an SQL-on-Hadoop tool and then evolved as a separate framework when it started supporting object stores such as S3.

Hive is a data warehouse infrastructure that queries data stored on HDFS or S3 using an SQL-like language.

What is Apache Hive?

In Hive you organize data in a directory tree, defining a table format by mapping a table or part of a table (a partition) to a physical directory.

This mapping is stored in the Hive metastore. You can define the Hive metastore in almost any sql database such as: MYSQL, MariaDB, Postgres, etc.

In the real world, the number of files and directories can be enormous. Scanning all these files while doing a search can have serious performance implications. This is why Hive supports Partitions and Buckets.

Partitioning is a way of dividing a table into related parts based on the values of particular columns. It means dividing the table into parts stored separately, based on the values of a particular column. The need for partitioning arises from the huge volumes of data stored in too many files. While storing data in partitions (different directories and files), when a query is made, the query engine will scan only the relevant directory, thus scanning much less files. 

For example if we partition the “date” column, we will have one directory per date

So when querying something of the form: 

SELECT * from table where date = 2022-02-11

The query engine will only open the files located in the directory

Bucketing in hive is separating data into ranges in a specific column.

For example: bucketing a customer id column should use a hash function that will map any customer id to a number in the range (the number of buckets).

Bucketing allows more efficient queries by limiting the location of the data to the bucket it was assigned thus shortening the search. 

Other advantages of Hive include being file-format agnostic, supporting file formats developed by others that were better for analytics such as Parquet and ORC.

Hive has been the de facto standard for the past 12 or more years, since it is supported by almost every query engine.

But Hive has some disadvantages

  • Hive does not manage a file list, so directory listings operations against object stores such as S3 take too long, creating performance problems.
  • Table state is stored in 2 places: partitions in the metastore and files in a file system.
  • Using transactions to store the partitions in a database but storing the files in a file system, creates issues when updating or deleting data. Moving or adding files to multiple partitions cannot be handled in a transactional manner, creating consistency problems.

Big tech companies handling big data volumes were struggling, creating too many workarounds to solve Hive’s problems. 

Netflix engineers stepped up in trying to solve the performance and usability challenges of using Apache Hive tables in large and demanding data lake environments by creating and then open-sourcing Apache Iceberg in 2018.  They understood that most of Hive’s issues were due to tracking the directories and not the files. 

In order to solve the slow file listings and the unified table state, they defined a table using a canonical list of files. 

What is Apache Iceberg? (**)

Iceberg is a high-performance format for huge analytics tables. Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for engines like Spark, Trino, Flink, Presto, and Hive to safely work with the same tables, at the same time. Main features are:

  • Expressive SQL

Iceberg supports flexible SQL commands to merge new data, update existing rows, and perform targeted deletes. Iceberg can eagerly rewrite data files for read performance, or it can use delete deltas for faster updates.

  • Schema evolution supports add, drop, update, or rename, and has no side-effects.

Schema evolution just works. Adding a column won’t bring back “zombie” data. Columns can be renamed and reordered. Best of all, schema changes never require rewriting your table.

  • Hidden partitioning prevents user mistakes that cause silently incorrect results or extremely slow queries. Iceberg handles the tedious and error-prone task of producing partition values for rows in a table and skips unnecessary partitions and files automatically. No extra filters are needed for fast queries, and table layout can be updated as data or queries change.
  • Partition layout evolution can update the layout of a table as data volume or query patterns change
  • Time travel enables reproducible queries that use exactly the same table snapshot, or lets users easily examine changes. Version rollback allows users to quickly correct problems by resetting tables to a good state
  • Scan planning is fast 
  • A distributed SQL engine isn’t needed to read a table or find files
  • Advanced filtering – data files are pruned with partition and column-level stats, using table metadata
  • Data compaction is supported out-of-the-box and you can choose from different rewrite strategies such as bin-packing or sorting to optimize file layout and size.

Compaction is an asynchronous background process that compacts small files into fewer larger files. Since it’s asynchronous and uses the snapshot mechanism, it has no negative impact on your users. 

Compaction helps balance low latency on write and high throughput on read. 

When writing, we want the data available as soon as possible so we write as fast as we can, creating small files.This creates the overhead of opening too many files.

After compaction we get bigger files that have more records per file, improving read performance.

**source: Apache Iceberg’s website

Data Lake architecture with Iceberg, Presto and Hive

Apache Iceberg Architecture

The table format tracks individual data files in a table instead of directories. This allows writers to create data files in-place and only adds files to the table in an explicit commit.

The Iceberg Catalog

  • When a query is made, the query engine accesses the Iceberg catalog, retrieves the location of the current metadata file for the table, then opens the file.
  • Within the catalog, there is a reference or pointer for each table to that table’s current metadata file. 

Metadata layer

Metadata files

  • Maintain the table state
  • All changes to table state create a new metadata file and replace the old metadata with an atomic swap. 
  • Tracks the table schema, partitioning config, custom properties, and snapshots of the table contents. 

Snapshots

  • A complete list of files in a table
  • Represent the state of a table at some time and are used to access the complete set of data files in the table.
  • Each write creates and commits a new snapshot
  • Snapshots create isolation without locking
  • Writers produce a new snapshot then commit it. Snapshots can be rolled back.
  • The data in a snapshot is the union of all files in its manifests

Manifest files

  • Snapshots are split across many manifest files
  • Contain
    • Files location and format
    • Values used for filtering: partition values, lower and upper bounds per file
    • Metrics for cost based optimization
    • File stats

Manifest list

  • Is a list of manifest files. 
  • Has information about each manifest file that makes up that snapshot

Data layer

Comprised of the actual files (parquet, orc) stored in a physical layer (S3, HDFS)

Iceberg’s advantages

  • Guaranteed isolated reads and writes, also providing concurrent writes.
  • By using snapshots, Iceberg enables time-travel operations, allowing users to query different versions of the data and reverting to a past version if needed.
  • All writes are atomic.
  • Faster planning and execution, since instead of listing the partitions in a table during job planning, it performs a single read of the snapshot.
  • Schema changes are free of side-effects. Fields have unique identifiers mapped to their names, so you can change the field names without side effects.
  • Partitions and partition granularity can be changed.

Conclusions

We have reviewed Apache Iceberg, you can research and review other players in this area such as: Apache Hudi, Delta Lake that solve these issues in a similar way.

Fast queries at petabytes scale requires modern data architectures, Iceberg allows organizations to move into the future by graciously solving many of the issues we encounter at a huge data scale.

Thanks for reading!

Building resilient Distributed Systems at scale

resilience in distributed systems
Bend but don’t break — like Bamboo

In this brave new world of distributed systems, we are entrusted with keeping the infrastructure up and running.
The source of the challenge is to monitor the services themselves and the space in between.

We face non-determinism, sometimes we can’t tell if our system is up, down, or partially working, and every failure is a task for Sherlock Holmes.

We collect more than 2 billion data points a day. We have to develop services in a continuous iterative process, where we design for observability and resilience. Predictability and transparency are our primary goals.

We use continuous delivery so we need to find which tools and processes will help us tackle the complexities of deploying microservices at scale and make them resilient.

How can we build resilient systems?

Resilience is all about preventing faults turning into failures.

  • Fault is an incorrect internal state in your system.
  • Failure is an inability of the system to perform its intended job.

“Being available” is crucial, so we need to build resilience from ground up. Building resilience in distributed systems communicating with each other is difficult.

The best way to write reliable and secure applications is write no code at all — write nothing and deploy nowhere

Kelsey Hightower

We need to know what happens everywhere all the time, this is defined as observability.

Observability is a measure of how well internal states of a system can be inferred from knowledge of its external outputs.

We need to be able at all times to infer the state of every piece of our architecture so when something goes wrong, we can diagnose and correct the failure as fast as possible.

There are 3 pillars of observability:

  • Logs
  • Metrics
  • Tracing

Logs: tell the story of the flow in our microservices and if implemented correctly we can look at them as in one of the Matrix movies and find the pattern in a mass of letters and numbers to understand what is really going on.

logs
Understanding the system by looking at the logs

Metrics: are numeric values, measured over a period of time. Metrics are comprised of attributes like name, timestamp, value, and more. For example, system metrics can be: average CPU, disk space or kubernetes metrics such as memory usage, number of pods running/terminated, etc. Application metrics can be the number of collected/processed items over a period of time.

Tracing: is a representation of a series of causally related distributed events that encode the end-to-end request flow through a distributed system. Traces show the path of a request flow across the system.

The tools we use

Logs

We need logs to debug on production, to understand what happened and why. We use centralized logging since our services are autoscaling all the time making log files volatile, so we send them to a single persisted location.

We use the ELK stack (elasticsearch, logstash, kibana) for logs.

We found that with the increase in data being processed we surpassed the write limit for ElasticSearch and got failures all the time. Increasing the size of our ElasticSearch cluster was one of the options but we decided it was not a long term cost-effective solution, so we introduced Kafka (we love Kafka).

We use Kafka to manage and buffer massive volumes of data ingested, in this case our logs and application metrics.

Centralized logging — before
Centralized logging — after

Apache Kafka is a high throughput message bus. By introducing Kafka, we are now able to scale our data collection services indefinitely without risking losing logs.

We decided to use Filebeat , Kafka and Logstash.

Filebeat: Filebeat is a log data shipper for local files. Filebeat agent is installed on the servers, then filebeat monitors all the logs in the log directory and forwards them to a Kafka topic.

Logstash: Logstash is used to collect the data from disparate sources (Kafka topic) and normalize the data into the destination of our choice, ElasticSearch in our case.

Kibana dashboards

Metrics

System metrics: We gather system metrics from AWS Cloudwatch into Grafana and Kubernetes metrics into Prometheus.

We monitor among other things:

  • Total and used cluster resources: CPU, memory, filesystem.
  • Total cluster network I/O pressure.
  • Kubernetes pods usage: CPU, memory, network I/O.
  • Containers usage: CPU, memory, network I/O.
Grafana showing system metrics
Prometheus sample metric

Application metrics: We send application metrics using Filebeat to a Kafka topic and then we stream them into Druid.

Tracing

trace is a representation of a series of causally related distributed events that encode the end-to-end request flow through a distributed system.

Traces look almost like event logs. A single trace can provide visibility into both the path traversed by a request as well as the structure of a request.

Traces are used to identify the amount of work done at each layer while preserving causality by using happens-before semantics. A trace is a directed acyclic graph (DAG) of spans, where the edges between spans are called references.

Flow of a single request in a distributed system

When a request begins, it is assigned a globally unique ID, that is passed as a parameter to all the requests being sent afterwards, allowing to trace the flow on the same request. Reconstructing a flow of execution enables us to better understand the lifecycle of a request.

For example — understanding the entire request lifecycle allows us to debug requests across multiple services to pinpoint the source of latency or resource utilisation.

The various components of a distributed system touched during the lifecycle of a request, represented as a directed acyclic graph

Two of the most popular open source distributed tracing solutions are Zipkin and Jaeger.

Adding tracing into an existing infrastructure is hard because every component in the path of a request needs to be modified to propagate tracing information. Using open source frameworks extensively might require additional instrumentation.

We decided to inject a request identifier in the logs and manually trace the logs for understanding the requests flow without implementing a tracing framework.

Conclusion

To achieve resilience in distributed systems we need observability.

Logs, metrics and tracing help us achieve transparency and allows us to debug in production.

We have created a core library to be used in all services to send logs and metrics.

We have set a pre-defined structure with naming conventions for the logs and metrics to create a system “language” for debugging.

Background photo created by jannoon028 – www.freepik.com

Data consistency across Microservices

We were told a monolith is evil and microservices are the answer. What nobody told us is that microservices come with many pain points deriving from its distributed nature.

In the past, we built an application connected to one database where normalized data was queried using “joins”. Then came: big data, big traffic and with that: big latency. We needed to solve query latency where no cache would help us, the data was too big.

Microservices came to the rescue, we needed to break our big application into small pieces. Scaling our servers only in the required areas, allowing teams to work in different parts of a system without deploying the whole application, enabling autonomy.

We all jumped and started writing microservices, not always understanding the implications. We read blog posts on big companies and their hundreds of microservices; how they scaled their architecture, seeing what they built, but sometimes without understanding the “how”.

“People try to copy Netflix, but they can only copy what they see. They copy the results, not the process” — Adrian Cockcroft, former Netflix Chief Cloud Architect

microservices

To build microservices we needed to remove dependencies across domains and we found that this was easier said than done. The basic idea was to build services that were responsible for their own logic and wrote to their own DB.

Distributed services dictated distributed data and this came with a price, we gained autonomy and lost ACID transactions, a single source of truth and “joins” between entities. In order to reconcile these we needed to find a way to communicate across services.

Microservices communication patterns

  • Service-to-service communication: each service implements the logic it needs, gathering data from multiple services.

Pros: each service gathers all the data it needs, so the business logic is developed only in the corresponding service.

Cons: creates dependencies between services, coupling them for deployment and maintenance. This results in a “distributed monolith”. Each service will need multiple hops to get all the data it needs and then will need to “join” it using business logic instead of queries in the DB.

  • API gatewaya single service that acts as the backend entry point.

Pros: Clients don’t need to understand the backend architecture. Reduces the number of requests from the clients.

Cons: Every API needs to be developed twice, once in the gateway and once in the service itself. We still don’t have joins in the DB, we do it programmatically in the gateway. The gateway service gets all the load. We get latency since the gateway hops to many services and “joins” the data.

  • Aggregate data at the client sideclients perform different requests to all services in the backend and then aggregates the data.

Pros: services are autonomous and can be deployed separately without inter-dependencies at the backend.

Cons: clients need to store all the gathered data and “join” the sources, sometimes requiring business logic. In mobile apps this approach is not acceptable, since it will require the app to demand updates for every change in business logic.

  • Events bus: each service sends events for its changes and the “interested” parties listen and save the info as needed.

Pros: Separation of concerns, low latency.

Cons: Data duplication that carries storage costs, the need to write failover procedures, the risk of data inconsistency across services.

Choosing each one of these communications patterns demands: monitoring, alerts, data-modeling for consistency, transactional updates across domains and rollback capabilities.

You need to understand that writing distributed systems is hard (microservices are after all distributed systems).

In my case, I am a fan of the “lean and mean” approach and architecture evolution.

In every new project, I start with the simplest approach that can be built with a small team and as the application and the organization scales, the architecture evolves. Remember:

  • Never compromise quality, test everything. Integration and end-to-end tests are your best friends, then you can refactor as you go without risking instability and regressions.
  • Build for the actual requirements of scale you have at the moment. Measure everything to know when to refactor and improve. Over-architecture is your enemy. Complicated technologies will stall your team’s progress.
  • Use decoupled architecture and define APIs for future refactoring, defining clear domains and responsibilities (for example: separate APIs for users, orders, stock, etc. that can be separated in the future)
  • Build everything using backwards compatibility where there are no breaking changes, neither in APIs nor in the DB. DB tables/columns are always added, never removed or changed.

Keep in mind that “There is no such a thing as a free lunch”.

Querying our Data Lake in S3 using Zeppelin and Spark SQL

Until recently, most companies used the traditional approach for storing all the company’s data in a Data Warehouse.

The internet growth caused an increase in the number of data sources and the massive quantities of data to be stored, requiring scaling these Data Warehouses constantly. They were not designed to handle petabytes of data, so companies were driven into using big data platforms (such as Hadoop) capable of transforming big data into actionable insights at high speed.

Hadoop traditionally coupled storage and computation together, so this was relatively expensive. Each node on a cluster wasn’t just a storage system, it was a server with processors and memory resources. The more data you keep, the more compute resources you needed to pay for, even though you didn’t really need that extra computational power to help you analyze your data. As a result, compute and storage were supposed to be scaled together and the clusters were persistently on, otherwise the data becames inaccessible.

This is the force that is driving the separation of storage from compute, to be able to scale them separately. This is how the term Data Lake emerged, being defined as: “a collection of storage instances of various data assets additional to the originating data sources.”

A Data Lake is a repository capable of storing vast quantities of data in various formats. It can contain data from webserver logs, application databases, and third-party data. Data can flow into the Data Lake by either batch or real-time processing of streaming data.

The reason to build a Data Lake is to provide with the ability to query all data sources combined in a single place, providing “Data and Analytics as a Service” for business users such as data scientists and business analysts.

The Data Lake offers an approach where compute and storage can be separated, in our case, S3 is used as the object storage, and any processing engines (Spark, Presto, etc) can be used for the compute. This means we can scale separately depending on the business needs.

As I explained in one of my previous posts, we decided to build our data lake based on Amazon Simple Storage Service (S3) to combine private and third party data in order to enable users to answer their most interesting business questions.

We use AWS Kinesis Firehose to push data into S3, AWS lambda functions for some pre-processing and Spark for our data pipelines.

In order to query S3 we needed a querying engine that allowed us to use SQL to provide access to TBs of structured and semi-structured data for quick analysis. We evaluated Presto, Athena and Spark SQL.

Out data is arranged in S3 in the following format (hadoop like partitions):

<bucket_name>\processed\<event_type>\dt=<date>\hh=<time>\*.json

In Presto and Athena (managed version of Presto by AWS) we encountered errors either when we created the tables with HIVE or when querying these tables. We found that both Presto and Athena do not “like” data types that change over time (for example: timestamp was string and then bigint) or any special characters at the beginning of the field names (_data_type). We corrected the data, using Spark and the pre-processing lambdas, so we thought this will suffice, but this was not enough, we had very bad performance when querying small JSON files. We knew that Parquet will perform much better, but the partitions dictated smaller files. So what did we do?

We thought to try Spark SQL as our query engine. Spark SQL provides the capability to expose the Spark datasets over JDBC API and allow running the SQL like queries on Spark data using traditional BI and visualization tools.

On top of this, our analysts and data scientist needed strong visualization tools to find data patterns and correlations.

Apache Zeppelin was our choice. Zeppelin is a Web-based notebook that enables data-driven, interactive data analytics and collaborative documents with SQL, Scala, Python and more. It provides analysts a great way to create interactive web notebooks to write the queries and visualize the results. The notebooks can be shared in real time. The queries can be scheduled.


Tutorial

1- Create a cluster in AWS EMR with Spark and Zeppelin.

2- Click on the link “Zeppelin” after the cluster was provisioned to access Zeppelin UI.

3- Download sample data for the San Francisco Traffic accidents data. (it can be found at: https://data.sfgov.org/api/views/vv57-2fgy/rows.csv?accessType=DOWNLOAD).

4- Place it in an S3 bucket , for example: “test-zeppelin-ni”.

Now we can access and query the data using Spark SQL and Zeppelin.

We will write some Scala code inside Zeppelin to visualize this CSV file and extract information contained on it. In order to view the contents of this file and manipulate the data, we will create a dataframe, bind it to a schema, create a temporary table and then use SQL for queries.

“A powerful feature of Spark SQL is that you can programmatically bind a schema to a Data Source and map it into Scala case classes which can be navigated and queried in a typesafe manner.”

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
import org.joda.time.DateTime
case class TrafficData(IncidntNum: String, Category: String, Descript: String, DayOfWeek: String,
Date: String, Time: String, PdDistrict: String,Resolution:String,
Address:String,
X:Double, Y:Double, Location:String, PdId:String)

val schema: StructType = Encoders.product[TrafficData].schema

val df = sqlContext.read.schema(schema).csv(“s3a://test-zeppelin-ni/*”)
df.printSchema()

Here is the second paragraph:

df.registerTempTable(“TrafficTable”)

Here is the third paragraph:

%sql

select * from TrafficTable

As you can see, after loading the data into Spark , we created a temp table and then we can query it using regular SQL.

Wrapping it up

I hope this article gave you a good starting point to learn about how to use Spark and Zeppelin.

In this article, we have learned how to read a large dataset from a S3 public bucket and perform SQL queries on it. This combination is one of the most common used setups for Machine Learning projects running data at scale.

Note: Remember that the changes you make on Zeppelin only persist as long as the EMR cluster is running.

If you want to know more about Zeppelin, watch this video by Moon soo Lee, who created Zeppelin, speaking in Amsterdam at Spark Summit Europe 2015.

Book photo created by rawpixel.com – www.freepik.com

Why Big Data is pushing us towards Machine learning

As an Engineer Manager with more than 20 years of experience I have seen many changes that completely disrupted different areas: “Web 2.0”, “Cloud computing”, “Mobile-first”, “Big Data”, etc. The new kid on the block is “Machine learning” and it is definitely at its peak, one example is perfectly described by CB Insights on how startups coined the terms related to Machine learning to raise their valuation: “If you want to drive home that you’re all about that AI, use terms like machine learning, neural networks, image recognition, deep learning, and NLP. Then sit back and watch the funding roll in.”

In 2016 Gartner’s famous Hype Cycle of Emerging Technologies added the term “machine learning”, since it is being used everywhere.

But let’s put things in perspective and try to understand why is this happening now.

Huge amounts of data are being streamed from phones, computers, TVs and IoT devices. Every day the equivalent of 530,000,000 million digital songs or 250,000 Libraries of Congress worth of data are being created globally.

The data gathered grows exponentially, creating a paradigm shift on how we store and process large data sets. This affects the data infrastructure and long-term devops strategic decisions we need to make in order to support the increasing demand for scalability and concurrency.

But… It is not the quantity of data that is revolutionary but, that we can do something with the data.

For most organizations leveraging massive data sets is a problem since not everyone knows how to deal with terabytes of data. It takes highly specialized teams to analyze and process insights. When the data is huge, it is not humanly possible to understand which variables affect each other.

This is where Machine Learning fits in and why it will ultimately change the way we handle data. Regardless of the amount, researchers need to ask the right questions, design a test, and use the data to determine whether their hypothesis is right.

Let’s tidy things up

Here is an interesting Venn diagram on machine learning and statistical modeling in data science (Reference: SAS institute)

Artificial Intelligence — a term coined in 1956, refers to a line of research that seeks to recreate the characteristics possessed by human intelligence.

Data science — uses automated methods to analyze massive amounts of data to extract valuable knowledge and insight.

Machine Learning — is a recent development that started in the 1990s when the availability and pricing of computers enabled data scientists to stop building finished models and train computers instead.

Building data science teams

Most organizations create heterogeneous teams that include three primary data-focused roles: data scientists, data analysts and data engineers.

Here are the key differentiators between the data-focused roles:

Data Engineer — basically these are developers that know how to handle big data. In general they would have majors in: Computer science and engineering.

Data Analyst — they translate numbers into plain English. A data analyst’s job is to take data and use it to help companies make better business decisions. In general they would have majors in: Business, economics, statistics.

Data Scientist — they combine statistics, mathematics and programming. They have the ability to find patterns by cleansing, preparing, and aligning data. In general they would have majors in: Math, applied statistics, operations research, computer science, physics, aerospace engineering.

Wrapping up

Let me play with Dan Ariely’s quote on big data as others already did:

“Machine Learning is like teenage sex: everyone talks about it, nobody really knows how to do it, everyone thinks everyone else is doing it, so everyone claims they are doing it…”

So, can everybody use machine learning to increase ROI? probably not and I don’t think that in the near future all our jobs will be automated by bots. But, we definitely need to be able to massively process data at scale using machine learning in order to provide solutions whether it is to increase ROI in our organizations or to help solve global urgent problems like cancer research, natural resources scarcity and more.

Make sure you hire the right team and ask the right questions, hopefully your organization’s data + data science will provide you with the answers you are looking for.

Let me leave you with one last quote:

“We always overestimate the change that will occur in the next two years and underestimate the change that will occur in the next ten. Don’t let yourself be lulled into inaction.” — Bill Gates

So, start learning and researching so you can find the right answer for you and your data.

Building a data lake in S3 using an event-driven serverless architecture

This article is about the journey from:

  • A data warehouse to a data lake.
  • Batch to near real-time processing.
  • Availability to query all data from the same repository (raw and processed).

Let’s start by understanding some terms.

Wikipedia defines Data Warehouses as:“…central repositories of integrated data from one or more disparate sources. They store current and historical data and are used for creating trending reports for senior management reporting such as annual and quarterly comparisons.”

And the term “data lake” in Wikipedia states: “…A data lake is a method of storing data within a system or repository, in its natural format, that facilitates the collocation of data in various schemata and structural forms, usually object blobs or files. The idea of data lake is to have a single store of all data in the enterprise ranging from raw data (which implies exact copy of source system data) to transformed data …”

Until recently, the de-facto standard for handling big data would include several technologies that can cope with huge amounts of data such as it the technologies on the Apache Hadoop ecosystem (deployed and managed with Cloudera or Hortonworks, for example). These are very powerful tools but they require lots of devops maintenance and tuning.

For most of the use cases, you really can’t go wrong with them performance-wise, but there can be a big difference between the ongoing operational burden of running your own infrastructure and letting AWS/Google/Microsoft do it for you using their managed solutions. Having said that, if your team requires special tuning and portability or on-premise installation, you should stick to the non-managed solution.

For the ones that can use managed solutions: AWS, Google and Microsoft have come to the rescue, offering many data services as PaaS (Platform as a service), that can provide the required functionality and scale without the complexity of maintaining the infrastructure associated with it.

Our needs

At Natural Intelligence we manage and analyze data being delivered from our comparison sites. A critical part of our day-to-day is the ability to store and query it from the data warehouse.

Our ETLs crunch the data in batches every couple of hours at the moment, so the data availability and the monitoring capabilities are limited. We wanted to be able to get insights and alerts in near real-time.

When we started analyzing the use cases we encountered that our needs required the ability to query:

  • Raw unfiltered and unstructured data (near real-time analytics, alerts on anomaly detection, machine learning).
  • Structured and optimized data (dashboards, analytics, daily aggregations)

Our Data Platform Architecture

We use AWS all over and Redshift as our data warehouse with Tableau as the visualization tool, so it was only natural to look into what AWS has to offer us to solve our new challenges.

For the unstructured data we decided to build our data lake in S3 and query it using some Distributed SQL engine for big data (for example: Redshift SpectrumAWS Athena or Presto). Using AWS Kinesis Firehose for streaming data and AWS Lambda for serverless processing.

S3 for storage

Amazon S3 is an object storage built to store and retrieve any amount of data from anywhere . It is designed to deliver 99.999999999% durability, and stores data for millions of applications.

AWS Kinesis Firehose and AWS lambda for data ingestion

Kinesis Firehose pairs Kinesis streams with out-of-the-box Lambda functions, so it can deliver streams of data to pre-defined destinations such as S3, Redshift and ElasticSearch. Kinesis streams allow for real-time data processing continuously collecting data as it is generated.

AWS Lambda provides a strong FaaS solution. FaaS stands for “function as a service” which means that your code is executed in containers that are invoked in response to events like an http call to a REST API or a file created in an S3 bucket.

A distributed SQL engine for big data

We are evaluating 3 options at the moment.

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Amazon Redshift Spectrum enables you to run Amazon Redshift SQL queries against exabytes of data in Amazon S3. With Redshift Spectrum, you can extend the analytic power of Amazon Redshift beyond data stored on local disks in your data warehouse to query vast amounts of unstructured data in your Amazon S3 “data lake”.

Presto is an open source distributed SQL query engine for running interactive analytic queries against data sources of all sizes ranging from gigabytes to petabytes.

Our Solution

We use S3 as our data lake, storing both the raw and unstructured data and querying it using some distributed SQL query engine for big data.

Our applications send events to AWS Kinesis Firehose. Firehose writes the events into files located in S3 buckets. We use AWS Lambda functions written in python for processing the streaming events in the S3 files.

We will continue using ETLs to process the raw data into structured formats and aggregations that are visualized using Tableau.

We are now in the process of implementing Redshift Spectrum on top of S3 data lake that is showing great performance so far (more on this on the coming part of this series). We will be doing benchmarking on other distributed SQL query engines for big data and keep you posted.

Next steps

  • Rollout the Redshift Spectrum implementation to be queried by relevant users.
  • Implement alerts and monitoring over the streaming data for near real-time processing using anomaly detection.
  • Create a near real time dashboard.
  • Tie it all together as a robust solution.

I will be posting more articles and “how to’s” and “lessons learned” topics in here, so stay tuned.

Design a site like this with WordPress.com
Get started