6  Importing data to Spark

Another way of creating Spark DataFrames, is to read (or import) data from somewhere and convert it to a Spark DataFrame. Spark can read a variety of file formats, including CSV, Parquet, JSON, ORC and Binary files. Furthermore, Spark can connect to other databases and import tables from them, using ODBC/JDBC connections.

To read (or import) any data to Spark, we use a “read engine”, and there are many different read engines available in Spark. Each engine is used to read a specific file format, or to import data from a specific type of data source, and we access these engines by using the read module from your Spark Session object.

6.1 Reading data from static files

Static files are probably the easiest way to transport data from one computer to another. Because you just need to copy and paste this file to the other machine, or download it from the internet.

But in order to Spark read any type of static file stored inside your computer, it always need to know the path to this file. Every OS have its own file system, and every file in your computer is stored in a specific address of this file system. The “path” to this file is the path (or “steps”) that your computer needs to follow to reach this specific address, where the file is stored.

As we pointed out earlier, to read any static file in Spark, you use one of the available “read engines”, which are in the spark.read module of your Spark Session. This means that, each read engine in this module is responsible for reading a specific file format.

If you want to read a CSV file for example, you use the spark.read.csv() engine. In contrast, if you want to read a JSON file, you use the spark.read.json() engine instead. But no matter what read engine you use, you always give the path to your file to any of these “read engines”.

The main read engines available in Spark are:

  • spark.read.json(): to read JSON files.
  • spark.read.csv(): to read CSV files.
  • spark.read.parquet(): to read Apache Parquet files.
  • spark.read.orc(): to read ORC (Apache Optimized Row Columnar format) files.
  • spark.read.text(): to read text files.
  • spark.read.jdbc(): to read data from databases using the JDBC API.

For example, to read a JSON file called sales.json that is stored in my Data folder, I can do this:

json_data = spark.read.json("../Data/sales.json")
json_data.show()
+-----+----------+------------+-------+-------------------+-----+
|price|product_id|product_name|sale_id|          timestamp|units|
+-----+----------+------------+-------+-------------------+-----+
| 3.12|       134| Milk 1L Mua| 328711|2022-02-01T22:10:02|    1|
| 1.22|       110|  Coke 350ml| 328712|2022-02-03T11:42:09|    3|
| 4.65|       117|    Pepsi 2L| 328713|2022-02-03T14:22:15|    1|
| 1.22|       110|  Coke 350ml| 328714|2022-02-03T18:33:08|    1|
| 0.85|       341|Trident Mint| 328715|2022-02-04T15:41:36|    1|
+-----+----------+------------+-------+-------------------+-----+

6.2 An example with a CSV file

As an example, I have the following CSV file saved in my computer:

name,age,job
Jorge,30,Developer
Bob,32,Developer

This CSV was saved in a file called people.csv, inside a folder called Data. So, to read this static file, Spark needs to know the path to this people.csv file. In other words, Spark needs to know where this file is stored in my computer, to be able to read it.

In my specific case, considering where this Data folder is in my computer, a relative path to it would be "../Data/". Having the path to the folder where people.csv is stored, I just need to add this file to the path, resulting in "../Data/people.csv". See in the example below, that I gave this path to the read.csv() method of my Spark Session. As a result, Spark will visit this address, and, read the file that is stored there:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

path = "../Data/people.csv"

df = spark.read.csv(path)
df.show()
+-----+---+---------+
|  _c0|_c1|      _c2|
+-----+---+---------+
| name|age|      job|
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+

In the above example, I gave a relative path to the file I wanted to read. But you can provide an absolute path1 to the file, if you want to. The people.csv is located at a very specific folder in my Linux computer, so, the absolute path to this file is pretty long as you can see below. But, if I were in my Windows machine, this absolute path would be something like "C:\Users\pedro\Documents\Projects\...".

# The absolute path to `people.csv`:
path = "/home/pedro/Documents/Projects/Books/"
path = path + "Introd-pyspark/Data/people.csv"

df = spark.read.csv(path)
df.show()
+-----+---+---------+
|  _c0|_c1|      _c2|
+-----+---+---------+
| name|age|      job|
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+

If you give an invalid path (that is, a path that does not exist in your computer), you will get a AnalysisException. In the example below, I try to read a file called "weird-file.csv" that (in theory) is located at my current working directory. But when Spark looks inside my current directory, it does not find any file called "weird-file.csv". As a result, Spark raises a AnalysisException that warns me about this mistake.

df = spark.read.csv("weird-file.csv")
Traceback (most recent call last):
pyspark.sql.utils.AnalysisException:
Path does not exist:
file:/home/pedro/Documents/Projects/Books/Introd-pyspark/weird-file.csv

Every time you face this “Path does not exist” error, it means that Spark did not found the file that you described in the path you gave to spark.read. In this case, is very likely that you have a typo or a mistake in your path. Maybe your forgot to add the .csv extension to the name of your file. Or maybe you forgot to use the right angled slash (/) instead of the left angled slash (\). Or maybe, you gave the path to folder \(x\), when in fact, you wanted to reach the folder \(y\).

Sometimes, is useful to list all the files that are stored inside the folder you are trying to access. This way, you can make sure that you are looking at the right folder of your file system. To do that, you can use the listdir() function from os module of python. As an example, I can list all the files that are stored inside of the Data folder in this way:

from os import listdir
listdir("../Data/")
['logs.json',
 'penguins.csv',
 'people.csv',
 'transf_reform.csv',
 'books.txt',
 'transf.csv',
 'user-events.json',
 'accounts.csv',
 'sales.json',
 'livros.txt']

6.3 Import options

While reading and importing data from any type of data source, Spark will always use the default values for each import option defined by the read engine you are using, unless you explicit ask it to use a different value. Each read engine has its own read/import options.

For example, the spark.read.orc() engine has a option called mergeSchema. With this option, you can ask Spark to merge the schemas collected from all the ORC part-files. In contrast, the spark.read.csv() engine does not have such option. Because this functionality of “merging schemas” does not make sense with CSV files.

This means that, some import options are specific (or characteristic) of some file formats. For example, the sep option (where you define the separator character) is used only in the spark.read.csv() engine. Because you do not have a special character that behaves as the “separator” in the other file formats (like ORC, JSON, Parquet…). So it does not make sense to have such option in the other read engines.

In the other hand, some import options can co-exist in multiple read engines. For example, the spark.read.json() and spark.read.csv() have both an encoding option. The encoding is a very important information, and Spark needs it to correctly interpret your file. By default, Spark will always assume that your files use the UTF-8 encoding system. Although, this may not be true for your specific case, and for these cases you use this encoding option to tell Spark which one to use.

In the next sections, I will break down some of the most used import options for each file format. If you want to see the complete list of import options, you can visit the Data Source Option section in the specific part of the file format you are using in the Spark SQL Guide2.

To define, or, set a specific import option, you use the option() method from a DataFrameReader object. To produce this kind of object, you use the spark.read module, like in the example below. Each call to the option() method is used to set a single import option.

Notice that the “read engine” of Spark (i.e. csv()) is the last method called at this chain (or sequence) of steps. In other words, you start by creating a DataFrameReader object, then, set the import options, and lastly, you define which “read engine” you want to use.

# Creating a `DataFrameReader` object:
df_reader = spark.read
# Setting the import options:
df_reader = df_reader\
  .option("sep", "$")\
  .option("locale", "pt-BR")
  
# Setting the "read engine" to be used with `.csv()`:
my_data = df_reader\
  .csv("../Data/a-csv-file.csv")

If you prefer, you can also merge all these calls together like this:

spark.read\ # a `DataFrameReader` object
  .option("sep", "$")\ # Setting the `sep` option
  .option("locale", "pt-BR")\ # Setting the `locale` option
  .csv("../Data/a-csv-file.csv") # The "read engine" to be used

There are many different import options for each read engine, and you can see the full list in the official documentation for Spark3. But lets just give you a brief overview of the probably most popular import options:

  • sep: sets the separator character for each field and value in the CSV file (defaults to ",");
  • encoding: sets the character encoding of the file to be read (defaults to "UTF-8");
  • header: boolean (defaults to False), should Spark consider the first line of the file as the header of the DataFrame (i.e. the name of the columns) ?
  • dateFormat and timestampFormat: sets the format for dates and timestamps in the file (defaults to "yyyy-MM-dd" and "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]" respectively);

6.4 Setting the separator character for CSV files

In this section, we will use the transf_reform.csv file to demonstrate how to set the separator character of a CSV file. This file, contains some data of transfers made in a fictitious bank. Is worth mentioning that this sep import option is only available for CSV files.

Lets use the peek_file() function defined below to get a quick peek at the first 5 lines of this file. If you look closely to these lines, you can identify that this CSV files uses the ";" character to separate fields and values, and not the american standard "," character.

def peek_file(path, n_lines = 5):
  with open(path) as file:
    lines = [next(file) for i in range(n_lines)]
  text = ''.join(lines)
  print(text)
  
peek_file("../Data/transf_reform.csv")
datetime;user;value;transferid;country;description
2018-12-06T22:19:19Z;Eduardo;598.5984;116241629;Germany;
2018-12-06T22:10:34Z;Júlio;4610.955;115586504;Germany;
2018-12-06T21:59:50Z;Nathália;4417.866;115079280;Germany;
2018-12-06T21:54:13Z;Júlio;2739.618;114972398;Germany;

This is usually the standard adopted by countries that uses a comma to define decimal places in real numbers. In other words, in some countries, the number 3.45 is usually written as 3,45.

Anyway, we know now that the transf_reform.csv file uses a different separator character, so, to correctly read this CSV file into Spark, we need to set the sep import option. Since this file comes with the column names in the first line, I also set the header import option to read this first line as the column names as well.

transf = spark.read\
  .option("sep", ";")\
  .option("header", True)\
  .csv("../Data/transf_reform.csv")
  
transf.show(5)
+--------------------+--------+--------+----------+-------+-----------+
|            datetime|    user|   value|transferid|country|description|
+--------------------+--------+--------+----------+-------+-----------+
|2018-12-06T22:19:19Z| Eduardo|598.5984| 116241629|Germany|       NULL|
|2018-12-06T22:10:34Z|   Júlio|4610.955| 115586504|Germany|       NULL|
|2018-12-06T21:59:50Z|Nathália|4417.866| 115079280|Germany|       NULL|
|2018-12-06T21:54:13Z|   Júlio|2739.618| 114972398|Germany|       NULL|
|2018-12-06T21:41:27Z|     Ana|1408.261| 116262934|Germany|       NULL|
+--------------------+--------+--------+----------+-------+-----------+
only showing top 5 rows

6.5 Setting the encoding of the file

Spark will always assume that your static files use the UTF-8 encoding system. But, that might not be the case for your specific file. In this situation, you have to tell Spark which is the appropriate encoding system to be used while reading the file. This encoding import option is available both for CSV and JSON files.

To do this, you can set the encoding import option, with the name of the encoding system to be used. As an example, lets look at the file books.txt, which is a CSV file encoded with the ISO-8859-1 system (i.e. the Latin 1 system).

If we use the defaults in Spark, you can see in the result below that some characters in the Title column are not correctly interpreted. Remember, this problem occurs because of a mismatch in encoding systems. Spark thinks books.txt is using the UTF-8 system, but, in reality, it uses the ISO-8859-1 system:

books = spark.read\
  .option("header", True)\
  .csv("../Data/books.txt")
  
books.show()
+--------------------+--------------------+------+
|               Title|              Author| Price|
+--------------------+--------------------+------+
|            O Hobbit|    J. R. R. Tolkien| 40.72|
|Matem�tica para E...|Carl P. Simon and...|139.74|
|Microeconomia: um...|       Hal R. Varian| 141.2|
|      A Luneta �mbar|      Philip Pullman| 42.89|
+--------------------+--------------------+------+

But if we tell Spark to use the ISO-8859-1 system while reading the file, then, all problems are solved, and all characters in the file are correctly interpreted, as you see in the result below:

books = spark.read\
  .option("header", True)\
  .option("encoding", "ISO-8859-1")\
  .csv("../Data/books.txt")
  
books.show()
+--------------------+--------------------+------+
|               Title|              Author| Price|
+--------------------+--------------------+------+
|            O Hobbit|    J. R. R. Tolkien| 40.72|
|Matemática para E...|Carl P. Simon and...|139.74|
|Microeconomia: um...|       Hal R. Varian| 141.2|
|      A Luneta Âmbar|      Philip Pullman| 42.89|
+--------------------+--------------------+------+

6.6 Setting the format of dates and timestamps

The format that humans write dates and timestamps vary drastically over the world. By default, Spark will assume that the dates and timestamps stored in your file are in the format described by the ISO-8601 standard. That is, the “YYYY-mm-dd”, or, “year-month-day” format.

But this standard might not be the case for your file. For example: the brazilian people usually write dates in the format “dd/mm/YYYY”, or, “day/month/year”; some parts of Spain write dates in the format “YYYY/dd/mm”, or, “year/day/month”; on Nordic countries (i.e. Sweden, Finland) dates are written in “YYYY.mm.dd” format.

Every format of a date or timestamp is defined by using a string with the codes of each part of the date/timestamp, like the letter ‘Y’ which represents a 4-digit year, or the letter ‘d’ which represents a 2-digit day. You can see the complete list of codes and their description in the official documentation of Spark4.

As an example, lets look into the user-events.json file. We can see that the dates and timestamps in this file are using the “dd/mm/YYYY” and “dd/mm/YYYY HH:mm:ss” formats respectively.

peek_file("../Data/user-events.json", n_lines=3)
{"dateOfEvent":"15/06/2022","timeOfEvent":"15/06/2022 14:33:10","userId":"b902e51e-d043-4a66-afc4-a820173e1bb4","nameOfEvent":"entry"}
{"dateOfEvent":"15/06/2022","timeOfEvent":"15/06/2022 14:40:08","userId":"b902e51e-d043-4a66-afc4-a820173e1bb4","nameOfEvent":"click: shop"}
{"dateOfEvent":"15/06/2022","timeOfEvent":"15/06/2022 15:48:41","userId":"b902e51e-d043-4a66-afc4-a820173e1bb4","nameOfEvent":"select: payment-method"}

Date variables are usually interpreted by Spark as string variables. In other words, Spark usually do not convert data that contains dates to the date type of Spark. In order to Spark

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DateType, StringType, TimestampType

schema = StructType([
  StructField('dateOfEvent', DateType(), True),
  StructField('timeOfEvent', TimestampType(), True),
  StructField('userId', StringType(), True),
  StructField('nameOfEvent', StringType(), True)
])

user_events = spark.read\
  .option("dateFormat", "d/M/y")\
  .option("timestampFormat", "d/M/y k:m:s")\
  .json("../Data/user-events.json", schema = schema)
  
user_events.show()
+-----------+-------------------+--------------------+--------------------+
|dateOfEvent|        timeOfEvent|              userId|         nameOfEvent|
+-----------+-------------------+--------------------+--------------------+
| 2022-06-15|2022-06-15 14:33:10|b902e51e-d043-4a6...|               entry|
| 2022-06-15|2022-06-15 14:40:08|b902e51e-d043-4a6...|         click: shop|
| 2022-06-15|2022-06-15 15:48:41|b902e51e-d043-4a6...|select: payment-m...|
+-----------+-------------------+--------------------+--------------------+

  1. That is, the complete path to the file, or, in other words, a path that starts in the root folder of your hard drive.↩︎

  2. For example, this Data Source Option for Parquet files is located at: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option↩︎

  3. https://spark.apache.org/docs/latest/sql-data-sources-csv.html↩︎

  4. https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html↩︎