Skip to content

DatasetsπŸ”—

  • foundational type of Structured APIs and strictly JVM language feature that work only in Scala or Java.
  • using datasets we define objects that each row in Dataset consist of. In Scala this will be a case class object that essentially defines a schema that you can use, and in Java, you will define a Bean.
  • To efficiently support domain-specific objects, a special concept called an β€œEncoder” is required. The encoder maps the domain-specific type T to Spark’s internal type system.

When to Use DatasetsπŸ”—

  • since Datasets are costly and come with performance penalty, then why use them at all ? Reasons to use :
    • When the operation(s) cannot be expressed using DataFrame Manipulations
    • When we want or need type-safety, and willing to trade performance for it
  • e.g. A large set of business logic that needs to encoded in one specific function
  • Another use it to reuse a variety of transformations of entire rows between single-node workloads and Spark workloads

Creating DatasetsπŸ”—

  • manual operation, requires prior knowledge of schema ahead of time

In Java: EncodersπŸ”—

import org.apache.spark.sql.Encoders;

public class Flight implements Serializable{
  String DEST_COUNTRY_NAME;
  String ORIGIN_COUNTRY_NAME;
  Long DEST_COUNTRY_NAME;
}

Dataset<Flight> flights = spark.read
  .parquet("/data/flight-data/parquet/2010-summary.parquet/")
  .as(Encoders.bean(Flight.class));

In Scala : Case ClassesπŸ”—

  • case class is a regular class with characterstics
    • immutable
    • Decomposable through pattern matching
    • allows for comparison based on structure instead of reference
    • easy to use and manipulate
case class Flight(DEST_COUNTRY_NAME: String,
                  ORIGIN_COUNTRY_NAME: String, count: BigInt)

val flightsDF = spark.read
  .parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]

ActionsπŸ”—

It is important to understand actions like collect, take and count apply to whether we are using Datasets or DataFrames

flights.show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
+-----------------+-------------------+-----+

TransformationsπŸ”—

  • transformations are similar to a dataframe.

FilteringπŸ”—

def originIsDestination(flight_row: Flight): Boolean = {
  return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}

flights.filter(flight_row => originIsDestination(flight_row)).first()

MappingπŸ”—

// manipulate our dataset to extract one value from each row
// effectively similar to select on DataFrame

val destinations = flights.map(f => f.DEST_COUNTRY_NAME)    // Dataset of Type String

JoinsπŸ”—

  • joins are also similar, but dataset also provide more sophisticated method joinWith
  • joinWith is roughly equal to a co-group (in RDD terminology), basically nested Datasets inside of one.
case class FlightMetadata(count: BigInt, randomData: BigInt)

val flightsMeta = spark.range(500).map(x => (x, scala.util.Random.nextLong))
  .withColumnRenamed("_1", "count").withColumnRenamed("_2", "randomData")
  .as[FlightMetadata]

val flights2 = flights
  .joinWith(flightsMeta, flights.col("count") === flightsMeta.col("count"))

flights2.selectExpr("_1.DEST_COUNTRY_NAME")
flights2.take(2)
// Array[(Flight, FlightMetadata)] = Array((Flight(United States,Romania,1),...

Grouping and AggregationsπŸ”—

  • all previous discussion of groupBy, rollup and cube still apply
flights.groupBy("DEST_COUNTRY_NAME").count()

flights.groupByKey(x => x.DEST_COUNTRY_NAME).count()