1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.TreeSet;
24
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.KeyValueUtil;
30 import org.apache.hadoop.hbase.client.Put;
31 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32 import org.apache.hadoop.mapreduce.Reducer;
33 import org.apache.hadoop.util.StringUtils;
34
35
36
37
38
39
40
41
42
43 @InterfaceAudience.Public
44 @InterfaceStability.Stable
45 public class PutSortReducer extends
46 Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
47
48 @Override
49 protected void reduce(
50 ImmutableBytesWritable row,
51 java.lang.Iterable<Put> puts,
52 Reducer<ImmutableBytesWritable, Put,
53 ImmutableBytesWritable, KeyValue>.Context context)
54 throws java.io.IOException, InterruptedException
55 {
56
57 long threshold = context.getConfiguration().getLong(
58 "putsortreducer.row.threshold", 1L * (1<<30));
59 Iterator<Put> iter = puts.iterator();
60 while (iter.hasNext()) {
61 TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
62 long curSize = 0;
63
64 while (iter.hasNext() && curSize < threshold) {
65 Put p = iter.next();
66 for (List<Cell> cells: p.getFamilyCellMap().values()) {
67 for (Cell cell: cells) {
68 KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell);
69 map.add(kv);
70 curSize += kv.heapSize();
71 }
72 }
73 }
74 context.setStatus("Read " + map.size() + " entries of " + map.getClass()
75 + "(" + StringUtils.humanReadableInt(curSize) + ")");
76 int index = 0;
77 for (KeyValue kv : map) {
78 context.write(row, kv);
79 if (++index % 100 == 0)
80 context.setStatus("Wrote " + index);
81 }
82
83
84 if (iter.hasNext()) {
85
86 context.write(null, null);
87 }
88 }
89 }
90 }