001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Iterator; 024import java.util.List; 025import java.util.TreeSet; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.ArrayBackedTag; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellComparator; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.KeyValueUtil; 033import org.apache.hadoop.hbase.Tag; 034import org.apache.hadoop.hbase.TagType; 035import org.apache.hadoop.hbase.TagUtil; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.exceptions.DeserializationException; 039import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 040import org.apache.hadoop.hbase.security.visibility.CellVisibility; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.mapreduce.Reducer; 043import org.apache.hadoop.util.StringUtils; 044 045/** 046 * Emits sorted Puts. 047 * Reads in all Puts from passed Iterator, sorts them, then emits 048 * Puts in sorted order. If lots of columns per row, it will use lots of 049 * memory sorting. 050 * @see HFileOutputFormat2 051 * @see CellSortReducer 052 */ 053@InterfaceAudience.Public 054public class PutSortReducer extends 055 Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> { 056 // the cell creator 057 private CellCreator kvCreator; 058 059 @Override 060 protected void 061 setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context) 062 throws IOException, InterruptedException { 063 Configuration conf = context.getConfiguration(); 064 this.kvCreator = new CellCreator(conf); 065 } 066 067 @Override 068 protected void reduce( 069 ImmutableBytesWritable row, 070 java.lang.Iterable<Put> puts, 071 Reducer<ImmutableBytesWritable, Put, 072 ImmutableBytesWritable, KeyValue>.Context context) 073 throws java.io.IOException, InterruptedException 074 { 075 // although reduce() is called per-row, handle pathological case 076 long threshold = context.getConfiguration().getLong( 077 "putsortreducer.row.threshold", 1L * (1<<30)); 078 Iterator<Put> iter = puts.iterator(); 079 while (iter.hasNext()) { 080 TreeSet<KeyValue> map = new TreeSet<>(CellComparator.getInstance()); 081 long curSize = 0; 082 // stop at the end or the RAM threshold 083 List<Tag> tags = new ArrayList<>(); 084 while (iter.hasNext() && curSize < threshold) { 085 // clear the tags 086 tags.clear(); 087 Put p = iter.next(); 088 long t = p.getTTL(); 089 if (t != Long.MAX_VALUE) { 090 // add TTL tag if found 091 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t))); 092 } 093 byte[] acl = p.getACL(); 094 if (acl != null) { 095 // add ACL tag if found 096 tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl)); 097 } 098 try { 099 CellVisibility cellVisibility = p.getCellVisibility(); 100 if (cellVisibility != null) { 101 // add the visibility labels if any 102 tags.addAll(kvCreator.getVisibilityExpressionResolver() 103 .createVisibilityExpTags(cellVisibility.getExpression())); 104 } 105 } catch (DeserializationException e) { 106 // We just throw exception here. Should we allow other mutations to proceed by 107 // just ignoring the bad one? 108 throw new IOException("Invalid visibility expression found in mutation " + p, e); 109 } 110 for (List<Cell> cells: p.getFamilyCellMap().values()) { 111 for (Cell cell: cells) { 112 // Creating the KV which needs to be directly written to HFiles. Using the Facade 113 // KVCreator for creation of kvs. 114 KeyValue kv = null; 115 TagUtil.carryForwardTags(tags, cell); 116 if (!tags.isEmpty()) { 117 kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(), 118 cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), 119 cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), 120 cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(), 121 cell.getValueOffset(), cell.getValueLength(), tags); 122 } else { 123 kv = KeyValueUtil.ensureKeyValue(cell); 124 } 125 if (map.add(kv)) {// don't count duplicated kv into size 126 curSize += kv.heapSize(); 127 } 128 } 129 } 130 } 131 context.setStatus("Read " + map.size() + " entries of " + map.getClass() 132 + "(" + StringUtils.humanReadableInt(curSize) + ")"); 133 int index = 0; 134 for (KeyValue kv : map) { 135 context.write(row, kv); 136 if (++index % 100 == 0) 137 context.setStatus("Wrote " + index); 138 } 139 140 // if we have more entries to process 141 if (iter.hasNext()) { 142 // force flush because we cannot guarantee intra-row sorted order 143 context.write(null, null); 144 } 145 } 146 } 147}