You need to provide a mapping to spark_map(). This mapping defines which columns spark_map() should apply the function given in the function argument. You can build this mapping using one of the mapping functions, which are as follows:
at_position(): maps the columns that are in certain positions (1st column, 2nd column, 3rd column, etc.);
starts_with(): maps columns whose name starts with a specific string;
ends_with(): maps columns whose name ends with a specific string;
matches(): maps columns whose name matches a regular expression;
are_of_type(): maps columns that belong to a specific data type (string, integer, double, etc.);
all_of(): maps all columns that are included within a specific list;
As a first example, you can use the at_position() function whenever you want to select columns by position. So if you want to select the first, second, third and fourth column, you give the respective indices of those columns to the at_position() function.
On the other hand, you may need to use another method to map the columns you are interested in. For example, the students DataFrame has 4 Score columns (Score1, Score2, Score3 and Score4), and we have two obvious ways to map all these columns. One way is using the starts_with() function, and another is using the matches() function. Both options below bring the same results.
Basically, the mapping is just a small description containing the algorithm that should be used to find the columns and the value that will be passed on to this algorithm. As an example, the result of the expression at_position(3, 4, 5) is a small dict, containing two elements (fun and val). The fun element defines the method/algorithm to be used to find the columns, and the val element stores the value that will be passed to this method/algorithm.
from spark_map.mapping import at_positionat_position(3, 4, 5)
{'fun': 'at_position', 'val': [2, 3, 4]}
As another example, the result of the expression matches('^Score') is quite similar. However, unlike the previous example that uses a method called at_position, this time, the method/algorithm to be used is called matches, and '^Score' is the value that will be passed to this method.
matches('^Score')
{'fun': 'matches', 'val': '^Score'}
Therefore, when you use one of the standard mapping functions that comes with spark_map package, a dict will be generated containing the name of the method/algorithm to be used to calculate the mapping, and, the input value that will be passed to this method. Upon receiving this dict, spark_map() or spark_across() will automatically search for the method/algorithm to be used inside the class Mapping, and it will execute this method. This means that all standard mapping methods/algorithms from the spark_map package are stored inside this Mapping class.
Creating your own mapping method
Despite being quite useful, you might want to implement your own mapping algorithm, and you can easily do that. Just give to spark_map() or spak_across a dict similar to the dict’s produced by one of the standard mapping functions. That is, a dict containing an item named fun and another item named val.
The val item keeps getting the value/object you want to pass to your mapping algorithm/method. However, this time, the fun item must contain a function, not a method/algorithm name inside a string. That is, whenever spark_map() or spark_across() receives a string in this fun item, these functions assume that you are trying to use one of the standard mapping methods, and therefore, they start a search for this method inside the Mapping class.
On the other hand, if these functions receive a function in the fun item, then spark_map() or spark_across() will directly execute that function that is in the fun item, inserting the value you provided in the val item in the first argument of this function.
However, this process has some important conditions. Every mapping function must always take three mandatory arguments: 1) value: an arbitrary value for the algorithm1 (which corresponds to to the value given in the val item); 2) cols: the Spark DataFrame column names as a list of strings2; 3) schema: the schema (or schema) of the Spark DataFrame3. Your mapping function should always have these three arguments, even if it doesn’t use all of them.
Therefore, the value argument of every mapping function will always receive as input the value you passed to the val item of the initial dict. Now, the other two arguments, cols and schema, they will be automatically filled by spark_map() or spark_across(). In other words, spark_map() or spark_across() will automatically collect the values of these arguments for you.
As an example, the alphabetic_order() function below maps the column that is in a specific position in the alphabetical order. This alphabetic_order() function only uses the index and cols arguments, even though it receives three arguments. In other words, the schema argument is just there to satisfy the conditions of spark_map() and spark_across().
One other requirement that we haven’t covered yet is the return value of your mapping function. Your mapping function must always return a list of strings, which contain the names of the columns that were mapped by the function. If your mapping function does not find any columns during your search, it should return an empty list (i.e. []).
As a first demonstration, let’s apply this alphabetic_order() function in the DataFrame students as an example. The column list of this DataFrame is shown below:
On the other hand, spark_map() will also raise a KeyError, in case the function you are using in your mapping doesn’t find any columns in your DataFrame. However, in this case, spark_map() sends a clear message that no columns were found using the mapping you defined. As an example, we could reproduce this error, when trying to map in the DataFrame students, all the columns that start with the string'april'. A KeyError is raised in this case because there is no column in the students table whose name starts with the word “april”.
spark_map(students, starts_with('april'), sum)
KeyError: '`spark_map()` did not found any column that matches your mapping!'
Footnotes
It’s up to you to define the type of this input value, how it should be formatted, etc.↩︎
This is a list of strings, or more precisely, the result of the columns method of an object of class pyspark.sql.dataframe.DataFrame.↩︎
This is an object of class StructType, or basically the result of the schema method of an object of class pyspark.sql.dataframe.DataFrame.↩︎