当前位置:   article > 正文

Hbase - 迁移数据--导出,导入_flink hbase数据全表导出

flink hbase数据全表导出

 https://www.jianshu.com/p/045026a13bf8

 

有没有这样一样情况,把一个集群中的某个表导到另一个群集中,或者hbase的表结构发生了更改,但是数据还要,比如预分区没做,导致某台RegionServer很吃紧,Hbase的导出导出都可以很快的完成这些操作。

 

现在环境上面有一张表logTable,有一个ext列簇
但是没有做预分区,虽然可以强制拆分表,但是split的start,end范围无法精确控制。

方式一 (先导出再导出)

  1. 创建导出目录
  1. hadoop fs -mkdir /tmp/hbase-export
  2. hadoop fs -ls /tmp/hbase-export
  1. 备份表数据
    使用hbase内置的mr命令,会默认导出到hdfs中: 换行符去掉生效
  1. hbase org.apache.hadoop.hbase.mapreduce.Export \
  2. -D hbase.mapreduce.scan.column.family=f1 \
  3. icc:test hdfs:///tmp/hbase-export/test
  1. 删除表
  1. disable 'logTable'
  2. drop 'logTable'
  1. 创建表
    数据3天过期,可以根据RegionServer的个数做预分区,假设有8台,则使用下面的方式。
    由于我们是使用MD5("uid")前两位作为打散,范围为00~ff 256个分片,可以使用如下方式。
  1. # scala shell 创建方式
  2. (0 until 256 by 256/8).map(Integer.toHexString).map(i=>s"0$i".takeRight(2))

hbase创表

  1. create 'logTable',{ \
  2. NAME => 'ext',TTL => '3 DAYS', \
  3. CONFIGURATION => {
  4. 'SPLIT_POLICY' => 'org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy', \
  5. 'KeyPrefixRegionSplitPolicy.prefix_length' => '2'
  6. }, \
  7. COMPRESSION => 'SNAPPY' \
  8. }, \
  9. SPLITS => ['20', '40', '60', '80', 'a0', 'c0', 'e0']
  1. 导入备份数据
hbase org.apache.hadoop.hbase.mapreduce.Import logTable hdfs:///tmp/hbase-export/logTable

注意事项

  1. 默认导出所有列簇,可通过指定-D hbase.mapreduce.scan.column.family=ext,info参数导出。
  2. 如果另一张表没有源表对应的列簇将会出错。

 

 

 

1.export

1> 执行导出命令

    可使用-D命令自定义参数,此处限定表名、列族、开始结束RowKey、以及导出到HDFS的目录

  1. hbase org.apache.hadoop.hbase.mapreduce.Export \
  2. -D hbase.mapreduce.scan.column.family=0 \
  3. -D hbase.mapreduce.scan.row.start=aaaaaaaaaaaaaaaaaaa00010078 \
  4. -D hbase.mapreduce.scan.row.stop=jjjjjjjjjjjjjjjjjjj00010078 TESTA /tmp/hbase_export

    可选的-D参数配置项

  1. Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> [<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]
  2. Note: -D properties will be applied to the conf used.
  3. For example:
  4. -D mapred.output.compress=true
  5. -D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
  6. -D mapred.output.compression.type=BLOCK
  7. Additionally, the following SCAN properties can be specified
  8. to control/limit what is exported..
  9. -D hbase.mapreduce.scan.column.family=<familyName>
  10. -D hbase.mapreduce.include.deleted.rows=true
  11. For performance consider the following properties:
  12. -Dhbase.client.scanner.caching=100
  13. -Dmapred.map.tasks.speculative.execution=false
  14. -Dmapred.reduce.tasks.speculative.execution=false
  15. For tables with very wide rows consider setting the batch size as below:
  16. -Dhbase.export.scanner.batch=10

2> MR执行导出

3> 查看HDFS

2.import

1> 预先建表

    在hbase中预先建立一个名称为TESTX的表,其包含一个名称为0的列族。若表事先不存在将报错

create 'TESTX','0'

2> 运行导入命令

    可使用-D命令自定义参数,此处不多做限制

hbase org.apache.hadoop.hbase.mapreduce.Import TESTX hdfs://cdh01/tmp/hbase_export/

    可选的-D参数配置项

  1. Usage: Import [options] <tablename> <inputdir>
  2. By default Import will load data directly into HBase. To instead generate
  3. HFiles of data to prepare for a bulk data load, pass the option:
  4. -Dimport.bulk.output=/path/for/output
  5. To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use
  6. -Dimport.filter.class=<name of filter class>
  7. -Dimport.filter.args=<comma separated list of args for filter
  8. NOTE: The filter will be applied BEFORE doing key renames via the HBASE_IMPORTER_RENAME_CFS property. Futher, filters will only use the Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify whether the current row needs to be ignored completely for processing and Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added; Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including the KeyValue.
  9. For performance consider the following options:
  10. -Dmapred.map.tasks.speculative.execution=false
  11. -Dmapred.reduce.tasks.speculative.execution=false
  12. -Dimport.wal.durability=<Used while writing data to hbase. Allowed values are the supported durability values like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>

3> MR执行导入

4> HBase查看导入数据

 

 

三、hbase批量提数,数据需要脱敏,将hbase1的数据导入hbase2

1、建立关联hbase1的hive1外表.

  1. hive>
  2. CREATE EXTERNAL TABLE china_mainland_acturl(
  3. rowkey string,
  4. act_url STRING
  5. ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
  6. WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,act:url")
  7. TBLPROPERTIES ("hbase.table.name" = "users:china_mainland")
  8. ;

2、hive -e  "select * from hive1 where ***" >>data.csv

3、建立hive2的临时表table_temp ,并将data.csv上传至hsfs,hadoop fs -put localpath  table_temp_hdfspath

4、建立关联hbase1的hive1外表.

5、将tabletemp表的数据插入hive2

insert into hive2 as  select * from table_temp

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/969847
推荐阅读
相关标签
  

闽ICP备14008679号