Beliebte Suchanfragen
//

Hands-on Spark intro: Cross Join customers and products with business logic

5.8.2019 | 17 minutes of reading time

In this blog post, I want to share my aha moments with you I had during the development of my first (Py) Spark application.

We do this by an example application:

  • Read customers, products, and transactions.
  • Create the cross join of customers and products and add a score to these combinations.
  • Flag products that a customer has bought already based on previous transactions.
  • Apply the business logic per customer.
    • “Only recommend products a customer has not bought already”
    • “Only return the top 3 products per brand”

Why should we use Spark here? In our setting, we have

  • 100k customers
  • 10k products

When we do the cross product, we get 100 000 * 10 000 = 1 000 000 000 (1 billion) rows. 1 billion rows times at least 3 cols (customer_id, product_id, score), with 64bit precision / 8 byte precision yields 24 GB of data. If we join 1 (2,3) product properties for filtering, we get 32 GB (40 GB, 48 GB) already. This is not “big data”, but most probably the data is too big to fit into your local machine’s memory.

The following image shows a sketch of the workflow:

This application at hand, we will discover the following key lessons learned (I sorted them according to the importance I assign to them, top to bottom):

  • Use the Spark SQL / DataFrame API.
    Especially if you have a Pandas / SQL background. This structured data API is more high-level than the Resilient Distributed Dataset (RDD) API. The SQL/DataFrame API is actively developed, new features are shipped frequently.
  • Talk to Spark using plain old SQL.
    As SQL is a mature technology with a large user base, your company’s analysts and BI department might speak SQL fluently already, thus, they might be able to adapt your application quickly.
  • Spark does lazy evaluation.
    When reading data with a read statement, we set a starting point for the execution graph. Nothing is done until so called actions trigger the processing. A list of actions (not exhaustive): count, show, head, write.{parquet, json, csv, saveAsTable}.
  • Avoid CSV as a serialization format, use Parquet instead.
    CSV comes without schema, and schema inference might take very long at initial read if the data to be read is not small.
  • Do checkpointing frequently, either to Parquet or to Hive tables.
    “Checkpointing” is storing of intermediate results. As Spark does lazy evaluation, you might be wondering why things take so long. But a word of caution: Don’t do it too frequently, because then Spark cannot optimize across the stages of the execution graph.
  • Only a partitioned dataset can be processed in parallel.
    Partitions are always processed as a whole. By default, we can assume that 1 partition = 1 (executor) core = 1 task holds.
  • For an efficient cross join of two rather small tables, do repartition the left table.
    Otherwise the cross join cannot be parallelized. Small tables (< 100 MB) might be read into one partition only.

The lessons learned are also summarized in the following diagram:

The runtime was recorded on a Google Cloud Dataproc cluster with 2 n1-standard-8 workers with 8 vCPUs and 30 GB RAM each. By now (July 2019), Google’s managed cluster service “Dataproc” ships with a fully functional Jupyter installation out of the box. You can find the cluster configuration amongst this blog post’s notebook in the accompanying Github repo .

Read customers, products and transactions

1# To log notebook execution time
2import time
3start = time.time()
1from pyspark.sql import SparkSession

First, we need to instantiate a SparkSession. When searching StackOverflow, you might encounter {Spark, SQL, Hive}Context as well. In the newer versions of Spark, these contexts were all unified into the SparkSession.
However, it’s important to enableHiveSupport to be able to communicate with the existing Hive installation. We use Hive to store data in distributed tables, i.e. the table data is stored across many (Parquet) files under the hood. Because of that, we can read and write in parallel, thus run our jobs in less wall clock time.

1%%time
2# This takes some time when executing the command for the first time
3ss = SparkSession.builder.enableHiveSupport().getOrCreate()

[stdout]:
CPU times: user 36 ms, sys: 12 ms, total: 48 ms
Wall time: 15.7 s

We define and browse the data directory on Google Cloud Storage:

1# Define the data directory in a GCP bucket. We read / write to GCP directly.
2DATADIR = "gs://spark-intro/data/"
1# IPython magic to browse the blob storage
2!gsutil ls $DATADIR

[stdout]:
gs://spark-intro/data/
gs://spark-intro/data/customers100000.csv
gs://spark-intro/data/products10000.csv
gs://spark-intro/data/output/
gs://spark-intro/data/transactions_csv/
gs://spark-intro/data/transactions_pq/
1def info(df):
2    # Helper method to avoid repeating commands
3    df.printSchema()
4    df.show(5)

We read customers and products from a CSV, as CSV is still a common exchange format. Specify option("header", True) when dealing with CSVs where the header row contains the column names. With option("inferSchema", True), Spark will infer the schema. We will see in the next paragraph that schema inference can be a very expensive operation!

Read customers (1.5 MB) and products (350 KB)

1%%time
2customers = ss.read.option("header", True).option("inferSchema", True).csv(DATADIR + "customers100000.csv")
3products = ss.read.option("header", True).option("inferSchema", True).csv(DATADIR + "products10000.csv")

[stdout]:
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 13 s

Let’s count customers and products:

1info(customers)
2info(products)

[stdout]:
root
 |-- cust_id: integer (nullable = true)
 |-- is_male: boolean (nullable = true)
 |-- location: integer (nullable = true)

+-------+-------+--------+
|cust_id|is_male|location|
+-------+-------+--------+
|1000000|   true|       1|
|1000001|  false|       2|
|1000002|   true|       3|
|1000003|  false|       4|
|1000004|   true|       1|
+-------+-------+--------+
only showing top 5 rows

root
 |-- product_id: integer (nullable = true)
 |-- food: boolean (nullable = true)
 |-- price: double (nullable = true)
 |-- brand: string (nullable = true)
 |-- at_location: string (nullable = true)

+----------+-----+------+-------+------------+
|product_id| food| price|  brand| at_location|
+----------+-----+------+-------+------------+
|     10000| true| 99.38| luxury|         [1]|
|     10001|false|141.12| luxury|      [1, 2]|
|     10002| true|151.15|premium|   [1, 2, 3]|
|     10003|false| 62.31|premium|[1, 2, 3, 4]|
|     10004| true| 92.95| luxury|         [1]|
+----------+-----+------+-------+------------+
only showing top 5 rows

1%%time
2customers.count()

[stdout]:
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 717 ms

100000
1%%time
2products.count()

[stdout]:
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 221 ms

10000

Everything is running smoothly so far, the results come almost instantly. Could as well be a Pandas execution now.

Read transaction data (18 GB CSV)

1TRANSACTIONS_CSV_PATH = transactions_csv_path = DATADIR + "transactions_csv"
2TRANSACTIONS_PARQUET_PATH = DATADIR + "transactions_pq"

How many files are we going to read?

1!gsutil du -h $TRANSACTIONS_CSV_PATH

[stdout]:
0 B          gs://spark-intro/data/transactions_csv/
0 B          gs://spark-intro/data/transactions_csv/_SUCCESS
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00000-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00001-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00002-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00003-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00004-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00005-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00006-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00007-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00008-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00009-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00010-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00011-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00012-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00013-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00014-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00015-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00016-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00017-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00018-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00019-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00020-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00021-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00022-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00023-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00024-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00025-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00026-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00027-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00028-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00029-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00030-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00031-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00032-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00033-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00034-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00035-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00036-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.56 MiB   gs://spark-intro/data/transactions_csv/part-00037-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00038-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00039-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00040-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00041-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00042-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00043-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00044-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00045-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00046-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00047-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00048-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
371.19 MiB   gs://spark-intro/data/transactions_csv/part-00049-8d19b503-8b7a-428f-8013-3a3920e35ac4-c000.csv
18.13 GiB    gs://spark-intro/data/transactions_csv/

So, we will read 18 GB of CSVs distributed across 50 files.

1%%time
2transactions_csv = ss.read.option("header", True).csv(transactions_csv_path)
3info(transactions_csv)

[stdout]:
root
 |-- cust_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- date: string (nullable = true)

+-------+----------+--------------------+
|cust_id|product_id|                date|
+-------+----------+--------------------+
|1087831|     10000|2018-05-23T23:04:...|
|1087831|     10001|2018-09-03T20:20:...|
|1087831|     10002|2018-08-07T15:21:...|
|1087831|     10003|2018-08-27T08:00:...|
|1087831|     10004|2018-11-29T22:50:...|
+-------+----------+--------------------+
only showing top 5 rows

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 1.64 s

This was very fast. Did we really read in all the CSV data now?
No, read() does not bring the data into memory. Instead, Spark just adds the data source to the execution graph. This is Spark’s lazy evaluation, we did not trigger an action yet. Let’s trigger the action count(), which will scan all data in the CSVs:

1%%time
2transactions_csv.count()

[stdout]:
CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 38.6 s

499180000

Lesson learned: Spark does lazy evaluation.

When reading data with a read statement, we set a starting point for the execution graph. Nothing is done until so called actions trigger the processing. A list of actions (not exhaustive): count, show, head, write.{parquet, json, csv, saveAsTable}.

Schema is still not correct. Let’s do inferSchema.:

1%%time
2transactions_csv = ss.read.option("header", True).option("inferSchema", True).csv(transactions_csv_path)

[stdout]:
CPU times: user 52 ms, sys: 0 ns, total: 52 ms
Wall time: 3min 18s
1info(transactions_csv)

[stdout]:
root
 |-- cust_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- date: timestamp (nullable = true)

+-------+----------+--------------------+
|cust_id|product_id|                date|
+-------+----------+--------------------+
|1087831|     10000|2018-05-23 23:04:...|
|1087831|     10001|2018-09-03 20:20:...|
|1087831|     10002|2018-08-07 15:21:...|
|1087831|     10003|2018-08-27 08:00:...|
|1087831|     10004|2018-11-29 22:50:...|
+-------+----------+--------------------+
only showing top 5 rows

Schema inference took quite some time, because Spark needed to scan every single value in the data (!) to perform the inference. Using parquet is the better option, the schema is stored in the metadata and I/O is way faster. However, it is very common that third parties provide data in the CSV format.

1!gsutil du -h $TRANSACTIONS_PARQUET_PATH

[stdout]:
0 B          gs://spark-intro/data/transactions_pq/
0 B          gs://spark-intro/data/transactions_pq/_SUCCESS
108.74 MiB   gs://spark-intro/data/transactions_pq/part-00000-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.81 MiB   gs://spark-intro/data/transactions_pq/part-00001-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.73 MiB   gs://spark-intro/data/transactions_pq/part-00002-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.76 MiB   gs://spark-intro/data/transactions_pq/part-00003-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.69 MiB   gs://spark-intro/data/transactions_pq/part-00004-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.72 MiB   gs://spark-intro/data/transactions_pq/part-00005-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.69 MiB   gs://spark-intro/data/transactions_pq/part-00006-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.81 MiB   gs://spark-intro/data/transactions_pq/part-00007-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.81 MiB   gs://spark-intro/data/transactions_pq/part-00008-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.75 MiB   gs://spark-intro/data/transactions_pq/part-00009-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.68 MiB   gs://spark-intro/data/transactions_pq/part-00010-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.89 MiB   gs://spark-intro/data/transactions_pq/part-00011-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.79 MiB   gs://spark-intro/data/transactions_pq/part-00012-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
109 MiB      gs://spark-intro/data/transactions_pq/part-00013-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.86 MiB   gs://spark-intro/data/transactions_pq/part-00014-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.75 MiB   gs://spark-intro/data/transactions_pq/part-00015-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.85 MiB   gs://spark-intro/data/transactions_pq/part-00016-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.94 MiB   gs://spark-intro/data/transactions_pq/part-00017-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.81 MiB   gs://spark-intro/data/transactions_pq/part-00018-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.78 MiB   gs://spark-intro/data/transactions_pq/part-00019-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.84 MiB   gs://spark-intro/data/transactions_pq/part-00020-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.79 MiB   gs://spark-intro/data/transactions_pq/part-00021-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.83 MiB   gs://spark-intro/data/transactions_pq/part-00022-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.8 MiB    gs://spark-intro/data/transactions_pq/part-00023-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.74 MiB   gs://spark-intro/data/transactions_pq/part-00024-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.93 MiB   gs://spark-intro/data/transactions_pq/part-00025-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.83 MiB   gs://spark-intro/data/transactions_pq/part-00026-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.91 MiB   gs://spark-intro/data/transactions_pq/part-00027-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.8 MiB    gs://spark-intro/data/transactions_pq/part-00028-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.63 MiB   gs://spark-intro/data/transactions_pq/part-00029-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.77 MiB   gs://spark-intro/data/transactions_pq/part-00030-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.73 MiB   gs://spark-intro/data/transactions_pq/part-00031-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.66 MiB   gs://spark-intro/data/transactions_pq/part-00032-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.75 MiB   gs://spark-intro/data/transactions_pq/part-00033-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.82 MiB   gs://spark-intro/data/transactions_pq/part-00034-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.8 MiB    gs://spark-intro/data/transactions_pq/part-00035-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.83 MiB   gs://spark-intro/data/transactions_pq/part-00036-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.75 MiB   gs://spark-intro/data/transactions_pq/part-00037-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.81 MiB   gs://spark-intro/data/transactions_pq/part-00038-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.76 MiB   gs://spark-intro/data/transactions_pq/part-00039-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.74 MiB   gs://spark-intro/data/transactions_pq/part-00040-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.72 MiB   gs://spark-intro/data/transactions_pq/part-00041-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.78 MiB   gs://spark-intro/data/transactions_pq/part-00042-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.73 MiB   gs://spark-intro/data/transactions_pq/part-00043-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.76 MiB   gs://spark-intro/data/transactions_pq/part-00044-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.75 MiB   gs://spark-intro/data/transactions_pq/part-00045-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.75 MiB   gs://spark-intro/data/transactions_pq/part-00046-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.77 MiB   gs://spark-intro/data/transactions_pq/part-00047-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.79 MiB   gs://spark-intro/data/transactions_pq/part-00048-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
108.66 MiB   gs://spark-intro/data/transactions_pq/part-00049-65356b68-a1e4-4cab-ac79-798c4897781c-c000.snappy.parquet
5.31 GiB     gs://spark-intro/data/transactions_pq/

We see that parquet consume less space than the CSV files (to be fair, the files are also compressed with snappy here. You could also apply snappy compression on CSV, but the magnitude of difference would stay the same).

1%%time
2transactions_parquet = ss.read.parquet(TRANSACTIONS_PARQUET_PATH)
3info(transactions_parquet)

[stdout]:
root
 |-- cust_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- date: timestamp (nullable = true)

+-------+----------+--------------------+
|cust_id|product_id|                date|
+-------+----------+--------------------+
|1087831|     10000|2018-05-23 23:04:...|
|1087831|     10001|2018-09-03 20:20:...|
|1087831|     10002|2018-08-07 15:21:...|
|1087831|     10003|2018-08-27 08:00:...|
|1087831|     10004|2018-11-29 22:50:...|
+-------+----------+--------------------+
only showing top 5 rows

CPU times: user 0 ns, sys: 8 ms, total: 8 ms
Wall time: 3.32 s
1%%time
2transactions_parquet.count()

[stdout]:
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 2.44 s

499180000

As expected, the result is the same, but we needed way less wall clock time.

Lesson learned: Avoid CSV as a serialization format, use Parquet instead.

CSV comes without schema, and schema inference might take very long at initial read if the data to be read is not small.

Let’s use the transactions loaded from Parquet from now on:

1transactions = transactions_parquet

We can inspect the underlying Hive installation and data using the catalog property.

1# Check available tables
2ss.catalog.listTables()

[]

By registering the transactions as a temporary view (TempView), we can now write SQL queries.

1transactions.createOrReplaceTempView("transactions_view")
1ss.catalog.listTables()

[Table(name='transactions_view', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
1ss.sql(
2"""
3SELECT COUNT(*) FROM transactions_view
4""").show()

[stdout]:
+---------+
| count(1)|
+---------+
|499180000|
+---------+

Let’s save the dataframe as a Hive table:

1%%time
2transactions.write.mode("overwrite").saveAsTable("transactions")

[stdout]:
CPU times: user 8 ms, sys: 4 ms, total: 12 ms
Wall time: 56.2 s

Though we stored a table, we see that the data is stored in compressed parquet files (snappy), each roughly 100 MB in size. We get the total storage footprint on HDFS using this command:

1!sudo -u hdfs hadoop fs -du -h /user/hive/warehouse/transactions && sudo -u hdfs hadoop fs -du -s -h /user/hive/warehouse/transactions  # list all && show sum

[stdout]:
0        /user/hive/warehouse/transactions/_SUCCESS
109.0 M  /user/hive/warehouse/transactions/part-00000-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.9 M  /user/hive/warehouse/transactions/part-00001-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.9 M  /user/hive/warehouse/transactions/part-00002-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.9 M  /user/hive/warehouse/transactions/part-00003-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.9 M  /user/hive/warehouse/transactions/part-00004-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.9 M  /user/hive/warehouse/transactions/part-00005-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.9 M  /user/hive/warehouse/transactions/part-00006-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00007-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00008-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00009-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00010-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00011-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00012-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00013-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00014-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00015-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00016-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00017-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00018-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00019-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00020-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00021-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00022-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00023-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00024-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00025-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00026-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00027-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00028-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00029-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00030-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00031-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00032-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.8 M  /user/hive/warehouse/transactions/part-00033-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00034-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00035-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00036-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00037-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00038-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00039-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00040-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00041-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00042-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00043-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00044-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00045-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00046-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00047-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.7 M  /user/hive/warehouse/transactions/part-00048-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
108.6 M  /user/hive/warehouse/transactions/part-00049-4765ff2f-acee-4851-a075-ac1739df63ec-c000.snappy.parquet
5.3 G  /user/hive/warehouse/transactions

And the table is persisted on the disks of the cluster, not in GCS. This means it will be gone, once the cluster is shut down. This has pros and cons:

(+) Store intermediate data on the cluster’s disks is (normally) faster and needs less bandwidth than storing on GCS
(+) Data is lost after cluster deletion (i.e. you are not polluting your long time storage)

(-) Data is lost after cluster deletion (bad if you forgot to export it to long term storage)

Let’s query the Hive table:

1# The SparkSession.sql(...) statement returns a DataFrame
2df = ss.sql(
3"""
4SELECT * FROM transactions LIMIT 100
5""")
1info(df)

[stdout]:
root
 |-- cust_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- date: timestamp (nullable = true)

+-------+----------+--------------------+
|cust_id|product_id|                date|
+-------+----------+--------------------+
|1004545|     10000|2018-12-01 03:52:...|
|1004545|     10001|2018-02-18 03:23:...|
|1004545|     10002|2018-08-05 12:24:...|
|1004545|     10003|2018-08-29 16:23:...|
|1004545|     10004|2018-10-12 15:15:...|
+-------+----------+--------------------+
only showing top 5 rows

1sql_cust_id_count = ss.sql(
2"""
3SELECT 
4cust_id, count(1) AS product_count
5FROM transactions
6GROUP BY cust_id
7""")
1%%time
2info(sql_cust_id_count)

[stdout]:
root
 |-- cust_id: integer (nullable = true)
 |-- product_count: long (nullable = false)

+-------+-------------+
|cust_id|product_count|
+-------+-------------+
|1025592|        10000|
|1092824|        10000|
|1046254|        10000|
|1098371|        10000|
|1048135|        10000|
+-------+-------------+
only showing top 5 rows

CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 3.88 s

Create the cross join of customers and products and add a score to these combinations

By doing a cross join, we get a list of all possible customer / product combinations. You then calculate a recommender score in order to get the relevant recommendations. In this post, we use a dummy random score, as recommender scores are out of scope at the moment.
In order to understand how to do a cross join efficiently, we need to talk about partitions in Spark shortly. In general, you can always assume the following defaults: 1 partition = 1 (executor) core = 1 task

Of course, these can be adjusted. For example, in the current 2 x n1-standard-8 cluster setting, Spark allocated the resources as follows:

  • 2 workers with 8 cores each yield
  • 3 executors with 4 cores each, i.e. 12 cores in total (so 4 cores are not used for data crunching) yield
  • 12 partitions that can be processed in parallel (1 partition per core)

You can think of executors as Java Virtual Machines (JVMs). Master nodes don’t do the number crunching, they are occupied with scheduling and organization.
If your dataset is in 1 partition only, only 1 core can read from it and the rest will be idle. No parallelization is happening, that’s bad for performance. To circumvent that, Spark sometomes performs an internal repartition(), which creates 200 partitions by default. Thus 200 partitions is a sensible default we can use for repartitioning our small dataset explicitly. Let’s check current partition count.

1customers.rdd.getNumPartitions()

1
1products.rdd.getNumPartitions()

1

We have 1 partition for the customer data and 1 partition for the products. How many partitions do we have for the cross join result with random scores added?

1import pyspark.sql.functions as sqlf
1scores_unpartitioned = (customers.
2                        crossJoin(
3                            products).
4                        select(["cust_id", "product_id"]).  # select IDs only, not the property columns
5                        withColumn("score",sqlf.rand()))  # Add a column with random values
1scores_unpartitioned.rdd.getNumPartitions()

1

How long does the execution take?

1%%time
2scores_unpartitioned.count()

[stdout]:
CPU times: user 4 ms, sys: 12 ms, total: 16 ms
Wall time: 1min 6s

1000000000

Let’s compare this to the repartitioned result:

1scores_repartitioned = (customers.repartition(200, "cust_id").
2                        crossJoin(
3                            products).
4                        select(["cust_id", "product_id"]).
5                        withColumn("score",sqlf.rand()))
1scores_repartitioned.rdd.getNumPartitions()

200
1%%time
2scores_repartitioned.count()

[stdout]:
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 17.5 s

1000000000

Because we have more partitions now, Spark was able to exploit parallelization. By now, let’s state these two lessons learned:

Lesson learned: Only a partitioned dataset can be processed in parallel.

Partitions are always processed as a whole. By default, we can assume that 1 partition = 1 (executor) core = 1 task holds.

Lesson learned: For an efficient cross join of two rather small tables, do repartition the left table.

Otherwise the cross join cannot be parallelized. Small tables (< 100 MB) might be read into one partition only.

Let’s do checkpointing now: Store the results to parquet and read again:

1%%time
2scores_repartitioned.write.mode("overwrite").parquet(DATADIR + "output/scores")

[stdout]:
CPU times: user 36 ms, sys: 8 ms, total: 44 ms
Wall time: 2min 56s
1%%time
2scores = ss.read.parquet(DATADIR + "output/scores")
3scores.createOrReplaceTempView("scores")

[stdout]:
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 550 ms
1%%time
2scores.count()

[stdout]:
CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 4.71 s

1000000000

This count is much faster than the one before, because we now start with the materialized cross join in the Hive table.

Lesson learned: Do checkpointing frequently, either to Parquet or to Hive tables.

“Checkpointing” is storing of intermediate results. As Spark does lazy evaluation, you might be wondering why things take so long. But a word of caution: Don’t do it too frequently, because then Spark cannot optimize across the stages of the execution graph.

Flag products that a customer has bought already based on previous transactions

We will do the flagging using SQL queries:

1customer_purchasing_history = ss.sql("""
2SELECT cust_id, product_id, TRUE AS has_bought
3FROM transactions
4GROUP BY cust_id, product_id
5""")
1%%time
2info(customer_purchasing_history)

[stdout]:
root
 |-- cust_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- has_bought: boolean (nullable = false)

+-------+----------+----------+
|cust_id|product_id|has_bought|
+-------+----------+----------+
|1086994|     10252|      true|
|1086994|     10358|      true|
|1086994|     10373|      true|
|1086994|     10414|      true|
|1086994|     10582|      true|
+-------+----------+----------+
only showing top 5 rows

CPU times: user 16 ms, sys: 0 ns, total: 16 ms
Wall time: 50.7 s
1%%time
2# Do checkpointing
3customer_purchasing_history.write.mode("overwrite").saveAsTable("customer_purchasing_history")
4customer_purchasing_history = ss.read.table("customer_purchasing_history")

[stdout]:
CPU times: user 24 ms, sys: 0 ns, total: 24 ms
Wall time: 1min 30s
1info(customer_purchasing_history)

[stdout]:
root
 |-- cust_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- has_bought: boolean (nullable = true)

+-------+----------+----------+
|cust_id|product_id|has_bought|
+-------+----------+----------+
|1086994|     10147|      true|
|1086994|     10314|      true|
|1086994|     10352|      true|
|1086994|     10357|      true|
|1086994|     10485|      true|
+-------+----------+----------+
only showing top 5 rows

Apply the business logic per customer

We want to apply the following business rules:

  1. “Only recommend products a customer has not bought already”
  2. “Only return the top 3 products per brand”

We join the purchasing history to the scores in order to be able to apply the business rules:

1scores_w_purchasing_history = ss.sql(
2"""
3SELECT s.cust_id, 
4       s.product_id, 
5       s.score,
6       if(isnull(c.has_bought), FALSE, c.has_bought) AS has_bought
7FROM scores s
8FULL JOIN customer_purchasing_history c  -- We need to do a full / outer join here
9ON s.cust_id = c.cust_id AND s.product_id = c.product_id
10""")

We use SQL queries / substrings for the filter application. This way, we can read the SQL strings from some config file (for example YAML) and directly apply them to the data.

1# The dictionary of business rules shows all applied business rules at a glance
2business_rules = {
3    "filter1": "has_bought = FALSE",
4    "filter2": ("RANK() OVER (PARTITION BY cust_id , brand ORDER BY score DESC) AS rank", 
5                "rank <= 3")
6}

Lesson learned: Talk to Spark using plain old SQL.

As SQL is a mature technology with a large user base, your company’s analysts and BI department might speak SQL fluently already, thus they might be able to adapt your application quickly.

Business Rule 1: Only recommend products a customer has not bought already

1%%time
2filter1 = business_rules["filter1"]
3info(scores_w_purchasing_history.where(filter1))

[stdout]:
root
 |-- cust_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- score: double (nullable = true)
 |-- has_bought: boolean (nullable = true)

+-------+----------+-------------------+----------+
|cust_id|product_id|              score|has_bought|
+-------+----------+-------------------+----------+
|1000001|     10196|  0.597881670408303|     false|
|1000001|     10445| 0.8235333791152456|     false|
|1000001|     11040| 0.7713772388944897|     false|
|1000001|     11118| 0.4994585892704396|     false|
|1000001|     11355|0.40070680892389354|     false|
+-------+----------+-------------------+----------+
only showing top 5 rows

CPU times: user 28 ms, sys: 8 ms, total: 36 ms
Wall time: 2min 5s
1%%time
2scores_w_purchasing_history.where(filter1).write.mode("overwrite").saveAsTable("scores_filter1")
3scores_filter1 = ss.read.table("scores_filter1")

[stdout]:
CPU times: user 64 ms, sys: 16 ms, total: 80 ms
Wall time: 5min 2s
1%%time
2scores_filter1.count()

[stdout]:
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 587 ms

500820000

Business Rule 2: Only return the top 3 products per brand

We need to join the product properties to get the field “brand”:

1scores_filter1_w_product_properties = scores_filter1.join(products, on=["product_id"])  # Here we use the dataframe API instead of the SQL API
2scores_filter1_w_product_properties.createOrReplaceTempView("scores_filter1_w_product_properties")
1info(scores_filter1_w_product_properties)

[stdout]:
root
 |-- product_id: integer (nullable = true)
 |-- cust_id: integer (nullable = true)
 |-- score: double (nullable = true)
 |-- has_bought: boolean (nullable = true)
 |-- food: boolean (nullable = true)
 |-- price: double (nullable = true)
 |-- brand: string (nullable = true)
 |-- at_location: string (nullable = true)

+----------+-------+-------------------+----------+-----+------+------+------------+
|product_id|cust_id|              score|has_bought| food| price| brand| at_location|
+----------+-------+-------------------+----------+-----+------+------+------------+
|     10041|1000001| 0.8977266600968434|     false|false| 75.02|luxury|      [1, 2]|
|     10921|1000001|0.33134652721871927|     false|false| 98.95| basic|      [1, 2]|
|     11253|1000001| 0.5069594974425412|     false|false|119.49| basic|      [1, 2]|
|     11368|1000001| 0.2940201474995948|     false| true| 66.49| basic|         [1]|
|     11551|1000001| 0.6868115679855877|     false|false| 57.47|luxury|[1, 2, 3, 4]|
+----------+-------+-------------------+----------+-----+------+------+------------+
only showing top 5 rows

We return the top 3 products per category.

1ranking1 = business_rules["filter2"][0]  # could be input by the user
2filter2 = business_rules["filter2"][1]  # also input by the user
3sql_query = """
4WITH ranks AS (
5SELECT 
6    s.cust_id, 
7    s.product_id,
8    s.score,
9    s.brand,
10    {ranking}
11FROM scores_filter1_w_product_properties s
12)
13SELECT * FROM ranks WHERE {cond}
14""".format(ranking=ranking1, cond=filter2)
15top_recommendations = ss.sql(sql_query)
1%%time
2top_recommendations.write.mode("overwrite").parquet(DATADIR + "output/top_recommendations")

[stdout]:
CPU times: user 36 ms, sys: 8 ms, total: 44 ms
Wall time: 2min 44s
1top_recommendations = ss.read.parquet(DATADIR + "output/top_recommendations")
2top_recommendations.createOrReplaceTempView("top_recommendations")
3top_recommendations.show()

[stdout]:
+-------+----------+------------------+-------+----+
|cust_id|product_id|             score|  brand|rank|
+-------+----------+------------------+-------+----+
|1000224|     10532|0.9999332944587627|  basic|   1|
|1000224|     13208| 0.999765620782344|  basic|   2|
|1000224|     11081|0.9991193851027369|  basic|   3|
|1000272|     16230|0.9986882032035357| luxury|   1|
|1000272|     18165|0.9974428246640789| luxury|   2|
|1000272|     13488|0.9972459167873641| luxury|   3|
|1000386|     16778|0.9997568764884482|premium|   1|
|1000386|     19947|0.9990247032678057|premium|   2|
|1000386|     19088|0.9987345942417952|premium|   3|
|1000674|     13839|0.9998953300691195|premium|   1|
|1000674|     13967|0.9998558958418522|premium|   2|
|1000674|     11882|0.9998079385453216|premium|   3|
|1000984|     16141|0.9998787647506188|  basic|   1|
|1000984|     14586|0.9989459654547336|  basic|   2|
|1000984|     17419|0.9987924948429464|  basic|   3|
|1001039|     13913|0.9998172170691527|premium|   1|
|1001039|     16070|0.9997322556812516|premium|   2|
|1001039|     17591|0.9995317635241421|premium|   3|
|1001213|     15057|0.9993775054656683| luxury|   1|
|1001213|     15479|0.9992028541140711| luxury|   2|
+-------+----------+------------------+-------+----+
only showing top 20 rows

And we have our result. Some sanity check:

1brand_count = ss.sql("""
2SELECT cust_id, brand, COUNT(product_id) AS prod_count
3FROM top_recommendations
4GROUP BY cust_id, brand
5""")
6brand_count.createOrReplaceTempView("brand_count")
1ss.sql("""
2SELECT MIN(prod_count), MAX(prod_count) FROM brand_count""").show()

[stdout]:
+---------------+---------------+
|min(prod_count)|max(prod_count)|
+---------------+---------------+
|              3|              3|
+---------------+---------------+

That’s it. At last, I want to mention the very first lesson learned from the top of the post, as we are using it all the time:

Lesson learned: Use the Spark SQL / DataFrame API.

Especially if you have a Pandas / SQL background. This structured data API is more high level than the Resilient Distributed Dataset (RDD) API. The SQL/DataFrame API is actively developed, new features are shipped frequently.

1elapsed = time.time() - start
2print("Notebook execution took {:.2f}s".format(elapsed))

[stdout]:
Notebook execution took 1369.45s

Was this post useful for you? Do you disagree at some point? Please leave a comment and share your thoughts! 🙂

1# shuts down the IPython kernel of the Jupyter notebook
2exit()

The End

share post

//

More articles in this subject area

Discover exciting further topics and let the codecentric world inspire you.

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.