View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  import java.util.List;
23  import java.util.Map.Entry;
24  import java.util.Map;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.classification.InterfaceStability;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.KeyValue;
32  import org.apache.hadoop.hbase.KeyValueUtil;
33  import org.apache.hadoop.hbase.client.Put;
34  import org.apache.hadoop.mapreduce.Reducer;
35  
36  /**
37   * Combine Puts. Merges Put instances grouped by <code>K</code> into a single
38   * instance.
39   * @see TableMapReduceUtil
40   */
41  @InterfaceAudience.Public
42  @InterfaceStability.Evolving
43  public class PutCombiner<K> extends Reducer<K, Put, K, Put> {
44    private static final Log LOG = LogFactory.getLog(PutCombiner.class);
45  
46    @Override
47    protected void reduce(K row, Iterable<Put> vals, Context context)
48        throws IOException, InterruptedException {
49      // Using HeapSize to create an upper bound on the memory size of
50      // the puts and flush some portion of the content while looping. This
51      // flush could result in multiple Puts for a single rowkey. That is
52      // acceptable because Combiner is run as an optimization and it's not
53      // critical that all Puts are grouped perfectly.
54      long threshold = context.getConfiguration().getLong(
55          "putcombiner.row.threshold", 1L * (1<<30));
56      int cnt = 0;
57      long curSize = 0;
58      Put put = null;
59      Map<byte[], List<Cell>> familyMap = null;
60      for (Put p : vals) {
61        cnt++;
62        if (put == null) {
63          put = p;
64          familyMap = put.getFamilyCellMap();
65        } else {
66          for (Entry<byte[], List<Cell>> entry : p.getFamilyCellMap()
67              .entrySet()) {
68            List<Cell> cells = familyMap.get(entry.getKey());
69            List<Cell> kvs = (cells != null) ? (List<Cell>) cells : null;
70            for (Cell cell : entry.getValue()) {
71              KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
72              curSize += kv.heapSize();
73              if (kvs != null) {
74                kvs.add(kv);
75              }
76            }
77            if (cells == null) {
78              familyMap.put(entry.getKey(), entry.getValue());
79            }
80          }
81          if (cnt % 10 == 0) context.setStatus("Combine " + cnt);
82          if (curSize > threshold) {
83            if (LOG.isDebugEnabled()) {
84              LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
85            }
86            context.write(row, put);
87            put = null;
88            curSize = 0;
89            cnt = 0;
90          }
91        }
92      }
93      if (put != null) {
94        if (LOG.isDebugEnabled()) {
95          LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
96        }
97        context.write(row, put);
98      }
99    }
100 }