001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Base64; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Set; 026import java.util.TreeSet; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.ArrayBackedTag; 029import org.apache.hadoop.hbase.CellComparator; 030import org.apache.hadoop.hbase.ExtendedCell; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.KeyValueUtil; 033import org.apache.hadoop.hbase.Tag; 034import org.apache.hadoop.hbase.TagType; 035import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 036import org.apache.hadoop.hbase.security.visibility.InvalidLabelException; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.io.Text; 039import org.apache.hadoop.mapreduce.Counter; 040import org.apache.hadoop.mapreduce.Reducer; 041import org.apache.hadoop.util.StringUtils; 042import org.apache.yetus.audience.InterfaceAudience; 043 044/** 045 * Emits Sorted KeyValues. Parse the passed text and creates KeyValues. Sorts them before emit. 046 * @see HFileOutputFormat2 047 * @see CellSortReducer 048 * @see PutSortReducer 049 */ 050@InterfaceAudience.Public 051public class TextSortReducer 052 extends Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> { 053 054 /** Timestamp for all inserted rows */ 055 private long ts; 056 057 /** Column seperator */ 058 private String separator; 059 060 /** Should skip bad lines */ 061 private boolean skipBadLines; 062 063 private Counter badLineCount; 064 065 private ImportTsv.TsvParser parser; 066 067 /** Cell visibility expr **/ 068 private String cellVisibilityExpr; 069 070 /** Cell TTL */ 071 private long ttl; 072 073 private CellCreator kvCreator; 074 075 public long getTs() { 076 return ts; 077 } 078 079 public boolean getSkipBadLines() { 080 return skipBadLines; 081 } 082 083 public Counter getBadLineCount() { 084 return badLineCount; 085 } 086 087 public void incrementBadLineCount(int count) { 088 this.badLineCount.increment(count); 089 } 090 091 /** 092 * Handles initializing this class with objects specific to it (i.e., the parser). Common 093 * initialization that might be leveraged by a subsclass is done in <code>doSetup</code>. Hence a 094 * subclass may choose to override this method and call <code>doSetup</code> as well before 095 * handling it's own custom params. 096 */ 097 @Override 098 protected void setup(Context context) { 099 Configuration conf = context.getConfiguration(); 100 doSetup(context, conf); 101 102 parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); 103 if (parser.getRowKeyColumnIndex() == -1) { 104 throw new RuntimeException("No row key column specified"); 105 } 106 this.kvCreator = new CellCreator(conf); 107 } 108 109 /** 110 * Handles common parameter initialization that a subclass might want to leverage. 111 */ 112 protected void doSetup(Context context, Configuration conf) { 113 // If a custom separator has been used, 114 // decode it back from Base64 encoding. 115 separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); 116 if (separator == null) { 117 separator = ImportTsv.DEFAULT_SEPARATOR; 118 } else { 119 separator = Bytes.toString(Base64.getDecoder().decode(separator)); 120 } 121 122 // Should never get 0 as we are setting this to a valid value in job configuration. 123 ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); 124 125 skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); 126 badLineCount = context.getCounter("ImportTsv", "Bad Lines"); 127 } 128 129 @Override 130 protected void reduce(ImmutableBytesWritable rowKey, java.lang.Iterable<Text> lines, 131 Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue>.Context context) 132 throws java.io.IOException, InterruptedException { 133 // although reduce() is called per-row, handle pathological case 134 long threshold = context.getConfiguration().getLong("reducer.row.threshold", 1L * (1 << 30)); 135 Iterator<Text> iter = lines.iterator(); 136 while (iter.hasNext()) { 137 Set<KeyValue> kvs = new TreeSet<>(CellComparator.getInstance()); 138 long curSize = 0; 139 // stop at the end or the RAM threshold 140 while (iter.hasNext() && curSize < threshold) { 141 Text line = iter.next(); 142 byte[] lineBytes = line.getBytes(); 143 try { 144 ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength()); 145 // Retrieve timestamp if exists 146 ts = parsed.getTimestamp(ts); 147 cellVisibilityExpr = parsed.getCellVisibility(); 148 ttl = parsed.getCellTTL(); 149 150 // create tags for the parsed line 151 List<Tag> tags = new ArrayList<>(); 152 if (cellVisibilityExpr != null) { 153 tags.addAll(kvCreator.getVisibilityExpressionResolver() 154 .createVisibilityExpTags(cellVisibilityExpr)); 155 } 156 // Add TTL directly to the KV so we can vary them when packing more than one KV 157 // into puts 158 if (ttl > 0) { 159 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); 160 } 161 for (int i = 0; i < parsed.getColumnCount(); i++) { 162 if ( 163 i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() 164 || i == parser.getAttributesKeyColumnIndex() 165 || i == parser.getCellVisibilityColumnIndex() || i == parser.getCellTTLColumnIndex() 166 ) { 167 continue; 168 } 169 // Creating the KV which needs to be directly written to HFiles. Using the Facade 170 // KVCreator for creation of kvs. 171 ExtendedCell cell = (ExtendedCell) this.kvCreator.create(lineBytes, 172 parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0, 173 parser.getFamily(i).length, parser.getQualifier(i), 0, parser.getQualifier(i).length, 174 ts, lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i), tags); 175 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); 176 kvs.add(kv); 177 curSize += kv.heapSize(); 178 } 179 } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException 180 | InvalidLabelException badLine) { 181 if (skipBadLines) { 182 System.err.println("Bad line." + badLine.getMessage()); 183 incrementBadLineCount(1); 184 continue; 185 } 186 throw new IOException(badLine); 187 } 188 } 189 context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass() + "(" 190 + StringUtils.humanReadableInt(curSize) + ")"); 191 int index = 0; 192 for (KeyValue kv : kvs) { 193 context.write(rowKey, kv); 194 if (++index > 0 && index % 100 == 0) context.setStatus("Wrote " + index + " key values."); 195 } 196 197 // if we have more entries to process 198 if (iter.hasNext()) { 199 // force flush because we cannot guarantee intra-row sorted order 200 context.write(null, null); 201 } 202 } 203 } 204}