Spark SQLπ
- Spark SQL can run SQL queries against views or tables organized into databases.
- We can define UDFs and analyze query plans in order to optimize their workload
- If some of data manipulation are Spark SQL and others are in DataFrames, they still compile to same underlying code.
What is SQLπ
- Structured Query Language is domain-specific language for expressing relational operations over data.
- SQL is everywhere, and even though tech pundits prophesized its death, its an extremely resilient data tool that many businesses depend on.
- Spark implements a subset of ANSI SQL 2003
Big Data and SQL: Apache Hiveπ
- Before Spark, Hive was de facto big data SQL access layer.
- developed at Facebook
- It helped propel Hadoop into different industries because analysts could run SQL Queries.
Big Data and SQL: Spark SQLπ
- With Spark 2.0 release, its authors created a superset of Hiveβs support, writing a native SQL parser that supports both ANSI-SQL as well as HiveQL queries.
- SQL analysts can now take advantage of Sparkβs computation abilities by plugging into the Thrift Server or Sparkβs SQL interface, whereas data engineers and scientists can use Spark SQL where appropriate in any data flow. This unifying API allows for data to be extracted with SQL, manipulated as a DataFrame, passed into one of Spark MLlibsβ large-scale machine learning algorithms, written out to another data source, and everything in between.
Sparkβs Relationship to Hiveπ
- Spark SQL has a great relationship with Hive because it can connect to Hive metastores. The Hive metastore is the way in which Hive maintains table information for use across sessions.
The Hive Metastoreπ
- To connect to Hive metastore, there are several properties required
- Metastore version
- Metstore jars
- Shared Prefixes: Proper Class prefixes in order to communicate with different databases
How to run Spark SQL Queriesπ
Spark SQL Interfaces :
Spark SQL CLIπ
Spark Programmatic SQL Interfaceπ
SparkSQL Thrift JDBC/ODBC Serverπ
- Spark provides JDBC interface by which a remote program can connect to Spark driver in order to execute Spark SQL queries.
- We can connect business softwares lik Tableau to Spark.
# start JDBC/ODBC server
./sbin/start-thriftserver.sh
# starting with some hive metastore
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-port> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri>
...
To test connection to this server
Catalogπ
- highest level of abstraction for storage of metadata about the data stored in your tables as well other helpful things like databases, tables, functions, and views
- The catalog is available in the
org.apache.spark.sql.catalog.Catalog
package
Tablesπ
- Tables are logically equivalent to a DataFrame in that they are structure of data against which you run commands.
- NOTE: Spark 2.x, tables always contain data. There is no notion of temporary table, only a view, which doesn't contain data. This is important because if you go to drop a table, you can risk losing the data when doing so.
Spark Managed Tablesπ
- Tables store two pieces of information : data and metadata about itself.
- Spark can manage the metadata for a set of files as well as data about tables.
- When you define a table from files on disk, you are defining an unmanaged table. When you use
saveAsTable
on a DataFrame, you are creating a managed table for which Spark will track of all of the relevant information. - Finding tables in a database :
show tables IN databaseName
Creating Tablesπ
- Spark has unique capability of reusing the entire Data Source API within SQL
CREATE TABLE flights (
DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
USING JSON OPTIONS (path '/data/flight-data/json/2015-summary.json')
-- tables with column comments
CREATE TABLE flights_csv (
DEST_COUNTRY_NAME STRING,
ORIGIN_COUNTRY_NAME STRING COMMENT "remember, the US will be most prevalent",
count LONG)
USING csv OPTIONS (header true, path '/data/flight-data/csv/2015-summary.csv')
-- creating table from a query
CREATE TABLE flights_from_select USING parquet AS SELECT * FROM flights
-- controlling layout of data
CREATE TABLE partitioned_flights USING parquet PARTITIONED BY (DEST_COUNTRY_NAME)
AS SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 5
Creating External Tablesπ
- spark is completely compatible with Hive SQL
CREATE EXTERNAL TABLE hive_flights (
DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/data/flight-data-hive/'
-- on an select query
CREATE EXTERNAL TABLE hive_flights_2
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/flight-data-hive/' AS SELECT * FROM flights
Inserting into Tablesπ
-- standard sql syntax
INSERT INTO flights_from_select
SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 20
-- partition spec for putting data in specific partition
INSERT INTO partitioned_flights
PARTITION (DEST_COUNTRY_NAME="UNITED STATES")
SELECT count, ORIGIN_COUNTRY_NAME FROM flights
WHERE DEST_COUNTRY_NAME='UNITED STATES' LIMIT 12
Describing Table Metadataπ
Refreshing Table Metadataπ
# refreshes all cached entries associated with table
REFRESH table partitioned_flights
# collects new partition information
MSCK REPAIR TABLE partitioned_flights
Dropping Tablesπ
Dropping a managed table deletes the data in the table, so you need to be very careful when doing this.
- for unmanaged tables (Hive), no data will be removed but we will no longer be able to refer to this data by table name.
Caching Tablesπ
Viewsπ
- View specifies a set of transformations on top of an existing table - basically saved query plans which can be convenient for organizing or reusing your query logic.
- Views can be global, set to a database or per session
Creating Viewsπ
CREATE VIEW just_usa_view AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
-- views limited to current session
CREATE TEMP VIEW just_usa_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
-- global temp view
CREATE GLOBAL TEMP VIEW just_usa_global_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
-- overwriting views
CREATE OR REPLACE TEMP VIEW just_usa_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
Execution plan of Views and DataFrames
val flights = spark.read.format("json")
.load("/data/flight-data/json/2015-summary.json")
val just_usa_df = flights.where("dest_country_name = 'United States'")
just_usa_df.selectExpr("*").explain
-- sql
EXPLAIN SELECT * FROM just_usa_view -- or
EXPLAIN SELECT * FROM flights WHERE dest_country_name = 'United States'
Dropping Viewsπ
- dropping a view doesnβt remove any data
Databasesπ
- tools for organizing tables
- if not defined spark uses default databases
SHOW DATABASES
-- Creating Databases
CREATE DATABASE some_db
-- setting the DataBase
USE some_db
-- list all tables in db
SHOW tables
SELECT * FROM flight -- fail with table/view not found
-- query from other database
SELECT * FROM default.flights
-- check current database
SELECT current_database()
-- switching database
USE default;
-- dropping a database
DROP DATABASE IF EXISTS some_db;
Select Statementsπ
- ANSI SQL requirements of SELECT works
SELECT [ALL|DISTINCT] named_expression[, named_expression, ...]
FROM relation[, relation, ...]
[lateral_view[, lateral_view, ...]]
[WHERE boolean_expression]
[aggregation [HAVING boolean_expression]]
[ORDER BY sort_expressions]
[CLUSTER BY expressions]
[DISTRIBUTE BY expressions]
[SORT BY sort_expressions]
[WINDOW named_window[, WINDOW named_window, ...]]
[LIMIT num_rows]
named_expression:
: expression [AS alias]
relation:
| join_relation
| (table_name|query|relation) [sample] [AS alias]
: VALUES (expressions)[, (expressions), ...]
[AS (column_name[, column_name, ...])]
expressions:
: expression[, expression, ...]
sort_expressions:
: expression [ASC|DESC][, expression [ASC|DESC], ...]
Conditional SQL Operationsπ
- SQL changes based on some condition i.e.
case...when...then...end
SELECT
CASE WHEN DEST_COUNTRY_NAME = 'UNITED STATES' THEN 1
WHEN DEST_COUNTRY_NAME = 'Egypt' THEN 0
ELSE -1 END
FROM partitioned_flights
Advanced Topicsπ
Complex Typesπ
- Incredibly powerful feature that does not exist in standard SQL.
Structsπ
- more akin to maps, providing a way of creating or querying nested data in Spark.
CREATE VIEW IF NOT EXISTS nested_data AS
SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights
SELECT * FROM nested_data
SELECT country.DEST_COUNTRY_NAME, count FROM nested_data -- notice dot syntax\
-- selection all subvalues from a struct
SELECT country.*, count FROM nested_data
Listsπ
collect_list
function, which creates a list of values.
SELECT DEST_COUNTRY_NAME as new_name, collect_list(count) as flight_counts,
collect_set(ORIGIN_COUNTRY_NAME) as origin_set
FROM flights GROUP BY DEST_COUNTRY_NAME
SELECT DEST_COUNTRY_NAME, ARRAY(1, 2, 3) FROM flights
-- accessing using python like syntax
SELECT DEST_COUNTRY_NAME as new_name, collect_list(count)[0]
FROM flights GROUP BY DEST_COUNTRY_NAME
-- converting array's back to rows using explode
-- new view
CREATE OR REPLACE TEMP VIEW flights_agg AS
SELECT DEST_COUNTRY_NAME, collect_list(count) as collected_counts
FROM flights GROUP BY DEST_COUNTRY_NAME
-- explode
SELECT explode(collected_counts), DEST_COUNTRY_NAME FROM flights_agg
Functionsπ
-- list of functions
SHOW FUNCTIONS
-- system/user functions
SHOW SYSTEM FUNCTIONS
SHOW USER FUNCTIONS
-- filtering functions
SHOW FUNCTIONS "s*";
SHOW FUNCTIONS LIKE "collect*"
Subqueriesπ
- Queries within other queries
- In Spark, there are two fundamental subqueries.
- Correlated Subqueries : some information from outer scope of the query
- Uncorrelated subqueries : no information from the outer scope
- Each of these query can return on (scalar subquery) or more values. Spark supports predicate subqueries, allowing filtering based on views.
Uncorrelated predicate subqueriesπ
-- composed of two uncorrelated queries
-- top 5 country destinations
SELECT dest_country_name FROM flights
GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5
+-----------------+
|dest_country_name|
+-----------------+
| United States|
| Canada|
| Mexico|
| United Kingdom|
| Japan|
+-----------------+
-- we can put this query inside another filter
SELECT * FROM flights
WHERE origin_country_name IN (SELECT dest_country_name FROM flights
GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5)
Correlated predicate subqueriesπ
- allows to use information from the outer scope in inner query
EXISTS
just checks for some existence in the subquery and returns true if there is a value.
-- to check if there is flight to take you back home, round way trip
SELECT * FROM flights f1
WHERE EXISTS (SELECT 1 FROM flights f2
WHERE f1.dest_country_name = f2.origin_country_name)
AND EXISTS (SELECT 1 FROM flights f2
WHERE f2.dest_country_name = f1.origin_country_name)
Uncorrelated scalar queriesπ
Miscellaneous Featuresπ
Configurationsπ
Property Name | Default | Meaning |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
true |
When set to true, Spark SQL automatically selects a compression codec for each column based on statistics of the data. |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OutOfMemoryError s (OOMs) when caching data. |
spark.sql.files.maxPartitionBytes |
134217728 (128 MB) | The maximum number of bytes to pack into a single partition when reading files. |
spark.sql.files.openCostInBytes |
4194304 (4 MB) | The estimated cost to open a file, measured by the number of bytes that could be scanned in the same time. This is used when putting multiple files into a partition. It is better to overestimate; that way the partitions with small files will be faster than partitions with bigger files (which is scheduled first). |
spark.sql.broadcastTimeout |
300 | Timeout in seconds for the broadcast wait time in broadcast joins. |
spark.sql.autoBroadcastJoinThreshold |
10485760 (10 MB) | Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. You can disable broadcasting by setting this value to -1. Note that currently statistics are supported only for Hive Metastore tables for which the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run. |
spark.sql.shuffle.partitions |
200 | Configures the number of partitions to use when shuffling data for joins or aggregations. |