Computational Thinking + Doing

Distributed, Parallel Computing With Spark

Splitting up data processing and computation of flight data over clusters of nodes—using Spark for Julia, Python, and R.

Getting Started

If you are interested in reproducing this work, here are the versions of Julia, Python, and R used (as well as the respective packages for each). In addition, Leland Wilkinson’s approach to data visualization (Grammar of Graphics) has been adopted in this work. Finally, my coding style here is verbose, in order to trace back where functions/methods and variables are originating from, and make this a learning experience for everyone—including me.

VERSION
v"1.5.0"
import Pkg
Pkg.add(name="CSV", version="0.10.4")
Pkg.add(name="DataFrames", version="1.3.6")
Pkg.add(name="CategoricalArrays", version="0.10.7")
Pkg.add(name="Colors", version="0.12.10")
Pkg.add(name="Cairo", version="1.0.5")
Pkg.add(name="Gadfly", version="1.3.4")
Pkg.add(name="Spark", version="0.5.2")
using CSV
using DataFrames
using CategoricalArrays
using Colors
using Cairo
using Gadfly
using Spark
import sys
print(sys.version)
3.9.6 (v3.9.6:db3ff76da1, Jun 28 2021, 11:49:53) 
[Clang 6.0 (clang-600.0.57)]
!pip install pandas==2.0.0
!pip install plotnine==0.10.1
!pip install pyspark==3.2.0
import pandas
import plotnine
import pyspark
R.version.string
[1] "R version 4.1.1 (2021-08-10)"
require(devtools)
devtools::install_version("dplyr", version="1.1.1", repos="http://cran.us.r-project.org")
devtools::install_version("ggplot2", version="3.4.2", repos="http://cran.us.r-project.org")
devtools::install_version("sparklyr", version="1.8.1", repos="http://cran.us.r-project.org")
library(dplyr)
library(ggplot2)
library(sparklyr)
pyspark.__version__
'3.2.0'
# Create SparkSession object
spark_session = pyspark.sql.SparkSession.builder.master("local[4]").appName("flights").getOrCreate()  # Use 4 cores from local machine
print(spark_session)
# spark_session.version()
<pyspark.sql.session.SparkSession object at 0x7f9a95064a30>
# spark_context = pyspark.SparkContext()
# print(spark_context)
# spark_context.version
# spark_context.appName
# If infering schema
# flights = spark_session.read.csv("../../dataset/flights/flights.csv", header=True, inferSchema=True, nullValue="NA")

schema = pyspark.sql.types.StructType([
    pyspark.sql.types.StructField("mon", pyspark.sql.types.IntegerType()),
    pyspark.sql.types.StructField("dom", pyspark.sql.types.IntegerType()),
    pyspark.sql.types.StructField("dow", pyspark.sql.types.IntegerType()),
    pyspark.sql.types.StructField("carrier", pyspark.sql.types.StringType()),
    pyspark.sql.types.StructField("flight", pyspark.sql.types.IntegerType()),
    pyspark.sql.types.StructField("org", pyspark.sql.types.StringType()),
    pyspark.sql.types.StructField("mile", pyspark.sql.types.IntegerType()),
    pyspark.sql.types.StructField("depart", pyspark.sql.types.DoubleType()),
    pyspark.sql.types.StructField("duration", pyspark.sql.types.IntegerType()),
    pyspark.sql.types.StructField("delay", pyspark.sql.types.IntegerType())
])
flights = spark_session.read.csv("../../dataset/flights/flights.csv", header=True, schema=schema, nullValue="NA")
spark_session.catalog.listTables()
[]
print("The dataset contains %d records." % flights.count())
The dataset contains 50000 records.
flights.show(8)
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| null|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65| null|
|  5|  2|  1|     UA|   704|SFO| 550|  7.98|     102|    2|
|  7|  2|  6|     AA|   380|ORD| 733| 10.83|     135|   54|
|  1| 16|  6|     UA|  1477|ORD|1440|   8.0|     232|   -7|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 8 rows
flights.printSchema()
root
 |-- mon: integer (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- org: string (nullable = true)
 |-- mile: integer (nullable = true)
 |-- depart: double (nullable = true)
 |-- duration: integer (nullable = true)
 |-- delay: integer (nullable = true)
flights.dtypes
[('mon', 'int'), ('dom', 'int'), ('dow', 'int'), ('carrier', 'string'), ('flight', 'int'), ('org', 'string'), ('mile', 'int'), ('depart', 'double'), ('duration', 'int'), ('delay', 'int')]
flights = flights.drop("carrier", "flight")
flights.printSchema()
root
 |-- mon: integer (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- org: string (nullable = true)
 |-- mile: integer (nullable = true)
 |-- depart: double (nullable = true)
 |-- duration: integer (nullable = true)
 |-- delay: integer (nullable = true)
flights.filter("delay is NULL").count()
2978
# Close connection to Spark
spark_session.stop()
Applied Computing