1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22 import java.util.TreeMap;
23
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.HBaseConfiguration;
26 import org.apache.hadoop.hbase.client.Put;
27 import org.apache.hadoop.hbase.client.Result;
28 import org.apache.hadoop.hbase.client.Scan;
29 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.hadoop.mapreduce.Job;
32 import org.apache.hadoop.mapreduce.Mapper;
33 import org.apache.hadoop.util.GenericOptionsParser;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 public class IndexBuilder {
66
67 public static final byte[] INDEX_COLUMN = Bytes.toBytes("INDEX");
68
69 public static final byte[] INDEX_QUALIFIER = Bytes.toBytes("ROW");
70
71
72
73
74 public static class Map extends
75 Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put> {
76 private byte[] family;
77 private TreeMap<byte[], ImmutableBytesWritable> indexes;
78
79 @Override
80 protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
81 throws IOException, InterruptedException {
82 for(java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes.entrySet()) {
83 byte[] qualifier = index.getKey();
84 ImmutableBytesWritable tableName = index.getValue();
85 byte[] value = result.getValue(family, qualifier);
86 if (value != null) {
87
88
89 Put put = new Put(value);
90 put.add(INDEX_COLUMN, INDEX_QUALIFIER, rowKey.get());
91 context.write(tableName, put);
92 }
93 }
94 }
95
96 @Override
97 protected void setup(Context context) throws IOException,
98 InterruptedException {
99 Configuration configuration = context.getConfiguration();
100 String tableName = configuration.get("index.tablename");
101 String[] fields = configuration.getStrings("index.fields");
102 String familyName = configuration.get("index.familyname");
103 family = Bytes.toBytes(familyName);
104 indexes = new TreeMap<byte[], ImmutableBytesWritable>(Bytes.BYTES_COMPARATOR);
105 for(String field : fields) {
106
107
108 indexes.put(Bytes.toBytes(field),
109 new ImmutableBytesWritable(Bytes.toBytes(tableName + "-" + field)));
110 }
111 }
112 }
113
114
115
116
117 public static Job configureJob(Configuration conf, String [] args)
118 throws IOException {
119 String tableName = args[0];
120 String columnFamily = args[1];
121 System.out.println("****" + tableName);
122 conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(new Scan()));
123 conf.set(TableInputFormat.INPUT_TABLE, tableName);
124 conf.set("index.tablename", tableName);
125 conf.set("index.familyname", columnFamily);
126 String[] fields = new String[args.length - 2];
127 System.arraycopy(args, 2, fields, 0, fields.length);
128 conf.setStrings("index.fields", fields);
129 Job job = new Job(conf, tableName);
130 job.setJarByClass(IndexBuilder.class);
131 job.setMapperClass(Map.class);
132 job.setNumReduceTasks(0);
133 job.setInputFormatClass(TableInputFormat.class);
134 job.setOutputFormatClass(MultiTableOutputFormat.class);
135 return job;
136 }
137
138 public static void main(String[] args) throws Exception {
139 Configuration conf = HBaseConfiguration.create();
140 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
141 if(otherArgs.length < 3) {
142 System.err.println("Only " + otherArgs.length + " arguments supplied, required: 3");
143 System.err.println("Usage: IndexBuilder <TABLE_NAME> <COLUMN_FAMILY> <ATTR> [<ATTR> ...]");
144 System.exit(-1);
145 }
146 Job job = configureJob(conf, otherArgs);
147 System.exit(job.waitForCompletion(true) ? 0 : 1);
148 }
149 }