Développer un Transformer Spark en Scala et l’appeler depuis Python

Développer un Transformer Spark en Scala et l'appeler depuis Python

Les Transformers sont des incontournables de l’étape de « feature engineering ». Pour des raisons d’interopérabilité ou de performance, il est parfois nécessaire de les développer en Scala pour les utiliser en Python. Cet article présente une façon de procéder.

Python reste le langage de référence pour les projets de Data Science. Mais il est assez fréquent dans de tels projets de devoir intégrer du code existant Scala / Java ou de réécrire une partie pour des objectifs de performance. Dans les deux cas, le code Scala / Java doit pouvoir être facilement intégré en Python.

Spark fournit un premier niveau d’intégration grâce aux UDF (User Defined Function) au travers de la fonction « registerJavaFunction » de la session Spark.

Mais c’est une autre voie que nous allons explorer, celle des Transformers, éléments de base dans l’étape de « feature engineering » : comme ceux fournis par Spark, nous allons développer un Transformer en Scala qui sera également utilisable en Python.

Qu’est-ce qu’un Transformer ?

Les Transformers font partie de l’API de Machine Learning de Spark à l’instar des éléments suivants :

  • Estimator : représente, pour la plupart, les algorithmes de Machine Learning ;
  • Pipeline : représente un enchaînement de Transformers et d’Estimators.

Leur rôle est de transformer un DataFrame en un autre DataFrame. Cette transformation résulte le plus souvent en l’ajout de nouvelles colonnes.

Aussi, ils incluent un système de paramétrage spécifique qui leur permet de supporter « l’hyperparameter tuning ». L’hyperparameter tuning consiste à trouver le jeu de paramètres des Transformers et Estimators qui donne le meilleur modèle.

En Scala

Pour notre exemple, nous allons développer le CaseTransformer qui met les chaînes de caractères en majuscule ou en minuscule en fonction d’un paramètre.

Déclarations

Cela commence par une série de déclarations :
package blog

import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param.BooleanParam
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{DataType, StringType}
On retrouve ici la déclaration du package et les imports nécessaires pour l’implémentation :
  • UnaryTransformer : classe de base de Spark qui transforme une colonne en une autre colonne ;
  • BooleanParam : type du paramètre qui sera utilisé par le CaseTransformer ;
  • StringType : type de la colonne d’entrée et de sortie du CaseTransformer.

Fonction de transformation

Ensuite, la transformation se fait dans une fonction isolée du contexte Spark :
object CaseTransformer {

  def transform(upper: Boolean)(value: String): String = {
    if (upper) {
      value.toUpperCase
    } else {
      value.toLowerCase
    }
  }

}
La fonction « transform » ci-dessus prend deux paramètres :
  • « upper » : indique si la valeur doit être changée en majuscule ou en minuscule ;
  • « value » : la valeur à changer.
Isoler une fonction du contexte Spark présente plusieurs avantages. D’une part, il est plus facile d’écrire des tests unitaires sur cette fonction et d’autre part, la fonction peut être utilisée dans d’autre contexte que Spark.

Classe Transformer

Enfin, on retrouve la classe « CaseTransformer » attendue par Spark :
class CaseTransformer(override val uid: String) extends UnaryTransformer[String, String, CaseTransformer] {

  def this() = this(Identifiable.randomUID(getClass().getSimpleName))

  val upper = new BooleanParam(this, "upper", "Whether to change to upper case")
  setDefault(upper, false)

  def getUpper = $(upper)

  def setUpper(value: Boolean): this.type = set(upper, value)


  override protected def createTransformFunc: String => String = CaseTransformer.transform($(upper))

  override protected def outputDataType: DataType = StringType
}
Le code de la classe est essentiellement technique et répond aux contraintes imposées par Spark. On peut citer :
  • Les deux constructeurs (« CaseTransformer(…) » et « this() ») qui permettent de créer une instance de la classe ;
  • La déclaration du paramètre « upper » de type « BooleanParam » accompagnée des fonctions getter / setter (« getUpper » et « setUpper ») :
  • La fonction « createTransformFunc » qui renvoie la fonction de transformation. On peut voir ici l’appel à la fonction « transform » vue précédemment ;
  • La fonction « outputDataType » qui fournit le type de valeur retournée par la fonction, en l’occurrence, un « StringType ».

Tests unitaires sans Spark

Pour les tests unitaires, la librairie « scalatest » est utilisée. Le premier jeu de tests consiste à vérifier le fonctionnement de la transformation sans Spark :
test("upper") {
  CaseTransformer.transform(false)("ToLower") shouldBe "tolower"
  CaseTransformer.transform(true)("ToUpper") shouldBe "TOUPPER"
}
Ici, les deux cas de figure sont testés : lorsque « upper » est vrai et lorsqu’il est faux. La fonction « shouldBe » indique la valeur attendue.

Tests unitaires avec Spark

Les tests unitaires avec Spark se font en plusieurs étapes. Il faut d’abord créer une instance locale de Spark :
  val spark = SparkSession.builder()
    .appName("test")
    .master("local[*]")
    .getOrCreate()
Ensuite, un DataFrame est créé avec des données de tests. Le DataFrame créé contient deux colonnes :
  • Une colonne pour les données à transformer ;
  • Une colonne pour les résultats attendus :
import spark.implicits._

val df = Seq(
  ("ToUpper", "TOUPPER")
).toDF("col1", "expected")
Puis, une instance de CaseTransformer est mise en place en lui affectant les paramètres voulus ; en l’occurrence, la colonne d’entrée « col1 », la colonne de sortie «  col1_upper » et la valeur de « upper » :
val upper = new CaseTransformer()
  .setInputCol("col1")
  .setOutputCol("col1_upper")
  .setUpper(true)
La transformation est appliquée sur le DataFrame créé plus haut et le résultat s’affiche :
val res = upper.transform(df)

res.show()
Enfin, vient la phase de vérification du bon fonctionnement de la transformation. Elle consiste à compter le nombre de valeurs de sortie qui ne sont pas égales aux valeurs attendues. Le nombre doit être égal à 0 :
res.filter(F.col("col1_upper") =!= F.col("expected")).count() shouldBe 0

En Python

Cette section va décrire le pendant Python du CaseTransformer développé en Scala. Le principe est assez simple. Il consiste à exposer les paramètres spécifiques au CaseTransformer et à définir un constructeur qui va instancier le CaseTransformer Scala.

À l’exécution, lorsqu’un CaseTransformer Python est créé, un CaseTransformer Scala l’est automatiquement également. Lorsqu’une opération est appliquée sur l’instance Python, elle est répercutée sur l’instance Scala.

Déclarations

En Python, comme en Scala, une série d’imports est nécessaire :
from pyspark import keyword_only
from pyspark.ml import Pipeline
from pyspark.ml.wrapper import JavaTransformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.param import Param, Params, TypeConverters
Un des imports essentiels est « JavaTransformer ». Il s’agit de la classe de base qui fait le lien entre le monde Python et le monde Scala.

Classe CaseTransformer

La classe CaseTransformer Python reprend les mêmes éléments que celle en Scala :
class CaseTransformer(JavaTransformer, HasInputCol, HasOutputCol):
    upper = Param(Params._dummy(), "upper", "Whether to change to upper case",
                  typeConverter=TypeConverters.toBoolean)

    @keyword_only
    def __init__(self, upper=True, inputCol=None, outputCol=None):
        super(CaseTransformer, self).__init__()
        self._java_obj = self._new_java_obj("blog.CaseTransformer", self.uid)

        self._setDefault(upper=True)

        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, upper=True, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setUpper(self, value):
        return self._set(upper=value)

    def getUpper(self):
        return self.getOrDefault(self.upper)
Les parties principales sont les suivantes :
  • La déclaration du paramètre « upper » : upper = Param(Params._dummy(), « upper », « Whether to change to upper case », typeConverter=TypeConverters.toBoolean) ;
  • Le constructeur « __init__ » dans lequel on retrouve l’instanciation du CaseTransformer Scala : self._java_obj = self._new_java_obj(« blog.CaseTransformer »,self.uid) ;
  • Les méthodes « getUpper » et « setUpper » permettant de récupérer et valoriser le paramètre « upper ».

Test

Pour le test, un DataFrame avec pyspark est d’abord créé :
df = spark.createDataFrame([
    ("a", ),
    ("A", )
], ["col1"])
Ensuite, un pipeline avec deux CaseTransformer est également créé ; le premier mettra en majuscule et le deuxième en minuscule :
upper = CaseTransformer().setInputCol("col1").setOutputCol("col1_upper").setUpper(True)
lower = CaseTransformer(inputCol="col1", outputCol="col1_lower", upper=False)

# construction du pipeline
pipeline = Pipeline(stages=[upper, lower]).fit(df)

Les paramètres d’un Transformer peuvent être valorisées de deux façons :
  • En chaînant les méthodes « set », comme pour la construction de la variable « upper » ;
  • En utilisant des paramètres nommés dans le constructeur, comme pour la construction de la variable « lower ».
Quand le pipeline est créé, il peut être appliqué sur le DataFrame d’exemple :
# application du pipeline
res = pipeline.transform(df)

res.show()

Le tableau suivant s’affiche :
+----+----------+----------+
|col1|col1_upper|col1_lower|
+----+----------+----------+
|   a|         A|         a|
|   A|         A|         a|
+----+----------+----------+

Cet article a montré la façon dont un Transformer Scala pouvait être appelé en Python. Assez simple, cet exemple à 80 % des cas. Cependant, pour des Transformers plus complexes, d’autres considérations sont à prendre en compte telles que l’échange de types structurés dans les colonnes d’entrée et de sortie ou dans les paramètres. Pour ces cas de figure, d’autres techniques sont à utiliser pour que l’interopérabilité entre Scala et Python puisse fonctionner correctement.