**REAL-TIME IOT SENSOR TEMPERATURE ANALYSIS**
¶

**Big Data Real-Time Analytics with Python, Apache Spark and Apache Kafka**
¶

TABLE OF CONTENTS¶

  • SUMMARY - ABSTRACT
  • 1. THE BUSINESS PROBLEM & DATA ARCHITECTURE
  • 2. DATA INPUT
    • 2.1. PYTHON - Data generator
    • 2.2. Data output examples
  • 3. APACHE KAFKA
    • 3.1. ZOOKEEPER - Cluster manager initialization
    • 3.2. Kafka Initialization
    • 3.3. Topics creation
    • 3.4. Data stream start
  • 4. APACHE SPARK (PYSPARK) - RUNNING THE CODE
    • 4.1. Creating Spark session
    • 4.2. Kafka/Spark structured stream read
    • 4.3. Data source schema definition
    • 4.4. Data source parsing
    • 4.5. Dataframe preparation
    • 4.6. SQL - Real-Time data analysis preparation
    • 4.7. SQL - Real-Time data visualization and different run methods
  • 5. ENDING & FINAL REMARKS

SUMMARY - ABSTRACT ¶

Return to Index¶

Apache Kafka and Spark project, showcasing Python and Real-Time Datastream skills.

Real-Time analytics is becoming more and more important, specially when we consider critical actions on scaled environments that require a quick intervention.

In this project, I want to show some skills regarding real-time data stream analysis and transformations.


Nowadays, every data professional should be comfortable with open-source solutions, specially the ones that are working on startups and highly scalable projects.

The idea of this project is to show technical skills, challenges that one might find when using tools without a GUI (Graphical User Interface).

OBJECTIVE: Analyze Real-Time data using Apache Kafka and PySpark.¶

I'll be using randomly generated data for this project, so I won't link any datasets nor Apache's official websites.

USED TECHNOLOGIES¶
  1. Python
  2. Apache Kafka
  3. Apache Spark
    3.1. PySpark
    3.2. SparkSQL
    3.3. PySparkStreaming

Let's begin!


1. THE BUSINESS PROBLEM & DATA ARCHITECTURE ¶

Return to Index¶

Assume your company have multiple sensors for industrial equipments that require critical attention, or deeper analysis for preventive maintenance.

Or even, assume that your business has a highly scalable application that would benefit from Real-Time analysis of its database.

Seems like a difficult job to pull off, right?
Well, I wouldn't say it's easy, but bare with me:

covered.gif

Let's take a look into how we will conduct the Data Architecture / Structure:
¶

DATA_ARCHITECTURE.png


There are basically 5 very good reasons to use Apache Kafka for a project like this:

  1. Low Latency and High Throughput
    Kafka is designed for high-performance data processing, with a focus on low latency and high throughput. It achieves this by leveraging techniques such as sequential disk I/O, message batching, and efficient data compression. These optimizations make Kafka suitable for handling massive data streams and processing them in near real-time.

  2. Distributed and Scalable
    Kafka is built as a distributed system, allowing it to handle large-scale data streams and process high volumes of data in real-time. It achieves scalability by distributing data and processing across multiple servers, making it highly resilient and capable of handling immense data loads.

  3. Fault Tolerance and Durability
    Kafka is designed to be highly fault-tolerant, ensuring data availability even in the face of failures. It achieves this through replication, where data is replicated across multiple broker nodes in a Kafka cluster. This replication mechanism ensures that even if some nodes fail, the data remains accessible and the system continues to operate without interruption.

  4. Ecosystem Integration
    Kafka integrates well with various components and frameworks within the larger data processing ecosystem. It supports connectors that enable seamless integration with popular data storage systems, such as Apache Hadoop and Apache Cassandra, as well as streaming frameworks like Apache Flink and Apache Spark. This interoperability enhances Kafka's usability and allows for flexible and powerful data processing pipelines.

  5. Data Integration and Decoupling
    Kafka acts as a central hub for data integration, decoupling data producers and consumers. This decoupling allows for loose coupling between different components and applications, enabling independent scalability and fault tolerance for each part of the system. It also supports replayability, meaning data can be consumed multiple times by different applications or stored for future analysis.


2. DATA INPUT ¶

Return to Index¶
Remember the last time you had multiple industrial sensors at home?

Well, I don't...



We have to generate data somehow to make the usage of Kafka possible.
For that, I've decided to write a data generator in Python that would give me random values which would simulate industrial sensors. Bare with me because this is quite interesting.

2.1. PYTHON - DATA GENERATOR ¶

Return to Index¶

I'm going to input the commented code of the data generator below but won't execute it in this Jupyter Notebook. I am going to execute it with Python through command line to save a file with the generated values and input that information into Kafka/Spark.

Feel free to test it out! You can run it mutiple times and you will realize the random output.

Here's how it works:

In [ ]:
# IoT Sensor Data Simulator

# Run the simulator with the following command at the terminal:
# python simulator.py 10000 > ../data/sensor_data.txt

# Imports
import re
import sys
import copy
import random
import string
import datetime
from random import randrange

# Defines the number of messages generated.
# If no value is informed, generates 10 values.
# IMPORTANT: IN THIS COMMENTED SNIPPET BELOW, IT USES 'SYS.ARGV' THAT WON'T WORK ON THIS NOTEBOOK,
# SO I'LL SET THE num_msgs to 10 by default, just so we can check its output.
#if len(sys.argv) > 1:
#  num_msgs = int(sys.argv[1])
#else:
#  num_msgs = 10

num_msgs = 10

# Set sensor base temperature
dic_temp_sensors = {'sensor1': 38.3, 'sensor2': 45.3, 'sensor3': 31.8, 'sensor4': 73.1, 'sensor5': 71.8, 'sensor6': 63.7, 
										 'sensor7': 80.7, 'sensor8': 52.0, 'sensor9': 64.1, 'sensor10': 62.7, 'sensor11': 73.4, 'sensor12': 54.2, 
										 'sensor13': 76.4, 'sensor14': 49.0, 'sensor15': 50.4, 'sensor16': 58.8, 'sensor17': 47.6, 'sensor18': 55.4, 
										 'sensor19': 58.8, 'sensor20': 49.4, 'sensor21': 59.9, 'sensor22': 45.1, 'sensor23': 55.1, 'sensor24': 16.6, 
										 'sensor25': 42.8, 'sensor26': 50.4, 'sensor27': 32.9, 'sensor28': 71.8, 'sensor29': 33.5, 'sensor30': 71.7, 
										 'sensor31': 37.8, 'sensor32': 69.6, 'sensor33': 50.3, 'sensor34': 84.4, 'sensor35': 79.0, 'sensor36': 11.0, 
										 'sensor37': 64.2, 'sensor38': 57.9, 'sensor39': 60.7, 'sensor40': 58.6, 'sensor41': 64.5, 'sensor42': 31.2, 
										 'sensor43': 54.4, 'sensor44': 40.1, 'sensor45': 44.3, 'sensor46': 62.7, 'sensor47': 53.4, 'sensor48': 52.4, 
										 'sensor49': 45.6, 'sensor50': 58.4}

# Base ID for each sensor
id_base_sensor = "S-HAR-PORT-DATA-19951-"

# Base ID for each equipment
id_base_equipment = "E-HAR-PORT-DATA-25015-"

# Sensor standard readout
readout = "iot:reading:sensor:temp"

# Set everything to upper case
letters = string.ascii_uppercase

# String with readout standard
header_reading_iot = """\
{ "id_sensor": "%s",
  "id_equipment": "%s",
  "sensor": "%s", """

iotmsg_date_event = """\
  "date_event": "%sZ", """

iotmsg_format = """\
  "standard": {"format": "%s", """

iotmsg_data ="""\
	"reading": { "temperature": %.1f  }   
	 }
}"""

# Sensor ID mapping dictionary
dic_map_sensor_id = {} 

# Latest measurement dictionary
dic_current_temp = {}

# Generates JSON output
if __name__ == "__main__":

	# Loop from 0 until the numbers of messages defined when running the simulator
	for counter in range(0, num_msgs):

		# Generates 3 random numbers
		rand_num = str(random.randrange(0, 9)) + str(random.randrange(0, 9)) + str(random.randrange(0, 9))

		# Generates 2 random letters
		rand_letter = random.choice(letters) + random.choice(letters)

		# Generates random value for temperature with uniform distribution
		rand_temp_value = random.uniform(-5, 5)

		# Generates another random value following an uniform distribution
		rand_temp_value_delta = random.uniform(-1, 1)

		# Sensor ID gets the base value plus the generate values.
		id_sensor = id_base_sensor + rand_num + rand_letter 

		# Equipment ID gets the base value plus the generate values.
		id_equipment = id_base_equipment + rand_num + rand_letter 

		# Selects random values from dictionaries.
		sensor = random.choice(list(dic_temp_sensors.keys())) 

		# If sensor is not associated with mapping, runs association.
		if (not id_sensor in dic_map_sensor_id): 

			# Includes sensor in the list.
			dic_map_sensor_id[id_sensor] = sensor 

			# Includes temperature in the list.
			dic_current_temp[id_sensor] = dic_temp_sensors[sensor] + rand_temp_value
			
		# If sensor is not on final list, includes it.
		elif (not dic_map_sensor_id[id_sensor] == sensor):		
			sensor = dic_map_sensor_id[id_sensor]

		# Extra temperature adjustment for it to be as random as possible.
		temperature = dic_current_temp[id_sensor] + rand_temp_value_delta
		dic_current_temp[id_sensor] = temperature

		# Writes current time to script's events.
		today = datetime.datetime.today() 
		date_event = today.isoformat()

		# Prints results in JSON format.
		print(re.sub(r"[\s+]", "", header_reading_iot) % (id_sensor, id_equipment, sensor),
					re.sub(r"[\s+]", "", iotmsg_date_event) % (date_event),
					re.sub(r"[\s+]", "", iotmsg_format) % (readout),
					re.sub(r"[\s+]", "", iotmsg_data) % (temperature))

2.2. DATA OUTPUT EXAMPLES ¶

Return to Index¶
Here are another two output examples with 10 messages each, by default:

example1.png

Example 1

example2.png

Example 2

3. APACHE KAFKA ¶

Return to Index¶

I want to promote a CONCEPT here. Why would we want real-time analysis?

Here are some compelling reasons why one would want to implement that:

1. Real-time Decision Making: Real-time data analysis allows businesses to make critical decisions immediately as data streams in. This is especially valuable in situations where timely actions can lead to a competitive advantage. For example, in finance, making real-time investment decisions can be crucial for maximizing returns.

2. Proactive Issue Detection: Real-time analysis enables early detection of issues and anomalies. By continuously monitoring data streams, businesses can identify potential problems before they escalate, thus minimizing the impact and reducing downtime.

3. Personalization and User Experience: In industries like e-commerce and online advertising, real-time data analysis can be used to personalize user experiences. Analyzing user behavior in real-time allows businesses to recommend relevant products, services, or content instantly, leading to improved customer satisfaction and engagement.

4. Fraud Detection and Security: Real-time data analysis is vital for identifying fraudulent activities as they happen. For example, financial institutions can use real-time analysis to detect suspicious transactions and prevent fraud in real-time, providing an additional layer of security.

5. IoT and Sensor Data: In IoT (Internet of Things) environments, numerous sensors generate continuous streams of data. Real-time data analysis enables the monitoring and analysis of sensor data as it is produced, enabling responsive actions based on the data.

6. Stream Processing Efficiency: Apache Kafka, as a distributed streaming platform, allows for high-throughput, low-latency stream processing. Real-time data analysis with PySpark on Kafka streams can efficiently handle large volumes of data and deliver results in near real-time.

7. Operational Efficiency: Real-time data analysis helps organizations optimize their operations by identifying inefficiencies, bottlenecks, and opportunities for improvement in real-time. This allows for rapid adjustments and optimizations to achieve better performance.

8. Event-Driven Architecture: Real-time data analysis is a fundamental component of event-driven architectures, where actions are triggered in response to specific events or data patterns. This architecture is scalable, flexible, and can lead to a more responsive system.

9. Market Insights and Trends: For businesses in competitive markets, real-time data analysis provides up-to-the-minute insights into market trends, customer preferences, and competitor behavior. This information is invaluable for staying ahead in the market.

10. Real-time Reporting and Visualization: Real-time data analysis allows for dynamic and interactive reporting and visualization. Stakeholders can access real-time dashboards and reports, facilitating quick understanding and decision-making.

In summary, real-time data analysis is essential for making informed, timely decisions, detecting issues proactively, optimizing operations, enhancing user experiences, and gaining a competitive advantage in today's fast-paced, data-driven world. Python, PySpark, and Apache Kafka together provide a powerful stack to implement real-time data analysis solutions efficiently.

With that in mind, let's get down to business.


3.1. ZOOKEEPER - Cluster manager initialization ¶

Return to Index¶

Kafka can run on servers and on clusters, but for this example I'll run everything locally. Anyway, we need to initialize the Zookeeper, which is our Kafka cluster manager.

Kafka runs on command line, there's no user interface. So, for that, we will open 3 different terminal windows. In total, I'll have 4 terminal windows open because I'll also run a Jupyter Notebook.

It should look something like this:

TERMINALS_KAFKA.png

Notice that it is required to change directory to Kafka's folder.
That being said, let's execute each command prompt one by one.

ZOOKEEPER.png

ZOOKEEPER

3.2. Kafka Initialization¶

Return to Index¶

KAFKA.png

KAFKA

3.3. Topics creation ¶

Return to Index¶

CREATE%20TOPIC.png

CREATE TOPIC

Once the topic is created, we also need to describe it with the command: bin/kafka-topics.sh --describe --topic helioport --bootstrap-server localhost:9092.

Should look like this:

DESCRIBE%20TOPIC.png

3.4. Data stream start ¶

Return to Index¶

Now that we have started our services and topics, it's time to initialize the data stream. For that, let's use the last terminal, the one that we created and described the topic, to initialize the data stream.

IMPORTANT CONCEPT: Kafka behaves as an sort of repository for data streams. That means that Kafka by itself won't download or upload data, we need to use connectors to make our data generator send info to Kafka, and a connector to subscribe to the information available on Kafka.

Let's open the console producer, which is our data producer.

The command connect to the folder which the Python data generator will store data to, is: bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic helioport < ../data/sensor_data.txt

Once you run this command you will realize that nothing really happens on your screen, but bare with me. Let's check if our streaming is functional. Now that I have run the console producer, let's run the console consumer, still on the very same terminal.

CONSOLE%20CONSUMER.png

Once you run this command, a surprise: Here is the data retrieved from out Python Data Generator:

CONSUMER%20WITH%20DATA.png

In theory, what is going on is that Kafka is maintaing this information on the memory to make sure the data is persistent, while the producer sends data to Kafka and the consumer retrieves data from it.

Now that our Kafka setup is up and running, it's time for some PySpark.

4. APACHE SPARK (PYSPARK) - RUNNING THE CODE ¶

Return to Index¶
In [1]:
# Chek the Python version:
from platform import python_version
print('Python version:', python_version())
Python version: 3.9.13
In [2]:
# If not installed yet, install findspark.
# !pip install findspark
In [3]:
# Imports findspark and initializes it.
import findspark
findspark.init()
In [4]:
# Import required modules.
# NOTE: We only need to use .streaming to get the data. See how we are using much more SQL libraries?
# In the end, all the data we are retrieving is to solve a business problem through data analysis.
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, from_json
In [5]:
# Connector
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'
In [6]:
# Spark versions used in this project
%reload_ext watermark
%watermark -a "Helio Ribeiro" --iversions
Author: Helio Ribeiro

findspark: 2.0.1
pyspark  : 3.4.0

4.1. Creating Spark session ¶

Return to Index¶
In [7]:
# Create Spark session
spark = SparkSession.builder.appName("RealTimeProject").getOrCreate()
23/07/22 19:00:48 WARN Utils: Your hostname, Helios-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 172.20.10.12 instead (on interface en0)
23/07/22 19:00:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/helioribeiro/.ivy2/cache
The jars for the packages stored in: /Users/helioribeiro/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d35725a3-b00a-4c9f-8a25-bfe19276b5d6;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.0 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 241ms :: artifacts dl 13ms
	:: modules in use:
	com.google.code.findbugs#jsr305;3.0.0 from central in [default]
	commons-logging#commons-logging;1.1.3 from central in [default]
	org.apache.commons#commons-pool2;2.11.1 from central in [default]
	org.apache.hadoop#hadoop-client-api;3.3.2 from central in [default]
	org.apache.hadoop#hadoop-client-runtime;3.3.2 from central in [default]
	org.apache.kafka#kafka-clients;2.8.1 from central in [default]
	org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.0 from central in [default]
	org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.0 from central in [default]
	org.lz4#lz4-java;1.8.0 from central in [default]
	org.slf4j#slf4j-api;1.7.32 from central in [default]
	org.spark-project.spark#unused;1.0.0 from central in [default]
	org.xerial.snappy#snappy-java;1.1.8.4 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   12  |   0   |   0   |   0   ||   12  |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-d35725a3-b00a-4c9f-8a25-bfe19276b5d6
	confs: [default]
	0 artifacts copied, 12 already retrieved (0kB/5ms)
23/07/22 19:00:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

4.2. Kafka/Spark Structured Stream read ¶

Return to Index¶
In [8]:
# Create topic subscription of the data we desire to pull from Kafka.
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "helioport") \
  .load()

4.3. Data source schema definition ¶

Return to Index¶
In [9]:
# Defining the data schema we desire to capture for analysis (temperature).
temp_data_schema = StructType([StructField("reading", 
                                             StructType([StructField("temperature", DoubleType(), True)]), True)])
In [10]:
# Defining global data schema for this streaming
data_schema = StructType([ 
    StructField("id_sensor", StringType(), True), 
    StructField("id_equipment", StringType(), True), 
    StructField("sensor", StringType(), True), 
    StructField("date_event", StringType(), True), 
    StructField("standard", temp_data_schema, True)
])

4.4. Data source parsing ¶

Return to Index¶
In [11]:
# Capture each data value as a string
df_convert = df.selectExpr("CAST(value AS STRING)")
In [12]:
# Parse JSON as dataframe.
df_convert = df_convert.withColumn("jsonData", from_json(col("value"), data_schema)).select("jsonData.*")
In [13]:
df_convert.printSchema()
root
 |-- id_sensor: string (nullable = true)
 |-- id_equipment: string (nullable = true)
 |-- sensor: string (nullable = true)
 |-- date_event: string (nullable = true)
 |-- standard: struct (nullable = true)
 |    |-- reading: struct (nullable = true)
 |    |    |-- temperature: double (nullable = true)

4.5. Dataframe preparation ¶

Return to Index¶
In [14]:
# Ranming columns for simplified analysis
df_convert_temp_sensor = df_convert.select(col("standard.reading.temperature").alias("temperature"), 
                                               col("sensor"))
In [15]:
df_convert_temp_sensor.printSchema()
root
 |-- temperature: double (nullable = true)
 |-- sensor: string (nullable = true)

In [16]:
# We are unable to visualize the dataframe with .head because it came from streaming.
# Queries with streaming sources must be executed with writeStream.start();kafka

4.6. SQL - Real-Time data analysis preparation¶

Return to Index¶
In [17]:
# This is the object that will contain the analysis, with the average temperature per sensor.
df_avg_temp_sensor = df_convert_temp_sensor.groupby("sensor").mean("temperature")
In [18]:
df_avg_temp_sensor.printSchema()
root
 |-- sensor: string (nullable = true)
 |-- avg(temperature): double (nullable = true)

In [19]:
# Renaming columns for simplifies analysis
df_avg_temp_sensor = df_avg_temp_sensor.select(col("sensor").alias("sensor"), 
                                                   col("avg(temperature)").alias("avg_temp"))
In [20]:
df_avg_temp_sensor.printSchema()
root
 |-- sensor: string (nullable = true)
 |-- avg_temp: double (nullable = true)


4.7. SQL - Real-Time data visualization and different run methods ¶

Return to Index¶

Are you ready? This part is really important, since it's where we are going to visualize our data. There are different ways of running the stream, with different expire times, including a way to automatise the stream stop.

Kafka is really good here because you can run multiple streams simultaneously, therefore making it the perfect tool for real-time data stream.

ATTENTION.

Remember that I've opened 4 terminals when we started?

Now, we need to open a fifth one that will run our python data generator to feed our Kafka stream!

Here's what it looks like now:

all_terminals.png

Here are the commands we need to run to visualize our stream, in this order:

5th Terminal:python simulator.py 1000 > ../data/sensor_data.txt

4th Terminal: bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic helioport < ../data/sensor_data.txt


What we are doing here, is providing Kafka with a new data stream. When we run our following spark commands, it will show a blank batch at first but the process will keep running, so it's very important that you ingest data.

Below, my execution for comparison.

In [21]:
# Object that initiates the straming with console format.
# This is the moment where Spark estabilishes the connection with Kafka to retrieve data.
query = df_avg_temp_sensor.writeStream.outputMode("complete").format("console").start()
23/07/22 19:00:57 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/vy/8tkfcqt12f31vsh13f_sb00h0000gn/T/temporary-a55c30d0-e95b-47ea-916d-6a33d05a0d8b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/07/22 19:00:57 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/07/22 19:00:57 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
23/07/22 19:00:57 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
23/07/22 19:00:57 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
23/07/22 19:00:57 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
23/07/22 19:00:57 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.
                                                                                
-------------------------------------------
Batch: 0
-------------------------------------------
+------+--------+
|sensor|avg_temp|
+------+--------+
+------+--------+

                                                                                
-------------------------------------------
Batch: 1
-------------------------------------------
+--------+------------------+
|  sensor|          avg_temp|
+--------+------------------+
| sensor7| 82.10476190476192|
|sensor34| 85.08421052631579|
|sensor41|           65.2375|
|sensor50| 60.07500000000001|
|sensor38|  57.1578947368421|
|sensor31| 37.39411764705883|
| sensor1| 39.25454545454546|
|sensor30| 70.92307692307692|
|sensor10| 61.41111111111112|
|sensor25| 43.15833333333333|
| sensor4|           73.3375|
| sensor5| 72.50555555555556|
|sensor20| 48.91428571428572|
|sensor44|           40.8375|
|sensor19|58.720000000000006|
| sensor8|51.278571428571425|
|sensor14| 49.12857142857142|
|sensor24|16.544999999999995|
|sensor43|             54.65|
|sensor47|53.775000000000006|
+--------+------------------+
only showing top 20 rows

                                                                                
-------------------------------------------
Batch: 2
-------------------------------------------
+--------+------------------+
|  sensor|          avg_temp|
+--------+------------------+
| sensor7|  82.1909090909091|
|sensor34| 85.20869565217392|
|sensor41| 64.63684210526316|
|sensor50|59.828571428571436|
|sensor31|37.790000000000006|
|sensor38| 56.96190476190476|
| sensor1| 38.90833333333334|
|sensor30| 70.63333333333334|
|sensor10| 61.89000000000001|
|sensor25| 43.05384615384615|
| sensor4| 73.39000000000001|
| sensor5| 72.39166666666667|
|sensor20| 49.06000000000001|
|sensor44| 40.75714285714285|
|sensor19| 58.81304347826087|
| sensor8|51.231249999999996|
|sensor14| 49.06086956521739|
|sensor24| 16.18181818181818|
|sensor43| 55.33076923076923|
|sensor47| 53.67857142857144|
+--------+------------------+
only showing top 20 rows

                                                                                
-------------------------------------------
Batch: 3
-------------------------------------------
+--------+------------------+
|  sensor|          avg_temp|
+--------+------------------+
| sensor7| 82.21739130434784|
|sensor34|            84.916|
|sensor41| 64.63684210526316|
|sensor50| 60.22500000000001|
|sensor31|37.790000000000006|
|sensor38|56.979166666666664|
| sensor1|             38.85|
|sensor30|            70.875|
|sensor10|61.775000000000006|
|sensor25|           43.0875|
| sensor4|  73.3904761904762|
| sensor5| 72.55555555555556|
|sensor20| 48.68000000000001|
|sensor44| 40.47727272727273|
|sensor19| 58.87692307692308|
| sensor8| 51.07058823529411|
|sensor14|48.912499999999994|
|sensor24| 16.03478260869565|
|sensor43| 55.05555555555556|
|sensor47|54.022222222222226|
+--------+------------------+
only showing top 20 rows

                                                                                
-------------------------------------------
Batch: 4
-------------------------------------------
+--------+------------------+
|  sensor|          avg_temp|
+--------+------------------+
| sensor7|            81.885|
|sensor34| 84.19565217391305|
|sensor41| 64.33846153846153|
|sensor50| 59.61785714285715|
|sensor31| 37.92580645161291|
|sensor38|57.583720930232566|
| sensor1|             39.14|
|sensor30| 70.65454545454546|
|sensor10| 61.87419354838711|
|sensor25|          43.85625|
| sensor4| 73.54324324324325|
| sensor5| 72.95897435897436|
|sensor20| 49.13023255813953|
|sensor44| 40.63333333333334|
|sensor19|58.873684210526314|
| sensor8|52.411627906976726|
|sensor14|48.894594594594594|
|sensor24|16.634210526315787|
|sensor43| 54.45277777777778|
|sensor47|              53.5|
+--------+------------------+
only showing top 20 rows

That's it, we basically have our data already. We can run kafka in other ways, for instance, with awaitTermination, which will run until we actually stop the process.

There other things we can do with Kafka, like analysing the number of streams, the information about the active streams, the status of the process and more.

In [28]:
# Execute streaming query and avoid the process to be terminated.
#query.awaitTermination()
In [22]:
query.status
Out[22]:
{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}
In [23]:
query.lastProgress
Out[23]:
{'id': 'a7db1898-818b-4a09-a775-e1501ef07d72',
 'runId': '30e5f81a-950a-4ddd-99c0-d3a331bc84b5',
 'name': None,
 'timestamp': '2023-07-22T17:01:46.927Z',
 'batchId': 5,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 2, 'triggerExecution': 2},
 'stateOperators': [{'operatorName': 'stateStoreSave',
   'numRowsTotal': 50,
   'numRowsUpdated': 0,
   'allUpdatesTimeMs': 677,
   'numRowsRemoved': 0,
   'allRemovalsTimeMs': 0,
   'commitTimeMs': 11963,
   'memoryUsedBytes': 101936,
   'numRowsDroppedByWatermark': 0,
   'numShufflePartitions': 200,
   'numStateStoreInstances': 200,
   'customMetrics': {'loadedMapCacheHitCount': 1600,
    'loadedMapCacheMissCount': 0,
    'stateOnCurrentVersionSizeBytes': 30568}}],
 'sources': [{'description': 'KafkaV2[Subscribe[helioport]]',
   'startOffset': {'helioport': {'0': 10000}},
   'endOffset': {'helioport': {'0': 10000}},
   'latestOffset': {'helioport': {'0': 10000}},
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0,
   'metrics': {'avgOffsetsBehindLatest': '0.0',
    'maxOffsetsBehindLatest': '0',
    'minOffsetsBehindLatest': '0'}}],
 'sink': {'description': 'org.apache.spark.sql.execution.streaming.ConsoleTable$@1192a010',
  'numOutputRows': 0}}
In [24]:
query.explain()
== Physical Plan ==
WriteToDataSourceV2 MicroBatchWrite[epoch: 4, writer: ConsoleWriter[numRows=20, truncate=true]], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2479/0x00000008011cc040@521476fa
+- *(4) HashAggregate(keys=[sensor#28], functions=[avg(temperature#36)])
   +- StateStoreSave [sensor#28], state info [ checkpoint = file:/private/var/folders/vy/8tkfcqt12f31vsh13f_sb00h0000gn/T/temporary-a55c30d0-e95b-47ea-916d-6a33d05a0d8b/state, runId = 30e5f81a-950a-4ddd-99c0-d3a331bc84b5, opId = 0, ver = 4, numPartitions = 200], Complete, 0, 0, 2
      +- *(3) HashAggregate(keys=[sensor#28], functions=[merge_avg(temperature#36)])
         +- StateStoreRestore [sensor#28], state info [ checkpoint = file:/private/var/folders/vy/8tkfcqt12f31vsh13f_sb00h0000gn/T/temporary-a55c30d0-e95b-47ea-916d-6a33d05a0d8b/state, runId = 30e5f81a-950a-4ddd-99c0-d3a331bc84b5, opId = 0, ver = 4, numPartitions = 200], 2
            +- *(2) HashAggregate(keys=[sensor#28], functions=[merge_avg(temperature#36)])
               +- Exchange hashpartitioning(sensor#28, 200), ENSURE_REQUIREMENTS, [plan_id=1016]
                  +- *(1) HashAggregate(keys=[sensor#28], functions=[partial_avg(temperature#36)])
                     +- *(1) Project [jsonData#23.standard.reading.temperature AS temperature#36, jsonData#23.sensor AS sensor#28]
                        +- Project [from_json(StructField(id_sensor,StringType,true), StructField(id_equipment,StringType,true), StructField(sensor,StringType,true), StructField(date_event,StringType,true), StructField(standard,StructType(StructField(reading,StructType(StructField(temperature,DoubleType,true)),true)),true), cast(value#8 as string), Some(Europe/Madrid)) AS jsonData#23]
                           +- MicroBatchScan[key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan


In [25]:
# Object that initiates streaming with memory format (temporary table)
query_memory = df_avg_temp_sensor \
    .writeStream \
    .queryName("Helio_Kafka_Project") \
    .outputMode("complete") \
    .format("memory") \
    .start()
23/07/22 19:01:55 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/vy/8tkfcqt12f31vsh13f_sb00h0000gn/T/temporary-7ba0a812-4f0a-44aa-8040-e1f6479d33b4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/07/22 19:01:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/07/22 19:01:55 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
23/07/22 19:01:55 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
23/07/22 19:01:55 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
23/07/22 19:01:55 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
23/07/22 19:01:55 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.
                                                                                

Here we can check that we can have multiple data streams running simultaneously, as well as coming up with conditions for the code execution.

I'll run one last intance of the query and wait for it to be terminated with a for repetition so I can share the results.

In [26]:
# Active streams
spark.streams.active
Out[26]:
[<pyspark.sql.streaming.query.StreamingQuery at 0x10bb06970>,
 <pyspark.sql.streaming.query.StreamingQuery at 0x10bce5d30>]
In [27]:
# Maintain query execution and apply SQL to the data in real-time.
from time import sleep

for x in range(10):
    
    spark.sql("select sensor, round(avg_temp, 2) as avg from Helio_Kafka_Project where avg_temp > 65").show()
    sleep(3)
    
query_memory.stop()
+------+---+
|sensor|avg|
+------+---+
+------+---+

+------+---+
|sensor|avg|
+------+---+
+------+---+

+------+---+
|sensor|avg|
+------+---+
+------+---+

[Stage 13:=========>    (141 + 8) / 200][Stage 15:>               (0 + 0) / 200]
+------+---+
|sensor|avg|
+------+---+
+------+---+

                                                                                
-------------------------------------------
Batch: 5
-------------------------------------------
+--------+------------------+
|  sensor|          avg_temp|
+--------+------------------+
| sensor7|             81.86|
|sensor34| 84.04313725490196|
|sensor41| 64.38478260869564|
|sensor50|59.612500000000004|
|sensor31| 38.07714285714286|
|sensor38|57.692592592592604|
| sensor1|39.135714285714286|
|sensor30|             70.72|
|sensor10| 61.71621621621623|
|sensor25|43.973684210526315|
| sensor4| 73.64500000000001|
| sensor5|            73.075|
|sensor20| 48.86666666666666|
|sensor44| 40.31458333333334|
|sensor19| 58.77619047619047|
| sensor8| 52.35882352941175|
|sensor14|           48.9075|
|sensor24|16.583720930232555|
|sensor43|54.311363636363645|
|sensor47| 53.23265306122449|
+--------+------------------+
only showing top 20 rows

                                                                                
-------------------------------------------
Batch: 6
-------------------------------------------
+--------+------------------+
|  sensor|          avg_temp|
+--------+------------------+
| sensor7| 81.76896551724138|
|sensor34| 83.85797101449275|
|sensor41| 64.24237288135593|
|sensor50| 59.54761904761905|
|sensor31| 37.99047619047619|
|sensor38| 57.78461538461539|
| sensor1| 39.18793103448276|
|sensor30| 70.66078431372549|
|sensor10| 61.87115384615385|
|sensor25| 44.06078431372549|
| sensor4| 73.60000000000001|
| sensor5| 73.21111111111111|
|sensor20|49.145070422535206|
|sensor44|40.592063492063495|
|sensor19|              58.9|
| sensor8|52.681428571428555|
|sensor14|48.819607843137256|
|sensor24|16.818518518518516|
|sensor43| 54.25932203389831|
|sensor47|53.456896551724135|
+--------+------------------+
only showing top 20 rows

                                                                                
+--------+-----+
|  sensor|  avg|
+--------+-----+
| sensor7|81.66|
|sensor34|82.64|
|sensor30| 71.8|
| sensor4| 74.9|
| sensor5|73.98|
|sensor28|69.98|
|sensor11|73.09|
|sensor35|79.14|
|sensor13| 75.8|
|sensor32|70.62|
+--------+-----+

+--------+-----+
|  sensor|  avg|
+--------+-----+
| sensor7|81.51|
|sensor34|83.18|
|sensor30|70.67|
| sensor4|73.72|
| sensor5|73.87|
|sensor28|71.35|
|sensor11| 74.2|
|sensor35|80.08|
|sensor13|76.27|
|sensor32|70.35|
+--------+-----+

                                                                                
-------------------------------------------
Batch: 7
-------------------------------------------
+--------+------------------+
|  sensor|          avg_temp|
+--------+------------------+
| sensor7| 81.81525423728814|
|sensor34| 83.98133333333332|
|sensor41| 64.28524590163934|
|sensor50|59.527906976744184|
|sensor31| 37.91162790697674|
|sensor38| 57.87205882352942|
| sensor1| 39.12203389830509|
|sensor30| 70.69056603773585|
|sensor10| 61.91320754716981|
|sensor25|44.005357142857136|
| sensor4|             73.54|
| sensor5| 73.02833333333334|
|sensor20| 49.24383561643835|
|sensor44|             40.65|
|sensor19| 58.89491525423728|
| sensor8|52.558108108108094|
|sensor14|48.971698113207545|
|sensor24|16.707142857142856|
|sensor43| 54.11290322580645|
|sensor47| 53.53220338983051|
+--------+------------------+
only showing top 20 rows

                                                                                
+--------+-----+
|  sensor|  avg|
+--------+-----+
| sensor7|81.51|
|sensor34|83.18|
|sensor30|70.67|
| sensor4|73.72|
| sensor5|73.87|
|sensor28|71.35|
|sensor11| 74.2|
|sensor35|80.08|
|sensor13|76.27|
|sensor32|70.35|
+--------+-----+

                                                                                
-------------------------------------------
Batch: 8
-------------------------------------------
+--------+------------------+
|  sensor|          avg_temp|
+--------+------------------+
| sensor7|              81.7|
|sensor34| 84.00631578947367|
|sensor41| 64.58055555555555|
|sensor50|  59.1157894736842|
|sensor31| 37.41290322580645|
|sensor38|57.888043478260876|
| sensor1| 39.05733333333333|
|sensor30| 70.91911764705883|
|sensor10| 62.03030303030303|
|sensor25| 43.66374999999999|
| sensor4|  73.2231884057971|
| sensor5| 72.77922077922078|
|sensor20|49.238144329896905|
|sensor44|40.709638554216866|
|sensor19| 58.77105263157894|
| sensor8| 52.27362637362636|
|sensor14|             49.08|
|sensor24|             16.55|
|sensor43| 54.10588235294118|
|sensor47|            53.652|
+--------+------------------+
only showing top 20 rows

                                                                                
+--------+-----+
|  sensor|  avg|
+--------+-----+
| sensor7|81.67|
|sensor34|83.64|
|sensor30|70.75|
| sensor4|73.53|
| sensor5|73.16|
|sensor28|71.74|
|sensor11|74.05|
|sensor35|80.02|
|sensor13|76.48|
|sensor32|70.35|
+--------+-----+

+--------+-----+
|  sensor|  avg|
+--------+-----+
| sensor7|81.48|
|sensor34|83.83|
|sensor30|71.17|
| sensor4|72.85|
| sensor5|72.59|
|sensor28|72.01|
|sensor11|74.13|
|sensor35|79.54|
|sensor13|76.34|
|sensor32|70.12|
+--------+-----+

                                                                                
-------------------------------------------
Batch: 9
-------------------------------------------
+--------+------------------+
|  sensor|          avg_temp|
+--------+------------------+
| sensor7| 81.72800000000001|
|sensor34| 84.08865979381443|
|sensor41| 64.55405405405403|
|sensor50| 59.13275862068964|
|sensor31| 37.36190476190476|
|sensor38|57.990425531914894|
| sensor1| 39.05733333333333|
|sensor30| 70.89857142857143|
|sensor10|62.117391304347834|
|sensor25|43.672839506172835|
| sensor4| 73.15714285714287|
| sensor5| 72.77922077922078|
|sensor20| 49.26938775510204|
|sensor44|40.709638554216866|
|sensor19|  58.8038961038961|
| sensor8|52.402105263157885|
|sensor14| 49.03380281690141|
|sensor24| 16.60281690140845|
|sensor43|54.104651162790695|
|sensor47|            53.652|
+--------+------------------+
only showing top 20 rows

                                                                                
+--------+-----+
|  sensor|  avg|
+--------+-----+
| sensor7|81.48|
|sensor34|83.83|
|sensor30|71.17|
| sensor4|72.85|
| sensor5|72.59|
|sensor28|72.01|
|sensor11|74.13|
|sensor35|79.54|
|sensor13|76.34|
|sensor32|70.12|
+--------+-----+

                                                                                
-------------------------------------------
Batch: 10
-------------------------------------------
+--------+------------------+
|  sensor|          avg_temp|
+--------+------------------+
| sensor7| 81.65111111111112|
|sensor34| 84.13644859813083|
|sensor41|  64.3136842105263|
|sensor50|59.231325301204805|
|sensor31| 37.52222222222222|
|sensor38|58.124369747899166|
| sensor1| 38.84563106796117|
|sensor30| 71.31627906976745|
|sensor10| 62.10941176470588|
|sensor25| 43.28532110091743|
| sensor4| 73.00348837209303|
| sensor5| 72.66736842105263|
|sensor20|49.244915254237284|
|sensor44| 40.82065217391305|
|sensor19| 58.72111111111111|
| sensor8| 52.08715596330274|
|sensor14|           49.1875|
|sensor24|16.618181818181817|
|sensor43|  54.2950495049505|
|sensor47| 53.54456521739131|
+--------+------------------+
only showing top 20 rows

23/07/22 19:02:46 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 6, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@3717a05e] is aborting.
23/07/22 19:02:46 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 6, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@3717a05e] aborted.
23/07/22 19:02:46 WARN Shell: Interrupted while joining on: Thread[Thread-36449,5,main]
java.lang.InterruptedException
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Thread.join(Thread.java:1305)
	at java.base/java.lang.Thread.join(Thread.java:1379)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:1042)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1002)
	at org.apache.hadoop.util.Shell.run(Shell.java:900)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
	at org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:212)
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1113)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1102)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFileSystem.java:1073)
	at org.apache.hadoop.fs.FileSystem.rename(FileSystem.java:1574)
	at org.apache.hadoop.fs.DelegateToFileSystem.renameInternal(DelegateToFileSystem.java:206)
	at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:790)
	at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
	at org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:489)
	at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
	at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1036)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:377)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:154)
	at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:198)
	at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:188)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:450)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:320)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:141)
	at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerBaseImpl.commit(StreamingAggregationStateManager.scala:89)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$7(statefulOperators.scala:436)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:640)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:143)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:143)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:392)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:436)
	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:68)
	at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:127)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
23/07/22 19:02:46 ERROR Utils: Aborting task
java.lang.InterruptedException
	at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1367)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:248)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:314)
	at org.apache.spark.scheduler.OutputCommitCoordinator.canCommit(OutputCommitCoordinator.scala:107)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:478)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborting commit for partition 154 (task 3618, attempt 0, stage 53.0)
23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborted commit for partition 154 (task 3618, attempt 0, stage 53.0)
23/07/22 19:02:46 WARN Shell: Interrupted while joining on: Thread[Thread-36453,5,]
java.lang.InterruptedException
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Thread.join(Thread.java:1305)
	at java.base/java.lang.Thread.join(Thread.java:1379)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:1042)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1002)
	at org.apache.hadoop.util.Shell.run(Shell.java:900)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
	at org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:212)
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1113)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1102)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFileSystem.java:1073)
	at org.apache.hadoop.fs.FileSystem.rename(FileSystem.java:1590)
	at org.apache.hadoop.fs.DelegateToFileSystem.renameInternal(DelegateToFileSystem.java:206)
	at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:790)
	at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
	at org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:489)
	at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
	at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1036)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:377)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:154)
	at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:198)
	at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:188)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:450)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:320)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:141)
	at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerBaseImpl.commit(StreamingAggregationStateManager.scala:89)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$7(statefulOperators.scala:436)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:640)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:143)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:143)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:392)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:436)
	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:68)
	at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:127)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
23/07/22 19:02:46 WARN Shell: Interrupted while joining on: Thread[Thread-36452,5,]
java.lang.InterruptedException
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Thread.join(Thread.java:1305)
	at java.base/java.lang.Thread.join(Thread.java:1379)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:1042)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1002)
	at org.apache.hadoop.util.Shell.run(Shell.java:900)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
	at org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:212)
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1113)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1102)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFileSystem.java:1073)
	at org.apache.hadoop.fs.FileSystem.rename(FileSystem.java:1574)
	at org.apache.hadoop.fs.DelegateToFileSystem.renameInternal(DelegateToFileSystem.java:206)
	at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:790)
	at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
	at org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:496)
	at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
	at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1036)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:377)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:154)
	at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:198)
	at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:188)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:450)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:320)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:141)
	at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerBaseImpl.commit(StreamingAggregationStateManager.scala:89)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$7(statefulOperators.scala:436)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:640)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:143)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:143)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:392)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:436)
	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:68)
	at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:127)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
23/07/22 19:02:46 WARN Shell: Interrupted while joining on: Thread[Thread-36454,5,]
java.lang.InterruptedException
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Thread.join(Thread.java:1305)
	at java.base/java.lang.Thread.join(Thread.java:1379)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:1042)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1002)
	at org.apache.hadoop.util.Shell.run(Shell.java:900)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
	at org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:212)
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1113)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1102)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFileSystem.java:1073)
	at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:133)
	at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:751)
	at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
	at org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:489)
	at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
	at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1036)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:377)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:154)
	at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:198)
	at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:188)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:450)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:320)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:141)
	at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerBaseImpl.commit(StreamingAggregationStateManager.scala:89)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$7(statefulOperators.scala:436)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:640)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:143)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:143)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:392)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:436)
	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:68)
	at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:127)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
23/07/22 19:02:46 ERROR Utils: Aborting task=======>            (154 + 8) / 200]
org.apache.spark.SparkException: Commit denied for partition 155 (task 3619, attempt 0, stage 53.0).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.commitDeniedError(QueryExecutionErrors.scala:929)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:485)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborting commit for partition 155 (task 3619, attempt 0, stage 53.0)
23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborted commit for partition 155 (task 3619, attempt 0, stage 53.0)
23/07/22 19:02:46 WARN TaskSetManager: Lost task 160.0 in stage 53.0 (TID 3624) (172.20.10.12 executor driver): TaskKilled (Stage cancelled)
23/07/22 19:02:46 WARN TaskSetManager: Lost task 155.0 in stage 53.0 (TID 3619) (172.20.10.12 executor driver): TaskKilled (Stage cancelled)
23/07/22 19:02:46 WARN TaskSetManager: Lost task 154.0 in stage 53.0 (TID 3618) (172.20.10.12 executor driver): TaskKilled (Stage cancelled)
23/07/22 19:02:46 WARN TaskSetManager: Lost task 161.0 in stage 53.0 (TID 3625) (172.20.10.12 executor driver): TaskKilled (Stage cancelled)
23/07/22 19:02:46 WARN TaskSetManager: Lost task 159.0 in stage 53.0 (TID 3623) (172.20.10.12 executor driver): TaskKilled (Stage cancelled)
23/07/22 19:02:46 ERROR Utils: Aborting task
org.apache.spark.SparkException: Commit denied for partition 157 (task 3621, attempt 0, stage 53.0).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.commitDeniedError(QueryExecutionErrors.scala:929)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:485)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborting commit for partition 157 (task 3621, attempt 0, stage 53.0)
23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborted commit for partition 157 (task 3621, attempt 0, stage 53.0)
23/07/22 19:02:46 WARN TaskSetManager: Lost task 157.0 in stage 53.0 (TID 3621) (172.20.10.12 executor driver): TaskKilled (Stage cancelled)
23/07/22 19:02:46 ERROR Utils: Aborting task
org.apache.spark.SparkException: Commit denied for partition 156 (task 3620, attempt 0, stage 53.0).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.commitDeniedError(QueryExecutionErrors.scala:929)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:485)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborting commit for partition 156 (task 3620, attempt 0, stage 53.0)
23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborted commit for partition 156 (task 3620, attempt 0, stage 53.0)
23/07/22 19:02:46 WARN TaskSetManager: Lost task 156.0 in stage 53.0 (TID 3620) (172.20.10.12 executor driver): TaskKilled (Stage cancelled)
23/07/22 19:02:46 ERROR Utils: Aborting task
org.apache.spark.SparkException: Commit denied for partition 158 (task 3622, attempt 0, stage 53.0).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.commitDeniedError(QueryExecutionErrors.scala:929)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:485)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborting commit for partition 158 (task 3622, attempt 0, stage 53.0)
23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborted commit for partition 158 (task 3622, attempt 0, stage 53.0)
23/07/22 19:02:46 WARN TaskSetManager: Lost task 158.0 in stage 53.0 (TID 3622) (172.20.10.12 executor driver): TaskKilled (Stage cancelled)

5. ENDING & FINAL REMARKS ¶

Return to Index¶

Now that's the end.

THANK YOU for sticking around and I hope this project was useful to you.

Hope talking to you soon!

CONTACT INFO:¶

Helio Ribeiro
helioribeiropro@gmail.com
+55 (11) 9 3932-8049