001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Base64; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Set; 026import java.util.TreeSet; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.ArrayBackedTag; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellComparator; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.KeyValueUtil; 034import org.apache.hadoop.hbase.Tag; 035import org.apache.hadoop.hbase.TagType; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 038import org.apache.hadoop.hbase.security.visibility.InvalidLabelException; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.io.Text; 041import org.apache.hadoop.mapreduce.Counter; 042import org.apache.hadoop.mapreduce.Reducer; 043import org.apache.hadoop.util.StringUtils; 044 045/** 046 * Emits Sorted KeyValues. Parse the passed text and creates KeyValues. Sorts them before emit. 047 * @see HFileOutputFormat2 048 * @see CellSortReducer 049 * @see PutSortReducer 050 */ 051@InterfaceAudience.Public 052public class TextSortReducer extends 053 Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> { 054 055 /** Timestamp for all inserted rows */ 056 private long ts; 057 058 /** Column seperator */ 059 private String separator; 060 061 /** Should skip bad lines */ 062 private boolean skipBadLines; 063 064 private Counter badLineCount; 065 066 private ImportTsv.TsvParser parser; 067 068 /** Cell visibility expr **/ 069 private String cellVisibilityExpr; 070 071 /** Cell TTL */ 072 private long ttl; 073 074 private CellCreator kvCreator; 075 076 public long getTs() { 077 return ts; 078 } 079 080 public boolean getSkipBadLines() { 081 return skipBadLines; 082 } 083 084 public Counter getBadLineCount() { 085 return badLineCount; 086 } 087 088 public void incrementBadLineCount(int count) { 089 this.badLineCount.increment(count); 090 } 091 092 /** 093 * Handles initializing this class with objects specific to it (i.e., the parser). 094 * Common initialization that might be leveraged by a subsclass is done in 095 * <code>doSetup</code>. Hence a subclass may choose to override this method 096 * and call <code>doSetup</code> as well before handling it's own custom params. 097 * 098 * @param context 099 */ 100 @Override 101 protected void setup(Context context) { 102 Configuration conf = context.getConfiguration(); 103 doSetup(context, conf); 104 105 parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); 106 if (parser.getRowKeyColumnIndex() == -1) { 107 throw new RuntimeException("No row key column specified"); 108 } 109 this.kvCreator = new CellCreator(conf); 110 } 111 112 /** 113 * Handles common parameter initialization that a subclass might want to leverage. 114 * @param context 115 * @param conf 116 */ 117 protected void doSetup(Context context, Configuration conf) { 118 // If a custom separator has been used, 119 // decode it back from Base64 encoding. 120 separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); 121 if (separator == null) { 122 separator = ImportTsv.DEFAULT_SEPARATOR; 123 } else { 124 separator = Bytes.toString(Base64.getDecoder().decode(separator)); 125 } 126 127 // Should never get 0 as we are setting this to a valid value in job configuration. 128 ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); 129 130 skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); 131 badLineCount = context.getCounter("ImportTsv", "Bad Lines"); 132 } 133 134 @Override 135 protected void reduce( 136 ImmutableBytesWritable rowKey, 137 java.lang.Iterable<Text> lines, 138 Reducer<ImmutableBytesWritable, Text, 139 ImmutableBytesWritable, KeyValue>.Context context) 140 throws java.io.IOException, InterruptedException 141 { 142 // although reduce() is called per-row, handle pathological case 143 long threshold = context.getConfiguration().getLong( 144 "reducer.row.threshold", 1L * (1<<30)); 145 Iterator<Text> iter = lines.iterator(); 146 while (iter.hasNext()) { 147 Set<KeyValue> kvs = new TreeSet<>(CellComparator.getInstance()); 148 long curSize = 0; 149 // stop at the end or the RAM threshold 150 while (iter.hasNext() && curSize < threshold) { 151 Text line = iter.next(); 152 byte[] lineBytes = line.getBytes(); 153 try { 154 ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength()); 155 // Retrieve timestamp if exists 156 ts = parsed.getTimestamp(ts); 157 cellVisibilityExpr = parsed.getCellVisibility(); 158 ttl = parsed.getCellTTL(); 159 160 // create tags for the parsed line 161 List<Tag> tags = new ArrayList<>(); 162 if (cellVisibilityExpr != null) { 163 tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags( 164 cellVisibilityExpr)); 165 } 166 // Add TTL directly to the KV so we can vary them when packing more than one KV 167 // into puts 168 if (ttl > 0) { 169 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); 170 } 171 for (int i = 0; i < parsed.getColumnCount(); i++) { 172 if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() 173 || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex() 174 || i == parser.getCellTTLColumnIndex()) { 175 continue; 176 } 177 // Creating the KV which needs to be directly written to HFiles. Using the Facade 178 // KVCreator for creation of kvs. 179 Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), 180 parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length, 181 parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes, 182 parsed.getColumnOffset(i), parsed.getColumnLength(i), tags); 183 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); 184 kvs.add(kv); 185 curSize += kv.heapSize(); 186 } 187 } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException 188 | InvalidLabelException badLine) { 189 if (skipBadLines) { 190 System.err.println("Bad line." + badLine.getMessage()); 191 incrementBadLineCount(1); 192 continue; 193 } 194 throw new IOException(badLine); 195 } 196 } 197 context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass() 198 + "(" + StringUtils.humanReadableInt(curSize) + ")"); 199 int index = 0; 200 for (KeyValue kv : kvs) { 201 context.write(rowKey, kv); 202 if (++index > 0 && index % 100 == 0) 203 context.setStatus("Wrote " + index + " key values."); 204 } 205 206 // if we have more entries to process 207 if (iter.hasNext()) { 208 // force flush because we cannot guarantee intra-row sorted order 209 context.write(null, null); 210 } 211 } 212 } 213}