001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Iterator; 023import java.util.List; 024import java.util.TreeSet; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.ArrayBackedTag; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellComparator; 029import org.apache.hadoop.hbase.KeyValue; 030import org.apache.hadoop.hbase.KeyValueUtil; 031import org.apache.hadoop.hbase.Tag; 032import org.apache.hadoop.hbase.TagType; 033import org.apache.hadoop.hbase.TagUtil; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.exceptions.DeserializationException; 036import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 037import org.apache.hadoop.hbase.security.visibility.CellVisibility; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.mapreduce.Reducer; 040import org.apache.hadoop.util.StringUtils; 041import org.apache.yetus.audience.InterfaceAudience; 042 043/** 044 * Emits sorted Puts. Reads in all Puts from passed Iterator, sorts them, then emits Puts in sorted 045 * order. If lots of columns per row, it will use lots of memory sorting. 046 * @see HFileOutputFormat2 047 * @see CellSortReducer 048 */ 049@InterfaceAudience.Public 050public class PutSortReducer 051 extends Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> { 052 // the cell creator 053 private CellCreator kvCreator; 054 055 @Override 056 protected void 057 setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context) 058 throws IOException, InterruptedException { 059 Configuration conf = context.getConfiguration(); 060 this.kvCreator = new CellCreator(conf); 061 } 062 063 @Override 064 protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<Put> puts, 065 Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context) 066 throws java.io.IOException, InterruptedException { 067 // although reduce() is called per-row, handle pathological case 068 long threshold = 069 context.getConfiguration().getLong("putsortreducer.row.threshold", 1L * (1 << 30)); 070 Iterator<Put> iter = puts.iterator(); 071 while (iter.hasNext()) { 072 TreeSet<KeyValue> map = new TreeSet<>(CellComparator.getInstance()); 073 long curSize = 0; 074 // stop at the end or the RAM threshold 075 List<Tag> tags = new ArrayList<>(); 076 while (iter.hasNext() && curSize < threshold) { 077 // clear the tags 078 tags.clear(); 079 Put p = iter.next(); 080 long t = p.getTTL(); 081 if (t != Long.MAX_VALUE) { 082 // add TTL tag if found 083 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t))); 084 } 085 byte[] acl = p.getACL(); 086 if (acl != null) { 087 // add ACL tag if found 088 tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl)); 089 } 090 try { 091 CellVisibility cellVisibility = p.getCellVisibility(); 092 if (cellVisibility != null) { 093 // add the visibility labels if any 094 tags.addAll(kvCreator.getVisibilityExpressionResolver() 095 .createVisibilityExpTags(cellVisibility.getExpression())); 096 } 097 } catch (DeserializationException e) { 098 // We just throw exception here. Should we allow other mutations to proceed by 099 // just ignoring the bad one? 100 throw new IOException("Invalid visibility expression found in mutation " + p, e); 101 } 102 for (List<Cell> cells : p.getFamilyCellMap().values()) { 103 for (Cell cell : cells) { 104 // Creating the KV which needs to be directly written to HFiles. Using the Facade 105 // KVCreator for creation of kvs. 106 KeyValue kv = null; 107 TagUtil.carryForwardTags(tags, cell); 108 if (!tags.isEmpty()) { 109 kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(), 110 cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), 111 cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), 112 cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(), 113 cell.getValueOffset(), cell.getValueLength(), tags); 114 } else { 115 kv = KeyValueUtil.ensureKeyValue(cell); 116 } 117 if (map.add(kv)) {// don't count duplicated kv into size 118 curSize += kv.heapSize(); 119 } 120 } 121 } 122 } 123 context.setStatus("Read " + map.size() + " entries of " + map.getClass() + "(" 124 + StringUtils.humanReadableInt(curSize) + ")"); 125 int index = 0; 126 for (KeyValue kv : map) { 127 context.write(row, kv); 128 if (++index % 100 == 0) context.setStatus("Wrote " + index); 129 } 130 131 // if we have more entries to process 132 if (iter.hasNext()) { 133 // force flush because we cannot guarantee intra-row sorted order 134 context.write(null, null); 135 } 136 } 137 } 138}