001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import java.util.TreeMap; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.conf.Configured; 025import org.apache.hadoop.hbase.HBaseConfiguration; 026import org.apache.hadoop.hbase.client.Put; 027import org.apache.hadoop.hbase.client.Result; 028import org.apache.hadoop.hbase.client.Scan; 029import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 030import org.apache.hadoop.hbase.util.Bytes; 031import org.apache.hadoop.mapreduce.Job; 032import org.apache.hadoop.mapreduce.Mapper; 033import org.apache.hadoop.util.Tool; 034import org.apache.hadoop.util.ToolRunner; 035import org.apache.yetus.audience.InterfaceAudience; 036 037/** 038 * Example map/reduce job to construct index tables that can be used to quickly 039 * find a row based on the value of a column. It demonstrates: 040 * <ul> 041 * <li>Using TableInputFormat and TableMapReduceUtil to use an HTable as input 042 * to a map/reduce job.</li> 043 * <li>Passing values from main method to children via the configuration.</li> 044 * <li>Using MultiTableOutputFormat to output to multiple tables from a 045 * map/reduce job.</li> 046 * <li>A real use case of building a secondary index over a table.</li> 047 * </ul> 048 * 049 * <h3>Usage</h3> 050 * 051 * <p> 052 * Modify ${HADOOP_HOME}/conf/hadoop-env.sh to include the hbase jar, the 053 * zookeeper jar (can be found in lib/ directory under HBase root, the examples output directory, 054 * and the hbase conf directory in HADOOP_CLASSPATH, and then run 055 * <tt><strong>bin/hadoop org.apache.hadoop.hbase.mapreduce.IndexBuilder 056 * TABLE_NAME COLUMN_FAMILY ATTR [ATTR ...]</strong></tt> 057 * </p> 058 * 059 * <p> 060 * To run with the sample data provided in index-builder-setup.rb, use the 061 * arguments <strong><tt>people attributes name email phone</tt></strong>. 062 * </p> 063 * 064 * <p> 065 * This code was written against HBase 0.21 trunk. 066 * </p> 067 */ 068@InterfaceAudience.Private 069public class IndexBuilder extends Configured implements Tool { 070 /** the column family containing the indexed row key */ 071 public static final byte[] INDEX_COLUMN = Bytes.toBytes("INDEX"); 072 /** the qualifier containing the indexed row key */ 073 public static final byte[] INDEX_QUALIFIER = Bytes.toBytes("ROW"); 074 075 /** 076 * Internal Mapper to be run by Hadoop. 077 */ 078 public static class Map extends 079 Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put> { 080 private byte[] family; 081 private TreeMap<byte[], ImmutableBytesWritable> indexes; 082 083 @Override 084 protected void map(ImmutableBytesWritable rowKey, Result result, Context context) 085 throws IOException, InterruptedException { 086 for(java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes.entrySet()) { 087 byte[] qualifier = index.getKey(); 088 ImmutableBytesWritable tableName = index.getValue(); 089 byte[] value = result.getValue(family, qualifier); 090 if (value != null) { 091 // original: row 123 attribute:phone 555-1212 092 // index: row 555-1212 INDEX:ROW 123 093 Put put = new Put(value); 094 put.addColumn(INDEX_COLUMN, INDEX_QUALIFIER, rowKey.get()); 095 context.write(tableName, put); 096 } 097 } 098 } 099 100 @Override 101 protected void setup(Context context) throws IOException, 102 InterruptedException { 103 Configuration configuration = context.getConfiguration(); 104 String tableName = configuration.get("index.tablename"); 105 String[] fields = configuration.getStrings("index.fields"); 106 String familyName = configuration.get("index.familyname"); 107 family = Bytes.toBytes(familyName); 108 indexes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 109 for(String field : fields) { 110 // if the table is "people" and the field to index is "email", then the 111 // index table will be called "people-email" 112 indexes.put(Bytes.toBytes(field), 113 new ImmutableBytesWritable(Bytes.toBytes(tableName + "-" + field))); 114 } 115 } 116 } 117 118 /** 119 * Job configuration. 120 */ 121 public static Job configureJob(Configuration conf, String [] args) throws IOException { 122 String tableName = args[0]; 123 String columnFamily = args[1]; 124 System.out.println("****" + tableName); 125 conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(new Scan())); 126 conf.set(TableInputFormat.INPUT_TABLE, tableName); 127 conf.set("index.tablename", tableName); 128 conf.set("index.familyname", columnFamily); 129 String[] fields = new String[args.length - 2]; 130 System.arraycopy(args, 2, fields, 0, fields.length); 131 conf.setStrings("index.fields", fields); 132 Job job = new Job(conf, tableName); 133 job.setJarByClass(IndexBuilder.class); 134 job.setMapperClass(Map.class); 135 job.setNumReduceTasks(0); 136 job.setInputFormatClass(TableInputFormat.class); 137 job.setOutputFormatClass(MultiTableOutputFormat.class); 138 return job; 139 } 140 141 public int run(String[] args) throws Exception { 142 Configuration conf = HBaseConfiguration.create(getConf()); 143 if(args.length < 3) { 144 System.err.println("Only " + args.length + " arguments supplied, required: 3"); 145 System.err.println("Usage: IndexBuilder <TABLE_NAME> <COLUMN_FAMILY> <ATTR> [<ATTR> ...]"); 146 System.exit(-1); 147 } 148 Job job = configureJob(conf, args); 149 return (job.waitForCompletion(true) ? 0 : 1); 150 } 151 152 public static void main(String[] args) throws Exception { 153 int result = ToolRunner.run(HBaseConfiguration.create(), new IndexBuilder(), args); 154 System.exit(result); 155 } 156}