001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Base64; 023import java.util.List; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.ArrayBackedTag; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.KeyValue; 029import org.apache.hadoop.hbase.Tag; 030import org.apache.hadoop.hbase.TagType; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 034import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; 035import org.apache.hadoop.hbase.security.visibility.CellVisibility; 036import org.apache.hadoop.hbase.security.visibility.InvalidLabelException; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.io.LongWritable; 039import org.apache.hadoop.io.Text; 040import org.apache.hadoop.mapreduce.Counter; 041import org.apache.hadoop.mapreduce.Mapper; 042 043/** 044 * Write table content out to files in hdfs. 045 */ 046@InterfaceAudience.Public 047public class TsvImporterMapper 048extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> 049{ 050 051 /** Timestamp for all inserted rows */ 052 protected long ts; 053 054 /** Column seperator */ 055 private String separator; 056 057 /** Should skip bad lines */ 058 private boolean skipBadLines; 059 /** Should skip empty columns*/ 060 private boolean skipEmptyColumns; 061 private Counter badLineCount; 062 private boolean logBadLines; 063 064 protected ImportTsv.TsvParser parser; 065 066 protected Configuration conf; 067 068 protected String cellVisibilityExpr; 069 070 protected long ttl; 071 072 protected CellCreator kvCreator; 073 074 private String hfileOutPath; 075 076 /** List of cell tags */ 077 private List<Tag> tags; 078 079 public long getTs() { 080 return ts; 081 } 082 083 public boolean getSkipBadLines() { 084 return skipBadLines; 085 } 086 087 public Counter getBadLineCount() { 088 return badLineCount; 089 } 090 091 public void incrementBadLineCount(int count) { 092 this.badLineCount.increment(count); 093 } 094 095 /** 096 * Handles initializing this class with objects specific to it (i.e., the parser). 097 * Common initialization that might be leveraged by a subsclass is done in 098 * <code>doSetup</code>. Hence a subclass may choose to override this method 099 * and call <code>doSetup</code> as well before handling it's own custom params. 100 * 101 * @param context 102 */ 103 @Override 104 protected void setup(Context context) { 105 doSetup(context); 106 107 conf = context.getConfiguration(); 108 parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), 109 separator); 110 if (parser.getRowKeyColumnIndex() == -1) { 111 throw new RuntimeException("No row key column specified"); 112 } 113 this.kvCreator = new CellCreator(conf); 114 tags = new ArrayList<>(); 115 } 116 117 /** 118 * Handles common parameter initialization that a subclass might want to leverage. 119 * @param context 120 */ 121 protected void doSetup(Context context) { 122 Configuration conf = context.getConfiguration(); 123 124 // If a custom separator has been used, 125 // decode it back from Base64 encoding. 126 separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); 127 if (separator == null) { 128 separator = ImportTsv.DEFAULT_SEPARATOR; 129 } else { 130 separator = new String(Base64.getDecoder().decode(separator)); 131 } 132 // Should never get 0 as we are setting this to a valid value in job 133 // configuration. 134 ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); 135 136 skipEmptyColumns = context.getConfiguration().getBoolean( 137 ImportTsv.SKIP_EMPTY_COLUMNS, false); 138 skipBadLines = context.getConfiguration().getBoolean( 139 ImportTsv.SKIP_LINES_CONF_KEY, true); 140 badLineCount = context.getCounter("ImportTsv", "Bad Lines"); 141 logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false); 142 hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY); 143 } 144 145 /** 146 * Convert a line of TSV text into an HBase table row. 147 */ 148 @Override 149 public void map(LongWritable offset, Text value, 150 Context context) 151 throws IOException { 152 byte[] lineBytes = value.getBytes(); 153 154 try { 155 ImportTsv.TsvParser.ParsedLine parsed = parser.parse( 156 lineBytes, value.getLength()); 157 ImmutableBytesWritable rowKey = 158 new ImmutableBytesWritable(lineBytes, 159 parsed.getRowKeyOffset(), 160 parsed.getRowKeyLength()); 161 // Retrieve timestamp if exists 162 ts = parsed.getTimestamp(ts); 163 cellVisibilityExpr = parsed.getCellVisibility(); 164 ttl = parsed.getCellTTL(); 165 166 // create tags for the parsed line 167 if (hfileOutPath != null) { 168 tags.clear(); 169 if (cellVisibilityExpr != null) { 170 tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags( 171 cellVisibilityExpr)); 172 } 173 // Add TTL directly to the KV so we can vary them when packing more than one KV 174 // into puts 175 if (ttl > 0) { 176 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); 177 } 178 } 179 Put put = new Put(rowKey.copyBytes()); 180 for (int i = 0; i < parsed.getColumnCount(); i++) { 181 if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() 182 || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex() 183 || i == parser.getCellTTLColumnIndex() || (skipEmptyColumns 184 && parsed.getColumnLength(i) == 0)) { 185 continue; 186 } 187 populatePut(lineBytes, parsed, put, i); 188 } 189 context.write(rowKey, put); 190 } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException 191 | InvalidLabelException badLine) { 192 if (logBadLines) { 193 System.err.println(value); 194 } 195 System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage()); 196 if (skipBadLines) { 197 incrementBadLineCount(1); 198 return; 199 } 200 throw new IOException(badLine); 201 } catch (InterruptedException e) { 202 e.printStackTrace(); 203 } 204 } 205 206 protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put, 207 int i) throws BadTsvLineException, IOException { 208 Cell cell = null; 209 if (hfileOutPath == null) { 210 cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), 211 parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, 212 parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, 213 parsed.getColumnOffset(i), parsed.getColumnLength(i)); 214 if (cellVisibilityExpr != null) { 215 // We won't be validating the expression here. The Visibility CP will do 216 // the validation 217 put.setCellVisibility(new CellVisibility(cellVisibilityExpr)); 218 } 219 if (ttl > 0) { 220 put.setTTL(ttl); 221 } 222 } else { 223 // Creating the KV which needs to be directly written to HFiles. Using the Facade 224 // KVCreator for creation of kvs. 225 cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), 226 parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, 227 parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i), 228 parsed.getColumnLength(i), tags); 229 } 230 put.add(cell); 231 } 232}