4.4 实战:使用MapReduce构建HBase索引
4.4.1 索引表蓝图
HBase索引主要用于提高HBase中表数据的访问速度,有效地避免了全表扫描(多数查询可以仅扫描少量索引页及数据页,而不是遍历所有的数据页)。从4.1.3节可知,HBase中的表根据行键被分成了多个Regions,通常一个Region的一行都会包含的较多的数据,如果以列值作为查询条件,就只能从第一行数据开始往下查找,直到找到相关数据为止,这显然很低效。相反,如果将经常被查询的列作为行键、行键作为列重新构造一张表,即可实现根据列值快速地定位相关数据所在的行,这就是索引。显然索引表仅需要包含一个列,所以索引表的大小和原表比起来要小得多,如图4-14给出了索引表与原表之间的关系。从图4-14可以看出,由于索引表的单条记录所占空间比原表要小,所以索引表的一个Region与原表相比,能包含更多条记录。
图4-14 索引表与原表关系
假设HBase中存在一张表heroes,里面内容如表4-3所示。则根据列info:name构建的索引表如图4-15所示。HBase会自动将生成的索引表加入如图4-3所示的结构中,从而提高搜索的效率。
表4-3 heroes表的逻辑视图
图4-15 heroes索引表示意
4.4.2 HBase和MapReduce
HBase中的表通常是非常大的,并且还可以不断增大,所以为表建立索引的工作量也是相当大的。为了解决类似的问题,HBase集成了MapReduce框架,用于对表中的大量数据进行并行处理。第3章已经对MapReduce的处理过程进行了详细的介绍,根据这些处理过程,HBase为每个阶段提供了相应的类用来处理表数据。
(1)InputFormat类。HBase实现了TableInputFormatBase类,该类提供了对表数据的大部分操作,其子类TableInputFormat则提供了完整的实现,用于处理表数据并生成键值对。TableInputFormat类将数据表按照Region分割成split,即有多少个Regions就有多个splits。然后将Region按行键分成<key, value>对,key值对应与行键,value值为该行所包含的数据。
(2)Mapper类和Reducer类。HBase实现了TableMapper类和TableReducer类,其中TableMapper类并没有实现具体的功能,只是将输入的<key, value>对的类型分别限定为ImmutableBytesWritable和Result。IdentityTableMapper类和IdentityTableReducer类则是上述两个类的具体实现,其和Mapper类及Reducer类一样,只是简单地将输入<key, value>对输出到下一个阶段。
(3)OutputFormat类。Hbase实现的TableOutputFormat将输出的<key, value>对写到指定的HBase表中,该类不会对WAL(Write-Ahead Log)进行操作,即如果服务器发生故障将面临丢失数据的风险。可以使用MultipleTableOutputFormat类解决这个问题,该类可以对是否写入WAL进行设置。
4.4.3 实现索引
构建索引表的完整源代码如下所示,对代码的详细分析将以注释的形式给出。运行程序需要设置运行参数,分别为表名、列族和需要建索引的列(列必须属于前面给出的列族)。如要对heroes中的name和email列构建索引,则运行参数应设为:heroes info name email。
import java.io.IOException; import java.util.HashMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.GenericOptionsParser;
public class IndexBuilder {
//索引表唯一的一列为INDEX:ROW,其中INDEX为列族 public static final byte[] INDEX_COLUMN = Bytes.toBytes("INDEX"); public static final byte[] INDEX_QUALIFIER = Bytes.toBytes("ROW");
public static class Map extends Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Writable> { private byte[] family;
//存储了“列名”到“表名-列名”的映射 //前者用于获取某列的值,并作为索引表的键值;后者用于作为索引表的表名 private HashMap<byte[], ImmutableBytesWritable> indexes;
protected void map(ImmutableBytesWritable rowKey, Result result, Context context) throws IOException, InterruptedException { for(java.util.Map.Entry<byte[], ImmutableBytesWritable> index :indexes.entrySet()) { byte[] qualifier = index.getKey(); //获得列名 ImmutableBytesWritable tableName = index.getValue(); //索引表的表名 byte[] value = result.getValue(family, qualifier); //根据“列族:列名”获得元素值 if (value != null) {
//以列值作为行键,在列“INDEX:ROW”中插入行键 Put put = new Put(value); put.add(INDEX_COLUMN, INDEX_QUALIFIER, rowKey.get()); //在tableName表上执行put操作 //使用MultiOutputFormat时,第二个参数必须是Put或Delete类型 context.write(tableName, put); } } } //setup为Mapper中的方法,该方法只在任务初始化时执行一次 protected void setup(Context context) throws IOException, InterruptedException { Configuration configuration = context.getConfiguration();
//通过configuration.set()方法传递参数,详见下面的configureJob方法 String tableName = configuration.get("index.tablename"); String[] fields = configuration.getStrings("index.fields"); //fields内为需要做索引的列名 String familyName = configuration.get("index.familyname"); family = Bytes.toBytes(familyName); //初始化indexes方法 indexes = new HashMap<byte[], ImmutableBytesWritable>(); for(String field :fields) { // 如果给name做索引,则索引表的名称为“heroes-name” indexes.put(Bytes.toBytes(field), new ImmutableBytesWritable(Bytes.toBytes(tableName + "-" + field))); } } }
public static Job configureJob(Configuration conf, String[] args) throws IOException { String tableName = args[0]; String columnFamily = args[1]; System.out.println("****" + tableName);
//通过Configuration.set()方法传递参数 conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(new Scan())); conf.set(TableInputFormat.INPUT_TABLE, tableName); conf.set("index.tablename", tableName); conf.set("index.familyname", columnFamily); String[] fields = new String[args.length - 2]; for(int i = 0; i < fields.length; i++) { fields[i] = args[i + 2]; } conf.setStrings("index.fields", fields); conf.set("index.familyname", "attributes");
//配置任务的运行参数 Job job = new Job(conf, tableName); job.setJarByClass(IndexBuilder.class); job.setMapperClass(Map.class); job.setNumReduceTasks(0); job.setInputFormatClass(TableInputFormat.class); job.setOutputFormatClass(MultiTableOutputFormat.class); return job; } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length < 3) { System.err.println("Only " + otherArgs.length + " arguments supplied, required:3"); System.err.println("Usage:IndexBuilder <TABLE_NAME> " + "<COLUMN_FAMILY> <ATTR>[<ATTR> ...]"); System.exit(-1); } Job job = configureJob(conf, otherArgs); System.exit(job.waitForCompletion(true) ? 0 :1); } }