Text mining : optimisation de Spacy avec Spark

TEXT MINING : OPTIMISATION DE SPACY AVEC SPARK

Le text mining nécessite de penser à une approche d’optimisation de temps de traitements surtout lorsque le dataset à étudier se compte en millions voire en milliards de phrases. Spacy, l’une des librairies les plus populaires du NLP (Natural Language Processing – 16,1K star sur Github), suffit-elle à traiter ce type de données ? De tels volumes de données ne nécessitent-ils pas également de travailler sur l’aspect technique de l’environnement ?
Cet article présentera les traitements réalisés en optimisant cette librairie et en la combinant avec Spark.

Spacy  

Parmi les nombreux sujets de Datascience, le text mining est un domaine à part entière. Il se différencie des projets d’images, de séries temporelles ou de scoring. Il nécessite des connaissances et des traitements particuliers.

Dans un projet de text mining une phase de nettoyage des phrases est nécessaire. Cette étape a pour objectif de supprimer les mots qui ne pèsent pas dans la compréhension de la phrase et qui n’apportent pas un sens à la phrase.

Dans la phrase « Les jours d’hiver sont froids, cette année. », un nettoyage aurait pour résultat « jour hiver froid année ». Cet exercice permet à l’algorithme de mieux catégoriser la phrase.

Pour réaliser les traitements spécifiques au texte, de nombreuses librairies existent. Une des plus populaires est Spacy. Simple d’utilisation et précise, elle permet, en quelques lignes, de retrouver les lemmes des mots, les stop-words tels que les articles, les déterminants ainsi que les relations et les sens des mots et/ou des phrases entre eux.

 

L’image ci-dessous illustre l’usage basique de l’API de Spacy : après le chargement du modèle, Spacy analyse le texte et retourne une liste de « tokens ». Chaque « token » correspond à un élément du texte. Les propriétés du « token » donnent des détails sur l’élément. Par exemple, la propriété « is_stop » indique si le mot est un « stop-word » ou non.

À noter que derrière l’appel de la fonction « nlp » se cache l’utilisation de plusieurs composants. On peut citer notamment :

  • Le « tokenizer » : découpe le texte en éléments distincts ;
  • Le « lemmatizer » : détermine le lemme de chaque mot ;
  • Le « tagger » : annote chaque élément avec son rôle dans le texte.

Le séquencement des composants s’appelle le « pipeline ».

Avec Spacy, le nettoyage de phrases se fait, par exemple, via la fonction suivante :

import spacy

# fonction qui nettoie le texte, elle garde le lemme des mots de plus de 2 # caractères qui ne sont pas des stop_words

def clean_txt(doc) :

    return u » « .join([x.lemma_ for x in doc if (len(x.lemma_)>2 and not x.is_stop)])

nlp = spacy.load(« fr_core_news_sm ») #charge le modèle français de Spacy

text = u »Les jours d’hiver sont froids, cette année. » #la phrase à nettoyer

clean = clean_txt(nlp(text))

print(clean)

#jour hiver froid année

Ces quelques lignes de code ont permis de nettoyer la phrase « Les jours d’hiver sont froids, cette année. » Le même traitement a été réalisé sur un volume de données plus conséquent soit 5 000 lignes.

 

Le poste de travail utilisé, dans le cadre de notre traitement « Les jours d’hiver sont froids, cette année. », présente les caractéristiques suivantes :

  • 26 Go de RAM
  • Intel(R) Xeon(R) CPU X5670  @ 2.93GHz

%%timeit

import spacy

def clean_txt(doc) :

    return u » « .join([x.lemma_ for x in doc if (len(x.lemma_)>2 and not x.is_stop)])

nlp = spacy.load(« fr_core_news_sm »)

text = u »Les jours d’hiver sont froids, cette année. »

#le traitement est exécuté  5 000 fois la même phrase

tab = [clean_txt(nlp(x)) for x in [text]*5000]

#1 loop, best of 3: 53.5 s per loop

Le traitement s’exécute en 53,5 secondes pour 5 000 lignes.
En extrapolant à un million de phrases, le traitement se réaliserait en 3 heures.

Pour un milliard de phrases, le décompte se fait en jour. Ici plus d’une centaine de jours, 125 exactement, seraient nécessaire.  

Cet ordre de grandeur ne permet pas l’utilisation de ce type de modèles sans une optimisation afin de faciliter sa mise en place et son déploiement.

« Batch » mode  

Le traitement précédent consiste à appeler  5 000 fois la fonction « nlp ». Le traitement se fait séquentiellement, ligne par ligne.

Pour améliorer le temps de traitement, Spacy propose la fonction « pipe ». Cette fonction permet de traiter des phrases par lot plutôt que séquentiellement. En interne, Spacy optimise l’utilisation des composants qui constituent le traitement. 

Depuis la version 2.2.2, sortie en octobre 2019, Spacy intègre le traitement en multi-process, permettant ainsi de bénéficier de toute la puissance de la machine.

%%timeit

import spacy

def clean_txt(doc) :

    return u » « .join([x.lemma_ for x in doc if (len(x.lemma_)>2 and not x.is_stop)])

nlp = spacy.load(« fr_core_news_sm »)

text = u »Les jours d’hiver sont froids, cette année »

#Le premier traitement appelait 5 000 fois la fonction « nlp », avec nlp.pipe le traitement se limite à un appel avec

# une liste de taille 5 000

tab = [clean_txt(x) for x in nlp.pipe([text]*5000) ]

#1 loop, best of 3: 13.9 s per loop

Cette première optimisation divise le temps de traitement par ~4 fois : 13,9 secondes pour 5 000 phrases, soit 2 780 secondes (46 minutes) pour un million.

Ce gain de temps reste néanmoins peu significatif s’il venait à être appliqué à un milliard de phrases. En effet, 32 jours de temps seraient nécessaires pour la réalisation du traitement.

Optimisation du pipeline  

La fonction « nlp » de Spacy fournit différentes informations ce qui, d’un point de vue du code, est pertinent.

Cette caractéristique impacte de manière significative les temps de traitements. Certains calculs réalisés peuvent s’avérer inutiles dans l’exécution de la fonction.

Une étape consiste à détecter les noms propres d’une phrase : le Name Entity Recognition, NER. Dans la phrase « Les jours d’hiver sont froids, cette année. », cette détection semble superflue : la phrase ne comporte aucun nom propre. Cette phase peut donc être supprimée dans notre traitement.

Cette suppression s’écrirait :

%%timeit

import spacy

def clean_txt(doc) :

    return u » « .join([x.lemma_ for x in doc if (len(x.lemma_)>2 and not x.is_stop)])

nlp = spacy.load(« fr_core_news_sm »)

text = u »Les jours d’hiver sont froids, cette année. »

#Le « ner » est désactivé via le paramètre disable

tab = [clean_txt(x) for x in nlp.pipe([text]*5000,disable=[« ner »]) ]

Le NER désactivé amène le temps de traitement à 36 minutes pour un million de phrases. Le milliard de phrases nécessitera, quant à lui, 25 jours en place des 32.

Ces optimisations représentent l’ensemble de celles pouvant être appliquées au code lui-même. L’enjeu représenté par un milliard de phrases relève, quant à lui, des défis relevés par les technologies du Big Data. Pour optimiser un tel traitement et atteindre des temps de traitements envisageables en production, une solution est de paralléliser et distribuer le traitement en utilisant des outils adaptés tels que Spark.

Distribution du traitement avec Spark

Plusieurs approches sont possibles pour traiter d’importants volumes de données. Parmi les plus populaires : Spark. Il s’agit d’un framework qui permet de manipuler des pétaoctets de données et de réaliser des modèles de Machine Learning en distribué. Spark peut également être utilisé en Python.

Spark permet de masquer la complexité des traitements distribués. Par exemple, la fonction « dataframe.count() » facilite le décompte du nombre de lignes d’un jeu de données. 

Derrière une syntaxe qui se veut simple, se cache un traitement bien plus complexe.  Réalisé sur plusieurs machines, il mobilise plusieurs cœurs et répartit le traitement sur l’ensemble des machines tout en optimisant les transferts de données.

Spark peut être utilisé dans d’autres thématiques comme par exemple le traitement des données géospatiales.

Comme l’illustre l’image ci-dessus, Spark se compose de trois éléments principaux :

  • Le driver : il communique les traitements à réaliser au cluster manager ;
  • Cluster manager : il se charge de répartir les traitements entre les différents workers ;
  • Les workers : ils réalisent le traitement à effectuer.

Spark ne dispose pas de fonction de nettoyage de texte intégrée nativement. Pour la créer, la mise en place d’une User Defined Function, UDF, est nécessaire. L’utilisateur définit sa propre fonction qui sera intégrée dans le traitement Spark.

Par défaut, Spark traite les lignes une par une ; la fonction nlp.pipe() ne peut être utilisée pour traiter un batch de données.

Les traitements suivants sont réalisés sur un cluster Spark avec les éléments suivants :

  • Le dataframe : 8 millions de phrases « Les jours d’hiver sont froids, cette année.» ;
  • 32 cœurs utilisés en parallèle.

%%time

import pyspark.sql.functions as F

import pyspark.sql.types as T

nlp = spacy.load(« fr_core_news_sm »)

# même fonction de nettoyage en version UDF, son type doit être précisé

@F.udf(T.StringType())

def clean_txt(text) :

    doc = nlp(text, disable=[« ner »])

    return u » « .join([x.lemma_ for x in doc if (len(x.lemma_)>2 and not x.is_stop)])

   

#Ajout d’une colonne au dataframe, celle-ci sera une colonne nettoyée

df_udf = df.withColumn(« clean »,clean_txt(« text »))

#Sauvegarde du dataframe

df_udf.write.parquet(« test_udf_classic.pqt »,mode= »overwrite »)

#Wall time: 35min 45s

36 minutes sont nécessaires pour traiter 8 millions de phrases. 3 jours suffiraient pour traiter 1 milliard de lignes.

Ces temps de traitements rendent envisageable le déploiement en production d’un projet de text mining. Les performances obtenues grâce à « nlp.pipe() » seraient d’autant plus intéressantes en étant appliquées à l’UDF.

Optimisation avec les fonctions ensemblistes de Spark

Depuis la version 2.3 de Spark, il existe un nouveau type d’UDF : les pandas_udf. Contrairement aux UDF classiques, qui traitent ligne par ligne ; les pandas_udf traitent plusieurs lignes à la fois. L’intérêt de cette fonctionnalité est de combiner Spark et la fonction « pipe » de Spacy que nous avons étudiée ci-dessus.

Spark va prendre en charge la création des lots et appellera notre fonction pour chaque lot. Pour 1 million de lignes, Spark appellera donc la pandas_udf 1000 fois sur 1000 lignes au lieu d’appeler 1 million de fois l’UDF sur un 1 million de lignes.

%%time

import pyspark.sql.functions as F

import pyspark.sql.types as T

import pandas as pd

#la même fonction de nettoyage mais version pandas_UDF, son type doit être précisée # les pandas udf retournent des objets pandas, le code nécessite une adaptation.

@F.pandas_udf(T.StringType())

def pd_udf_clean_spacy(x) :

    docs = nlp.pipe(x, disable=[« ner »])

    def clean_txt(doc) :

        return u » « .join([x.lemma_ for x in doc if (len(x.lemma_)>2 and not x.is_stop)])

    return pd.Series([clean_txt(doc) for doc in docs])

   

df_pd_udf = df.withColumn(« clean »,pd_udf_clean_spacy(« text »))

df_pd_udf.write.parquet(« test_udf_pandas.pqt »,mode= »overwrite »)

#Wall time: 8min 12s

Le temps de traitement est divisé par plus de 4. Le milliard de lignes serait traité en moins d’un jour soit 17 heures en place des 3 jours.

Une autre optimisation pourrait être envisagée en augmentant le nombre de cœurs mobilisés par scalabilité horizontale. Si plusieurs machines étaient rajoutées incrémentant notre capacité de 32 cœurs supplémentaires, le traitement serait réalisé en moins de 9 heures.

Le temps de traitement du texte dans un projet de text mining est un élément important de son succès. Un réel travail d’optimisation est nécessaire pour réduire celui-ci. L’optimisation des librairies se révèle peu suffisante si les moyens techniques ne le sont pas également. La garantie de temps de calculs optimums passe par la combinaison des deux. Spark semble être une réponse adéquate dans ce type de thématique.