001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Map;
023import java.util.Map.Entry;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.KeyValue;
026import org.apache.hadoop.hbase.KeyValueUtil;
027import org.apache.hadoop.hbase.client.Put;
028import org.apache.hadoop.mapreduce.Reducer;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Combine Puts. Merges Put instances grouped by <code>K</code> into a single instance.
035 * @see TableMapReduceUtil
036 */
037@InterfaceAudience.Public
038public class PutCombiner<K> extends Reducer<K, Put, K, Put> {
039  private static final Logger LOG = LoggerFactory.getLogger(PutCombiner.class);
040
041  @Override
042  protected void reduce(K row, Iterable<Put> vals, Context context)
043    throws IOException, InterruptedException {
044    // Using HeapSize to create an upper bound on the memory size of
045    // the puts and flush some portion of the content while looping. This
046    // flush could result in multiple Puts for a single rowkey. That is
047    // acceptable because Combiner is run as an optimization and it's not
048    // critical that all Puts are grouped perfectly.
049    long threshold =
050      context.getConfiguration().getLong("putcombiner.row.threshold", 1L * (1 << 30));
051    int cnt = 0;
052    long curSize = 0;
053    Put put = null;
054    Map<byte[], List<Cell>> familyMap = null;
055    for (Put p : vals) {
056      cnt++;
057      if (put == null) {
058        put = p;
059        familyMap = put.getFamilyCellMap();
060      } else {
061        for (Entry<byte[], List<Cell>> entry : p.getFamilyCellMap().entrySet()) {
062          List<Cell> cells = familyMap.get(entry.getKey());
063          List<Cell> kvs = (cells != null) ? (List<Cell>) cells : null;
064          for (Cell cell : entry.getValue()) {
065            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
066            curSize += kv.heapSize();
067            if (kvs != null) {
068              kvs.add(kv);
069            }
070          }
071          if (cells == null) {
072            familyMap.put(entry.getKey(), entry.getValue());
073          }
074        }
075        if (cnt % 10 == 0) context.setStatus("Combine " + cnt);
076        if (curSize > threshold) {
077          if (LOG.isDebugEnabled()) {
078            LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
079          }
080          context.write(row, put);
081          put = null;
082          curSize = 0;
083          cnt = 0;
084        }
085      }
086    }
087    if (put != null) {
088      if (LOG.isDebugEnabled()) {
089        LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
090      }
091      context.write(row, put);
092    }
093  }
094}