001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Base64;
023import java.util.List;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.ArrayBackedTag;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.KeyValue;
028import org.apache.hadoop.hbase.Tag;
029import org.apache.hadoop.hbase.TagType;
030import org.apache.hadoop.hbase.client.Put;
031import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
032import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
033import org.apache.hadoop.hbase.security.visibility.CellVisibility;
034import org.apache.hadoop.hbase.security.visibility.InvalidLabelException;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.io.LongWritable;
037import org.apache.hadoop.io.Text;
038import org.apache.hadoop.mapreduce.Counter;
039import org.apache.hadoop.mapreduce.Mapper;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Write table content out to files in hdfs.
046 */
047@InterfaceAudience.Public
048public class TsvImporterMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
049  private static final Logger LOG = LoggerFactory.getLogger(TsvImporterMapper.class);
050
051  /** Timestamp for all inserted rows */
052  protected long ts;
053
054  /** Column seperator */
055  private String separator;
056
057  /** Should skip bad lines */
058  private boolean skipBadLines;
059  /** Should skip empty columns */
060  private boolean skipEmptyColumns;
061  private Counter badLineCount;
062  private boolean logBadLines;
063
064  protected ImportTsv.TsvParser parser;
065
066  protected Configuration conf;
067
068  protected String cellVisibilityExpr;
069
070  protected long ttl;
071
072  protected CellCreator kvCreator;
073
074  private String hfileOutPath;
075
076  /** List of cell tags */
077  private List<Tag> tags;
078
079  public long getTs() {
080    return ts;
081  }
082
083  public boolean getSkipBadLines() {
084    return skipBadLines;
085  }
086
087  public Counter getBadLineCount() {
088    return badLineCount;
089  }
090
091  public void incrementBadLineCount(int count) {
092    this.badLineCount.increment(count);
093  }
094
095  /**
096   * Handles initializing this class with objects specific to it (i.e., the parser). Common
097   * initialization that might be leveraged by a subsclass is done in <code>doSetup</code>. Hence a
098   * subclass may choose to override this method and call <code>doSetup</code> as well before
099   * handling it's own custom params.
100   */
101  @Override
102  protected void setup(Context context) {
103    doSetup(context);
104
105    conf = context.getConfiguration();
106    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
107    if (parser.getRowKeyColumnIndex() == -1) {
108      throw new RuntimeException("No row key column specified");
109    }
110    this.kvCreator = new CellCreator(conf);
111    tags = new ArrayList<>();
112  }
113
114  /**
115   * Handles common parameter initialization that a subclass might want to leverage.
116   */
117  protected void doSetup(Context context) {
118    Configuration conf = context.getConfiguration();
119
120    // If a custom separator has been used,
121    // decode it back from Base64 encoding.
122    separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
123    if (separator == null) {
124      separator = ImportTsv.DEFAULT_SEPARATOR;
125    } else {
126      separator = new String(Base64.getDecoder().decode(separator));
127    }
128    // Should never get 0 as we are setting this to a valid value in job
129    // configuration.
130    ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
131
132    skipEmptyColumns = context.getConfiguration().getBoolean(ImportTsv.SKIP_EMPTY_COLUMNS, false);
133    skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
134    badLineCount = context.getCounter("ImportTsv", "Bad Lines");
135    logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false);
136    hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
137  }
138
139  /**
140   * Convert a line of TSV text into an HBase table row.
141   */
142  @Override
143  public void map(LongWritable offset, Text value, Context context) throws IOException {
144    byte[] lineBytes = value.getBytes();
145
146    try {
147      ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, value.getLength());
148      ImmutableBytesWritable rowKey =
149        new ImmutableBytesWritable(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength());
150      // Retrieve timestamp if exists
151      ts = parsed.getTimestamp(ts);
152      cellVisibilityExpr = parsed.getCellVisibility();
153      ttl = parsed.getCellTTL();
154
155      // create tags for the parsed line
156      if (hfileOutPath != null) {
157        tags.clear();
158        if (cellVisibilityExpr != null) {
159          tags.addAll(kvCreator.getVisibilityExpressionResolver()
160            .createVisibilityExpTags(cellVisibilityExpr));
161        }
162        // Add TTL directly to the KV so we can vary them when packing more than one KV
163        // into puts
164        if (ttl > 0) {
165          tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
166        }
167      }
168      Put put = new Put(rowKey.copyBytes());
169      for (int i = 0; i < parsed.getColumnCount(); i++) {
170        if (
171          i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
172            || i == parser.getAttributesKeyColumnIndex()
173            || i == parser.getCellVisibilityColumnIndex() || i == parser.getCellTTLColumnIndex()
174            || (skipEmptyColumns && parsed.getColumnLength(i) == 0)
175        ) {
176          continue;
177        }
178        populatePut(lineBytes, parsed, put, i);
179      }
180      context.write(rowKey, put);
181    } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException
182      | InvalidLabelException badLine) {
183      if (logBadLines) {
184        System.err.println(value);
185      }
186      System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
187      if (skipBadLines) {
188        incrementBadLineCount(1);
189        return;
190      }
191      throw new IOException(badLine);
192    } catch (InterruptedException e) {
193      LOG.error("Interrupted while emitting put", e);
194      Thread.currentThread().interrupt();
195    }
196  }
197
198  protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put,
199    int i) throws BadTsvLineException, IOException {
200    Cell cell = null;
201    if (hfileOutPath == null) {
202      cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
203        parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
204        parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, parsed.getColumnOffset(i),
205        parsed.getColumnLength(i));
206      if (cellVisibilityExpr != null) {
207        // We won't be validating the expression here. The Visibility CP will do
208        // the validation
209        put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
210      }
211      if (ttl > 0) {
212        put.setTTL(ttl);
213      }
214    } else {
215      // Creating the KV which needs to be directly written to HFiles. Using the Facade
216      // KVCreator for creation of kvs.
217      cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
218        parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
219        parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
220        parsed.getColumnLength(i), tags);
221    }
222    put.add(cell);
223  }
224}