Skip to content

Spark SQLπŸ”—

  • Spark SQL can run SQL queries against views or tables organized into databases.
  • We can define UDFs and analyze query plans in order to optimize their workload
  • If some of data manipulation are Spark SQL and others are in DataFrames, they still compile to same underlying code.

What is SQLπŸ”—

  • Structured Query Language is domain-specific language for expressing relational operations over data.
  • SQL is everywhere, and even though tech pundits prophesized its death, its an extremely resilient data tool that many businesses depend on.
  • Spark implements a subset of ANSI SQL 2003

Big Data and SQL: Apache HiveπŸ”—

  • Before Spark, Hive was de facto big data SQL access layer.
  • developed at Facebook
  • It helped propel Hadoop into different industries because analysts could run SQL Queries.

Big Data and SQL: Spark SQLπŸ”—

  • With Spark 2.0 release, its authors created a superset of Hive’s support, writing a native SQL parser that supports both ANSI-SQL as well as HiveQL queries.
  • SQL analysts can now take advantage of Spark’s computation abilities by plugging into the Thrift Server or Spark’s SQL interface, whereas data engineers and scientists can use Spark SQL where appropriate in any data flow. This unifying API allows for data to be extracted with SQL, manipulated as a DataFrame, passed into one of Spark MLlibs’ large-scale machine learning algorithms, written out to another data source, and everything in between.

Spark’s Relationship to HiveπŸ”—

  • Spark SQL has a great relationship with Hive because it can connect to Hive metastores. The Hive metastore is the way in which Hive maintains table information for use across sessions.

The Hive MetastoreπŸ”—

  • To connect to Hive metastore, there are several properties required
    • Metastore version
    • Metstore jars
    • Shared Prefixes: Proper Class prefixes in order to communicate with different databases

How to run Spark SQL QueriesπŸ”—

Spark SQL Interfaces :

Spark SQL CLIπŸ”—

./bin/spark-sql
# ./bin/spark-sql --help

Spark Programmatic SQL InterfaceπŸ”—

spark.sql("SELECT 1 + 1").show()    # return a dataframe executed lazily

SparkSQL Thrift JDBC/ODBC ServerπŸ”—

  • Spark provides JDBC interface by which a remote program can connect to Spark driver in order to execute Spark SQL queries.
  • We can connect business softwares lik Tableau to Spark.
# start JDBC/ODBC server
./sbin/start-thriftserver.sh

# starting with some hive metastore
./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

To test connection to this server

./bin/beeline
beeline> !connect jdbc:hive2://localhost:10000

CatalogπŸ”—

  • highest level of abstraction for storage of metadata about the data stored in your tables as well other helpful things like databases, tables, functions, and views
  • The catalog is available in the org.apache.spark.sql.catalog.Catalog package

TablesπŸ”—

  • Tables are logically equivalent to a DataFrame in that they are structure of data against which you run commands.
  • NOTE: Spark 2.x, tables always contain data. There is no notion of temporary table, only a view, which doesn't contain data. This is important because if you go to drop a table, you can risk losing the data when doing so.

Spark Managed TablesπŸ”—

  • Tables store two pieces of information : data and metadata about itself.
  • Spark can manage the metadata for a set of files as well as data about tables.
  • When you define a table from files on disk, you are defining an unmanaged table. When you use saveAsTable on a DataFrame, you are creating a managed table for which Spark will track of all of the relevant information.
  • Finding tables in a database : show tables IN databaseName

Creating TablesπŸ”—

  • Spark has unique capability of reusing the entire Data Source API within SQL
CREATE TABLE flights (
  DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
USING JSON OPTIONS (path '/data/flight-data/json/2015-summary.json')
-- tables with column comments

CREATE TABLE flights_csv (
  DEST_COUNTRY_NAME STRING,
  ORIGIN_COUNTRY_NAME STRING COMMENT "remember, the US will be most prevalent",
  count LONG)
USING csv OPTIONS (header true, path '/data/flight-data/csv/2015-summary.csv')

-- creating table from a query
CREATE TABLE flights_from_select USING parquet AS SELECT * FROM flights
-- controlling layout of data
CREATE TABLE partitioned_flights USING parquet PARTITIONED BY (DEST_COUNTRY_NAME)
AS SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 5

Creating External TablesπŸ”—

  • spark is completely compatible with Hive SQL
CREATE EXTERNAL TABLE hive_flights (
  DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/data/flight-data-hive/'

-- on an select query
CREATE EXTERNAL TABLE hive_flights_2
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/flight-data-hive/' AS SELECT * FROM flights

Inserting into TablesπŸ”—

-- standard sql syntax
INSERT INTO flights_from_select
  SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 20

-- partition spec for putting data in specific partition
INSERT INTO partitioned_flights
  PARTITION (DEST_COUNTRY_NAME="UNITED STATES")
  SELECT count, ORIGIN_COUNTRY_NAME FROM flights
  WHERE DEST_COUNTRY_NAME='UNITED STATES' LIMIT 12

Describing Table MetadataπŸ”—

DESCRIBE TABLE flights_csv

SHOW PARTITIONS partitioned_flights

Refreshing Table MetadataπŸ”—

# refreshes all cached entries associated with table
REFRESH table partitioned_flights

# collects new partition information
MSCK REPAIR TABLE partitioned_flights

Dropping TablesπŸ”—

Dropping a managed table deletes the data in the table, so you need to be very careful when doing this.

DROP TABLE flights_csv;

DROP TABLE IF EXISTS flights_csv;
  • for unmanaged tables (Hive), no data will be removed but we will no longer be able to refer to this data by table name.

Caching TablesπŸ”—

CACHE TABLE flights
UNCACHE TABLE flights

ViewsπŸ”—

  • View specifies a set of transformations on top of an existing table - basically saved query plans which can be convenient for organizing or reusing your query logic.
  • Views can be global, set to a database or per session

Creating ViewsπŸ”—

CREATE VIEW just_usa_view AS
  SELECT * FROM flights WHERE dest_country_name = 'United States'

-- views limited to current session
CREATE TEMP VIEW just_usa_view_temp AS
  SELECT * FROM flights WHERE dest_country_name = 'United States'

-- global temp view
CREATE GLOBAL TEMP VIEW just_usa_global_view_temp AS
  SELECT * FROM flights WHERE dest_country_name = 'United States'
SHOW TABLES
-- overwriting views
CREATE OR REPLACE TEMP VIEW just_usa_view_temp AS
  SELECT * FROM flights WHERE dest_country_name = 'United States'

Execution plan of Views and DataFrames

val flights = spark.read.format("json")
  .load("/data/flight-data/json/2015-summary.json")
val just_usa_df = flights.where("dest_country_name = 'United States'")
just_usa_df.selectExpr("*").explain


-- sql
EXPLAIN SELECT * FROM just_usa_view     -- or
EXPLAIN SELECT * FROM flights WHERE dest_country_name = 'United States'

Dropping ViewsπŸ”—

  • dropping a view doesn’t remove any data
DROP VIEW IF EXISTS just_usa_view;

DatabasesπŸ”—

  • tools for organizing tables
  • if not defined spark uses default databases
SHOW DATABASES

-- Creating Databases
CREATE DATABASE some_db

-- setting the DataBase
USE some_db

-- list all tables in db
SHOW tables

SELECT * FROM flight    -- fail with table/view not found

-- query from other database
SELECT * FROM default.flights

-- check current database
SELECT current_database()

-- switching database
USE default;

-- dropping a database
DROP DATABASE IF EXISTS some_db;

Select StatementsπŸ”—

  • ANSI SQL requirements of SELECT works
SELECT [ALL|DISTINCT] named_expression[, named_expression, ...]
    FROM relation[, relation, ...]
    [lateral_view[, lateral_view, ...]]
    [WHERE boolean_expression]
    [aggregation [HAVING boolean_expression]]
    [ORDER BY sort_expressions]
    [CLUSTER BY expressions]
    [DISTRIBUTE BY expressions]
    [SORT BY sort_expressions]
    [WINDOW named_window[, WINDOW named_window, ...]]
    [LIMIT num_rows]

named_expression:
    : expression [AS alias]

relation:
    | join_relation
    | (table_name|query|relation) [sample] [AS alias]
    : VALUES (expressions)[, (expressions), ...]
          [AS (column_name[, column_name, ...])]

expressions:
    : expression[, expression, ...]

sort_expressions:
    : expression [ASC|DESC][, expression [ASC|DESC], ...]

Conditional SQL OperationsπŸ”—

  • SQL changes based on some condition i.e. case...when...then...end
SELECT
  CASE WHEN DEST_COUNTRY_NAME = 'UNITED STATES' THEN 1
       WHEN DEST_COUNTRY_NAME = 'Egypt' THEN 0
       ELSE -1 END
FROM partitioned_flights

Advanced TopicsπŸ”—

Complex TypesπŸ”—

  • Incredibly powerful feature that does not exist in standard SQL.

StructsπŸ”—

  • more akin to maps, providing a way of creating or querying nested data in Spark.
CREATE VIEW IF NOT EXISTS nested_data AS
  SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights
SELECT * FROM nested_data
SELECT country.DEST_COUNTRY_NAME, count FROM nested_data    -- notice dot syntax\

-- selection all subvalues from a struct
SELECT country.*, count FROM nested_data

ListsπŸ”—

  • collect_list function, which creates a list of values.
SELECT DEST_COUNTRY_NAME as new_name, collect_list(count) as flight_counts,
  collect_set(ORIGIN_COUNTRY_NAME) as origin_set
FROM flights GROUP BY DEST_COUNTRY_NAME

SELECT DEST_COUNTRY_NAME, ARRAY(1, 2, 3) FROM flights

-- accessing using python like syntax
SELECT DEST_COUNTRY_NAME as new_name, collect_list(count)[0]
FROM flights GROUP BY DEST_COUNTRY_NAME

-- converting array's back to rows using explode
-- new view
CREATE OR REPLACE TEMP VIEW flights_agg AS
  SELECT DEST_COUNTRY_NAME, collect_list(count) as collected_counts
  FROM flights GROUP BY DEST_COUNTRY_NAME

-- explode
SELECT explode(collected_counts), DEST_COUNTRY_NAME FROM flights_agg

FunctionsπŸ”—

-- list of functions
SHOW FUNCTIONS

-- system/user functions
SHOW SYSTEM FUNCTIONS
SHOW USER FUNCTIONS

-- filtering functions
SHOW FUNCTIONS "s*";

SHOW FUNCTIONS LIKE "collect*"

SubqueriesπŸ”—

  • Queries within other queries
  • In Spark, there are two fundamental subqueries.
    • Correlated Subqueries : some information from outer scope of the query
    • Uncorrelated subqueries : no information from the outer scope
  • Each of these query can return on (scalar subquery) or more values. Spark supports predicate subqueries, allowing filtering based on views.

Uncorrelated predicate subqueriesπŸ”—

-- composed of two uncorrelated queries
-- top 5 country destinations
SELECT dest_country_name FROM flights
GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5
+-----------------+
|dest_country_name|
+-----------------+
|    United States|
|           Canada|
|           Mexico|
|   United Kingdom|
|            Japan|
+-----------------+

-- we can put this query inside another filter
SELECT * FROM flights
WHERE origin_country_name IN (SELECT dest_country_name FROM flights
      GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5)

Correlated predicate subqueriesπŸ”—

  • allows to use information from the outer scope in inner query
  • EXISTS just checks for some existence in the subquery and returns true if there is a value.
-- to check if there is flight to take you back home, round way trip
SELECT * FROM flights f1
WHERE EXISTS (SELECT 1 FROM flights f2
            WHERE f1.dest_country_name = f2.origin_country_name)
AND EXISTS (SELECT 1 FROM flights f2
            WHERE f2.dest_country_name = f1.origin_country_name)

Uncorrelated scalar queriesπŸ”—

SELECT *, (SELECT max(count) FROM flights) AS maximum FROM flights

Miscellaneous FeaturesπŸ”—

ConfigurationsπŸ”—

Property Name Default Meaning
spark.sql.inMemoryColumnarStorage.compressed true When set to true, Spark SQL automatically selects a compression codec for each column based on statistics of the data.
spark.sql.inMemoryColumnarStorage.batchSize 10000 Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OutOfMemoryErrors (OOMs) when caching data.
spark.sql.files.maxPartitionBytes 134217728 (128 MB) The maximum number of bytes to pack into a single partition when reading files.
spark.sql.files.openCostInBytes 4194304 (4 MB) The estimated cost to open a file, measured by the number of bytes that could be scanned in the same time. This is used when putting multiple files into a partition. It is better to overestimate; that way the partitions with small files will be faster than partitions with bigger files (which is scheduled first).
spark.sql.broadcastTimeout 300 Timeout in seconds for the broadcast wait time in broadcast joins.
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. You can disable broadcasting by setting this value to -1. Note that currently statistics are supported only for Hive Metastore tables for which the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run.
spark.sql.shuffle.partitions 200 Configures the number of partitions to use when shuffling data for joins or aggregations.

Setting Configuration Values in SQLπŸ”—

SET spark.sql.shuffle.partitions=20