Streamlining Big Data with Spark: Writing and Reading Delta Lake Format on MinIO-S3 Storage
The topic of this blog is not my cup of tea, but sometimes, at work, you need to drink coffee without sugar.
Introduction
As with many of my blogs, this post stems from real-world tasks I’m tackling at work. Some are proof-of-concepts (POCs), others involve updating our existing solutions and addressing requirements raised by different teams.
Today’s blog focuses on a proof-of-concept (POC) that explores alternative big data infrastructure solutions, specifically addressing how to efficiently manage data for smaller customers.
Imagine you’ve built a powerful solution that handles vast amounts of traffic and is trusted by major customers. Everything runs smoothly, and the system works perfectly. But then, you start to consider the smaller customers — those who want to leverage your solution and its features but don’t generate large volumes of data. Deploying and maintaining such a robust, resource-heavy system for smaller customers isn’t always feasible. Complexity and cost are major concerns, requiring an adaptable and cost-effective solution.
This time, I was tasked with investigating how we could replace a traditional big data ecosystem (HDFS, Yarn, Spark, Hive) with a more cost-effective solution, all while keeping our established processes intact.
This blog will walk you through how to write and read data in Delta Lake format using Apache Spark, all stored efficiently on MinIO’s S3-compatible storage.
This is the first of two blogs on this topic. In this post, I’ll cover the configurations, component versions, and Python scripts needed to write and read data in Delta Lake format. In the second blog, I’ll dive into querying the data using Trino and Hive, further expanding on the insights gained from this POC. Check the link for my second blog — https://medium.com/@dudu.zbeda_13698/setting-up-trino-with-hive-to-query-delta-lake-data-on-minio-a-scalable-big-data-solution-a7e2392e04f4
Lastly, in large environments or where security and maintenance are critical, adopting a big data distribution like Cloudera is essential. It significantly simplifies deployment, configuration, maintenance, and security.
While running this POC, I’ve come to appreciate even more how a distribution like this can save you days of work by handling the complexities of finding compatible versions and avoiding conflicts (spark , Hadoop, jar, you name it. it is Sh***).
Blog Goals
In this blog post, we will explore how to set up and run a Spark job with the goal of writing data in Delta Lake format. We’ll also demonstrate how to read this data back efficiently. Our focus will be on using MinIO S3-compatible storage, a popular choice for scalable and efficient data management. This guide will walk you through the necessary configurations, provide key code snippets, and outline best practices to ensure smooth execution of both writing and reading operations in Spark.
In the blog, we will cover the following steps
- Download and Install Apache Spark
Using Apache Spark we will submit our jobs that will write records on MINIO storage in Delta Lake format & read the records
2. Download relevant Jar files to support the Spark job
3. Execute spark job that generates data in a delta-lake format stored on MINIO-S3 storage
We will cover two methods, Partitioned and Non-partitioned data
4. Execute spark job that reads data in a delta-lake format stored on MINIO-S3 storage
We will read the same records that we wrote (look surprised)
Understanding Key Components and Dependencies
Before diving deeper into the technical steps, it’s important to understand the core components and the purpose of each dependency used in this POC.
- Apache Spark
Apache Spark is an open-source, distributed computing system designed for big data processing and analytics. It supports a wide range of workloads, from batch processing to real-time streaming, and offers various APIs in Java, Scala, Python, and R. In this POC, Spark is used to submit jobs that handle the reading and writing of data in the Delta Lake format, leveraging its robust capabilities for parallel processing.
- Delta Lake
Delta Lake is an open-source storage layer that brings ACID (Atomicity, Consistency, Isolation, Durability) transactions to data lakes. It ensures reliable and consistent data by supporting transactional write and read operations. Delta Lake also enables advanced features like time-travel, which allows users to query historical data. In this blog, we’re using Delta Lake to store data in a scalable and fault-tolerant way on MinIO’s S3-compatible storage.
- Hadoop-AWS JAR
The hadoop-aws JAR is essential for integrating Hadoop’s ecosystem with Amazon S3 or S3-compatible storage like MinIO. Spark relies on Hadoop libraries for file I/O operations, and the hadoop-aws JAR provides the necessary functionality to connect Spark jobs to MinIO using the S3A protocol, which is optimized for high-performance cloud storage access.
- AWS Java SDK Bundle JAR
The AWS Java SDK Bundle JAR is a consolidated package that provides all the necessary Amazon Web Services (AWS) SDK libraries. Even though we are using MinIO as the storage layer, the MinIO API is compatible with Amazon S3. This JAR allows the Spark job to interact with MinIO using the same libraries as those used for AWS services, making it a crucial component for connecting to MinIO’s S3 interface.
- Delta-Core JAR
The delta-core JAR is the key library that enables Delta Lake functionality within Spark. It provides the logic to read and write Delta tables and supports the ACID transactions and schema evolution that Delta Lake offers. Without delta-core, Spark wouldn’t be able to interact with Delta Lake tables, making it an essential dependency for this POC.
- Delta-Storage JAR
The delta-storage JAR works in conjunction with delta-core to provide the actual storage layer operations for Delta Lake. It is responsible for managing the underlying storage system, including the data files and transaction logs. This JAR ensures that Delta Lake can effectively write to and read from your storage medium, which in our case is MinIO S3-compatible storage.
Prerequisites and Ingredients
Below are all the prerequisites required to run this exercise:
- Linux box with internet connection — for our exercises i have used Linux box running Ubuntu 22.04 Operating system
- Preparation of the Linux box included
- Python3 installation
- /data/spark folder as our designated working directory. The folder path is recommended but can be modified as needed.
3. MINIO-S3 solution with a bucket named “spark-delta-lake” The bucket name is a suggestion and can be customized.
Let’s start working
Note: Before diving into the setup, please note that for this exercise, I’m using specific versions of Java, Spark, and a collection of Jars to ensure compatibility. It’s essential to use the exact versions mentioned to avoid conflicts and potential failures. Changing the versions may result in errors during the setup or execution.
Download Spark and Jars
To have the option to run Spark jobs, write and read delta-lake format, integrated with MINIO-S3 storage and to run Spark, it is necessary to download the spark platform along with a set of jars.
Run the following steps to download the relevant manifests
- Sign in to your Linux machine.
- Navigate to /data/spark by executing the command cd /data/spark
- Download Spark 3.4.1 version with Hadoop3 support by executing the command
wget https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
4. Download AWS JAVA bundle JAR version 1.12.533 by executing the command
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.533/aws-java-sdk-bundle-1.12.533.jar
5. Download Hadoop-aws JAR version 3.3.4 by executing the comman
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar
6. Download delta-core JAR version 2.12–2.4.0 by executing the command
wget https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.4.0/delta-core_2.12-2.4.0.jar
7. Download delta-storage JAR version 2.4.0 by executing the command
wget https://repo1.maven.org/maven2/io/delta/delta-storage/2.4.0/delta-storage-2.4.0.jar
Spark Installation
To run Spark jobs using Python scripts, Spark is necessary. These jobs can be executed on a standalone Linux machine (as we are currently doing), Kubernetes, or Yarn.
To set up Spark, follow these steps:
- Login to your Linux machine.
- Java installation
- Install Java 11 by executing the command sudo apt install openjdk-11-jdk
- Confirm the installation by executing the command java — -version and confirm the output.
3. Navigate to /data/spark by executing the command cd /data/spark
4. Extract spark spark-3.4.1-bin-hadoop3.tgz file by executing the command tar -xvf spark-3.4.1-bin-hadoop3.tgz
5. Move the extracted Spark files to /usr/local/spark folder by executing the command sudo mv spark-3.4.1-bin-hadoop3 /usr/local/spark
6. Switch to the root user using by executing the command sudo -i
7. Update environment variables in your bashrc by running the following steps
- Edit ~/.bashrc file by executing the command vi ~/.bashrc
- Add the following lines to the end of the file:
- export SPARK_HOME=/usr/local/spark
- export PATH=$SPARK_HOME/bin:$PATH
- Activate the changes in bashrc by executing the command source ~/.bashrc
8. Confirm the Spark installation by executing the command spark-submit — -version and checking the output.
Generate \ write data in delta-lake format on MINIO-S3
Within this section, we will be saving data in a delta-lake format to the spark-delta-lake bucket that was set up on the MINIO-S3 storage as outlined in the prerequisites. Both partitioned and non-partitioned data will be created.
Non-partitioned data
- Login to your Linux machine.
- Navigate to /data/spark by executing the command cd /data/spark
- You can create or download the write-delta-to-minio.py from the following location:
- Git Repository address: https://github.com/dzbeda/spark-delta_lake-trino-hive-minio.git
- File location: ./write-delta-to-minio.py
from pyspark.sql import SparkSession
# Replace these values with your MinIO configuration
minio_access_key = "Ndsadsadsadsa"
minio_secret_key = "lch9r8gsdgdsgdsgdsgdsYgmKnwGWaCD"
minio_endpoint = "http://130.1.1.1:9000" # Example: http://localhost:9000
output_bucket = "s3a://spark-delta-lake/non-partition" # Replace with your desired output path in MinIO
# Create a Spark session with the necessary MinIO/S3A configurations
spark = SparkSession.builder \
.appName("WriteDeltaToMinIO") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.hadoop.fs.s3a.access.key", minio_access_key) \
.config("spark.hadoop.fs.s3a.secret.key", minio_secret_key) \
.config("spark.hadoop.fs.s3a.endpoint", minio_endpoint) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
.getOrCreate()
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("David", 4), ("Eve", 5)]
columns = ["Name", "ID"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").save(output_bucket)
spark.stop()
4. Modify the specified sections in the Python script
- minio_access_key — This is the access key that has permission to the bucket where we intend to store the data
- minio_secret_key — This is the secret access key
- minio_endpoint — This is the MINIO address. in my setup the address and port of the MINIO are http://130.1.1.1.9000
- output_bucket — This is the bucket and path where we wish to save the data. In our exercises, the bucket name is spark-delta-lake, and the non-partitioned data will be saved to the non-partition path. s3a://spark-delta-lake/non-partition
- data — This is the data structure we will generate. In this exercise, I will create 2 columns , ”Name” and “ID” and 5 records
5. To generate data, execute the following command
spark-submit --jars /data/spark/delta-core_2.12-2.4.0.jar,/data/spark/delta-storage-2.4.0.jar,/data/spark/hadoop-aws-3.3.4.jar,/data/spark/aws-java-sdk-bundle-1.12.533.jar /data/spark/write-delta-to-minio.py
6. Confirm that the data has been generated by running these steps
- From the code output verify that the schema was created
- Browse spark-delta-lake bucket on your MINIO-S3 storage. I’m using S3 browser application
- Verify non-partition folder was created
- Verify that snappy.parquet file was created. This file holds the records
- Verify that delte_log folder was created. This folder includes all change been done in delta-lake. The logs give you the option to do time-travel
Partitioned data
- Login to your Linux machine.
- Navigate to /data/spark by executing the command cd /data/spark
- You can create or download the write-delta-partitioned-to-minio.py from the following location:
- Git Repository address: https://github.com/dzbeda/spark-delta_lake-trino-hive-minio.git
- File location: ./write-delta-partitioned-to-minio.py
rom pyspark.sql import SparkSession
#from delta import *
# Replace these values with your MinIO configuration
minio_access_key = "N1a96Y3vQ1oy4fdsfdsfsdfdsOa"
minio_secret_key = "lch9rNXRySofdsfdsfdsfdsdsfV18umhYgmKnwGWaCD"
minio_endpoint = "http://130.1.1.1:9000" # Example: http://localhost:9000
output_bucket = "s3a://spark-delta-lake/partition" # Replace with your desired output path in MinIO
# Create a Spark session with the necessary MinIO/S3A configurations
spark = SparkSession.builder \
.appName("WriteDeltaToMinIO") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.hadoop.fs.s3a.access.key", minio_access_key) \
.config("spark.hadoop.fs.s3a.secret.key", minio_secret_key) \
.config("spark.hadoop.fs.s3a.endpoint", minio_endpoint) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
.getOrCreate()
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("David", 4), ("Eve", 5)]
columns = ["Name", "ID"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").partitionBy("ID").save(output_bucket)
spark.stop()
5. Modify the specified sections in the Python script
- minio_access_key — This is the access key that has permission to the bucket where we intend to store the data
- minio_secret_key — This is the secret access key
- minio_endpoint — This is the MINIO address. in my setup the address and port of the MINIO are http://130.1.1.1.9000
- output_bucket — This is the bucket and path where we wish to save the data. In our exercises, the bucket name is spark-delta-lake, and the non-partitioned data will be saved to the non-partition path. s3a://spark-delta-lake/non-partition
- data — This is the data structure we will generate. In this exercise, I will create 2 columns, “Name” and “ID” and 5 records. The data is partitioned by the ID column
5. To generate data, execute the following command
spark-submit --jars /data/spark/delta-core_2.12-2.4.0.jar,/data/spark/delta-storage-2.4.0.jar,/data/spark/hadoop-aws-3.3.4.jar,/data/spark/aws-java-sdk-bundle-1.12.533.jar /data/spark/write-delta-partitioned-to-minio.py
6. Confirm that the data has been generated by running these steps
- From the code output verify that the schema was created
- Browse spark-delta-lake bucket on your MINIO-S3 storage. I’m using S3 browser application
- Verify partition folder was created
- Since we have set the partition by ID , verify that folder for each ID was created
- Verify that snappy.parquet file was created. This file holds the records
- Verify that delte_log folder was created. This folder includes all change been done in delta-lake. The logs give you the option to do time-travel
Read data in delta-lake format from MINIO-S3
Within this section, we will read the data that we have generated from the previous section (look surprised) in a delta-lake format from the spark-delta-lake bucket that was set up on the MINIO-S3 storage. The same procedure reads both Partitioned and Non-partitioned data
- Login to your Linux machine.
- Navigate to /data/spark by executing the command cd /data/spark
- You can create or download the read-delta-from-minio.py from the following location:
- Git Repository address: https://github.com/dzbeda/spark-delta_lake-trino-hive-minio.git
- File location: ./read-delta-from-minio.py
from pyspark.sql import SparkSession
# Replace these values with your MinIO configuration
minio_access_key = "N1a96Y3vfdsfdsfdsfsdfdsOa"
minio_secret_key = "lch9rNXRfdsfdsfdsfdsfdsfdsfdsV18umhYgmKnwGWaCD"
minio_endpoint = "http://130.1.1.1:9000" # Example: http://localhost:9000
input_bucket = "s3a://spark-delta-lake/non-partition" # Replace with your desired input path in MinIO
# Create a Spark session with the necessary MinIO/S3A configurations
spark = SparkSession.builder \
.appName("ReadDeltaToMinIO") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.hadoop.fs.s3a.access.key", minio_access_key) \
.config("spark.hadoop.fs.s3a.secret.key", minio_secret_key) \
.config("spark.hadoop.fs.s3a.endpoint", minio_endpoint) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
.getOrCreate()
# Read DataFrame from Delta format on MinIO
df = spark.read.format("delta").load(input_bucket)
# Show the DataFrame
df.show()
# Stop Spark Session
spark.stop()
- Modify the specified sections in the Python script
- minio_access_key — This is the access key that has permission to the bucket where we intend to read the data from
- minio_secret_key — This is the secret access key
- minio_endpoint — This is the MINIO address. in my setup the address and port of the MINIO are http://130.1.1.1.9000
- output_bucket — This is the bucket and path which we wish to read the data from. In our exercises, the bucket name is spark-delta-lake, and the non-partitioned path holds the saved data. s3a://spark-delta-lake/non-partition or s3a://spark-delta-lake/partition
5. To read the data, execute the following command
spark-submit --jars /data/spark/delta-core_2.12-2.4.0.jar,/data/spark/delta-storage-2.4.0.jar,/data/spark/hadoop-aws-3.3.4.jar,/data/spark/aws-java-sdk-bundle-1.12.533.jar /data/spark/read-delta-from-minio.py
6. Confirm that the data has been read by running these steps
- From the code output verify that records are read
Congratulations! You’ve successfully written data to MinIO S3-compatible storage in Delta Lake format and read it back using Apache Spark. This POC demonstrates how you can leverage a cost-effective yet powerful solution for managing your data.
On my next blog Setting Up Trino with Hive to Query Delta Lake Data on MinIO: A Scalable Big Data Solution, we’ll take things a step further by deploying Trino (formerly Presto), Hive, and Postgres to query the data stored in MinIO S3. Stay tuned for more insights on building a scalable and efficient data-querying infrastructure!
If you liked this blog don’t forget to clap and follow me on both Medium and Linkedin