001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Base64;
023import java.util.List;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.ArrayBackedTag;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.KeyValue;
029import org.apache.hadoop.hbase.Tag;
030import org.apache.hadoop.hbase.TagType;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
034import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
035import org.apache.hadoop.hbase.security.visibility.CellVisibility;
036import org.apache.hadoop.hbase.security.visibility.InvalidLabelException;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.io.LongWritable;
039import org.apache.hadoop.io.Text;
040import org.apache.hadoop.mapreduce.Counter;
041import org.apache.hadoop.mapreduce.Mapper;
042
043/**
044 * Write table content out to files in hdfs.
045 */
046@InterfaceAudience.Public
047public class TsvImporterMapper
048extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
049{
050
051  /** Timestamp for all inserted rows */
052  protected long ts;
053
054  /** Column seperator */
055  private String separator;
056
057  /** Should skip bad lines */
058  private boolean skipBadLines;
059  /** Should skip empty columns*/
060  private boolean skipEmptyColumns;
061  private Counter badLineCount;
062  private boolean logBadLines;
063
064  protected ImportTsv.TsvParser parser;
065
066  protected Configuration conf;
067
068  protected String cellVisibilityExpr;
069
070  protected long ttl;
071
072  protected CellCreator kvCreator;
073
074  private String hfileOutPath;
075
076  /** List of cell tags */
077  private List<Tag> tags;
078
079  public long getTs() {
080    return ts;
081  }
082
083  public boolean getSkipBadLines() {
084    return skipBadLines;
085  }
086
087  public Counter getBadLineCount() {
088    return badLineCount;
089  }
090
091  public void incrementBadLineCount(int count) {
092    this.badLineCount.increment(count);
093  }
094
095  /**
096   * Handles initializing this class with objects specific to it (i.e., the parser).
097   * Common initialization that might be leveraged by a subsclass is done in
098   * <code>doSetup</code>. Hence a subclass may choose to override this method
099   * and call <code>doSetup</code> as well before handling it's own custom params.
100   *
101   * @param context
102   */
103  @Override
104  protected void setup(Context context) {
105    doSetup(context);
106
107    conf = context.getConfiguration();
108    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
109                           separator);
110    if (parser.getRowKeyColumnIndex() == -1) {
111      throw new RuntimeException("No row key column specified");
112    }
113    this.kvCreator = new CellCreator(conf);
114    tags = new ArrayList<>();
115  }
116
117  /**
118   * Handles common parameter initialization that a subclass might want to leverage.
119   * @param context
120   */
121  protected void doSetup(Context context) {
122    Configuration conf = context.getConfiguration();
123
124    // If a custom separator has been used,
125    // decode it back from Base64 encoding.
126    separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
127    if (separator == null) {
128      separator = ImportTsv.DEFAULT_SEPARATOR;
129    } else {
130      separator = new String(Base64.getDecoder().decode(separator));
131    }
132    // Should never get 0 as we are setting this to a valid value in job
133    // configuration.
134    ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
135
136    skipEmptyColumns = context.getConfiguration().getBoolean(
137        ImportTsv.SKIP_EMPTY_COLUMNS, false);
138    skipBadLines = context.getConfiguration().getBoolean(
139        ImportTsv.SKIP_LINES_CONF_KEY, true);
140    badLineCount = context.getCounter("ImportTsv", "Bad Lines");
141    logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false);
142    hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
143  }
144
145  /**
146   * Convert a line of TSV text into an HBase table row.
147   */
148  @Override
149  public void map(LongWritable offset, Text value,
150    Context context)
151  throws IOException {
152    byte[] lineBytes = value.getBytes();
153
154    try {
155      ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
156          lineBytes, value.getLength());
157      ImmutableBytesWritable rowKey =
158        new ImmutableBytesWritable(lineBytes,
159            parsed.getRowKeyOffset(),
160            parsed.getRowKeyLength());
161      // Retrieve timestamp if exists
162      ts = parsed.getTimestamp(ts);
163      cellVisibilityExpr = parsed.getCellVisibility();
164      ttl = parsed.getCellTTL();
165
166      // create tags for the parsed line
167      if (hfileOutPath != null) {
168        tags.clear();
169        if (cellVisibilityExpr != null) {
170          tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags(
171            cellVisibilityExpr));
172        }
173        // Add TTL directly to the KV so we can vary them when packing more than one KV
174        // into puts
175        if (ttl > 0) {
176          tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
177        }
178      }
179      Put put = new Put(rowKey.copyBytes());
180      for (int i = 0; i < parsed.getColumnCount(); i++) {
181        if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
182            || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
183            || i == parser.getCellTTLColumnIndex() || (skipEmptyColumns
184            && parsed.getColumnLength(i) == 0)) {
185          continue;
186        }
187        populatePut(lineBytes, parsed, put, i);
188      }
189      context.write(rowKey, put);
190    } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException
191        | InvalidLabelException badLine) {
192      if (logBadLines) {
193        System.err.println(value);
194      }
195      System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
196      if (skipBadLines) {
197        incrementBadLineCount(1);
198        return;
199      }
200      throw new IOException(badLine);
201    } catch (InterruptedException e) {
202      e.printStackTrace();
203    }
204  }
205
206  protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put,
207      int i) throws BadTsvLineException, IOException {
208    Cell cell = null;
209    if (hfileOutPath == null) {
210      cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
211          parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
212          parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
213          parsed.getColumnOffset(i), parsed.getColumnLength(i));
214      if (cellVisibilityExpr != null) {
215        // We won't be validating the expression here. The Visibility CP will do
216        // the validation
217        put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
218      }
219      if (ttl > 0) {
220        put.setTTL(ttl);
221      }
222    } else {
223      // Creating the KV which needs to be directly written to HFiles. Using the Facade
224      // KVCreator for creation of kvs.
225      cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
226          parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
227          parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
228          parsed.getColumnLength(i), tags);
229    }
230    put.add(cell);
231  }
232}