001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.Base64; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 024import org.apache.hadoop.hbase.util.Pair; 025import org.apache.hadoop.io.LongWritable; 026import org.apache.hadoop.io.Text; 027import org.apache.hadoop.mapreduce.Counter; 028import org.apache.hadoop.mapreduce.Mapper; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Write table content out to map output files. 035 */ 036@InterfaceAudience.Public 037public class TsvImporterTextMapper 038 extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text> { 039 private static final Logger LOG = LoggerFactory.getLogger(TsvImporterTextMapper.class); 040 041 /** Column seperator */ 042 private String separator; 043 044 /** Should skip bad lines */ 045 private boolean skipBadLines; 046 private Counter badLineCount; 047 private boolean logBadLines; 048 049 private ImportTsv.TsvParser parser; 050 051 public boolean getSkipBadLines() { 052 return skipBadLines; 053 } 054 055 public Counter getBadLineCount() { 056 return badLineCount; 057 } 058 059 public void incrementBadLineCount(int count) { 060 this.badLineCount.increment(count); 061 } 062 063 /** 064 * Handles initializing this class with objects specific to it (i.e., the parser). Common 065 * initialization that might be leveraged by a subclass is done in <code>doSetup</code>. Hence a 066 * subclass may choose to override this method and call <code>doSetup</code> as well before 067 * handling it's own custom params. 068 */ 069 @Override 070 protected void setup(Context context) { 071 doSetup(context); 072 073 Configuration conf = context.getConfiguration(); 074 075 parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); 076 if (parser.getRowKeyColumnIndex() == -1) { 077 throw new RuntimeException("No row key column specified"); 078 } 079 } 080 081 /** 082 * Handles common parameter initialization that a subclass might want to leverage. 083 */ 084 protected void doSetup(Context context) { 085 Configuration conf = context.getConfiguration(); 086 087 // If a custom separator has been used, 088 // decode it back from Base64 encoding. 089 separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); 090 if (separator == null) { 091 separator = ImportTsv.DEFAULT_SEPARATOR; 092 } else { 093 separator = new String(Base64.getDecoder().decode(separator)); 094 } 095 096 skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); 097 logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false); 098 badLineCount = context.getCounter("ImportTsv", "Bad Lines"); 099 } 100 101 /** 102 * Convert a line of TSV text into an HBase table row. 103 */ 104 @Override 105 public void map(LongWritable offset, Text value, Context context) throws IOException { 106 try { 107 Pair<Integer, Integer> rowKeyOffests = 108 parser.parseRowKey(value.getBytes(), value.getLength()); 109 ImmutableBytesWritable rowKey = new ImmutableBytesWritable(value.getBytes(), 110 rowKeyOffests.getFirst(), rowKeyOffests.getSecond()); 111 context.write(rowKey, value); 112 } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException badLine) { 113 if (logBadLines) { 114 System.err.println(value); 115 } 116 System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage()); 117 if (skipBadLines) { 118 incrementBadLineCount(1); 119 return; 120 } 121 throw new IOException(badLine); 122 } catch (InterruptedException e) { 123 LOG.error("Interrupted while emitting TSV text", e); 124 Thread.currentThread().interrupt(); 125 } 126 } 127}