Apache Spark : l’importance du broadcast
- tags
- #Broadcast #HDFS
- categories
- Apache Spark BigData
- published
- reading time
- 3 minutes
Apache Spark est un moteur de calcul distribué visant à remplacer et fournir des APIs de plus haut niveau pour résoudre simplement des problèmes où Hadoop montre ses limitations et sa complexité.
Ce billet fait partie d'une série de billet sur Apache Spark permettant d'approfondir certaines notions du système du développement, à l'optimisation jusqu'au déploiement.
Un des avantage principaux de Spark est sa capacité à être bien intégré à l'éco-système Scala/Java ou Python. C'est d'autant plus vrai en Scala car les méthodes principales attachées aux contextes Spark sont de la même forme qu'en Scala avec quelques améliorations (et le contexte distribué en plus) ex: map, flatMap, filter...
Cet avantage vient avec l'inconvénient qu'il est important de savoir quels objets/instances manipulées et dans quel contexte Spark ou Scala ces objets vont être utilisés. Si vous en doutez, voilà un petit exemple permettant de bien l'illustrer :
[code language=”scala”]
val multiplier = 50
val data = sc.parallelize(1 to 10000)
val result = data
.map( _ * multiplier)
.filter( _ > 1000 )
.collect()
.map( _ / 2 )
.filter( _ < (20 * multiplier) )
[/code]
Si on étudie cet exemple volontairement simpliste, les deux premières opérations map et filter s'appliquent sur un RDD[Int] géré par Spark et vont donc s'exécuter dans un contexte parallélisé, ce n'est plus le cas dès l'appel à collect() qui va ramener la totalité des données traitées par les workers vers la mémoire du driver Spark. Ainsi les deux autres appels à map et filter vont s'appliquer sur une List[Int] et donc font partie de la Standard Library de Scala.
Cet exemple a deux propriétés importantes, il permet de voir la confusion possible entre les appels Scala et Spark, mais surtout il permet de voir, avec le coefficient multiplier utilisé, qu'il est assez simple d'utiliser des valeurs Scala dans une closure envoyée à Spark.
La sérialisation des closures Scala vers les workers Spark méritera un article à lui tout seul et donc n'est pas l'objet de cet article, mais pour bien comprendre le problème qui nous intéresse il suffit de savoir qu'à chaque instance de la closure lancée par un worker contiendra une copie de la valeur utilisée. Ainsi si cette valeur correspond à une donnée un peu volumineuse cela devient rapidement inefficace et surtout dangereux pour l'utilisation mémoire de vos workers.
Heureusement Spark vient avec deux notions de variables partagées les Accumulateurs et variables Broadcastées, maintenant comme vous aurez deviné c'est cette dernière notion qui vient à notre secours.
En effet au lieu d'avoir autant de copie des valeurs dans les closures que nous avons d'appels dans les workers par celle-ci, il est possible d'utiliser la fonction de broadcast() pour partager en lecture-seule cette valeur et ainsi n'avoir qu'une copie par noeud géré par le système.
Cette fonction n'est en revanche intéressante que pour partagé de grosses sources de données à travers les workers, par pour notre pauvre petit Int de multiplier dans l'exemple précédent, voilà comment l'utiliser :
[code language=”scala”]
val largeKeyValuePair: Map[String, String] = ….
// broadcast this variable for workers to use it efficiently
val bdLarge = sc.broadcast(largeKeyValuePair)
val data = sc.parallelize(1 to 10000)
val result = data
.map( item => (item, bdLarge.value.get(item.toString) )
…
[/code]
Pour résumé, le broadcast sert à n'envoyer qu'une seule fois une valeur assez large pour en valoir la peine. Maintenant votre question doit être “grosse comment” ?
L'Université de Berkeley (CA) a étudié la question dans la publication suivante sur les performances des différents algorithmes de broadcasting entre les noeuds et pour vous la faire courte, le mécanisme standard de broadcasting de Spark Centralized HDFS Broadcast (CHB pour les intimes) donne ce genre de performance selon la taille des payloads :
Si vous voulez en savoir plus, j'organise avec Lateral Thoughts et Hopwork des formations Spark régulières, l'agenda est disponible ici : http://www.lateral-thoughts.com/training.