Datasetsπ
- foundational type of Structured APIs and strictly JVM language feature that work only in Scala or Java.
- using datasets we define objects that each row in Dataset consist of. In Scala this will be a case class object that essentially defines a schema that you can use, and in Java, you will define a Bean.
- To efficiently support domain-specific objects, a special concept called an βEncoderβ is required. The encoder maps the domain-specific type T to Sparkβs internal type system.
When to Use Datasetsπ
- since Datasets are costly and come with performance penalty, then why use them at all ? Reasons to use :
- When the operation(s) cannot be expressed using DataFrame Manipulations
- When we want or need type-safety, and willing to trade performance for it
- e.g. A large set of business logic that needs to encoded in one specific function
- Another use it to reuse a variety of transformations of entire rows between single-node workloads and Spark workloads
Creating Datasetsπ
- manual operation, requires prior knowledge of schema ahead of time
In Java: Encodersπ
import org.apache.spark.sql.Encoders;
public class Flight implements Serializable{
String DEST_COUNTRY_NAME;
String ORIGIN_COUNTRY_NAME;
Long DEST_COUNTRY_NAME;
}
Dataset<Flight> flights = spark.read
.parquet("/data/flight-data/parquet/2010-summary.parquet/")
.as(Encoders.bean(Flight.class));
In Scala : Case Classesπ
case class
is a regular class with characterstics- immutable
- Decomposable through pattern matching
- allows for comparison based on structure instead of reference
- easy to use and manipulate
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String, count: BigInt)
val flightsDF = spark.read
.parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
Actionsπ
It is important to understand actions like collect
, take
and count
apply to whether we are using Datasets or DataFrames
flights.show(2)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Romania| 1|
| United States| Ireland| 264|
+-----------------+-------------------+-----+
Transformationsπ
- transformations are similar to a dataframe.
Filteringπ
def originIsDestination(flight_row: Flight): Boolean = {
return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}
flights.filter(flight_row => originIsDestination(flight_row)).first()
Mappingπ
// manipulate our dataset to extract one value from each row
// effectively similar to select on DataFrame
val destinations = flights.map(f => f.DEST_COUNTRY_NAME) // Dataset of Type String
Joinsπ
- joins are also similar, but dataset also provide more sophisticated method
joinWith
joinWith
is roughly equal to a co-group (in RDD terminology), basically nested Datasets inside of one.
case class FlightMetadata(count: BigInt, randomData: BigInt)
val flightsMeta = spark.range(500).map(x => (x, scala.util.Random.nextLong))
.withColumnRenamed("_1", "count").withColumnRenamed("_2", "randomData")
.as[FlightMetadata]
val flights2 = flights
.joinWith(flightsMeta, flights.col("count") === flightsMeta.col("count"))
flights2.selectExpr("_1.DEST_COUNTRY_NAME")
flights2.take(2)
// Array[(Flight, FlightMetadata)] = Array((Flight(United States,Romania,1),...
Grouping and Aggregationsπ
- all previous discussion of
groupBy
,rollup
andcube
still apply