Spark Cheat Sheet for Scala/Python
目錄
Spark Example
Read the parquet file
scala> val param = spark.read.parquet("s3://file_path_you_put")
Print the parquet file schema
scala> param.printSchema()
root
|-- sha1: string (nullable = true)
|-- label: string (nullable = true)
|-- time: long (nullable = true)
Print the parquet content
scala> new_result.show()
+--------------------+-----+----------+
| uuid|label| time|
+--------------------+-----+----------+
|d8f9ba869c19f25cc...| Hell|1562112000|
|f8e172cb34d620bbe...| |1562112000|
|28eb0ec1e0d549a58...| PUMA|1562112000|
|145760249908bb4f7...| PUMA|1562112000|
|e5622270036303a86...| Hell|1562112000|
+--------------------+-----+----------+
only showing top 20 rows
Get the number of rows
scala> new_result.count()
res8: Long = 568
Join two dataframes (with name alias)
scale> val jt = new_df.as("n").join(old_df.as("o"), "uuid")
jt: org.apache.spark.sql.DataFrame = [uuid: string, label: string ... 3 more fields]
Filter two fields for the different value
scala> jt.filter(! ($"n.label" <=> $"o.label")).show()
+--------------------+-----+----------+-----+----------+
| uuid|label| time|label| time|
+--------------------+-----+----------+-----+----------+
|e3198d3d3b51d245a...| |1562112000| Hell|1562112000|
+--------------------+-----+----------+-----+----------+
ref: scala filtering out rows in a joined df based on 2 columns with same values - best way
PySpark Example
Read csv file and transform to data frame.
ps_schema = StructType([
StructField("sha1", StringType(), True),
StructField("label", StringType(), True),
StructField("time", LongType(), True)
])
spark.read.csv("file.csv", header=False, schema=ps_schema)
Count number of duplicate rows
import pyspark.sql.functions as funcs
df.groupBy(df.columns) \
.count() \
.where(funcs.col('count') > 1) \ # Filter count condition
.select(funcs.sum('count')) \
.show()
https://proinsias.github.io/til/Spark-Count-number-of-duplicate-rows/
Convert Dataframe to CSV or HTML String for email report usage
df = spark.read.csv("file.csv", header=False, schema=ps_schema)
panda_df = df.toPandas()
panda_df.to_html()
panda_df.to_csv()
https://docs.databricks.com/spark/latest/spark-sql/spark-pandas.html
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_csv.html
Spark Read multiple CSV files and parquet files
parquet
release_date = datetime.strptime("2020/10/10", '%Y/%m/%d')
process_dates = ["s3://sample/" + (release_date - timedelta(days=x)).strftime("%Y/%m/%d")
for x in range(0, 10)]
total_df = spark.read.parquet(*process_dates).select("sha1", "reqBody").cache()
CSV
release_date = datetime.strptime("2020/10/10", '%Y/%m/%d')
process_dates = ["s3://sample/" + (release_date - timedelta(days=x)).strftime("%Y/%m/%d")
for x in range(0, 10)]
total_df = spark.read.format("csv").option("header", "False").schema(StructType([
StructField("sha1", StringType(), True),
StructField("count", IntegerType(), True)
])).load(process_dates)
PDF External Link
https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf