1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.util.TreeSet;
22
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.hbase.classification.InterfaceStability;
25 import org.apache.hadoop.hbase.KeyValue;
26 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
27 import org.apache.hadoop.mapreduce.Reducer;
28
29
30
31
32
33
34
35
36 @InterfaceAudience.Public
37 @InterfaceStability.Stable
38 public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
39 protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs,
40 org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
41 throws java.io.IOException, InterruptedException {
42 TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
43 for (KeyValue kv: kvs) {
44 try {
45 map.add(kv.clone());
46 } catch (CloneNotSupportedException e) {
47 throw new java.io.IOException(e);
48 }
49 }
50 context.setStatus("Read " + map.getClass());
51 int index = 0;
52 for (KeyValue kv: map) {
53 context.write(row, kv);
54 if (++index % 100 == 0) context.setStatus("Wrote " + index);
55 }
56 }
57 }