Managing Spark dataframes in Python
2016-12-02
Below a quick sample of using Apache Spark (2.0) dataframes for manipulating data. Sample data is a file of jsonlines like``` {“description”: “255/40 ZR17 94W”, “ean”: “EAN: 4981910401193”, “season”: “tires_season summer”, “price”: “203,98”, “model”: “Michelin Pilot Sport PS2 255/40 R17”, “id”: “MPN: 2351610”} {“description”: “225/55 R17 101V XL”, “ean”: “EAN: 5452000438744”, “season”: “tires_season summer”, “price”: “120,98”, “model”: “Pirelli P Zero 205/45 R17”, “id”: “MPN: 530155”}
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import \*
import re, sys
# warehouse\_location points to the default location for managed databases and tables
warehouse\_location = 'spark-warehouse'
spark = SparkSession \\
.builder \\
.appName("Python Spark ") \\
.config("spark.sql.warehouse.dir", warehouse\_location) \\
.enableHiveSupport() \\
.getOrCreate()
records\_orig = spark.read.json("scraped\_tyres\_data.json")
## removing bad records
records = records\_orig \\
.filter(records.id != '') \\
.filter(regexp\_extract('description', '(rinnovati)', 1) == '')
## saving bad records
records\_orig.subtract(records).coalesce(1).write.csv("bad-records.csv", sep=";")
# extract new features
regexp\_size = "(\\d+)/(\\d+) R(\\d+) (\\d+)(\\w+)\\s\*"
records = records \\
.withColumn("width", regexp\_extract("description", regexp\_size, 1)) \\
.withColumn("ratio", regexp\_extract("description", regexp\_size, 2)) \\
.withColumn("diameter", regexp\_extract("description", regexp\_size, 3)) \\
.withColumn("load\_index", regexp\_extract("description", regexp\_size, 4)) \\
.withColumn("speed\_index", regexp\_extract("description", regexp\_size, 5)) \\
.withColumn("brand", regexp\_extract("model", "^(\\w+) ", 1)) \\
.withColumn("season", trim(regexp\_replace("season", "tires\_season",""))) \\
.withColumn("id", trim(regexp\_replace("id", "MPN: ",""))) \\
.withColumn("ean", trim(regexp\_replace("ean", "EAN: ",""))) \\
.withColumn("runflat", regexp\_extract("description", "(runflat)", 1)) \\
.withColumn("mfs", regexp\_extract("description", "(MFS|FSL|bordo di protezione|bordino di protezione)", 1)) \\
.withColumn("xl", regexp\_extract("description", " (XL|RF)\\s\*", 1)) \\
.withColumn("chiodabile", regexp\_extract("description", "(chiodabile)\\s\*", 1))
## extracting and saving all season values
records.select("season").distinct().coalesce(1).write.csv("season\_values", sep=";")
# misc
# records.columns # show columns
# records.groupBy("brand").count().show()
# records.groupBy("brand").count().filter("count > 100").show(20,False)
#
# renaming all columns before joining dataframes with same column names
# records\_renamed = records.select(\*(col(x).alias(x + '\_renamed') for x in records.columns))
# join two dataframe
# records.join(record\_renamed, records.ean == records\_renamed.ean\_renamed)
#
#
# saving data to several formats
records.coalesce(1).write.csv("result.csv", sep=";")
records.write.json("result.json")
records.write.parquet("result.parquet")
records.write.format("com.databricks.spark.avro").save("result.avro")
```