001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.Base64;
022
023import org.apache.hadoop.io.LongWritable;
024import org.apache.hadoop.io.Text;
025import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
026import org.apache.hadoop.hbase.util.Pair;
027import org.apache.hadoop.mapreduce.Mapper;
028import org.apache.hadoop.mapreduce.Counter;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.apache.hadoop.conf.Configuration;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Write table content out to map output files.
036 */
037@InterfaceAudience.Public
038public class TsvImporterTextMapper
039    extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text> {
040  private static final Logger LOG = LoggerFactory.getLogger(TsvImporterTextMapper.class);
041
042  /** Column seperator */
043  private String separator;
044
045  /** Should skip bad lines */
046  private boolean skipBadLines;
047  private Counter badLineCount;
048  private boolean logBadLines;
049
050  private ImportTsv.TsvParser parser;
051
052  public boolean getSkipBadLines() {
053    return skipBadLines;
054  }
055
056  public Counter getBadLineCount() {
057    return badLineCount;
058  }
059
060  public void incrementBadLineCount(int count) {
061    this.badLineCount.increment(count);
062  }
063
064  /**
065   * Handles initializing this class with objects specific to it (i.e., the parser).
066   * Common initialization that might be leveraged by a subclass is done in
067   * <code>doSetup</code>. Hence a subclass may choose to override this method
068   * and call <code>doSetup</code> as well before handling it's own custom params.
069   *
070   * @param context
071   */
072  @Override
073  protected void setup(Context context) {
074    doSetup(context);
075
076    Configuration conf = context.getConfiguration();
077
078    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
079    if (parser.getRowKeyColumnIndex() == -1) {
080      throw new RuntimeException("No row key column specified");
081    }
082  }
083
084  /**
085   * Handles common parameter initialization that a subclass might want to leverage.
086   * @param context
087   */
088  protected void doSetup(Context context) {
089    Configuration conf = context.getConfiguration();
090
091    // If a custom separator has been used,
092    // decode it back from Base64 encoding.
093    separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
094    if (separator == null) {
095      separator = ImportTsv.DEFAULT_SEPARATOR;
096    } else {
097      separator = new String(Base64.getDecoder().decode(separator));
098    }
099
100    skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
101    logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false);
102    badLineCount = context.getCounter("ImportTsv", "Bad Lines");
103  }
104
105  /**
106   * Convert a line of TSV text into an HBase table row.
107   */
108  @Override
109  public void map(LongWritable offset, Text value, Context context) throws IOException {
110    try {
111      Pair<Integer,Integer> rowKeyOffests = parser.parseRowKey(value.getBytes(), value.getLength());
112      ImmutableBytesWritable rowKey = new ImmutableBytesWritable(
113          value.getBytes(), rowKeyOffests.getFirst(), rowKeyOffests.getSecond());
114      context.write(rowKey, value);
115    } catch (ImportTsv.TsvParser.BadTsvLineException|IllegalArgumentException badLine) {
116      if (logBadLines) {
117        System.err.println(value);
118      }
119      System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
120      if (skipBadLines) {
121        incrementBadLineCount(1);
122        return;
123      }
124      throw new IOException(badLine);
125    } catch (InterruptedException e) {
126      LOG.error("Interrupted while emitting TSV text", e);
127      Thread.currentThread().interrupt();
128    }
129  }
130}