The Need for Speed: Efficient Data Processing in the Modern World
In today’s fast-paced world, efficient data processing is essential for any successful data pipeline, especially when handling large datasets. Recently, I tackled a challenge involving extracting a large number of files, applying business-specific transformations, and loading them into a consolidated output—all within a Microsoft Fabric environment. This article explores the techniques I used to optimise performance. These techniques include multithreading, Spark configurations, and custom data pipeline design.
Parallel Data Processing Use Case
Firstly, my task was to process a large volume of JSON files. Each file contained data in a non-standard format, including special characters and custom delimiters. The key requirements were:
- Efficient file processing: Apply business logic to input semi-structured JSON files and produce one consolidated CSV output. The conversion to CSV also required some business-specific processing.
- Custom formatting and encoding: The data had to retain its original format, including special characters and unique delimiters.
- Dynamic resource allocation: Spark configuration needed to scale optimally based on the workload.
- Cost-effective solution: The pipeline was designed for optimal compute usage and fast processing, ensuring cost efficiency.
Approach and Solutions
I approached these challenges by breaking the solution into manageable components, focusing on performance and scalability.
1. Parallel Processing with Multithreading and Spark
Secondly, I began by loading the JSON data into a delta table by flattening the JSON’s nested structure. This simplified the application of business logic while managing complexity through delta tables and their corresponding data frames. To efficiently process the JSON files in parallel, I designed a data pipeline that enabled the simultaneous execution of notebook runs. This was achieved using Azure Pipelines and multithreading within Spark.
Multithreading in Spark: Initially, I explored using the mssparkutils.notebook.runMultiple() command to trigger multiple notebooks in parallel. However, this approach led to performance issues. As an alternative, I utilised Python’s concurrent.futures.ThreadPoolExecutor. I combined this with specific parameters to increase scalability and optimise test runs with subsets of the data. This approach enabled me to process large datasets in chunks, with each job working on a specific row range.
from concurrent.futures import ThreadPoolExecutor
def process_file(start_row, end_row):
# Call the Spark job with specific start and end row
mssparkutils.notebook.run('ProcessCSV', 60, {'startRow': start_row, 'endRow':end_row})
with ThreadPoolExecutor(max_workers=20) as executor:
# Batch size and number of parallel jobs
batch_size = 1000
for i in range(0, 5000, batch_size):
executor.submit(process_file, i, i + batch_size)
2. Optimised Spark Configuration for Resource Management
The next challenge was optimising the Spark pool configuration to ensure it could handle parallel tasks efficiently. I focused on scaling executors and tuning Spark settings for memory optimisation and CPU allocation:
- Microsoft Fabric Capacity: I used F64 capacity, where per-thread processing is moderate in terms of memory and compute utilisation. I opted for a small node size to run more parallel threads. With 1 CU equalling 2 V-cores, F64 provides 128 Spark V-cores.
- Burst Factor: A burst factor of 3 allows the capacity to temporarily use up to 3 times its base V-cores, totalling a maximum of 384 Spark V-cores (384 parallel threads).
- Autoscaling: Dynamic resource allocation allowed Spark to adjust the number of executors based on the workload, preventing system overload while maximising resource use.
- Tuning Spark Pool: I adjusted the number of executors to match my workload and carefully monitored CPU and memory usage for each job to avoid resource contention.
- Executor Spark Cluster Setting: I created a small node custom pool with an autoscale limit of 32 and a dynamic executor allocation of 31. This Spark pool served as the default for the environment, utilising all available Spark executors of small-size nodes.
3. CSV Processing and Consolidation
However, retaining the original format of the CSV files presented a significant challenge. The files contained special characters, custom delimiters, and no headers. To address this, I avoided standard DataFrame operations, which could alter the CSV structure. Instead, I employed file streaming techniques to read individual CSVs and consolidate the final CSV data directly from lakehouse paths. This approach also facilitated more efficient writing of consolidated files.
Key Techniques Used for Parallel Data Processing:
- Reading CSVs Directly from Lakehouse: Instead of loading CSVs into a DataFrame (which might lose formatting), I used the Python
open()
command to read raw CSV content as a string. - Consolidating CSV Files: After reading multiple CSV files, I concatenated their contents into a single string variable. Then, I used
mssparkutils.fs.put()
to write the consolidated data back to the Lakehouse.
csv_files = mssparkutils.fs.ls(csv_folder_path)
consolidated_data = ""
for i, file_info in enumerate(csv_files):
if file_info.name.endswith('.csv'):
# Construct the full file path
file_path_original = os.path.join(subdir_path, file_info.name)
file_path_forOpen = file_path_original.replace('abfss://{LakehouseName}', '/lakehouse/default')
file_object = open(file_path_forOpen, 'r')
file_content = file_object.read()
# Append the content to the consolidated data
consolidated_data += file_content + '\n'
# Write the consolidated CSV back to the Lakehouse
mssparkutils.fs.put(final_output_path, consolidated_data, "True")
This method ensured that the special characters and formatting were preserved.
4. Custom Pipeline Design for Batch Processing
To orchestrate the execution of multiple jobs efficiently, I used Azure Pipelines to design a pipeline that triggers the parallel execution of notebooks based on different row ranges. I utilised the ‘ForEach activity’ with a batch count greater than 1, enabling multiple Spark jobs to run concurrently. The number of batch counts allows for scaling the parallel jobs up or down.
Key Pipeline Design Features:
- Pipeline Parameters: The pipeline took parameters such as start_row, batch_size, and max_parallel_runs, which dictated how many notebooks would run concurrently and which rows they would process.
- Parallel Execution of Notebooks: The pipeline was configured to call multiple notebooks with different row ranges simultaneously, ensuring that the jobs ran concurrently without waiting for one another to complete.
Conclusion
In short, by combining multithreading, custom Spark cluster configurations, and pipeline design, I significantly improved file processing performance in Microsoft Fabric. The parallel data processing strategy and optimised resource allocation allowed for dynamic system scaling and handling large data volumes without bottlenecks. Furthermore, careful management of source-to-destination formatting ensured data integrity throughout the consolidation process.
These strategies offer a flexible and scalable solution for optimising data processing workloads in Microsoft Fabric, especially in environments demanding high performance and accuracy.
Key Takeaways
- Multithreading enables parallel data processing of large datasets.
- Spark cluster tuning ensures optimal resource utilisation.
- Azure Pipelines with the ‘ForEach’ activity can effectively manage concurrent notebook runs for batch processing.
If you’re dealing with large datasets in a similar environment, these techniques can help you boost performance while maintaining data integrity.
Devoteam helps you turn data into impact with Microsoft Data Solutions
With a team of 1,200+ Microsoft Experts, 18 Advanced Specialisations and a preferred Microsoft Partner for Premium Consulting, Solutions & Managed Services across EMEA. Devoteam helps you turn data into actionable insights to drive tangible business impact.