1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.Map.Entry;
24 import java.util.Map;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.classification.InterfaceStability;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.KeyValue;
32 import org.apache.hadoop.hbase.KeyValueUtil;
33 import org.apache.hadoop.hbase.client.Put;
34 import org.apache.hadoop.mapreduce.Reducer;
35
36
37
38
39
40
41 @InterfaceAudience.Public
42 @InterfaceStability.Evolving
43 public class PutCombiner<K> extends Reducer<K, Put, K, Put> {
44 private static final Log LOG = LogFactory.getLog(PutCombiner.class);
45
46 @Override
47 protected void reduce(K row, Iterable<Put> vals, Context context)
48 throws IOException, InterruptedException {
49
50
51
52
53
54 long threshold = context.getConfiguration().getLong(
55 "putcombiner.row.threshold", 1L * (1<<30));
56 int cnt = 0;
57 long curSize = 0;
58 Put put = null;
59 Map<byte[], List<Cell>> familyMap = null;
60 for (Put p : vals) {
61 cnt++;
62 if (put == null) {
63 put = p;
64 familyMap = put.getFamilyCellMap();
65 } else {
66 for (Entry<byte[], List<Cell>> entry : p.getFamilyCellMap()
67 .entrySet()) {
68 List<Cell> cells = familyMap.get(entry.getKey());
69 List<Cell> kvs = (cells != null) ? (List<Cell>) cells : null;
70 for (Cell cell : entry.getValue()) {
71 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
72 curSize += kv.heapSize();
73 if (kvs != null) {
74 kvs.add(kv);
75 }
76 }
77 if (cells == null) {
78 familyMap.put(entry.getKey(), entry.getValue());
79 }
80 }
81 if (cnt % 10 == 0) context.setStatus("Combine " + cnt);
82 if (curSize > threshold) {
83 if (LOG.isDebugEnabled()) {
84 LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
85 }
86 context.write(row, put);
87 put = null;
88 curSize = 0;
89 cnt = 0;
90 }
91 }
92 }
93 if (put != null) {
94 if (LOG.isDebugEnabled()) {
95 LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
96 }
97 context.write(row, put);
98 }
99 }
100 }