Apache Kafka and Spark project, showcasing Python and Real-Time Datastream skills.
Real-Time analytics is becoming more and more important, specially when we consider critical actions on scaled environments that require a quick intervention.
In this project, I want to show some skills regarding real-time data stream analysis and transformations.
Nowadays, every data professional should be comfortable with open-source solutions, specially the ones that are working on startups and highly scalable projects.
The idea of this project is to show technical skills, challenges that one might find when using tools without a GUI (Graphical User Interface).
I'll be using randomly generated data for this project, so I won't link any datasets nor Apache's official websites.
Let's begin!
Assume your company have multiple sensors for industrial equipments that require critical attention, or deeper analysis for preventive maintenance.
Or even, assume that your business has a highly scalable application that would benefit from Real-Time analysis of its database.
There are basically 5 very good reasons to use Apache Kafka for a project like this:
I'm going to input the commented code of the data generator below but won't execute it in this Jupyter Notebook. I am going to execute it with Python through command line to save a file with the generated values and input that information into Kafka/Spark.
Feel free to test it out! You can run it mutiple times and you will realize the random output.
Here's how it works:
# IoT Sensor Data Simulator
# Run the simulator with the following command at the terminal:
# python simulator.py 10000 > ../data/sensor_data.txt
# Imports
import re
import sys
import copy
import random
import string
import datetime
from random import randrange
# Defines the number of messages generated.
# If no value is informed, generates 10 values.
# IMPORTANT: IN THIS COMMENTED SNIPPET BELOW, IT USES 'SYS.ARGV' THAT WON'T WORK ON THIS NOTEBOOK,
# SO I'LL SET THE num_msgs to 10 by default, just so we can check its output.
#if len(sys.argv) > 1:
# num_msgs = int(sys.argv[1])
#else:
# num_msgs = 10
num_msgs = 10
# Set sensor base temperature
dic_temp_sensors = {'sensor1': 38.3, 'sensor2': 45.3, 'sensor3': 31.8, 'sensor4': 73.1, 'sensor5': 71.8, 'sensor6': 63.7,
'sensor7': 80.7, 'sensor8': 52.0, 'sensor9': 64.1, 'sensor10': 62.7, 'sensor11': 73.4, 'sensor12': 54.2,
'sensor13': 76.4, 'sensor14': 49.0, 'sensor15': 50.4, 'sensor16': 58.8, 'sensor17': 47.6, 'sensor18': 55.4,
'sensor19': 58.8, 'sensor20': 49.4, 'sensor21': 59.9, 'sensor22': 45.1, 'sensor23': 55.1, 'sensor24': 16.6,
'sensor25': 42.8, 'sensor26': 50.4, 'sensor27': 32.9, 'sensor28': 71.8, 'sensor29': 33.5, 'sensor30': 71.7,
'sensor31': 37.8, 'sensor32': 69.6, 'sensor33': 50.3, 'sensor34': 84.4, 'sensor35': 79.0, 'sensor36': 11.0,
'sensor37': 64.2, 'sensor38': 57.9, 'sensor39': 60.7, 'sensor40': 58.6, 'sensor41': 64.5, 'sensor42': 31.2,
'sensor43': 54.4, 'sensor44': 40.1, 'sensor45': 44.3, 'sensor46': 62.7, 'sensor47': 53.4, 'sensor48': 52.4,
'sensor49': 45.6, 'sensor50': 58.4}
# Base ID for each sensor
id_base_sensor = "S-HAR-PORT-DATA-19951-"
# Base ID for each equipment
id_base_equipment = "E-HAR-PORT-DATA-25015-"
# Sensor standard readout
readout = "iot:reading:sensor:temp"
# Set everything to upper case
letters = string.ascii_uppercase
# String with readout standard
header_reading_iot = """\
{ "id_sensor": "%s",
"id_equipment": "%s",
"sensor": "%s", """
iotmsg_date_event = """\
"date_event": "%sZ", """
iotmsg_format = """\
"standard": {"format": "%s", """
iotmsg_data ="""\
"reading": { "temperature": %.1f }
}
}"""
# Sensor ID mapping dictionary
dic_map_sensor_id = {}
# Latest measurement dictionary
dic_current_temp = {}
# Generates JSON output
if __name__ == "__main__":
# Loop from 0 until the numbers of messages defined when running the simulator
for counter in range(0, num_msgs):
# Generates 3 random numbers
rand_num = str(random.randrange(0, 9)) + str(random.randrange(0, 9)) + str(random.randrange(0, 9))
# Generates 2 random letters
rand_letter = random.choice(letters) + random.choice(letters)
# Generates random value for temperature with uniform distribution
rand_temp_value = random.uniform(-5, 5)
# Generates another random value following an uniform distribution
rand_temp_value_delta = random.uniform(-1, 1)
# Sensor ID gets the base value plus the generate values.
id_sensor = id_base_sensor + rand_num + rand_letter
# Equipment ID gets the base value plus the generate values.
id_equipment = id_base_equipment + rand_num + rand_letter
# Selects random values from dictionaries.
sensor = random.choice(list(dic_temp_sensors.keys()))
# If sensor is not associated with mapping, runs association.
if (not id_sensor in dic_map_sensor_id):
# Includes sensor in the list.
dic_map_sensor_id[id_sensor] = sensor
# Includes temperature in the list.
dic_current_temp[id_sensor] = dic_temp_sensors[sensor] + rand_temp_value
# If sensor is not on final list, includes it.
elif (not dic_map_sensor_id[id_sensor] == sensor):
sensor = dic_map_sensor_id[id_sensor]
# Extra temperature adjustment for it to be as random as possible.
temperature = dic_current_temp[id_sensor] + rand_temp_value_delta
dic_current_temp[id_sensor] = temperature
# Writes current time to script's events.
today = datetime.datetime.today()
date_event = today.isoformat()
# Prints results in JSON format.
print(re.sub(r"[\s+]", "", header_reading_iot) % (id_sensor, id_equipment, sensor),
re.sub(r"[\s+]", "", iotmsg_date_event) % (date_event),
re.sub(r"[\s+]", "", iotmsg_format) % (readout),
re.sub(r"[\s+]", "", iotmsg_data) % (temperature))
I want to promote a CONCEPT here. Why would we want real-time analysis?
Here are some compelling reasons why one would want to implement that:
1. Real-time Decision Making: Real-time data analysis allows businesses to make critical decisions immediately as data streams in. This is especially valuable in situations where timely actions can lead to a competitive advantage. For example, in finance, making real-time investment decisions can be crucial for maximizing returns.
2. Proactive Issue Detection: Real-time analysis enables early detection of issues and anomalies. By continuously monitoring data streams, businesses can identify potential problems before they escalate, thus minimizing the impact and reducing downtime.
3. Personalization and User Experience: In industries like e-commerce and online advertising, real-time data analysis can be used to personalize user experiences. Analyzing user behavior in real-time allows businesses to recommend relevant products, services, or content instantly, leading to improved customer satisfaction and engagement.
4. Fraud Detection and Security: Real-time data analysis is vital for identifying fraudulent activities as they happen. For example, financial institutions can use real-time analysis to detect suspicious transactions and prevent fraud in real-time, providing an additional layer of security.
5. IoT and Sensor Data: In IoT (Internet of Things) environments, numerous sensors generate continuous streams of data. Real-time data analysis enables the monitoring and analysis of sensor data as it is produced, enabling responsive actions based on the data.
6. Stream Processing Efficiency: Apache Kafka, as a distributed streaming platform, allows for high-throughput, low-latency stream processing. Real-time data analysis with PySpark on Kafka streams can efficiently handle large volumes of data and deliver results in near real-time.
7. Operational Efficiency: Real-time data analysis helps organizations optimize their operations by identifying inefficiencies, bottlenecks, and opportunities for improvement in real-time. This allows for rapid adjustments and optimizations to achieve better performance.
8. Event-Driven Architecture: Real-time data analysis is a fundamental component of event-driven architectures, where actions are triggered in response to specific events or data patterns. This architecture is scalable, flexible, and can lead to a more responsive system.
9. Market Insights and Trends: For businesses in competitive markets, real-time data analysis provides up-to-the-minute insights into market trends, customer preferences, and competitor behavior. This information is invaluable for staying ahead in the market.
10. Real-time Reporting and Visualization: Real-time data analysis allows for dynamic and interactive reporting and visualization. Stakeholders can access real-time dashboards and reports, facilitating quick understanding and decision-making.
In summary, real-time data analysis is essential for making informed, timely decisions, detecting issues proactively, optimizing operations, enhancing user experiences, and gaining a competitive advantage in today's fast-paced, data-driven world. Python, PySpark, and Apache Kafka together provide a powerful stack to implement real-time data analysis solutions efficiently.
With that in mind, let's get down to business.
Kafka can run on servers and on clusters, but for this example I'll run everything locally. Anyway, we need to initialize the Zookeeper, which is our Kafka cluster manager.
Kafka runs on command line, there's no user interface. So, for that, we will open 3 different terminal windows. In total, I'll have 4 terminal windows open because I'll also run a Jupyter Notebook.
Now that we have started our services and topics, it's time to initialize the data stream. For that, let's use the last terminal, the one that we created and described the topic, to initialize the data stream.
IMPORTANT CONCEPT: Kafka behaves as an sort of repository for data streams. That means that Kafka by itself won't download or upload data, we need to use connectors to make our data generator send info to Kafka, and a connector to subscribe to the information available on Kafka.
Let's open the console producer, which is our data producer.
The command connect to the folder which the Python data generator will store data to, is: bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic helioport < ../data/sensor_data.txt
Once you run this command you will realize that nothing really happens on your screen, but bare with me. Let's check if our streaming is functional. Now that I have run the console producer, let's run the console consumer, still on the very same terminal.
# Chek the Python version:
from platform import python_version
print('Python version:', python_version())
Python version: 3.9.13
# If not installed yet, install findspark.
# !pip install findspark
# Imports findspark and initializes it.
import findspark
findspark.init()
# Import required modules.
# NOTE: We only need to use .streaming to get the data. See how we are using much more SQL libraries?
# In the end, all the data we are retrieving is to solve a business problem through data analysis.
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, from_json
# Connector
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'
# Spark versions used in this project
%reload_ext watermark
%watermark -a "Helio Ribeiro" --iversions
Author: Helio Ribeiro findspark: 2.0.1 pyspark : 3.4.0
# Create Spark session
spark = SparkSession.builder.appName("RealTimeProject").getOrCreate()
23/07/22 19:00:48 WARN Utils: Your hostname, Helios-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 172.20.10.12 instead (on interface en0) 23/07/22 19:00:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/helioribeiro/.ivy2/cache The jars for the packages stored in: /Users/helioribeiro/.ivy2/jars org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-d35725a3-b00a-4c9f-8a25-bfe19276b5d6;1.0 confs: [default] found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.0 in central found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.0 in central found org.apache.kafka#kafka-clients;2.8.1 in central found org.lz4#lz4-java;1.8.0 in central found org.xerial.snappy#snappy-java;1.1.8.4 in central found org.slf4j#slf4j-api;1.7.32 in central found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central found org.spark-project.spark#unused;1.0.0 in central found org.apache.hadoop#hadoop-client-api;3.3.2 in central found commons-logging#commons-logging;1.1.3 in central found com.google.code.findbugs#jsr305;3.0.0 in central found org.apache.commons#commons-pool2;2.11.1 in central :: resolution report :: resolve 241ms :: artifacts dl 13ms :: modules in use: com.google.code.findbugs#jsr305;3.0.0 from central in [default] commons-logging#commons-logging;1.1.3 from central in [default] org.apache.commons#commons-pool2;2.11.1 from central in [default] org.apache.hadoop#hadoop-client-api;3.3.2 from central in [default] org.apache.hadoop#hadoop-client-runtime;3.3.2 from central in [default] org.apache.kafka#kafka-clients;2.8.1 from central in [default] org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.0 from central in [default] org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.0 from central in [default] org.lz4#lz4-java;1.8.0 from central in [default] org.slf4j#slf4j-api;1.7.32 from central in [default] org.spark-project.spark#unused;1.0.0 from central in [default] org.xerial.snappy#snappy-java;1.1.8.4 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 12 | 0 | 0 | 0 || 12 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-d35725a3-b00a-4c9f-8a25-bfe19276b5d6 confs: [default] 0 artifacts copied, 12 already retrieved (0kB/5ms) 23/07/22 19:00:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
# Create topic subscription of the data we desire to pull from Kafka.
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "helioport") \
.load()
# Defining the data schema we desire to capture for analysis (temperature).
temp_data_schema = StructType([StructField("reading",
StructType([StructField("temperature", DoubleType(), True)]), True)])
# Defining global data schema for this streaming
data_schema = StructType([
StructField("id_sensor", StringType(), True),
StructField("id_equipment", StringType(), True),
StructField("sensor", StringType(), True),
StructField("date_event", StringType(), True),
StructField("standard", temp_data_schema, True)
])
# Capture each data value as a string
df_convert = df.selectExpr("CAST(value AS STRING)")
# Parse JSON as dataframe.
df_convert = df_convert.withColumn("jsonData", from_json(col("value"), data_schema)).select("jsonData.*")
df_convert.printSchema()
root |-- id_sensor: string (nullable = true) |-- id_equipment: string (nullable = true) |-- sensor: string (nullable = true) |-- date_event: string (nullable = true) |-- standard: struct (nullable = true) | |-- reading: struct (nullable = true) | | |-- temperature: double (nullable = true)
# Ranming columns for simplified analysis
df_convert_temp_sensor = df_convert.select(col("standard.reading.temperature").alias("temperature"),
col("sensor"))
df_convert_temp_sensor.printSchema()
root |-- temperature: double (nullable = true) |-- sensor: string (nullable = true)
# We are unable to visualize the dataframe with .head because it came from streaming.
# Queries with streaming sources must be executed with writeStream.start();kafka
# This is the object that will contain the analysis, with the average temperature per sensor.
df_avg_temp_sensor = df_convert_temp_sensor.groupby("sensor").mean("temperature")
df_avg_temp_sensor.printSchema()
root |-- sensor: string (nullable = true) |-- avg(temperature): double (nullable = true)
# Renaming columns for simplifies analysis
df_avg_temp_sensor = df_avg_temp_sensor.select(col("sensor").alias("sensor"),
col("avg(temperature)").alias("avg_temp"))
df_avg_temp_sensor.printSchema()
root |-- sensor: string (nullable = true) |-- avg_temp: double (nullable = true)
Are you ready? This part is really important, since it's where we are going to visualize our data. There are different ways of running the stream, with different expire times, including a way to automatise the stream stop.
Kafka is really good here because you can run multiple streams simultaneously, therefore making it the perfect tool for real-time data stream.
Remember that I've opened 4 terminals when we started?
Now, we need to open a fifth one that will run our python data generator to feed our Kafka stream!
Here's what it looks like now:
Here are the commands we need to run to visualize our stream, in this order:
5th Terminal:python simulator.py 1000 > ../data/sensor_data.txt
4th Terminal: bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic helioport < ../data/sensor_data.txt
What we are doing here, is providing Kafka with a new data stream. When we run our following spark commands, it will show a blank batch at first but the process will keep running, so it's very important that you ingest data.
Below, my execution for comparison.
# Object that initiates the straming with console format.
# This is the moment where Spark estabilishes the connection with Kafka to retrieve data.
query = df_avg_temp_sensor.writeStream.outputMode("complete").format("console").start()
23/07/22 19:00:57 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/vy/8tkfcqt12f31vsh13f_sb00h0000gn/T/temporary-a55c30d0-e95b-47ea-916d-6a33d05a0d8b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort. 23/07/22 19:00:57 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. 23/07/22 19:00:57 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config. 23/07/22 19:00:57 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config. 23/07/22 19:00:57 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config. 23/07/22 19:00:57 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config. 23/07/22 19:00:57 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.
------------------------------------------- Batch: 0 ------------------------------------------- +------+--------+ |sensor|avg_temp| +------+--------+ +------+--------+
------------------------------------------- Batch: 1 ------------------------------------------- +--------+------------------+ | sensor| avg_temp| +--------+------------------+ | sensor7| 82.10476190476192| |sensor34| 85.08421052631579| |sensor41| 65.2375| |sensor50| 60.07500000000001| |sensor38| 57.1578947368421| |sensor31| 37.39411764705883| | sensor1| 39.25454545454546| |sensor30| 70.92307692307692| |sensor10| 61.41111111111112| |sensor25| 43.15833333333333| | sensor4| 73.3375| | sensor5| 72.50555555555556| |sensor20| 48.91428571428572| |sensor44| 40.8375| |sensor19|58.720000000000006| | sensor8|51.278571428571425| |sensor14| 49.12857142857142| |sensor24|16.544999999999995| |sensor43| 54.65| |sensor47|53.775000000000006| +--------+------------------+ only showing top 20 rows
------------------------------------------- Batch: 2 ------------------------------------------- +--------+------------------+ | sensor| avg_temp| +--------+------------------+ | sensor7| 82.1909090909091| |sensor34| 85.20869565217392| |sensor41| 64.63684210526316| |sensor50|59.828571428571436| |sensor31|37.790000000000006| |sensor38| 56.96190476190476| | sensor1| 38.90833333333334| |sensor30| 70.63333333333334| |sensor10| 61.89000000000001| |sensor25| 43.05384615384615| | sensor4| 73.39000000000001| | sensor5| 72.39166666666667| |sensor20| 49.06000000000001| |sensor44| 40.75714285714285| |sensor19| 58.81304347826087| | sensor8|51.231249999999996| |sensor14| 49.06086956521739| |sensor24| 16.18181818181818| |sensor43| 55.33076923076923| |sensor47| 53.67857142857144| +--------+------------------+ only showing top 20 rows
------------------------------------------- Batch: 3 ------------------------------------------- +--------+------------------+ | sensor| avg_temp| +--------+------------------+ | sensor7| 82.21739130434784| |sensor34| 84.916| |sensor41| 64.63684210526316| |sensor50| 60.22500000000001| |sensor31|37.790000000000006| |sensor38|56.979166666666664| | sensor1| 38.85| |sensor30| 70.875| |sensor10|61.775000000000006| |sensor25| 43.0875| | sensor4| 73.3904761904762| | sensor5| 72.55555555555556| |sensor20| 48.68000000000001| |sensor44| 40.47727272727273| |sensor19| 58.87692307692308| | sensor8| 51.07058823529411| |sensor14|48.912499999999994| |sensor24| 16.03478260869565| |sensor43| 55.05555555555556| |sensor47|54.022222222222226| +--------+------------------+ only showing top 20 rows
------------------------------------------- Batch: 4 ------------------------------------------- +--------+------------------+ | sensor| avg_temp| +--------+------------------+ | sensor7| 81.885| |sensor34| 84.19565217391305| |sensor41| 64.33846153846153| |sensor50| 59.61785714285715| |sensor31| 37.92580645161291| |sensor38|57.583720930232566| | sensor1| 39.14| |sensor30| 70.65454545454546| |sensor10| 61.87419354838711| |sensor25| 43.85625| | sensor4| 73.54324324324325| | sensor5| 72.95897435897436| |sensor20| 49.13023255813953| |sensor44| 40.63333333333334| |sensor19|58.873684210526314| | sensor8|52.411627906976726| |sensor14|48.894594594594594| |sensor24|16.634210526315787| |sensor43| 54.45277777777778| |sensor47| 53.5| +--------+------------------+ only showing top 20 rows
That's it, we basically have our data already. We can run kafka in other ways, for instance, with awaitTermination, which will run until we actually stop the process.
There other things we can do with Kafka, like analysing the number of streams, the information about the active streams, the status of the process and more.
# Execute streaming query and avoid the process to be terminated.
#query.awaitTermination()
query.status
{'message': 'Waiting for data to arrive', 'isDataAvailable': False, 'isTriggerActive': False}
query.lastProgress
{'id': 'a7db1898-818b-4a09-a775-e1501ef07d72', 'runId': '30e5f81a-950a-4ddd-99c0-d3a331bc84b5', 'name': None, 'timestamp': '2023-07-22T17:01:46.927Z', 'batchId': 5, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 2, 'triggerExecution': 2}, 'stateOperators': [{'operatorName': 'stateStoreSave', 'numRowsTotal': 50, 'numRowsUpdated': 0, 'allUpdatesTimeMs': 677, 'numRowsRemoved': 0, 'allRemovalsTimeMs': 0, 'commitTimeMs': 11963, 'memoryUsedBytes': 101936, 'numRowsDroppedByWatermark': 0, 'numShufflePartitions': 200, 'numStateStoreInstances': 200, 'customMetrics': {'loadedMapCacheHitCount': 1600, 'loadedMapCacheMissCount': 0, 'stateOnCurrentVersionSizeBytes': 30568}}], 'sources': [{'description': 'KafkaV2[Subscribe[helioport]]', 'startOffset': {'helioport': {'0': 10000}}, 'endOffset': {'helioport': {'0': 10000}}, 'latestOffset': {'helioport': {'0': 10000}}, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'metrics': {'avgOffsetsBehindLatest': '0.0', 'maxOffsetsBehindLatest': '0', 'minOffsetsBehindLatest': '0'}}], 'sink': {'description': 'org.apache.spark.sql.execution.streaming.ConsoleTable$@1192a010', 'numOutputRows': 0}}
query.explain()
== Physical Plan == WriteToDataSourceV2 MicroBatchWrite[epoch: 4, writer: ConsoleWriter[numRows=20, truncate=true]], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2479/0x00000008011cc040@521476fa +- *(4) HashAggregate(keys=[sensor#28], functions=[avg(temperature#36)]) +- StateStoreSave [sensor#28], state info [ checkpoint = file:/private/var/folders/vy/8tkfcqt12f31vsh13f_sb00h0000gn/T/temporary-a55c30d0-e95b-47ea-916d-6a33d05a0d8b/state, runId = 30e5f81a-950a-4ddd-99c0-d3a331bc84b5, opId = 0, ver = 4, numPartitions = 200], Complete, 0, 0, 2 +- *(3) HashAggregate(keys=[sensor#28], functions=[merge_avg(temperature#36)]) +- StateStoreRestore [sensor#28], state info [ checkpoint = file:/private/var/folders/vy/8tkfcqt12f31vsh13f_sb00h0000gn/T/temporary-a55c30d0-e95b-47ea-916d-6a33d05a0d8b/state, runId = 30e5f81a-950a-4ddd-99c0-d3a331bc84b5, opId = 0, ver = 4, numPartitions = 200], 2 +- *(2) HashAggregate(keys=[sensor#28], functions=[merge_avg(temperature#36)]) +- Exchange hashpartitioning(sensor#28, 200), ENSURE_REQUIREMENTS, [plan_id=1016] +- *(1) HashAggregate(keys=[sensor#28], functions=[partial_avg(temperature#36)]) +- *(1) Project [jsonData#23.standard.reading.temperature AS temperature#36, jsonData#23.sensor AS sensor#28] +- Project [from_json(StructField(id_sensor,StringType,true), StructField(id_equipment,StringType,true), StructField(sensor,StringType,true), StructField(date_event,StringType,true), StructField(standard,StructType(StructField(reading,StructType(StructField(temperature,DoubleType,true)),true)),true), cast(value#8 as string), Some(Europe/Madrid)) AS jsonData#23] +- MicroBatchScan[key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan
# Object that initiates streaming with memory format (temporary table)
query_memory = df_avg_temp_sensor \
.writeStream \
.queryName("Helio_Kafka_Project") \
.outputMode("complete") \
.format("memory") \
.start()
23/07/22 19:01:55 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/vy/8tkfcqt12f31vsh13f_sb00h0000gn/T/temporary-7ba0a812-4f0a-44aa-8040-e1f6479d33b4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort. 23/07/22 19:01:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. 23/07/22 19:01:55 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config. 23/07/22 19:01:55 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config. 23/07/22 19:01:55 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config. 23/07/22 19:01:55 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config. 23/07/22 19:01:55 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.
Here we can check that we can have multiple data streams running simultaneously, as well as coming up with conditions for the code execution.
I'll run one last intance of the query and wait for it to be terminated with a for repetition so I can share the results.
# Active streams
spark.streams.active
[<pyspark.sql.streaming.query.StreamingQuery at 0x10bb06970>, <pyspark.sql.streaming.query.StreamingQuery at 0x10bce5d30>]
# Maintain query execution and apply SQL to the data in real-time.
from time import sleep
for x in range(10):
spark.sql("select sensor, round(avg_temp, 2) as avg from Helio_Kafka_Project where avg_temp > 65").show()
sleep(3)
query_memory.stop()
+------+---+ |sensor|avg| +------+---+ +------+---+ +------+---+ |sensor|avg| +------+---+ +------+---+ +------+---+ |sensor|avg| +------+---+ +------+---+
[Stage 13:=========> (141 + 8) / 200][Stage 15:> (0 + 0) / 200]
+------+---+ |sensor|avg| +------+---+ +------+---+
------------------------------------------- Batch: 5 ------------------------------------------- +--------+------------------+ | sensor| avg_temp| +--------+------------------+ | sensor7| 81.86| |sensor34| 84.04313725490196| |sensor41| 64.38478260869564| |sensor50|59.612500000000004| |sensor31| 38.07714285714286| |sensor38|57.692592592592604| | sensor1|39.135714285714286| |sensor30| 70.72| |sensor10| 61.71621621621623| |sensor25|43.973684210526315| | sensor4| 73.64500000000001| | sensor5| 73.075| |sensor20| 48.86666666666666| |sensor44| 40.31458333333334| |sensor19| 58.77619047619047| | sensor8| 52.35882352941175| |sensor14| 48.9075| |sensor24|16.583720930232555| |sensor43|54.311363636363645| |sensor47| 53.23265306122449| +--------+------------------+ only showing top 20 rows
------------------------------------------- Batch: 6 ------------------------------------------- +--------+------------------+ | sensor| avg_temp| +--------+------------------+ | sensor7| 81.76896551724138| |sensor34| 83.85797101449275| |sensor41| 64.24237288135593| |sensor50| 59.54761904761905| |sensor31| 37.99047619047619| |sensor38| 57.78461538461539| | sensor1| 39.18793103448276| |sensor30| 70.66078431372549| |sensor10| 61.87115384615385| |sensor25| 44.06078431372549| | sensor4| 73.60000000000001| | sensor5| 73.21111111111111| |sensor20|49.145070422535206| |sensor44|40.592063492063495| |sensor19| 58.9| | sensor8|52.681428571428555| |sensor14|48.819607843137256| |sensor24|16.818518518518516| |sensor43| 54.25932203389831| |sensor47|53.456896551724135| +--------+------------------+ only showing top 20 rows
+--------+-----+ | sensor| avg| +--------+-----+ | sensor7|81.66| |sensor34|82.64| |sensor30| 71.8| | sensor4| 74.9| | sensor5|73.98| |sensor28|69.98| |sensor11|73.09| |sensor35|79.14| |sensor13| 75.8| |sensor32|70.62| +--------+-----+ +--------+-----+ | sensor| avg| +--------+-----+ | sensor7|81.51| |sensor34|83.18| |sensor30|70.67| | sensor4|73.72| | sensor5|73.87| |sensor28|71.35| |sensor11| 74.2| |sensor35|80.08| |sensor13|76.27| |sensor32|70.35| +--------+-----+
------------------------------------------- Batch: 7 ------------------------------------------- +--------+------------------+ | sensor| avg_temp| +--------+------------------+ | sensor7| 81.81525423728814| |sensor34| 83.98133333333332| |sensor41| 64.28524590163934| |sensor50|59.527906976744184| |sensor31| 37.91162790697674| |sensor38| 57.87205882352942| | sensor1| 39.12203389830509| |sensor30| 70.69056603773585| |sensor10| 61.91320754716981| |sensor25|44.005357142857136| | sensor4| 73.54| | sensor5| 73.02833333333334| |sensor20| 49.24383561643835| |sensor44| 40.65| |sensor19| 58.89491525423728| | sensor8|52.558108108108094| |sensor14|48.971698113207545| |sensor24|16.707142857142856| |sensor43| 54.11290322580645| |sensor47| 53.53220338983051| +--------+------------------+ only showing top 20 rows
+--------+-----+ | sensor| avg| +--------+-----+ | sensor7|81.51| |sensor34|83.18| |sensor30|70.67| | sensor4|73.72| | sensor5|73.87| |sensor28|71.35| |sensor11| 74.2| |sensor35|80.08| |sensor13|76.27| |sensor32|70.35| +--------+-----+
------------------------------------------- Batch: 8 ------------------------------------------- +--------+------------------+ | sensor| avg_temp| +--------+------------------+ | sensor7| 81.7| |sensor34| 84.00631578947367| |sensor41| 64.58055555555555| |sensor50| 59.1157894736842| |sensor31| 37.41290322580645| |sensor38|57.888043478260876| | sensor1| 39.05733333333333| |sensor30| 70.91911764705883| |sensor10| 62.03030303030303| |sensor25| 43.66374999999999| | sensor4| 73.2231884057971| | sensor5| 72.77922077922078| |sensor20|49.238144329896905| |sensor44|40.709638554216866| |sensor19| 58.77105263157894| | sensor8| 52.27362637362636| |sensor14| 49.08| |sensor24| 16.55| |sensor43| 54.10588235294118| |sensor47| 53.652| +--------+------------------+ only showing top 20 rows
+--------+-----+ | sensor| avg| +--------+-----+ | sensor7|81.67| |sensor34|83.64| |sensor30|70.75| | sensor4|73.53| | sensor5|73.16| |sensor28|71.74| |sensor11|74.05| |sensor35|80.02| |sensor13|76.48| |sensor32|70.35| +--------+-----+ +--------+-----+ | sensor| avg| +--------+-----+ | sensor7|81.48| |sensor34|83.83| |sensor30|71.17| | sensor4|72.85| | sensor5|72.59| |sensor28|72.01| |sensor11|74.13| |sensor35|79.54| |sensor13|76.34| |sensor32|70.12| +--------+-----+
------------------------------------------- Batch: 9 ------------------------------------------- +--------+------------------+ | sensor| avg_temp| +--------+------------------+ | sensor7| 81.72800000000001| |sensor34| 84.08865979381443| |sensor41| 64.55405405405403| |sensor50| 59.13275862068964| |sensor31| 37.36190476190476| |sensor38|57.990425531914894| | sensor1| 39.05733333333333| |sensor30| 70.89857142857143| |sensor10|62.117391304347834| |sensor25|43.672839506172835| | sensor4| 73.15714285714287| | sensor5| 72.77922077922078| |sensor20| 49.26938775510204| |sensor44|40.709638554216866| |sensor19| 58.8038961038961| | sensor8|52.402105263157885| |sensor14| 49.03380281690141| |sensor24| 16.60281690140845| |sensor43|54.104651162790695| |sensor47| 53.652| +--------+------------------+ only showing top 20 rows
+--------+-----+ | sensor| avg| +--------+-----+ | sensor7|81.48| |sensor34|83.83| |sensor30|71.17| | sensor4|72.85| | sensor5|72.59| |sensor28|72.01| |sensor11|74.13| |sensor35|79.54| |sensor13|76.34| |sensor32|70.12| +--------+-----+
------------------------------------------- Batch: 10 ------------------------------------------- +--------+------------------+ | sensor| avg_temp| +--------+------------------+ | sensor7| 81.65111111111112| |sensor34| 84.13644859813083| |sensor41| 64.3136842105263| |sensor50|59.231325301204805| |sensor31| 37.52222222222222| |sensor38|58.124369747899166| | sensor1| 38.84563106796117| |sensor30| 71.31627906976745| |sensor10| 62.10941176470588| |sensor25| 43.28532110091743| | sensor4| 73.00348837209303| | sensor5| 72.66736842105263| |sensor20|49.244915254237284| |sensor44| 40.82065217391305| |sensor19| 58.72111111111111| | sensor8| 52.08715596330274| |sensor14| 49.1875| |sensor24|16.618181818181817| |sensor43| 54.2950495049505| |sensor47| 53.54456521739131| +--------+------------------+ only showing top 20 rows
23/07/22 19:02:46 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 6, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@3717a05e] is aborting. 23/07/22 19:02:46 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 6, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@3717a05e] aborted. 23/07/22 19:02:46 WARN Shell: Interrupted while joining on: Thread[Thread-36449,5,main] java.lang.InterruptedException at java.base/java.lang.Object.wait(Native Method) at java.base/java.lang.Thread.join(Thread.java:1305) at java.base/java.lang.Thread.join(Thread.java:1379) at org.apache.hadoop.util.Shell.joinThread(Shell.java:1042) at org.apache.hadoop.util.Shell.runCommand(Shell.java:1002) at org.apache.hadoop.util.Shell.run(Shell.java:900) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212) at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306) at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288) at org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:212) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1113) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1102) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFileSystem.java:1073) at org.apache.hadoop.fs.FileSystem.rename(FileSystem.java:1574) at org.apache.hadoop.fs.DelegateToFileSystem.renameInternal(DelegateToFileSystem.java:206) at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:790) at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720) at org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:489) at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720) at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1036) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:377) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:154) at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:198) at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:188) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:450) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:320) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:141) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerBaseImpl.commit(StreamingAggregationStateManager.scala:89) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$7(statefulOperators.scala:436) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:640) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:143) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:143) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:392) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:436) at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:68) at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:127) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) 23/07/22 19:02:46 ERROR Utils: Aborting task java.lang.InterruptedException at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1367) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:248) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:314) at org.apache.spark.scheduler.OutputCommitCoordinator.canCommit(OutputCommitCoordinator.scala:107) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:478) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) 23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborting commit for partition 154 (task 3618, attempt 0, stage 53.0) 23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborted commit for partition 154 (task 3618, attempt 0, stage 53.0) 23/07/22 19:02:46 WARN Shell: Interrupted while joining on: Thread[Thread-36453,5,] java.lang.InterruptedException at java.base/java.lang.Object.wait(Native Method) at java.base/java.lang.Thread.join(Thread.java:1305) at java.base/java.lang.Thread.join(Thread.java:1379) at org.apache.hadoop.util.Shell.joinThread(Shell.java:1042) at org.apache.hadoop.util.Shell.runCommand(Shell.java:1002) at org.apache.hadoop.util.Shell.run(Shell.java:900) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212) at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306) at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288) at org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:212) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1113) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1102) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFileSystem.java:1073) at org.apache.hadoop.fs.FileSystem.rename(FileSystem.java:1590) at org.apache.hadoop.fs.DelegateToFileSystem.renameInternal(DelegateToFileSystem.java:206) at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:790) at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720) at org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:489) at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720) at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1036) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:377) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:154) at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:198) at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:188) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:450) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:320) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:141) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerBaseImpl.commit(StreamingAggregationStateManager.scala:89) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$7(statefulOperators.scala:436) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:640) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:143) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:143) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:392) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:436) at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:68) at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:127) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) 23/07/22 19:02:46 WARN Shell: Interrupted while joining on: Thread[Thread-36452,5,] java.lang.InterruptedException at java.base/java.lang.Object.wait(Native Method) at java.base/java.lang.Thread.join(Thread.java:1305) at java.base/java.lang.Thread.join(Thread.java:1379) at org.apache.hadoop.util.Shell.joinThread(Shell.java:1042) at org.apache.hadoop.util.Shell.runCommand(Shell.java:1002) at org.apache.hadoop.util.Shell.run(Shell.java:900) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212) at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306) at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288) at org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:212) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1113) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1102) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFileSystem.java:1073) at org.apache.hadoop.fs.FileSystem.rename(FileSystem.java:1574) at org.apache.hadoop.fs.DelegateToFileSystem.renameInternal(DelegateToFileSystem.java:206) at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:790) at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720) at org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:496) at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720) at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1036) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:377) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:154) at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:198) at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:188) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:450) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:320) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:141) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerBaseImpl.commit(StreamingAggregationStateManager.scala:89) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$7(statefulOperators.scala:436) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:640) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:143) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:143) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:392) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:436) at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:68) at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:127) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) 23/07/22 19:02:46 WARN Shell: Interrupted while joining on: Thread[Thread-36454,5,] java.lang.InterruptedException at java.base/java.lang.Object.wait(Native Method) at java.base/java.lang.Thread.join(Thread.java:1305) at java.base/java.lang.Thread.join(Thread.java:1379) at org.apache.hadoop.util.Shell.joinThread(Shell.java:1042) at org.apache.hadoop.util.Shell.runCommand(Shell.java:1002) at org.apache.hadoop.util.Shell.run(Shell.java:900) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212) at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306) at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288) at org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:212) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1113) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1102) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFileSystem.java:1073) at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:133) at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:751) at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720) at org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:489) at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720) at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1036) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:377) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:154) at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:198) at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:188) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:450) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:320) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:141) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerBaseImpl.commit(StreamingAggregationStateManager.scala:89) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$7(statefulOperators.scala:436) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:640) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:143) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:143) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:392) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:436) at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:68) at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:127) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) 23/07/22 19:02:46 ERROR Utils: Aborting task=======> (154 + 8) / 200] org.apache.spark.SparkException: Commit denied for partition 155 (task 3619, attempt 0, stage 53.0). at org.apache.spark.sql.errors.QueryExecutionErrors$.commitDeniedError(QueryExecutionErrors.scala:929) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:485) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) 23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborting commit for partition 155 (task 3619, attempt 0, stage 53.0) 23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborted commit for partition 155 (task 3619, attempt 0, stage 53.0) 23/07/22 19:02:46 WARN TaskSetManager: Lost task 160.0 in stage 53.0 (TID 3624) (172.20.10.12 executor driver): TaskKilled (Stage cancelled) 23/07/22 19:02:46 WARN TaskSetManager: Lost task 155.0 in stage 53.0 (TID 3619) (172.20.10.12 executor driver): TaskKilled (Stage cancelled) 23/07/22 19:02:46 WARN TaskSetManager: Lost task 154.0 in stage 53.0 (TID 3618) (172.20.10.12 executor driver): TaskKilled (Stage cancelled) 23/07/22 19:02:46 WARN TaskSetManager: Lost task 161.0 in stage 53.0 (TID 3625) (172.20.10.12 executor driver): TaskKilled (Stage cancelled) 23/07/22 19:02:46 WARN TaskSetManager: Lost task 159.0 in stage 53.0 (TID 3623) (172.20.10.12 executor driver): TaskKilled (Stage cancelled) 23/07/22 19:02:46 ERROR Utils: Aborting task org.apache.spark.SparkException: Commit denied for partition 157 (task 3621, attempt 0, stage 53.0). at org.apache.spark.sql.errors.QueryExecutionErrors$.commitDeniedError(QueryExecutionErrors.scala:929) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:485) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) 23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborting commit for partition 157 (task 3621, attempt 0, stage 53.0) 23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborted commit for partition 157 (task 3621, attempt 0, stage 53.0) 23/07/22 19:02:46 WARN TaskSetManager: Lost task 157.0 in stage 53.0 (TID 3621) (172.20.10.12 executor driver): TaskKilled (Stage cancelled) 23/07/22 19:02:46 ERROR Utils: Aborting task org.apache.spark.SparkException: Commit denied for partition 156 (task 3620, attempt 0, stage 53.0). at org.apache.spark.sql.errors.QueryExecutionErrors$.commitDeniedError(QueryExecutionErrors.scala:929) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:485) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) 23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborting commit for partition 156 (task 3620, attempt 0, stage 53.0) 23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborted commit for partition 156 (task 3620, attempt 0, stage 53.0) 23/07/22 19:02:46 WARN TaskSetManager: Lost task 156.0 in stage 53.0 (TID 3620) (172.20.10.12 executor driver): TaskKilled (Stage cancelled) 23/07/22 19:02:46 ERROR Utils: Aborting task org.apache.spark.SparkException: Commit denied for partition 158 (task 3622, attempt 0, stage 53.0). at org.apache.spark.sql.errors.QueryExecutionErrors$.commitDeniedError(QueryExecutionErrors.scala:929) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:485) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) 23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborting commit for partition 158 (task 3622, attempt 0, stage 53.0) 23/07/22 19:02:46 ERROR DataWritingSparkTask: Aborted commit for partition 158 (task 3622, attempt 0, stage 53.0) 23/07/22 19:02:46 WARN TaskSetManager: Lost task 158.0 in stage 53.0 (TID 3622) (172.20.10.12 executor driver): TaskKilled (Stage cancelled)
Now that's the end.
THANK YOU for sticking around and I hope this project was useful to you.
Hope talking to you soon!
Helio Ribeiro
helioribeiropro@gmail.com
+55 (11) 9 3932-8049