Spark-Mongo Aggregation MadeEasy

Somnath Dutta
5 min readJul 28, 2023
Spark-Mongo Aggregation

A. Introduction

In today’s data-driven world, the need for efficient and scalable data processing solutions has never been greater. Enter Apache Spark and MongoDB, two cutting-edge technologies that have revolutionised the way we handle and analyse vast amounts of data. Apache Spark serves as a unified analytics engine, providing a powerful framework for large-scale data processing with its extensive range of APIs and tools. On the other hand, MongoDB, a document-oriented database, offers unmatched performance and flexibility for storing diverse data types.
When combined, Apache Spark and MongoDB form an unstoppable duo, enabling the creation of a robust and high-performance data processing pipeline. With Spark’s ability to process massive datasets and MongoDB’s storage capabilities, this dynamic duo opens the door to rapid and efficient data processing, empowering organisations to extract valuable insights and unlock the true potential of their data.

B. Environment and Connection Details

Spark-Mongo connection

B1. Environment

we will be utilizing Databricks and Cloud MongoDB as the foundation for our data processing environment.
The combination of Databricks and Atlas Cloud MongoDB provides a powerful ecosystem for building and executing efficient data processing pipelines. Databricks offers a streamlined environment to leverage Spark’s distributed computing capabilities, while Cloud MongoDB simplifies data storage and management. This integrated approach enables smooth data flow between Spark and MongoDB, empowering you to perform complex analytics, machine learning, and real-time processing on your MongoDB data.

B2. Connection Details

For Mongo-Spark connection we require three things:

  1. Mongo URI
  2. Mongo Database Name
  3. Collection Name
Mongo uri format = mongodb+srv://<username>:<password>@CONNECTION_STRING_HERE/

we can create free mongodb account in cloud mongodb and can generate our own connection string with username and password. Mongo provide upto 5gb of space in free account.

Additionally, we can create a free account in databricks and do the data processing there. We can spin a single node cluster for free.

C. Data pulling and Aggregation options

Efficient data pulling and aggregation are crucial steps in data analysis. When working with MongoDB and Spark, the mongo-spark connector offers versatile options to streamline these processes. In this section, we’ll delve into two distinct approaches for data pulling and aggregation, exploring the significance of MongoDB query pipelines.

1.Pulling Data of Entire Collection:

The first approach involves fetching the complete dataset from a MongoDB collection and bringing it into the Spark cluster. Once the data resides in Spark, we can leverage its powerful APIs and functions to perform comprehensive analysis on the entire dataset, providing a holistic view of the data.

df = spark.read \
.format("mongo") \
.option("uri", mongo_url) \
.option("database", database) \
.option("collection", collection) \
.load()

2. Pulling Data with MongoDB Query Aggregate Pipeline:

Alternatively, we can make use of MongoDB’s query aggregation pipeline in conjunction with the Spark read command. By defining a pipeline, we can perform data preprocessing directly within MongoDB before transferring only the required aggregated or filtered data to Spark. This approach is particularly advantageous when working with specific subsets of data, as it reduces unnecessary data transfers and greatly enhances efficiency.

Example of Mongo Query in Pipeline: Let’s consider an example where we want to filter data based on a specific condition using the aggregation pipeline:


pipeline = [{'$match': {'field_name': {'$gte': 100}}}]

df = spark.read \
.format("mongo") \
.option("uri", mongo_url) \
.option("database", database) \
.option("collection", collection) \
.option("pipeline", pipeline) \
.load()

Significance of MongoDB Query Pipeline

The MongoDB query pipeline allows us to specify a sequence of operations to process the data within MongoDB before it reaches Spark. This means we can perform complex transformations, aggregations, and filtering operations directly in the database, thereby reducing the amount of data transferred to Spark.

D. Data Pushing in MongoDB via Spark

After performing complex data aggregations and transformations in Spark, the next crucial step is to persist the processed data for future analysis or sharing it with other applications. MongoDB, as a NoSQL database, offers an excellent solution for storing and managing semi-structured or unstructured data efficiently. Leveraging the powerful integration between Spark and MongoDB facilitated by the mongo-spark connector, we can seamlessly push data from Spark to MongoDB collections, enabling a seamless data flow in our analytical workflows. In this section, we’ll explore the data pushing process in MongoDB after Spark aggregation, along with the various options available for customization.

1. Data Pushing Process:

The process of pushing data from Spark to MongoDB is a simple yet crucial operation. We can achieve this by utilizing the write method available for DataFrames in Spark.

aggregatedDF.write \
.format("mongo") \
.option("uri", mongo_url) \
.option("database", database) \
.option("collection", collection) \
.save()

2. OperationType for Data Pushing:

The mongo-spark connector provides different operation types for data pushing, allowing us to control how data is inserted, replaced, or updated in MongoDB.

2.1 Insert Operation

The insert operation is used to add new documents to the MongoDB collection. It does not require a unique identifier field as it generates a unique _id automatically for each inserted document.

aggregatedDF.write \
.format("mongo") \
.option("uri", mongo_url) \
.option("database", database) \
.option("collection", collection) \
.option("operationType", "insert") \
.save()

2.2 Replace Operation

The replace operation allows us to replace existing documents in the collection based on a specified unique identifier field idFlieldList. The default idField is _id. We can give our own customised idFieldList with comma separated string. If document with the given unique identifier already exists, it will be replaced; otherwise, a new document will be inserted.

aggregatedDF.write \
.format("mongo") \
.option("uri", mongo_url) \
.option("database", database) \
.option("collection", collection) \
.option("idFieldList", "item_id,date") \
.option("operationType", "replace") \
.mode("append") \
.save()

2.3 Update Operation

The Update operation allows us to update existing documents in the collection based on a specified unique identifier field idFlieldList. The default idField is _id. We can give our own customised idFieldList with comma separated string. If document with the given unique identifier already exists, it will be updated; otherwise, a new document will be inserted.

aggregatedDF.write \
.format("mongo") \
.option("uri", mongo_url) \
.option("database", database) \
.option("collection", collection) \
.option("idFieldList", "item_id,date") \
.option("operationType", "update") \
.mode("append") \
.save()

E. Conclusion

The Spark-MongoDB combination offers powerful data processing and aggregation capabilities. With two distinct approaches for data pulling and aggregation, we can efficiently retrieve, analyze, and transform data to gain valuable insights.

--

--