Analyzing huge sensor data in near realtime with Apache Spark Streaming
2015-11-25
For this demo I downloaded and installed Apache Spark 1.5.1 Suppose you have a stream of data from several (industrial) machines likeMACHINE,TIMESTAMP,SIGNAL1,SIGNAL2,SIGNAL3,... 1,2015-01-01 11:00:01,1.0,1.1,1.2,1.3,.. 2,2015-01-01 11:00:01,2.2,2.1,2.6,2.8,. 3,2015-01-01 11:00:01,1.1,1.2,1.3,1.3,. 1,2015-01-01 11:00:02,1.0,1.1,1.2,1.4,. 1,2015-01-01 11:00:02,1.3,1.2,3.2,3.3,.. ...
Below a system, written in Python, that reads data from a stream (use the command “nc -lk 9999” to send data to the stream) and every 10 seconds collects alerts from signals: at least 4 suspicious values of a specific signal of the same machine```
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
min_occurs = 4
def signals_from_1_row_to_many(row): “output is (machine, date, signal_number, signal_value)” result = [] for f in range(2,21): result = result + [(row[0], row[1], f-1, row[f])] return result
def isAlert(signal, value): defaults = [83.0, 57.0, 37.0, 57.0, 45.0, 19.0, -223.0, 20.50, 20.42, 20.48, 20.24, 20.22, 20.43, 20, 20.44, 20.39, 20.36, 20.25, 1675.0] soglia = 0.95 if value == ‘’: return True value = float(value) ref = defaults[signal -1] if value < ref - soglia*ref or value > ref + soglia*ref: return True else: return False def isException(machine, signal):
sample data. the sensor 19 of machine 11 is broken
exceptions = [(11,19)] return (int(machine), signal) in exceptions
Create a local StreamingContext with two working thread and batch interval of 10 second
sc = SparkContext(“local[2]”, “SignalsAlerts”) ssc = StreamingContext(sc, 10)
Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream(“localhost”, 9999)
all_alerts = lines.map(lambda l: l.split(",")) \ .flatMap(signals_from_1_row_to_many) \ .filter(lambda s: isAlert(s[2], s[3])) \ .filter(lambda s: not isException(s[0], s[2])) \ .map(lambda s: (s[0]+’-’+str(s[2]), [(s[1], s[3])])) \ .reduceByKey(lambda x, y: x + y)
alerts = all_alerts.filter(lambda s: len(s[1]) > min_occurs)
alerts.pprint()
ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate