transf.write
<pyspark.sql.readwriter.DataFrameWriter at 0x79a654014cb0>
After you transform your DataFrame and generate the results you want, you might need to actually export these results out of Spark, so you can:
write
object as the main entrypointEvery Spark session you start has an built-in read
object that you can use to read data and import it into Spark (this object was described at Section 6.1), and the same applies to writing data out of Spark. That is, Spark also offers a write
object that you can use to write/output data out of Spark.
But in contrast to the read
object, which is avaiable trough the SparkSession
object (spark
), this write
object is available trough the write
method of any DataFrame
object. In other words, every DataFrame you create in Spark has a built-in write
object that you can use to write/export the data present in this DataFrame out of Spark.
As an example, let’s use the transf
DataFrame that I presented at Chapter 5. The write
method of the transf
DataFrame object is the main entrypoint to all the facilities that Spark offers to write/export transf
’s data to somewhere else.
This write
object is very similar in structure to the read
object. Essentially, this write object have a collection of write engines. Each write engine is speciallized in writing data into a specific file format. So you have an engine for CSV files, another engine for JSON files, another for Parquet files, etc.
Every write
object have the following methods:
mode()
: set the mode of the write process. This affects how the data will be written to the files, and how the process will behaviour if exceptions (or erros) are raised during runtime.option()
: set an option to be used in the write process. This option might be specific to the write engine used, or, might be an option that is global to the write process (i.e. an option that does not depend of the chosen engine).csv()
: the write engine to export data to CSV files.json()
: the write engine to export data to JSON files.parquet()
: the write engine to export data to Parquet files.orc()
: the write engine to export data to ORC files.text()
: the write engine to export data to text files.jdbc()
: saves the data of the current DataFrame into a database using the JDBC API.transf
DataFrameAs a first example on how to export data out of Spark, I will export the data from the transf
DataFrame. Over the next sections, I will cover individual aspects that influences this write/export process. You should know and consider each of these individual aspects when exporting your data.
Lets begin with a quick example of exporting the Spark data to a CSV file. For this job, we need to use the write engine for CSV files, which is the csv()
method from the write object.
The first (and main) argument to all write engines available in Spark is a path to a folder where you want to store the exported files. This means that (whatever write engine you use) Spark will always write the files (with the exported data) inside a folder.
Spark needs to use a folder to write the data. Because it generates some extra files during the process that serves as “placeholders” or as “statuses”. That is why Spark needs to create a folder, to store all of these different files together during the process.
In the example below, I decided to write this data into a folder called transf_export
.
Now, after I executed the above command, if I take a look at my current working directory, I will see the transf_export
folder that was created by Spark.
from pathlib import Path
current_directory = Path(".")
folders_in_current_directory = [
str(item)
for item in current_directory.iterdir()
if item.is_dir()
]
print(folders_in_current_directory)
['metastore_db', 'transf_export']
And if I look inside this transf_export
folder I will see two files. One is the placeholder file (_SUCCESS
), and the other, is a CSV file containing the exported data (part-*.csv
).
export_folder = Path("transf_export")
files = [str(x.name) for x in export_folder.iterdir()]
print(files)
['part-00000-a4ee2ff4-4b7f-499e-a904-cec8d524ac56-c000.csv', '_SUCCESS']
We can see this file structure by using the tree
command line utility1 to build a diagram of this file structure:
Terminal$ tree transf_export
transf_export
├── part-00000-a4ee2ff4-4b7f-499e-a904-cec8d524ac56-c000.csv
└── _SUCCESS
You can set the mode of a write process by using the mode()
method. This “mode of the write process” affects specially the behavior of the process when files for this particular DataFrame you trying to export already exists in your file system.
There are four write modes available in Spark:
append
: will append the exported data to existing files of this specific DataFrame.overwrite
: will overwrite the data inside existing files of this specific DataFrame with the data that is being currently exported.error
or errorifexists
: will throw an exception in case already existing files for this specific DataFrame are found.ignore
: silently ignore/abort this write operation in case already existing files for this specific DataFrame are found.If we set the write mode to overwrite
, this means that every time we execute the command below, the files inside the folder transf_export
are rewritten from scratch. Everytime we export the data, the files part-*
inside the folder are rewritten to contain the most fresh data from transf
DataFrame.
However, if we set the write mode to error
, and run the command again, then an error will be raised to indicate that the folder (transf_export
) where we are trying to write the files already exists.
AnalysisException: [PATH_ALREADY_EXISTS]
Path file:/home/pedro/Documentos/Projetos/Livros/Introd-pyspark/Chapters/transf_export
already exists. Set mode as "overwrite" to overwrite the existing path.
In contrast, if we set the write mode to append
, then the current data of transf is appended (or “added”) to the folder transf_export
.
Now, if I take a look at the contents of the transf_export
folder, I will see now two part-*
files instead of just one. Both files have the same size (around 218 kb) because they both contain the same data, or the same lines from the transf
DataFrame.
Terminal$ tree transf_export
transf_export
├── part-00000-a4ee2ff4-4b7f-499e-a904-cec8d524ac56-c000.csv
├── part-00000-ffcc7487-fc60-403b-a815-a1dd56894062-c000.csv
└── _SUCCESS
This means that the data is currently duplicated inside the transf_export
folder. We can see this duplication by looking at the number of rows of the DataFrame contained inside transf_export
. We can use spark.read.load()
to quickly load the contents of the transf_export
folder into a new DataFrame, and use count()
method to see the number of rows.
4842
The result above show us that the folder transf_export
currently contains 4842 rows of data. This is the exact double of number of rows in the transf
DataFrame, which have 2421 rows.
So, in resume, the difference between write mode overwrite
and append
, is that overwrite
causes Spark to erase the contents of transf_export
, before it starts to write the current data into the folder. This way, Spark exports the most recent version of the data stored inside the DataFrame. In contrast, append
simply appends (or adds) new files to the folder transf_export
with the most recent version of the data stored inside the DataFrame.
At Section 7.2.1.6 (or more specifically, at Figure 7.1) we presented this difference visually. So, in case you don’t understood fully the difference between these two write modes, you can comeback at Section 7.2.1.6 and check Figure 7.1 to see if it clears your understanding. OBS: save modes = write modes.
Each person might have different needs, and also, each file format (or each write engine) have its particularities or advantages that you may need to exploit. As a consequence, you might need to set some options to customize the writing process to fit into your needs.
You can set options for the write process using the option()
method of the write object. This method works with key value pairs. Inside this method, you provide the a key that identifies the option you want to set, and the value you want to give to this option.
For CSV files, an option that is very popular is the sep
option, that corresponds to the separator character of the CSV. This is a special character inside the CSV file that separates each column field.
As an example, if we wanted to build a CSV file which uses the semicolon (;
- which is the european standard for CSV files) as the separator character, instead of the comma (,
- which is the american standard for CSV files), we just need to set the sep
option to ;
, like this:
Each file format (or each write engine) have different options that are specific (or characteristic) to the file format itself. For example, JSON and CSV files are text file formats, and because of that, one key aspect to them is the encoding of the text that is being stored inside these files. So both write engines for these file formats (csv()
and json()
) have an option called encoding
that you can use to change the encoding being used to write the data into these files.
In the example below, we are asking Spark to write a CSV file using the Latin1 encoding (ISO-8859-1).
Is worth mentioning that the option()
method sets one option at a time. So if you need to set various write options, you just stack option()
calls on top of each other. In each call, you set a different option. Like in the example below where we are setting options sep
, encoding
and header
:
transf\
.write\
.mode("overwrite")\
.option("sep", ";")\
.option("encoding", "UTF-8")\
.option("header", True)\
.csv("transf_export")
If you want to see the full list of options for each write engine, the documentation of Spark have a table with the complete list of options available at each write engine2.
As I explained at Section 3.2, every DataFrame that exists in Spark is a distributed DataFrame, meaning that this DataFrame is divided into multiple pieces (that we call partitions), and these pieces are spread across the nodes in the Spark cluster.
In other words, each machine that is present in the Spark cluster, contains some partitions (or some pieces) of the total DataFrame. But why we are discussing partitions here? Is because the number of partitions of your DataFrame determines the number of files written by Spark when you export the data using the write
method.
On the previous examples across Section 9.2, when we exported the transf
DataFrame into CSV files, only one single CSV file was generated inside the transf_exported
folder. That is because the transf
DataFrame have only one single partition, as the code below demonstrates:
1
That means that all the data from transf
DataFrame is concentrated into a single partition. Having that in mind, we could say that Spark decided in this specific case to not actually distribute the data of transf
. Because all of its data is concentrated into one single place.
But what would happen if the transf
DataFrame was splitted across 5 different partitions? What would happen then? In that case, if the transf
DataFrame had 5 different partitions, and I ran the command transf.write.csv("transf_export")
to export its data into CSV files, then, 5 different CSV files would be written by Spark inside the folder transf_export
. One CSV file for each existing partition of the DataFrame.
The same goes for any other file format, or any write engine that you might use in Spark. Each file generated by the write process contains the data from a specific partition of the DataFrame.
Spark will always try to organize your DataFrame into a partition distribution that yields the best performance in any data processing. Usually in production environments, we have huge amounts of data, and a single partition distribution is rarely the case that yields the best performance in these environments.
That is why most existing Spark DataFrames in production environments are splitted into multiple partitions across the Spark cluster. This means that Spark DataFrames that are by default concentrated into one single partition (like the transf
DataFrame in the examples of this book) are very, very rare to find in the production environments.
As a consequence, if you really need to export your data into a single static file in a production environment, you will likely need to:
pandas
, or polars
, or the tidyverse
.The option 2 above is a little out of the scope of this book, so I will not explain it further here. But if you really need to export all the data from your Spark DataFrame into a single static file (whatever is the file format you choose), and you choose to follow option 1, then, you need to perform a repartition operation to concentrate all data from your Spark DataFrame into a single partition.
Is worth mentioning that I strongly advise against this option 1. Because option 1 may cause some serious bottlenecks in your data pipeline, depending specially on the size of the DataFrame you are trying to export.
In more details, when you do not perform any repartition operation, that is, when you just write your DataFrame as is, without touching in the existing partitions, then, the write process is a narrow transformation, as I explained at Section 5.3. Because each partition is exported into a single and separate file that is independent from the others.
This is really important, because narrow transformations are much more predictable and are more easily scaled than wide transformations. As a result, Spark tends to scale and perform better when dealing with narrow transformations.
However, when you do perform a repartition operation to concentrate all the data into a single partition, then, three things happen:
the write process becomes a wide transformation, because all partitions needs to be merged together, and as a consequence, all nodes in the cluster needs to send their data to a single place (which is usually the driver node of the cluster).
a high amount of partition shuffles can happen inside the cluster, and if they do happen, then, depending on the amount of data that needs to be “shuffled” accross the cluster, this may cause some serious slowdown in the processing.
depending on the size of all partitions merged together, the risks for an “out of memory” error to be raised during the process scales rapidly.
So you should be aware of these risks above, and always try to avoid using the option 1. Actually, you should avoid as much as possible the need to write all the data into a single static file! Is best for you to just write the data using the default number of partitions that Spark choose for your DataFrame.
But anyway, if you really cannot avoid this need, and if you have, for example, a sales
DataFrame you want to export, and this DataFrame contains 4 partitions:
4
And you want to perform a repartition operation over this DataFrame to export its data into a single static file, you can do so by using the coalesce()
DataFrame method. Just provide the number 1 to this method, and all of the partitions will be reorganized into a single partition:
1
Having that in mind, the entire source code to export the DataFrame into a single static file would be something like this:
In case you don’t know about this, Spark offers an API that you can use to quickly convert your Spark DataFrames into a pandas
DataFrame. This might be extremely useful for a number of reasons:
pandas
, and work more productively with it than pyspark
.pandas
extensively.pandas
you can easily export this data into Excel files (.xlsx
)3, which are not easily available in Spark.To convert an existing Spark DataFrame into a pandas
DataFrame, all you need to do is to call the toPandas()
method of your Spark DataFrame, and you will get a pandas
DataFrame as output, like in the example below:
But you should be careful with this method, because when you transform your Spark DataFrame into a pandas
DataFrame you eliminate the distributed aspect of it. As a result, all the data from your DataFrame needs to be loaded into a single place (which is usually the driver’s memory).
Because of that, using this toPandas()
method might cause very similar issues as the ones discussed at Section 9.3. In other words, you might face the same slowdowns caused by doing a repartition to concentrate all the data into a single partition.
So, as the Spark documentation itself suggests, you should use this toPandas()
method only if you know that your DataFrame is small enough to fit into the driver’s memory.
collect()
method as a way to export dataThe collect()
DataFrame method exports the DataFrame’s data from Spark into a Python native object, more specifically, into a normal Python list. To some extent, this is a viable way to export data from Spark.
Because by making this data from Spark available as a normal/standard Python object, many new possibilities become open for us. Such as:
request
Python package.email
built-in Python package.paramiko
Python package.boto3
Python package).By having the DataFrame’s data easily available to Python as a Python list, we can do virtually anything with this data. We can use this data in basically anything that Python is capable of doing.
Just as a simple example, if I needed to send the transf
data to an fictitious endpoint using a POST
HTTP request, the source code would probably be something similar to this:
https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option.↩︎
Actually, there is a Spark plugin available that is capable of exporting data from Spark directly into Excel files. But you need to install this plugin separately, since it does not come with Spark from the factory: https://github.com/crealytics/spark-excel.↩︎