Hbase - 表导出CSV数据

发布时间:2019/7/11 22:31:15最新文章

新鲜文章,昨天刚经过线上验证过的,使用它导出了3亿的用户数据出来,花了半个小时,性能还是稳稳的,好了不吹牛皮了,直接上代码吧。

MR

考查了Hbase的各种MR,没有发现哪一个是能实现的,如果有请通知我,我给他发红包。
所以我们只能自己来写一个MR了,编写一个Hbase的MR,官方文档上也有相应的例子。
我们用来加以化妆就得到我们想要的了。

导出的CSV格式为

admin,22,北京admin,23,天津

依赖 hbase-mapreduce

撸scala代码了

定义Map转换类

class MyMapper extends TableMapper[Text, Text] { val keyText = new Text() val valueText = new Text() override def map(key: ImmutableBytesWritable, value: Result, context: Mapper[ImmutableBytesWritable, Result, Text, Text]#Context): Unit = { val maps = result2Map(value) keyText.set(maps.get("userId")) valueText.set(s"${maps.get("regTime")}") context.write(keyText, valueText) } //将Result转换为Map def result2Map(result: Result): util.HashMap[lang.String, lang.String] = { val map = new util.HashMap[lang.String, lang.String]() result.rawCells().foreach { cell => val column: Array[Byte] = CellUtil.cloneQualifier(cell) val value: Array[Byte] = CellUtil.cloneValue(cell) val qualifierByte = cell.getQualifierArray if (qualifierByte != null && qualifierByte.nonEmpty) { if (value == null || value.length == 0) { map.put(Bytes.toString(column), "") } else { map.put(Bytes.toString(column), Bytes.toString(value)) } } } map }}

定义Reducer类

class MyReducer extends Reducer[Text, Text, Text, Text] { override def reduce(key: Text, values: lang.Iterable[Text], context: Reducer[Text, Text, Text, Text]#Context): Unit = { val iter = values.iterator() while (iter.hasNext) { //这样可以只保留下Key字段,也就只有一行数据了 val tmpText = iter.next() val mergeKey = new Text() mergeKey.set(key.toString + "," + tmpText.toString) val v = new Text() v.set("") context.write(mergeKey, v) } }}

ExportCsv核心

class ExportCsv extends Configured with Tool { override def run(args: Array[String]): Int = { val conf = HBaseConfiguration.create() conf.addResource(new FileInputStream(new File("/etc/hbase/conf/hbase-site.xml"))) conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, "/tmp/hbasecsv") conf.set("mapreduce.job.running.map.limit", "8") //最多有多少个Task同时跑 val job = Job.getInstance(conf, "HbaseExportCsv") job.setJarByClass(classOf[ExportCsv]) val scan = new Scan() //过滤我们想要的数据 scan.addFamily(Bytes.toBytes("ext")) scan.addColumn(Bytes.toBytes("ext"), Bytes.toBytes("userId")) scan.addColumn(Bytes.toBytes("ext"), Bytes.toBytes("regTime")) scan.setBatch(1000) scan.setCacheBlocks(false) TableMapReduceUtil.initTableMapperJob( "USER_TABLE", scan, classOf[MyMapper], classOf[Text], classOf[Text], job ) job.setReducerClass(classOf[MyReducer]) val jobConf = new JobConf(job.getConfiguration) FileOutputFormat.setOutputPath(jobConf, new Path("/tmp/hbasecsv")) val isDone = job.waitForCompletion(true) if (isDone) 0 else 1 }}

要跑了任务了

hadoop jar ExportCsv.jar

相关文章:
上一集:Flink - CEP 下一集:Hbase BulkLoad用法