RDD โดยใช้ Spark: Building Block ของ Apache Spark



บล็อกเกี่ยวกับ RDD โดยใช้ Spark นี้จะให้ความรู้โดยละเอียดและครอบคลุมเกี่ยวกับ RDD ซึ่งเป็นหน่วยพื้นฐานของ Spark & ​​ว่ามีประโยชน์อย่างไร

, คำนี้เพียงพอที่จะจุดประกายในความคิดของวิศวกร Hadoop ทุกคน ถึง n ในหน่วยความจำ เครื่องมือประมวลผล ซึ่งรวดเร็วทันใจในการประมวลผลแบบคลัสเตอร์ เมื่อเทียบกับ MapReduce การแบ่งปันข้อมูลในหน่วยความจำทำให้ RDDs 10-100x เร็วขึ้น มากกว่าการแชร์เครือข่ายและดิสก์และทั้งหมดนี้เป็นไปได้เนื่องจาก RDD (ชุดข้อมูลการกระจายแบบยืดหยุ่น) ประเด็นสำคัญที่เรามุ่งเน้นในวันนี้ใน RDD โดยใช้บทความ Spark คือ:

ต้องการ RDD หรือไม่

ทำไมเราถึงต้องการ RDD -RDD โดยใช้ Spark





โลกกำลังพัฒนาไปพร้อมกับ และ วิทยาศาสตร์ข้อมูล เพราะความก้าวหน้าใน . อัลกอริทึม ขึ้นอยู่กับ การถดถอย , , และ ซึ่งทำงานบน กระจาย การคำนวณซ้ำ ation แฟชั่นที่รวมถึงการใช้ซ้ำและการแบ่งปันข้อมูลระหว่างหน่วยคอมพิวเตอร์หลาย ๆ

แบบดั้งเดิม เทคนิคที่จำเป็นต้องมีหน่วยเก็บข้อมูลระดับกลางและแบบกระจายที่เสถียรเช่น HDFS ประกอบด้วยการคำนวณซ้ำ ๆ กับการจำลองข้อมูลและการจัดลำดับข้อมูลซึ่งทำให้กระบวนการช้าลงมาก การหาทางออกไม่ใช่เรื่องง่าย



นี่คือที่ RDD (Resilient Distributed Datasets) มาสู่ภาพใหญ่

RDD s ใช้งานง่ายและสร้างได้ไม่ยากเนื่องจากข้อมูลถูกนำเข้าจากแหล่งข้อมูลและทิ้งลงใน RDD นอกจากนี้การดำเนินการจะถูกนำไปใช้ในการประมวลผล พวกเขาคือ ชุดหน่วยความจำแบบกระจาย ด้วยสิทธิ์เป็น อ่านเท่านั้น และที่สำคัญที่สุดคือ ทนต่อความผิดพลาด .



ถ้ามี พาร์ติชันข้อมูล ของ RDD คือ สูญหาย สามารถสร้างใหม่ได้โดยใช้สิ่งเดียวกัน การเปลี่ยนแปลง การดำเนินการกับพาร์ติชันที่หายไปใน เชื้อสาย แทนที่จะประมวลผลข้อมูลทั้งหมดตั้งแต่เริ่มต้น วิธีการแบบนี้ในสถานการณ์แบบเรียลไทม์สามารถทำให้ปาฏิหาริย์เกิดขึ้นในสถานการณ์ข้อมูลสูญหายหรือเมื่อระบบล่ม

RDD คืออะไร?

RDD หรือ ( ชุดข้อมูลการกระจายที่ยืดหยุ่น ) เป็นพื้นฐาน โครงสร้างข้อมูล ใน Spark ระยะ ยืดหยุ่น กำหนดความสามารถที่สร้างข้อมูลโดยอัตโนมัติหรือข้อมูล ย้อนกลับ ไปที่ สถานะเดิม เมื่อเกิดภัยพิบัติที่ไม่คาดคิดและมีความเป็นไปได้ที่ข้อมูลจะสูญหาย

ข้อมูลที่เขียนลงใน RDD คือ แบ่งพาร์ติชัน และเก็บไว้ใน โหนดปฏิบัติการหลายโหนด . หากโหนดกำลังดำเนินการ ล้มเหลว ในเวลาทำงานจากนั้นจะได้รับการสำรองข้อมูลทันทีจากไฟล์ โหนดปฏิบัติการถัดไป . นี่คือเหตุผลที่ RDDs ถือเป็นโครงสร้างข้อมูลขั้นสูงเมื่อเปรียบเทียบกับโครงสร้างข้อมูลแบบดั้งเดิมอื่น ๆ RDD สามารถจัดเก็บข้อมูลที่มีโครงสร้างไม่มีโครงสร้างและกึ่งโครงสร้าง

เราจะก้าวไปข้างหน้าด้วย RDD ของเราโดยใช้บล็อก Spark และเรียนรู้เกี่ยวกับคุณลักษณะเฉพาะของ RDD ซึ่งทำให้ได้เปรียบเหนือโครงสร้างข้อมูลประเภทอื่น ๆ

คุณสมบัติของ RDD

  • ในความทรงจำ (แกะ) การคำนวณ : แนวคิดของการคำนวณในหน่วยความจำจะนำการประมวลผลข้อมูลไปสู่ขั้นตอนที่รวดเร็วและมีประสิทธิภาพโดยรวม ประสิทธิภาพ ของระบบคือ อัพเกรดแล้ว
  • การประเมินผลของเขา : คำว่า Lazy evaluation กล่าวว่า การเปลี่ยนแปลง ถูกนำไปใช้กับข้อมูลใน RDD แต่ไม่ได้สร้างเอาต์พุต แต่การแปลงที่ใช้คือ เข้าสู่ระบบ
  • วิริยะ : RDD ที่เป็นผลลัพธ์อยู่เสมอ ใช้ซ้ำได้
  • การดำเนินการแบบหยาบ : ผู้ใช้สามารถใช้การแปลงกับองค์ประกอบทั้งหมดในชุดข้อมูลผ่าน แผนที่, กรอง หรือ จัดกลุ่มตาม การดำเนินงาน
  • ความทนทานต่อความผิดพลาด : หากข้อมูลสูญหายระบบสามารถทำได้ ย้อนกลับ ไปยัง สถานะเดิม โดยใช้ไฟล์ การเปลี่ยนแปลง .
  • ไม่เปลี่ยนรูป : ข้อมูลที่กำหนดเรียกค้นหรือสร้างไม่ได้ เปลี่ยนแปลง เมื่อเข้าสู่ระบบแล้ว ในกรณีที่คุณต้องการเข้าถึงและแก้ไข RDD ที่มีอยู่คุณต้องสร้าง RDD ใหม่โดยใช้ชุดของ การเปลี่ยนแปลง ฟังก์ชันกับ RDD ปัจจุบันหรือก่อนหน้า.
  • การแบ่งพาร์ติชัน : มันคือ หน่วยที่สำคัญ ของความเท่าเทียมกันใน Spark RDD. ตามค่าเริ่มต้นจำนวนพาร์ติชันที่สร้างขึ้นจะขึ้นอยู่กับแหล่งข้อมูลของคุณ คุณยังสามารถกำหนดจำนวนพาร์ติชันที่คุณต้องการใช้ พาร์ติชันที่กำหนดเอง ฟังก์ชั่น.

การสร้าง RDD โดยใช้ Spark

สามารถสร้าง RDD ในรูปแบบ สามวิธี:

  1. กำลังอ่านข้อมูลจาก คอลเลกชันที่ขนานกัน
Val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. การสมัคร การเปลี่ยนแปลง ใน RDD ก่อนหน้านี้
val word = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'very', 'powerful', 'language')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). foreach (println)
  1. กำลังอ่านข้อมูลจาก จัดเก็บข้อมูลภายนอก หรือเส้นทางไฟล์เช่น HDFS หรือ HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

การดำเนินการกับ RDDs:

ส่วนใหญ่มีการดำเนินการสองประเภทที่ดำเนินการกับ RDD ได้แก่ :

  • การเปลี่ยนแปลง
  • การดำเนินการ

การเปลี่ยนแปลง : การดำเนินงาน เราใช้กับ RDD เพื่อ กรองการเข้าถึง และ แก้ไข ข้อมูลใน RDD หลักเพื่อสร้างไฟล์ RDD ต่อเนื่อง ถูกเรียก การเปลี่ยนแปลง . RDD ใหม่จะส่งกลับตัวชี้ไปยัง RDD ก่อนหน้าเพื่อให้แน่ใจว่าการพึ่งพาระหว่างกัน

การเปลี่ยนแปลงคือ ขี้เกียจประเมิน กล่าวอีกนัยหนึ่งการดำเนินการที่ใช้กับ RDD ที่คุณกำลังทำงานจะถูกบันทึกไว้ แต่ไม่ใช่ ดำเนินการ ระบบจะแสดงผลลัพธ์หรือข้อยกเว้นหลังจากทริกเกอร์ไฟล์ หนังบู๊ .

เราสามารถแบ่งการแปลงออกเป็นสองประเภทดังนี้

วิธีใช้ microsoft visual studio
  • การเปลี่ยนแปลงที่แคบ
  • การเปลี่ยนแปลงที่กว้าง

การเปลี่ยนแปลงที่แคบ เราใช้การแปลงแบบแคบกับไฟล์ พาร์ติชันเดียว ของ RDD พาเรนต์เพื่อสร้าง RDD ใหม่เนื่องจากข้อมูลที่จำเป็นในการประมวลผล RDD นั้นมีอยู่ในพาร์ติชันเดียวของ ASD ผู้ปกครอง . ตัวอย่างสำหรับการแปลงแบบแคบ ได้แก่ :

  • แผนที่()
  • กรอง()
  • แบนแผนที่ ()
  • พาร์ติชัน ()
  • mapPartitions ()

การเปลี่ยนแปลงที่กว้าง: เราใช้การเปลี่ยนแปลงในวงกว้าง หลายพาร์ติชัน เพื่อสร้าง RDD ใหม่ ข้อมูลที่จำเป็นในการประมวลผล RDD มีอยู่ในหลายพาร์ติชันของ ASD ผู้ปกครอง . ตัวอย่างสำหรับการแปลงแบบกว้าง ได้แก่ :

  • ลดโดย ()
  • สหภาพแรงงาน ()

การดำเนินการ : การดำเนินการสั่งให้ใช้ Apache Spark การคำนวณ และส่งผลลัพธ์หรือข้อยกเว้นกลับไปยังไดรเวอร์ RDD การกระทำบางอย่าง ได้แก่ :

  • เก็บ()
  • นับ()
  • ใช้เวลา ()
  • แรก ()

ให้เราใช้การดำเนินการกับ RDD ได้จริง:

IPL (พรีเมียร์ลีกอินเดีย) คือการแข่งขันคริกเก็ตที่มีการแข่งขันระดับสูงสุด ดังนั้นวันนี้เรามาลองใช้ชุดข้อมูล IPL และดำเนินการ RDD ของเราโดยใช้ Spark

  • ประการแรก มาดาวน์โหลดข้อมูลการจับคู่ CSV ของ IPL กัน หลังจากดาวน์โหลดไฟล์จะเริ่มมีลักษณะเป็นไฟล์ EXCEL ที่มีแถวและคอลัมน์

ในขั้นตอนต่อไปเราจะทำการจุดประกายและโหลดไฟล์ match.csv จากตำแหน่งของมันในกรณีของฉันคือไฟล์csvตำแหน่งไฟล์คือ “ /user/edureka_566977/test/matches.csv”

ตอนนี้ให้เราเริ่มต้นด้วย การเปลี่ยนแปลง ส่วนแรก:

  • แผนที่():

เราใช้ การเปลี่ยนแปลงแผนที่ เพื่อใช้การดำเนินการแปลงเฉพาะกับทุกองค์ประกอบของ RDD ที่นี่เราสร้าง RDD โดยใช้ชื่อ CKfile ที่เก็บไฟล์csvไฟล์. เราจะสร้าง RDD อีกแห่งที่เรียกว่า States to เก็บรายละเอียดเมือง .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val state = CKfile.map (_. split (',') (2)) States.collect (). foreach (println)

  • กรอง():

การแปลงตัวกรองชื่อนั้นอธิบายถึงการใช้งาน เราใช้การดำเนินการแปลงนี้เพื่อกรองข้อมูลที่เลือกออกจากชุดข้อมูลที่กำหนด เราสมัคร การทำงานของตัวกรอง ที่นี่เพื่อรับบันทึกการแข่งขัน IPL ประจำปี พ.ศ. 2560 และเก็บไว้ใน fil RDD

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • แบนแผนที่ ():

เราใช้ flatMap คือการดำเนินการแปลงกับแต่ละองค์ประกอบของ RDD เพื่อสร้าง newRDD มันคล้ายกับการแปลงแผนที่ ที่นี่เราสมัครFlatmapถึง พ่นไม้ขีดไฟของเมืองไฮเดอราบาด และจัดเก็บข้อมูลลงในไฟล์filRDDRDD.

val filRDD = fil.flatMap (line => line.split ('ไฮเดอราบาด')) รวบรวม ()

  • พาร์ติชัน ():

ข้อมูลทั้งหมดที่เราเขียนลงใน RDD จะถูกแบ่งออกเป็นพาร์ติชันจำนวนหนึ่ง เราใช้การเปลี่ยนแปลงนี้เพื่อค้นหาไฟล์ จำนวนพาร์ติชัน ข้อมูลถูกแบ่งออกเป็น

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

เราถือว่า MapPatitions เป็นอีกทางเลือกหนึ่งของ Map () และแต่ละ() ด้วยกัน. เราใช้ mapPartitions ที่นี่เพื่อค้นหาไฟล์ จำนวนแถว เรามีใน fil RDD ของเรา

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • ลดโดย ():

เราใช้ลดโดย() บน คู่คีย์ - ค่า . เราใช้การเปลี่ยนแปลงนี้กับไฟล์csvเพื่อค้นหาเครื่องเล่นด้วยไฟล์ ชายสูงสุดของการแข่งขัน .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • สหภาพ ():

ชื่ออธิบายทั้งหมดเราใช้การเปลี่ยนแปลงสหภาพคือการ คลับ RDD สองตัวด้วยกัน . ที่นี่เรากำลังสร้าง RDD สองตัวคือ fil และ fil2 fil RDD มีบันทึกการจับคู่ IPL ปี 2017 และ fil2 RDD มีบันทึกการจับคู่ IPL ประจำปี 2559

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

เริ่มต้นด้วย หนังบู๊ ส่วนที่เราแสดงผลลัพธ์จริง:

  • เก็บ():

Collect คือการกระทำที่เราใช้ แสดงเนื้อหา ใน RDD

Val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • นับ():

นับเป็นการกระทำที่เราใช้ในการนับ จำนวนบันทึก มีอยู่ใน RDDที่นี่เรากำลังใช้การดำเนินการนี้เพื่อนับจำนวนบันทึกทั้งหมดในไฟล์ match.csv ของเรา

Val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

  • ใช้เวลา ():

Take เป็นการดำเนินการที่คล้ายกับการรวบรวม แต่ความแตกต่างเพียงอย่างเดียวคือสามารถพิมพ์อะไรก็ได้ จำนวนแถวที่เลือก ตามคำขอของผู้ใช้ ที่นี่เราใช้รหัสต่อไปนี้เพื่อพิมพ์ไฟล์ รายงานชั้นนำสิบอันดับแรก

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. ใช้เวลา (10) .foreach (println)

  • แรก ():

First () เป็นการดำเนินการที่คล้ายกับการรวบรวม () และรับ ()มันใช้ในการพิมพ์รายงานที่อยู่บนสุดของผลลัพธ์ที่นี่เราใช้การดำเนินการ first () เพื่อค้นหาไฟล์ จำนวนการแข่งขันสูงสุดที่เล่นในเมืองใดเมืองหนึ่ง และเราได้มุมไบเป็นผลลัพธ์

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') สถานะ val = CKfile.map (_. split (',') (2)) val Scount = States.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

เพื่อให้กระบวนการเรียนรู้ RDD ของเราโดยใช้ Spark น่าสนใจยิ่งขึ้นฉันได้คิดกรณีการใช้งานที่น่าสนใจ

RDD โดยใช้ Spark: Pokemon Use Case

  • ประการแรก ให้เราดาวน์โหลดไฟล์ Pokemon.csv และโหลดลงใน spark-shell เหมือนที่เราทำกับไฟล์ Matches.csv
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemons มีให้เลือกมากมายให้เราค้นหาไม่กี่สายพันธุ์

  • การลบ schema ออกจากไฟล์ Pokemon.csv

เราอาจไม่ต้องการไฟล์ สคีมา ของไฟล์ Pokemon.csv ดังนั้นเราจึงลบออก

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • การค้นหาจำนวน พาร์ติชัน pokemon.csv ของเราแจกจ่ายไปยังไฟล์.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • โปเกมอนน้ำ

การค้นหาไฟล์ จำนวนโปเกมอนน้ำ

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • โปเกมอนไฟ

การค้นหาไฟล์ จำนวนโปเกมอนไฟ

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • นอกจากนี้เรายังสามารถตรวจจับไฟล์ ประชากร โปเกมอนประเภทอื่นโดยใช้ฟังก์ชันนับ
WaterRDD.count () FireRDD.count ()

  • เนื่องจากฉันชอบเกมของ กลยุทธ์การป้องกัน ให้เราค้นหาโปเกมอนด้วย การป้องกันสูงสุด
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • เรารู้สูงสุด ค่าพลังป้องกัน แต่เราไม่รู้ว่ามันคือโปเกมอนตัวไหน ให้เราหาว่าอันไหน โปเกมอน
val defWithPokemonName = NoHeader.map {x => x.split (',')} แผนที่ {x => (x (6) .toDouble, x (1))} ค่า MaxDefencePokemon = defWithPokemonName.groupByKey.takeOr ผง (การสั่งซื้อ [คู่]. ย้อนกลับบน (_._ 1)) MaxDefencePokemon.foreach (println)

  • ตอนนี้ให้เราจัดเรียงโปเกมอนด้วย ป้องกันน้อยที่สุด
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • ตอนนี้ให้เราดูโปเกมอนด้วย กลยุทธ์การป้องกันน้อย
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val อ่าน defWithPokemonNequals (Head)) .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOr ผง (1) (การสั่งซื้อ [Double ]. บน (_._ 1)) MinDefencePokemon2.foreach (println)

ดังนั้นเราจึงมาถึงจุดสิ้นสุดของ RDD นี้โดยใช้บทความ Spark ฉันหวังว่าเราจะจุดประกายความรู้ของคุณเกี่ยวกับ RDDs คุณสมบัติและการดำเนินการประเภทต่างๆที่สามารถดำเนินการได้

บทความนี้อ้างอิงจาก ได้รับการออกแบบมาเพื่อเตรียมความพร้อมสำหรับการสอบ Cloudera Hadoop และ Spark Developer Certification (CCA175) คุณจะได้รับความรู้เชิงลึกเกี่ยวกับ Apache Spark และ Spark Ecosystem ซึ่งรวมถึง Spark RDD, Spark SQL, Spark MLlib และ Spark Streaming คุณจะได้รับความรู้ที่ครอบคลุมเกี่ยวกับภาษาโปรแกรม Scala, HDFS, Sqoop, Flume, Spark GraphX ​​และ Messaging System เช่น Kafka