การเปลี่ยนแปลงสถานะสะสมในการสตรีม Apache Spark



โพสต์บล็อกนี้กล่าวถึงการเปลี่ยนแปลงที่เป็นรัฐใน Spark Streaming เรียนรู้ทั้งหมดเกี่ยวกับการติดตามสะสมและการอัพสกิลสำหรับอาชีพ Hadoop Spark

สนับสนุนโดย Prithviraj Bose

ในบล็อกก่อนหน้าของฉันฉันได้พูดถึงการแปลงสภาพโดยใช้แนวคิดหน้าต่างของ Apache Spark Streaming คุณสามารถอ่านได้ ที่นี่ .





ในโพสต์นี้ฉันจะพูดถึงการดำเนินการที่มีสถานะสะสมใน Apache Spark Streaming หากคุณยังใหม่กับ Spark Streaming ฉันขอแนะนำให้คุณอ่านบล็อกก่อนหน้านี้ของฉันเพื่อทำความเข้าใจวิธีการทำงานของหน้าต่าง

ประเภทของการเปลี่ยนแปลงสถานะใน Spark Streaming (ต่อ…)

> การติดตามแบบสะสม

เราได้ใช้ไฟล์ ReduceByKeyAndWindow (…) API เพื่อติดตามสถานะของคีย์อย่างไรก็ตามการกำหนดหน้าต่างทำให้เกิดข้อ จำกัด สำหรับการใช้งานบางกรณี จะเป็นอย่างไรหากเราต้องการสะสมสถานะของคีย์ตลอดเวลาแทนที่จะ จำกัด ไว้ที่หน้าต่างเวลา? ในกรณีนี้เราจำเป็นต้องใช้ updateStateByKey (…) ไฟ.



API นี้เปิดตัวใน Spark 1.3.0 และได้รับความนิยมอย่างมาก อย่างไรก็ตาม API นี้มีค่าใช้จ่ายด้านประสิทธิภาพการทำงานบางส่วนประสิทธิภาพจะลดลงเมื่อขนาดของรัฐเพิ่มขึ้นเมื่อเวลาผ่านไป ฉันได้เขียนตัวอย่างเพื่อแสดงการใช้งาน API นี้ คุณสามารถค้นหารหัส ที่นี่ .

Spark 1.6.0 เปิดตัว API ใหม่ mapWithState (…) ซึ่งแก้ปัญหาค่าใช้จ่ายด้านประสิทธิภาพที่เกิดจาก updateStateByKey (…) . ในบล็อกนี้ฉันจะพูดคุยเกี่ยวกับ API นี้โดยใช้โปรแกรมตัวอย่างที่ฉันเขียนขึ้น คุณสามารถค้นหารหัส ที่นี่ .

ก่อนที่ฉันจะดำดิ่งสู่การเดินผ่านรหัสเรามาอ่านคำศัพท์เกี่ยวกับการตรวจสอบ สำหรับการเปลี่ยนแปลงสถานะใด ๆ การตรวจสอบเป็นสิ่งจำเป็น Checkpointing เป็นกลไกในการกู้คืนสถานะของคีย์ในกรณีที่โปรแกรมไดรเวอร์ล้มเหลว เมื่อไดรเวอร์รีสตาร์ทสถานะของคีย์จะถูกเรียกคืนจากไฟล์การตรวจสอบ ตำแหน่งของจุดตรวจมักจะเป็น HDFS หรือ Amazon S3 หรือที่เก็บข้อมูลที่เชื่อถือได้ ในขณะทดสอบโค้ดเราสามารถจัดเก็บในระบบไฟล์ภายในเครื่องได้



วิธีค้นหา palindrome ใน java

ในโปรแกรมตัวอย่างเราฟังสตรีมข้อความซ็อกเก็ตบนโฮสต์ = localhost และพอร์ต = 9999 มันจะโทเค็นสตรีมขาเข้าเป็น (คำจำนวนครั้งที่เกิดขึ้น) และติดตามจำนวนคำโดยใช้ 1.6.0 API mapWithState (…) . นอกจากนี้คีย์ที่ไม่มีการอัปเดตจะถูกลบออกโดยใช้ StateSpec.timeout API เรากำลังตรวจสอบใน HDFS และความถี่ในการตรวจสอบคือทุกๆ 20 วินาที

ก่อนอื่นมาสร้างเซสชัน Spark Streaming

Spark-streaming-session

เราสร้างไฟล์ ด่านตรวจ ใน HDFS แล้วเรียกใช้ object method getOrCreate (…) . getOrCreate API ตรวจสอบไฟล์ ด่านตรวจ เพื่อดูว่ามีสถานะก่อนหน้านี้ที่ต้องกู้คืนหรือไม่หากมีอยู่ระบบจะสร้างเซสชัน Spark Streaming ขึ้นใหม่และอัปเดตสถานะของคีย์จากข้อมูลที่เก็บไว้ในไฟล์ก่อนที่จะดำเนินการต่อด้วยข้อมูลใหม่ มิฉะนั้นจะสร้างเซสชัน Spark Streaming ใหม่

getOrCreate ใช้ชื่อไดเร็กทอรีจุดตรวจและฟังก์ชัน (ซึ่งเราตั้งชื่อ createFunc ) ลายเซ็นของใครควรเป็น () => StreamingContext .

มาตรวจสอบโค้ดด้านในกัน createFunc .

บรรทัด # 2: เราสร้างบริบทการสตรีมด้วยชื่องานเป็น“ TestMapWithStateJob” และช่วงแบทช์ = 5 วินาที

บรรทัดที่ 5: ตั้งค่าไดเรกทอรีจุดตรวจสอบ

บรรทัด # 8: ตั้งค่าข้อกำหนดสถานะโดยใช้คลาส org.apache.streaming.StateSpec วัตถุ. ก่อนอื่นเราตั้งค่าฟังก์ชันที่จะติดตามสถานะจากนั้นกำหนดจำนวนพาร์ติชันสำหรับ DStream ที่เป็นผลลัพธ์ที่จะสร้างขึ้นในระหว่างการแปลงในภายหลัง ในที่สุดเราก็ตั้งค่าการหมดเวลา (เป็น 30 วินาที) ซึ่งหากไม่ได้รับการอัปเดตสำหรับคีย์ภายใน 30 วินาทีสถานะคีย์จะถูกลบออก

บรรทัดที่ 12 #: ตั้งค่าสตรีมซ็อกเก็ตแบนข้อมูลแบตช์ที่เข้ามาสร้างคู่คีย์ - ค่าโทร mapWithState ตั้งค่าช่วงการตรวจสอบเป็น 20 วินาทีและพิมพ์ผลลัพธ์ในที่สุด

กรอบ Spark เรียก th e createFunc สำหรับทุกคีย์ที่มีค่าก่อนหน้าและสถานะปัจจุบัน เราคำนวณผลรวมและอัปเดตสถานะด้วยผลรวมสะสมและในที่สุดเราก็ส่งคืนผลรวมสำหรับคีย์

แหล่ง Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

มีคำถามสำหรับเรา? โปรดระบุไว้ในส่วนความคิดเห็นแล้วเราจะติดต่อกลับไป

กระทู้ที่เกี่ยวข้อง:

เริ่มต้นกับ Apache Spark & ​​Scala

การแปลงสภาพด้วย Windowing ใน Spark Streaming