001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.Map.Entry;
024import java.util.Map;
025
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.KeyValue;
031import org.apache.hadoop.hbase.KeyValueUtil;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.mapreduce.Reducer;
034
035/**
036 * Combine Puts. Merges Put instances grouped by <code>K</code> into a single
037 * instance.
038 * @see TableMapReduceUtil
039 */
040@InterfaceAudience.Public
041public class PutCombiner<K> extends Reducer<K, Put, K, Put> {
042  private static final Logger LOG = LoggerFactory.getLogger(PutCombiner.class);
043
044  @Override
045  protected void reduce(K row, Iterable<Put> vals, Context context)
046      throws IOException, InterruptedException {
047    // Using HeapSize to create an upper bound on the memory size of
048    // the puts and flush some portion of the content while looping. This
049    // flush could result in multiple Puts for a single rowkey. That is
050    // acceptable because Combiner is run as an optimization and it's not
051    // critical that all Puts are grouped perfectly.
052    long threshold = context.getConfiguration().getLong(
053        "putcombiner.row.threshold", 1L * (1<<30));
054    int cnt = 0;
055    long curSize = 0;
056    Put put = null;
057    Map<byte[], List<Cell>> familyMap = null;
058    for (Put p : vals) {
059      cnt++;
060      if (put == null) {
061        put = p;
062        familyMap = put.getFamilyCellMap();
063      } else {
064        for (Entry<byte[], List<Cell>> entry : p.getFamilyCellMap()
065            .entrySet()) {
066          List<Cell> cells = familyMap.get(entry.getKey());
067          List<Cell> kvs = (cells != null) ? (List<Cell>) cells : null;
068          for (Cell cell : entry.getValue()) {
069            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
070            curSize += kv.heapSize();
071            if (kvs != null) {
072              kvs.add(kv);
073            }
074          }
075          if (cells == null) {
076            familyMap.put(entry.getKey(), entry.getValue());
077          }
078        }
079        if (cnt % 10 == 0) context.setStatus("Combine " + cnt);
080        if (curSize > threshold) {
081          if (LOG.isDebugEnabled()) {
082            LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
083          }
084          context.write(row, put);
085          put = null;
086          curSize = 0;
087          cnt = 0;
088        }
089      }
090    }
091    if (put != null) {
092      if (LOG.isDebugEnabled()) {
093        LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
094      }
095      context.write(row, put);
096    }
097  }
098}