001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Base64;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Set;
026import java.util.TreeSet;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ArrayBackedTag;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.KeyValueUtil;
033import org.apache.hadoop.hbase.Tag;
034import org.apache.hadoop.hbase.TagType;
035import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
036import org.apache.hadoop.hbase.security.visibility.InvalidLabelException;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.io.Text;
039import org.apache.hadoop.mapreduce.Counter;
040import org.apache.hadoop.mapreduce.Reducer;
041import org.apache.hadoop.util.StringUtils;
042import org.apache.yetus.audience.InterfaceAudience;
043
044/**
045 * Emits Sorted KeyValues. Parse the passed text and creates KeyValues. Sorts them before emit.
046 * @see HFileOutputFormat2
047 * @see CellSortReducer
048 * @see PutSortReducer
049 */
050@InterfaceAudience.Public
051public class TextSortReducer
052  extends Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {
053
054  /** Timestamp for all inserted rows */
055  private long ts;
056
057  /** Column seperator */
058  private String separator;
059
060  /** Should skip bad lines */
061  private boolean skipBadLines;
062
063  private Counter badLineCount;
064
065  private ImportTsv.TsvParser parser;
066
067  /** Cell visibility expr **/
068  private String cellVisibilityExpr;
069
070  /** Cell TTL */
071  private long ttl;
072
073  private CellCreator kvCreator;
074
075  public long getTs() {
076    return ts;
077  }
078
079  public boolean getSkipBadLines() {
080    return skipBadLines;
081  }
082
083  public Counter getBadLineCount() {
084    return badLineCount;
085  }
086
087  public void incrementBadLineCount(int count) {
088    this.badLineCount.increment(count);
089  }
090
091  /**
092   * Handles initializing this class with objects specific to it (i.e., the parser). Common
093   * initialization that might be leveraged by a subsclass is done in <code>doSetup</code>. Hence a
094   * subclass may choose to override this method and call <code>doSetup</code> as well before
095   * handling it's own custom params.
096   */
097  @Override
098  protected void setup(Context context) {
099    Configuration conf = context.getConfiguration();
100    doSetup(context, conf);
101
102    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
103    if (parser.getRowKeyColumnIndex() == -1) {
104      throw new RuntimeException("No row key column specified");
105    }
106    this.kvCreator = new CellCreator(conf);
107  }
108
109  /**
110   * Handles common parameter initialization that a subclass might want to leverage.
111   */
112  protected void doSetup(Context context, Configuration conf) {
113    // If a custom separator has been used,
114    // decode it back from Base64 encoding.
115    separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
116    if (separator == null) {
117      separator = ImportTsv.DEFAULT_SEPARATOR;
118    } else {
119      separator = Bytes.toString(Base64.getDecoder().decode(separator));
120    }
121
122    // Should never get 0 as we are setting this to a valid value in job configuration.
123    ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
124
125    skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
126    badLineCount = context.getCounter("ImportTsv", "Bad Lines");
127  }
128
129  @Override
130  protected void reduce(ImmutableBytesWritable rowKey, java.lang.Iterable<Text> lines,
131    Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue>.Context context)
132    throws java.io.IOException, InterruptedException {
133    // although reduce() is called per-row, handle pathological case
134    long threshold = context.getConfiguration().getLong("reducer.row.threshold", 1L * (1 << 30));
135    Iterator<Text> iter = lines.iterator();
136    while (iter.hasNext()) {
137      Set<KeyValue> kvs = new TreeSet<>(CellComparator.getInstance());
138      long curSize = 0;
139      // stop at the end or the RAM threshold
140      while (iter.hasNext() && curSize < threshold) {
141        Text line = iter.next();
142        byte[] lineBytes = line.getBytes();
143        try {
144          ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength());
145          // Retrieve timestamp if exists
146          ts = parsed.getTimestamp(ts);
147          cellVisibilityExpr = parsed.getCellVisibility();
148          ttl = parsed.getCellTTL();
149
150          // create tags for the parsed line
151          List<Tag> tags = new ArrayList<>();
152          if (cellVisibilityExpr != null) {
153            tags.addAll(kvCreator.getVisibilityExpressionResolver()
154              .createVisibilityExpTags(cellVisibilityExpr));
155          }
156          // Add TTL directly to the KV so we can vary them when packing more than one KV
157          // into puts
158          if (ttl > 0) {
159            tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
160          }
161          for (int i = 0; i < parsed.getColumnCount(); i++) {
162            if (
163              i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
164                || i == parser.getAttributesKeyColumnIndex()
165                || i == parser.getCellVisibilityColumnIndex() || i == parser.getCellTTLColumnIndex()
166            ) {
167              continue;
168            }
169            // Creating the KV which needs to be directly written to HFiles. Using the Facade
170            // KVCreator for creation of kvs.
171            Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
172              parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
173              parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
174              parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
175            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
176            kvs.add(kv);
177            curSize += kv.heapSize();
178          }
179        } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException
180          | InvalidLabelException badLine) {
181          if (skipBadLines) {
182            System.err.println("Bad line." + badLine.getMessage());
183            incrementBadLineCount(1);
184            continue;
185          }
186          throw new IOException(badLine);
187        }
188      }
189      context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass() + "("
190        + StringUtils.humanReadableInt(curSize) + ")");
191      int index = 0;
192      for (KeyValue kv : kvs) {
193        context.write(rowKey, kv);
194        if (++index > 0 && index % 100 == 0) context.setStatus("Wrote " + index + " key values.");
195      }
196
197      // if we have more entries to process
198      if (iter.hasNext()) {
199        // force flush because we cannot guarantee intra-row sorted order
200        context.write(null, null);
201      }
202    }
203  }
204}