001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.Map.Entry; 024import java.util.Map; 025 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.KeyValue; 031import org.apache.hadoop.hbase.KeyValueUtil; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.mapreduce.Reducer; 034 035/** 036 * Combine Puts. Merges Put instances grouped by <code>K</code> into a single 037 * instance. 038 * @see TableMapReduceUtil 039 */ 040@InterfaceAudience.Public 041public class PutCombiner<K> extends Reducer<K, Put, K, Put> { 042 private static final Logger LOG = LoggerFactory.getLogger(PutCombiner.class); 043 044 @Override 045 protected void reduce(K row, Iterable<Put> vals, Context context) 046 throws IOException, InterruptedException { 047 // Using HeapSize to create an upper bound on the memory size of 048 // the puts and flush some portion of the content while looping. This 049 // flush could result in multiple Puts for a single rowkey. That is 050 // acceptable because Combiner is run as an optimization and it's not 051 // critical that all Puts are grouped perfectly. 052 long threshold = context.getConfiguration().getLong( 053 "putcombiner.row.threshold", 1L * (1<<30)); 054 int cnt = 0; 055 long curSize = 0; 056 Put put = null; 057 Map<byte[], List<Cell>> familyMap = null; 058 for (Put p : vals) { 059 cnt++; 060 if (put == null) { 061 put = p; 062 familyMap = put.getFamilyCellMap(); 063 } else { 064 for (Entry<byte[], List<Cell>> entry : p.getFamilyCellMap() 065 .entrySet()) { 066 List<Cell> cells = familyMap.get(entry.getKey()); 067 List<Cell> kvs = (cells != null) ? (List<Cell>) cells : null; 068 for (Cell cell : entry.getValue()) { 069 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); 070 curSize += kv.heapSize(); 071 if (kvs != null) { 072 kvs.add(kv); 073 } 074 } 075 if (cells == null) { 076 familyMap.put(entry.getKey(), entry.getValue()); 077 } 078 } 079 if (cnt % 10 == 0) context.setStatus("Combine " + cnt); 080 if (curSize > threshold) { 081 if (LOG.isDebugEnabled()) { 082 LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1)); 083 } 084 context.write(row, put); 085 put = null; 086 curSize = 0; 087 cnt = 0; 088 } 089 } 090 } 091 if (put != null) { 092 if (LOG.isDebugEnabled()) { 093 LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1)); 094 } 095 context.write(row, put); 096 } 097 } 098}