DBInputFormat เพื่อถ่ายโอนข้อมูลจาก SQL ไปยังฐานข้อมูล NoSQL



วัตถุประสงค์ของบล็อกนี้คือการเรียนรู้วิธีการถ่ายโอนข้อมูลจากฐานข้อมูล SQL ไปยัง HDFS วิธีการถ่ายโอนข้อมูลจากฐานข้อมูล SQL ไปยังฐานข้อมูล NoSQL

ในบล็อกนี้เราจะสำรวจความสามารถและความเป็นไปได้ของหนึ่งในองค์ประกอบที่สำคัญที่สุดของเทคโนโลยี Hadoop นั่นคือ MapReduce

ปัจจุบัน บริษัท ต่างๆใช้ Hadoop Framework เป็นตัวเลือกแรกสำหรับการจัดเก็บข้อมูลเนื่องจากความสามารถในการจัดการข้อมูลขนาดใหญ่ได้อย่างมีประสิทธิภาพ แต่เราก็รู้ด้วยว่าข้อมูลมีความหลากหลายและมีอยู่ในโครงสร้างและรูปแบบต่างๆ ในการควบคุมข้อมูลที่หลากหลายและรูปแบบที่แตกต่างกันควรมีกลไกเพื่อรองรับความหลากหลายทั้งหมดและยังให้ผลลัพธ์ที่มีประสิทธิภาพและสม่ำเสมอ





องค์ประกอบที่ทรงพลังที่สุดใน Hadoop framework คือ MapReduce ซึ่งสามารถให้การควบคุมข้อมูลและโครงสร้างได้ดีกว่าคู่อื่น ๆ แม้ว่าจะต้องใช้ค่าใช้จ่ายในการเรียนรู้และความซับซ้อนในการเขียนโปรแกรม แต่หากคุณสามารถจัดการกับความซับซ้อนเหล่านี้ได้คุณก็สามารถจัดการข้อมูลประเภทใดก็ได้ด้วย Hadoop

MapReduce framework แบ่งงานประมวลผลทั้งหมดออกเป็นสองขั้นตอน: แผนที่และลด



การเตรียมข้อมูลดิบของคุณสำหรับขั้นตอนเหล่านี้จำเป็นต้องมีความเข้าใจเกี่ยวกับคลาสและอินเตอร์เฟสพื้นฐานบางอย่าง ซูเปอร์คลาสสำหรับการประมวลผลซ้ำเหล่านี้คือ InputFormat

InputFormat คลาสเป็นหนึ่งในคลาสหลักใน Hadoop MapReduce API คลาสนี้มีหน้าที่กำหนดสองสิ่งหลัก:

  • การแยกข้อมูล
  • เครื่องอ่านบันทึก

แยกข้อมูล เป็นแนวคิดพื้นฐานในเฟรมเวิร์ก Hadoop MapReduce ซึ่งกำหนดทั้งขนาดของงานแผนที่แต่ละงานและเซิร์ฟเวอร์ปฏิบัติการที่เป็นไปได้ ผู้อ่านบันทึก มีหน้าที่รับผิดชอบในการอ่านบันทึกจริงจากไฟล์อินพุตและส่ง (เป็นคู่คีย์ / ค่า) ไปยังผู้ทำแผนที่



จำนวนผู้ทำแผนที่จะขึ้นอยู่กับจำนวนการแยก เป็นงานของ InputFormat ในการสร้างการแยก ขนาดการแบ่งเวลาส่วนใหญ่จะเทียบเท่ากับขนาดบล็อก แต่ไม่เสมอไปที่จะสร้างการแบ่งตามขนาดบล็อก HDFS ทั้งหมดขึ้นอยู่กับว่าเมธอด getSplits () ของ InputFormat ของคุณถูกแทนที่อย่างไร

มีความแตกต่างพื้นฐานระหว่าง MR Split และ HDFS block บล็อกเป็นกลุ่มข้อมูลทางกายภาพในขณะที่การแบ่งเป็นเพียงชิ้นส่วนตรรกะที่ผู้ทำแผนที่อ่าน การแยกไม่มีข้อมูลอินพุต แต่มีเพียงการอ้างอิงหรือที่อยู่ของข้อมูล การแบ่งโดยทั่วไปมีสองสิ่ง: ความยาวเป็นไบต์และชุดของที่เก็บข้อมูลซึ่งเป็นเพียงสตริง

เพื่อให้เข้าใจสิ่งนี้ได้ดีขึ้นเรามาดูตัวอย่างการประมวลผลข้อมูลที่จัดเก็บใน MySQL โดยใช้ MR เนื่องจากไม่มีแนวคิดของบล็อกในกรณีนี้ทฤษฎี: 'การแยกจะถูกสร้างขึ้นจากบล็อก HDFS เสมอ'ล้มเหลว. ความเป็นไปได้อย่างหนึ่งคือการสร้างการแบ่งตามช่วงของแถวในตาราง MySQL ของคุณ (และนี่คือสิ่งที่ DBInputFormat ทำรูปแบบอินพุตสำหรับการอ่านข้อมูลจากฐานข้อมูลเชิงสัมพันธ์) เราอาจมีจำนวนแยก k ประกอบด้วย n แถว

มีไว้สำหรับ InputFormats ที่ยึดตาม FileInputFormat (InputFormat สำหรับจัดการข้อมูลที่จัดเก็บในไฟล์) เท่านั้นที่การแบ่งจะถูกสร้างขึ้นตามขนาดรวมเป็นไบต์ของไฟล์อินพุต อย่างไรก็ตาม FileSystem บล็อกขนาดของไฟล์อินพุตจะถือว่าเป็นขอบเขตบนสำหรับการแยกอินพุต หากคุณมีไฟล์ที่เล็กกว่าขนาดบล็อก HDFS คุณจะได้รับเพียง 1 mapper สำหรับไฟล์นั้น หากคุณต้องการมีพฤติกรรมที่แตกต่างออกไปคุณสามารถใช้ mapred.min.split.size แต่อีกครั้งขึ้นอยู่กับ getSplits () ของ InputFormat ของคุณ

เรามีรูปแบบการป้อนข้อมูลที่มีอยู่แล้วมากมายที่มีอยู่ในแพ็คเกจ org.apache.hadoop.mapreduce.lib.input

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

คำสั่ง goto ใน c ++

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

ค่าเริ่มต้นคือ TextInputFormat

ในทำนองเดียวกันเรามีรูปแบบเอาต์พุตจำนวนมากซึ่งอ่านข้อมูลจากตัวลดขนาดและเก็บไว้ใน HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

ค่าเริ่มต้นคือ TextOutputFormat

เมื่อคุณอ่านบล็อกนี้จบคุณจะได้เรียนรู้:

  • วิธีเขียนโปรแกรมลดแผนที่
  • เกี่ยวกับ InputFormats ประเภทต่างๆที่มีอยู่ใน Mapreduce
  • ความต้องการของ InputFormats คืออะไร
  • วิธีการเขียน InputFormats แบบกำหนดเอง
  • วิธีถ่ายโอนข้อมูลจากฐานข้อมูล SQL ไปยัง HDFS
  • วิธีการถ่ายโอนข้อมูลจากฐานข้อมูล SQL (ที่นี่ MySQL) ไปยังฐานข้อมูล NoSQL (ที่นี่ Hbase)
  • วิธีการถ่ายโอนข้อมูลจากฐานข้อมูล SQL หนึ่งไปยังตารางอื่นในฐานข้อมูล SQL (บางทีสิ่งนี้อาจไม่สำคัญมากนักหากเราทำเช่นนี้ในฐานข้อมูล SQL เดียวกันอย่างไรก็ตามไม่มีอะไรผิดในการมีความรู้เหมือนกันที่คุณไม่เคยรู้ วิธีการใช้งาน)

วิชาบังคับก่อน:

  • ติดตั้ง Hadoop ไว้แล้ว
  • ติดตั้ง SQL ล่วงหน้า
  • ติดตั้ง Hbase ล่วงหน้า
  • ความเข้าใจพื้นฐาน Java
  • MapReduce ความรู้
  • Hadoop กรอบความรู้พื้นฐาน

มาทำความเข้าใจกับคำชี้แจงปัญหาที่เรากำลังจะแก้ไขที่นี่:

เรามีตารางพนักงานใน MySQL DB ในฐานข้อมูลเชิงสัมพันธ์ Edureka ของเรา ตอนนี้ตามความต้องการของธุรกิจเราต้องเปลี่ยนข้อมูลทั้งหมดที่มีอยู่ในฐานข้อมูลเชิงสัมพันธ์ไปยังระบบไฟล์ Hadoop เช่น HDFS, NoSQL DB ที่เรียกว่า Hbase

เรามีทางเลือกมากมายในการทำงานนี้:

  • Sqoop
  • Flume
  • MapReduce

ตอนนี้คุณไม่ต้องการติดตั้งและกำหนดค่าเครื่องมืออื่นใดสำหรับการดำเนินการนี้ คุณจะเหลือเพียงตัวเลือกเดียวซึ่งก็คือ MapReduce เฟรมเวิร์กการประมวลผลของ Hadoop MapReduce framework จะช่วยให้คุณสามารถควบคุมข้อมูลได้อย่างเต็มที่ขณะถ่ายโอน คุณสามารถจัดการคอลัมน์และวางโดยตรงที่ตำแหน่งใดก็ได้จากสองตำแหน่งเป้าหมาย

บันทึก:

  • เราจำเป็นต้องดาวน์โหลดและใส่ตัวเชื่อมต่อ MySQL ใน classpath ของ Hadoop เพื่อดึงตารางจากตาราง MySQL ในการดำเนินการนี้ให้ดาวน์โหลดตัวเชื่อมต่อ com.mysql.jdbc_5.1.5.jar และเก็บไว้ในไดเร็กทอรี Hadoop_home / share / Hadoop / MaPreduce / lib
ดาวน์โหลด cp / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • นอกจากนี้ให้วางขวด Hbase ทั้งหมดไว้ใน Hadoop classpath เพื่อให้โปรแกรม MR ของคุณเข้าถึง Hbase ในการดำเนินการนี้ให้ดำเนินการคำสั่งต่อไปนี้ :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

เวอร์ชันซอฟต์แวร์ที่ฉันใช้ในการทำงานนี้คือ:

  • Hadooop-2.3.0
  • HBase 0.98.9-Hadoop2
  • คราสดวงจันทร์

เพื่อหลีกเลี่ยงโปรแกรมในปัญหาความเข้ากันได้ใด ๆ ฉันกำหนดให้ผู้อ่านรันคำสั่งด้วยสภาพแวดล้อมที่คล้ายกัน

DBInputWritable ที่กำหนดเอง:

แพคเกจ com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implements Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) พ่น IOException {} public void readFields (ResultSet rs) พ่นวัตถุ SQLException // Resultset แสดงถึงข้อมูลที่ส่งคืนจากคำสั่ง SQL {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} โมฆะสาธารณะเขียน (DataOutput out) พ่น IOException { } โมฆะสาธารณะเขียน (PreparedStatement ps) พ่น SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

กำหนดเอง DBOutputWritable:

แพคเกจ com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implements Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = name this.id = id this.dept = dept} โมฆะสาธารณะ readFields (DataInput in) พ่น IOException {} โมฆะสาธารณะ readFields (ResultSet rs) พ่น SQLException {} โมฆะสาธารณะเขียน (DataOutput out) พ่น IOException {} โมฆะสาธารณะเขียน (PreparedStatement ps) พ่น SQLException {ps.setString (1, name) ps.setInt (2, id) ps.setString (3, dept)}}

ตารางอินพุต:

สร้างฐานข้อมูล edureka
สร้างตาราง emp (empid int ไม่ใช่ null ชื่อ varchar (30), dept varchar (20), คีย์หลัก (empid))
แทรกลงในค่า Emp (1, 'abhay', 'developmentement'), (2, 'brundesh', 'test')
เลือก * จาก emp

กรณีที่ 1: โอนจาก MySQL เป็น HDFS

แพคเกจ com.inputFormat.copy นำเข้า java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) พ่น Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // คลาสไดรเวอร์' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // user name' root ') // password Job job = new Job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.class) FileOutputset เส้นทางใหม่ (args [0])) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input table name null, null, new String [] {'empid', 'name', 'dept'} / / ตารางคอลัมน์) เส้นทาง p = เส้นทางใหม่ (args [0]) FileSystem fs = FileSystem.get (URI ใหม่ (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (จริง)? 0: 1)}}

โค้ดส่วนนี้ช่วยให้เราจัดเตรียมหรือกำหนดค่ารูปแบบอินพุตเพื่อเข้าถึง SQL DB ต้นทางของเราพารามิเตอร์ประกอบด้วยคลาสไดรเวอร์ URL มีที่อยู่ของฐานข้อมูล SQL ชื่อผู้ใช้และรหัสผ่าน

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // คลาสไดรเวอร์ 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // ชื่อผู้ใช้ 'root') // รหัสผ่าน

รหัสชิ้นนี้ช่วยให้เราส่งผ่านรายละเอียดของตารางในฐานข้อมูลและตั้งค่าในวัตถุงาน พารามิเตอร์รวมถึงอินสแตนซ์งานคลาสที่เขียนได้แบบกำหนดเองซึ่งต้องใช้อินเทอร์เฟซ DBWritable ชื่อตารางต้นทางเงื่อนไขถ้ามีค่าว่างพารามิเตอร์การเรียงลำดับอื่น ๆ เป็นโมฆะรายการคอลัมน์ตารางตามลำดับ

DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input table name null, null, new String [] {'empid', 'name', 'dept'} // table คอลัมน์)

ผู้ทำแผนที่

แพ็กเกจ com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable public class Map ขยาย Mapper {
แผนที่โมฆะที่มีการป้องกัน (คีย์ LongWritable, ค่า DBInputWritable, Context ctx) {ลองใช้ {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (ข้อความใหม่ (ชื่อ + '' + id + '' + dept), id)
} catch (IOException จ) {e.printStackTrace ()} catch (InterruptedException จ) {e.printStackTrace ()}}}

Reducer: Identity Reducer ใช้

คำสั่งเพื่อเรียกใช้:

กระปุก hadoop dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

เอาต์พุต: ตาราง MySQL โอนไปยัง HDFS

hadoop dfs -ls / dbtohdfs / *

กรณีที่ 2: โอนจากตารางหนึ่งใน MySQL ไปยังอีกตารางหนึ่งใน MySQL

การสร้างตารางผลลัพธ์ใน MySQL

สร้างตาราง staff1 (ชื่อ varchar (20), id int, dept varchar (20))

แพคเกจ com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce การนำเข้างาน org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable คลาสสาธารณะ Mainonetable_to_other_table {public static void main (String [] args) พ่น Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // คลาสไดรเวอร์ 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // ชื่อผู้ใช้' root ') // รหัสผ่าน Job job = งานใหม่ (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input table name null, null, new String [] {'empid ',' name ',' dept '} // table คอลัมน์) DBOutputFormat.setOutput (job,' workers1 ', // output table name new String [] {' name ',' id ',' dept '} // table คอลัมน์) System.exit (job.waitForCompletion (จริง)? 0: 1)}}

โค้ดส่วนนี้ช่วยให้เรากำหนดค่าชื่อตารางผลลัพธ์ใน SQL DB พารามิเตอร์ ได้แก่ อินสแตนซ์งานชื่อตารางเอาต์พุตและชื่อคอลัมน์เอาต์พุตตามลำดับ

DBOutputFormat.setOutput (job, 'staff1', // ชื่อตารางผลลัพธ์ new String [] {'name', 'id', 'dept'} // table column)

Mapper: เหมือนกับกรณีที่ 1

ลด:

แพคเกจ com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io นำเข้าข้อความ org.apache.hadoop.io.IntWritable import org.apache.hadoop.io คลาสสาธารณะ. NullWritable ลดขยาย Reducer {ป้องกันโมฆะลด (คีย์ข้อความค่าที่ทำซ้ำได้บริบท ctx) {int sum = 0 String line [] = key.toString (). split ('') ลองใช้ {ctx.write (DBOutputWritable ใหม่ (บรรทัด [0] .toString (), Integer.parseInt (บรรทัด [1] .toString ()), บรรทัด [2] .toString ()), NullWritable.get ())} จับ (IOException จ) {e.printStackTrace ()} catch (InterruptedException จ) {e.printStackTrace ()}}}

คำสั่งเพื่อเรียกใช้:

กระปุก hadoop dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

เอาต์พุต: ข้อมูลที่ถ่ายโอนจากตาราง EMP ใน MySQL ไปยังตารางอื่น Employee1 ใน MySQL

กรณีที่ 3: โอนจากตารางใน MySQL ไปยังตาราง NoSQL (Hbase)

การสร้างตาราง Hbase เพื่อรองรับเอาต์พุตจากตาราง SQL:

สร้าง 'พนักงาน', 'official_info'

ระดับคนขับ:

แพคเกจการนำเข้า Dbtohbase org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) พ่น Exception {Configuration configuration = HBaseConfiguration.create () HTableInterface mytable = HTable ใหม่ (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // คลาสไดรเวอร์ 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // ชื่อผู้ใช้ 'root') // รหัสผ่าน Job job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('พนักงาน', Reduce.class, job) job.setInput.class class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input table name null, null, new String [] {'empid', 'name', 'dept'} // table column) System.exit (job.waitForCompletion (จริง)? 0: 1)}}

โค้ดส่วนนี้ให้คุณกำหนดค่าคลาสคีย์เอาต์พุตซึ่งในกรณีของ hbase คือ ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

ที่นี่เรากำลังส่งชื่อตาราง hbase และตัวลดเพื่อทำหน้าที่บนตาราง

TableMapReduceUtil.initTableReducerJob ('พนักงาน', Reduce.class, งาน)

ผู้ทำแผนที่:

แพคเกจการนำเข้า Dbtohbase java.io.IOException นำเข้า org.apache.hadoop.mapreduce.Mapper นำเข้า org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map ขยาย Mapper {private IntWritable one = new IntWritable (1) protected void map (LongWritable id, DBInputWritable value, Context context) {ลองใช้ {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + ' '+ dept))} catch (IOException จ) {e.printStackTrace ()} catch (InterruptedException จ) {e.printStackTrace ()}}}

ในโค้ดส่วนนี้เรากำลังรับค่าจาก getters ของคลาส DBinputwritable แล้วส่งผ่านเข้าไป
ImmutableBytesWritable เพื่อให้เข้าถึงตัวลดในรูปแบบ bytewriatble ซึ่ง Hbase เข้าใจ

String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (ใหม่ ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + '' + dept ))

ลด:

แพคเกจการนำเข้า Dbtohbase java.io.IOException นำเข้า org.apache.hadoop.hbase.client ใส่การนำเข้า org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util Bytes import org.apache.hadoop.io.Text public class Reduce ขยาย TableReducer {public void reduction (ImmutableBytesWritable key, Iterable values, Context context) พ่น IOException, InterruptedException {String [] cause = null // Loop values สำหรับ (Text val: values) {cause = val.toString (). split ('')} // ใส่ไปที่ HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info' ), Bytes.toBytes ('name'), Bytes.toBytes (สาเหตุ [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (สาเหตุ [1 ])) context.write (คีย์ใส่)}}

โค้ดชิ้นนี้ช่วยให้เราตัดสินใจเลือกแถวและคอลัมน์ที่เราจะจัดเก็บค่าจากตัวลด ที่นี่เรากำลังจัดเก็บแต่ละ Empid ในแถวแยกกันเนื่องจากเราสร้าง Empid เป็นคีย์แถวซึ่งจะไม่ซ้ำกัน ในแต่ละแถวเรากำลังจัดเก็บข้อมูลอย่างเป็นทางการของพนักงานในตระกูลคอลัมน์“ official_info” ภายใต้คอลัมน์“ ชื่อ” และ“ แผนก” ตามลำดับ

ใส่ใส่ = ใส่ใหม่ (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (สาเหตุ [0])) put.add (ไบต์. toBytes ('official_info'), Bytes.toBytes ('แผนก'), Bytes.toBytes (สาเหตุ [1])) context.write (คีย์ใส่)

ถ่ายโอนข้อมูลใน Hbase:

สแกนพนักงาน

ดังที่เราเห็นว่าเราสามารถทำงานในการย้ายข้อมูลธุรกิจของเราจาก SQL DB เชิงสัมพันธ์ไปยัง NoSQL DB ได้สำเร็จ

ในบล็อกถัดไปเราจะเรียนรู้วิธีเขียนและรันโค้ดสำหรับรูปแบบอินพุตและเอาต์พุตอื่น ๆ

โพสต์ความคิดเห็นคำถามหรือข้อเสนอแนะของคุณ ฉันชอบที่จะได้ยินจากคุณ

มีคำถามสำหรับเรา? โปรดระบุไว้ในส่วนความคิดเห็นแล้วเราจะติดต่อกลับไป

กระทู้ที่เกี่ยวข้อง: