Transformation avec état cumulative dans Apache Spark Streaming



Cet article de blog traite des transformations avec état dans Spark Streaming. Apprenez tout sur le suivi cumulatif et l'amélioration des compétences pour une carrière Hadoop Spark.

Contribution de Prithviraj Bose

Dans mon blog précédent, j'ai discuté des transformations avec état en utilisant le concept de fenêtrage d'Apache Spark Streaming. Vous pouvez le lire Ici .





Dans cet article, je vais discuter des opérations avec état cumulées dans Apache Spark Streaming. Si vous êtes nouveau sur Spark Streaming, je vous recommande vivement de lire mon blog précédent afin de comprendre le fonctionnement du fenêtrage.

Types de transformation avec état dans Spark Streaming (suite…)

> Suivi cumulatif

Nous avions utilisé le reductionByKeyAndWindow (…) API pour suivre les états des clés, mais le fenêtrage pose des limites pour certains cas d'utilisation. Que faire si nous voulons accumuler les états des clés tout au long plutôt que de les limiter à une fenêtre de temps? Dans ce cas, nous aurions besoin d'utiliser updateStateByKey (…) FEU.



Cette API a été introduite dans Spark 1.3.0 et a été très populaire. Cependant, cette API a une certaine surcharge de performances, ses performances se dégradent à mesure que la taille des états augmente avec le temps. J'ai écrit un exemple pour montrer l'utilisation de cette API. Vous pouvez trouver le code Ici .

Spark 1.6.0 a introduit une nouvelle API mapWithState (…) qui résout les frais généraux de performance posés par updateStateByKey (…) . Dans ce blog, je vais discuter de cette API particulière en utilisant un exemple de programme que j'ai écrit. Vous pouvez trouver le code Ici .

qu'est-ce que charat en java

Avant de plonger dans un code pas à pas, épargnons quelques mots sur les points de contrôle. Pour toute transformation avec état, le point de contrôle est obligatoire. Le point de contrôle est un mécanisme permettant de restaurer l'état des clés en cas d'échec du programme pilote. Lorsque le pilote redémarre, l'état des clés est restauré à partir des fichiers de point de contrôle. Les emplacements de point de contrôle sont généralement HDFS ou Amazon S3 ou tout autre stockage fiable. Lors du test du code, on peut également stocker dans le système de fichiers local.



Dans l'exemple de programme, nous écoutons le flux de texte de socket sur host = localhost et port = 9999. Il convertit le flux entrant en (mots, nombre d'occurrences) et suit le nombre de mots à l'aide de l'API 1.6.0 mapWithState (…) . De plus, les clés sans mise à jour sont supprimées à l'aide de StateSpec.timeout API. Nous vérifions en HDFS et la fréquence des points de contrôle est toutes les 20 secondes.

Commençons par créer une session Spark Streaming,

Spark-streaming-session

Nous créons un checkpointDir dans le HDFS, puis appelez la méthode objet getOrCreate (…) . La getOrCreate L'API vérifie le checkpointDir pour voir s'il y a des états précédents à restaurer, si cela existe, il recrée la session Spark Streaming et met à jour les états des clés à partir des données stockées dans les fichiers avant de passer à de nouvelles données. Sinon, il crée une nouvelle session Spark Streaming.

La getOrCreate prend le nom du répertoire de point de contrôle et une fonction (que nous avons nommée createFunc ) dont la signature doit être () => StreamingContext .

Examinons le code à l'intérieur createFunc .

Ligne n ° 2: Nous créons un contexte de streaming avec le nom du travail à «TestMapWithStateJob» et l'intervalle de lots = 5 secondes.

Ligne n ° 5: définissez le répertoire des points de contrôle.

qu'est-ce qu'une sous-chaîne en java

Ligne # 8: Définissez la spécification d'état à l'aide de la classe org.apache.streaming.StateSpec objet. Nous définissons d'abord la fonction qui suivra l'état, puis nous définissons le nombre de partitions pour les DStreams résultants qui doivent être générés lors des transformations suivantes. Enfin, nous définissons le délai (à 30 secondes) où si aucune mise à jour pour une clé n'est reçue dans les 30 secondes, l'état de la clé sera supprimé.

Ligne 12 #: Configurez le flux de socket, aplatissez les données de lot entrantes, créez une paire clé-valeur, appelez mapWithState , définissez l'intervalle de point de contrôle sur 20 s et imprimez enfin les résultats.

Le framework Spark appelle th e createFunc pour chaque clé avec la valeur précédente et l'état actuel. Nous calculons la somme et mettons à jour l'état avec la somme cumulée et enfin nous retournons la somme pour la clé.

comment utiliser spyder python

Sources Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Vous avez une question pour nous? Veuillez le mentionner dans la section commentaires et nous vous recontacterons.

Articles Similaires:

Premiers pas avec Apache Spark et Scala

Transformations avec état avec fenêtrage dans Spark Streaming