001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.Base64; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 024import org.apache.hadoop.hbase.util.Pair; 025import org.apache.hadoop.io.LongWritable; 026import org.apache.hadoop.io.Text; 027import org.apache.hadoop.mapreduce.Counter; 028import org.apache.hadoop.mapreduce.Mapper; 029import org.apache.yetus.audience.InterfaceAudience; 030 031/** 032 * Write table content out to map output files. 033 */ 034@InterfaceAudience.Public 035public class TsvImporterTextMapper 036 extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text> { 037 038 /** Column seperator */ 039 private String separator; 040 041 /** Should skip bad lines */ 042 private boolean skipBadLines; 043 private Counter badLineCount; 044 private boolean logBadLines; 045 046 private ImportTsv.TsvParser parser; 047 048 public boolean getSkipBadLines() { 049 return skipBadLines; 050 } 051 052 public Counter getBadLineCount() { 053 return badLineCount; 054 } 055 056 public void incrementBadLineCount(int count) { 057 this.badLineCount.increment(count); 058 } 059 060 /** 061 * Handles initializing this class with objects specific to it (i.e., the parser). Common 062 * initialization that might be leveraged by a subclass is done in <code>doSetup</code>. Hence a 063 * subclass may choose to override this method and call <code>doSetup</code> as well before 064 * handling it's own custom params. n 065 */ 066 @Override 067 protected void setup(Context context) { 068 doSetup(context); 069 070 Configuration conf = context.getConfiguration(); 071 072 parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); 073 if (parser.getRowKeyColumnIndex() == -1) { 074 throw new RuntimeException("No row key column specified"); 075 } 076 } 077 078 /** 079 * Handles common parameter initialization that a subclass might want to leverage. n 080 */ 081 protected void doSetup(Context context) { 082 Configuration conf = context.getConfiguration(); 083 084 // If a custom separator has been used, 085 // decode it back from Base64 encoding. 086 separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); 087 if (separator == null) { 088 separator = ImportTsv.DEFAULT_SEPARATOR; 089 } else { 090 separator = new String(Base64.getDecoder().decode(separator)); 091 } 092 093 skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); 094 logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false); 095 badLineCount = context.getCounter("ImportTsv", "Bad Lines"); 096 } 097 098 /** 099 * Convert a line of TSV text into an HBase table row. 100 */ 101 @Override 102 public void map(LongWritable offset, Text value, Context context) throws IOException { 103 try { 104 Pair<Integer, Integer> rowKeyOffests = 105 parser.parseRowKey(value.getBytes(), value.getLength()); 106 ImmutableBytesWritable rowKey = new ImmutableBytesWritable(value.getBytes(), 107 rowKeyOffests.getFirst(), rowKeyOffests.getSecond()); 108 context.write(rowKey, value); 109 } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException badLine) { 110 if (logBadLines) { 111 System.err.println(value); 112 } 113 System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage()); 114 if (skipBadLines) { 115 incrementBadLineCount(1); 116 return; 117 } 118 throw new IOException(badLine); 119 } catch (InterruptedException e) { 120 e.printStackTrace(); 121 Thread.currentThread().interrupt(); 122 } 123 } 124}