001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.nio.charset.Charset;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.TreeMap;
037import java.util.TreeSet;
038import java.util.UUID;
039import java.util.function.Function;
040import java.util.stream.Collectors;
041import org.apache.commons.lang3.StringUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellUtil;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.HRegionLocation;
049import org.apache.hadoop.hbase.KeyValue;
050import org.apache.hadoop.hbase.PrivateCellUtil;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
054import org.apache.hadoop.hbase.client.Connection;
055import org.apache.hadoop.hbase.client.ConnectionFactory;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.RegionLocator;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.client.TableDescriptor;
060import org.apache.hadoop.hbase.fs.HFileSystem;
061import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
062import org.apache.hadoop.hbase.io.compress.Compression;
063import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
064import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
065import org.apache.hadoop.hbase.io.hfile.CacheConfig;
066import org.apache.hadoop.hbase.io.hfile.HFile;
067import org.apache.hadoop.hbase.io.hfile.HFileContext;
068import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
069import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
070import org.apache.hadoop.hbase.regionserver.BloomType;
071import org.apache.hadoop.hbase.regionserver.HStore;
072import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
073import org.apache.hadoop.hbase.regionserver.StoreUtils;
074import org.apache.hadoop.hbase.util.BloomFilterUtil;
075import org.apache.hadoop.hbase.util.Bytes;
076import org.apache.hadoop.hbase.util.CommonFSUtils;
077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
078import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
079import org.apache.hadoop.io.NullWritable;
080import org.apache.hadoop.io.SequenceFile;
081import org.apache.hadoop.io.Text;
082import org.apache.hadoop.mapreduce.Job;
083import org.apache.hadoop.mapreduce.OutputCommitter;
084import org.apache.hadoop.mapreduce.OutputFormat;
085import org.apache.hadoop.mapreduce.RecordWriter;
086import org.apache.hadoop.mapreduce.TaskAttemptContext;
087import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
088import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
089import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
090import org.apache.yetus.audience.InterfaceAudience;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094/**
095 * Writes HFiles. Passed Cells must arrive in order.
096 * Writes current time as the sequence id for the file. Sets the major compacted
097 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
098 * all HFiles being written.
099 * <p>
100 * Using this class as part of a MapReduce job is best done
101 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
102 */
103@InterfaceAudience.Public
104public class HFileOutputFormat2
105    extends FileOutputFormat<ImmutableBytesWritable, Cell> {
106  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
107  static class TableInfo {
108    private TableDescriptor tableDesctiptor;
109    private RegionLocator regionLocator;
110
111    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
112      this.tableDesctiptor = tableDesctiptor;
113      this.regionLocator = regionLocator;
114    }
115
116    public TableDescriptor getTableDescriptor() {
117      return tableDesctiptor;
118    }
119
120    public RegionLocator getRegionLocator() {
121      return regionLocator;
122    }
123  }
124
125  protected static final byte[] tableSeparator = Bytes.toBytes(";");
126
127  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
128    return Bytes.add(tableName, tableSeparator, suffix);
129  }
130
131  // The following constants are private since these are used by
132  // HFileOutputFormat2 to internally transfer data between job setup and
133  // reducer run using conf.
134  // These should not be changed by the client.
135  static final String COMPRESSION_FAMILIES_CONF_KEY =
136      "hbase.hfileoutputformat.families.compression";
137  static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
138      "hbase.hfileoutputformat.families.bloomtype";
139  static final String BLOOM_PARAM_FAMILIES_CONF_KEY =
140      "hbase.hfileoutputformat.families.bloomparam";
141  static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
142      "hbase.mapreduce.hfileoutputformat.blocksize";
143  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
144      "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
145
146  // This constant is public since the client can modify this when setting
147  // up their conf object and thus refer to this symbol.
148  // It is present for backwards compatibility reasons. Use it only to
149  // override the auto-detection of datablock encoding and compression.
150  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
151      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
152  public static final String COMPRESSION_OVERRIDE_CONF_KEY =
153      "hbase.mapreduce.hfileoutputformat.compression";
154
155  /**
156   * Keep locality while generating HFiles for bulkload. See HBASE-12596
157   */
158  public static final String LOCALITY_SENSITIVE_CONF_KEY =
159      "hbase.bulkload.locality.sensitive.enabled";
160  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
161  static final String OUTPUT_TABLE_NAME_CONF_KEY =
162      "hbase.mapreduce.hfileoutputformat.table.name";
163  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
164          "hbase.mapreduce.use.multi.table.hfileoutputformat";
165
166  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
167  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
168
169  @Override
170  public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
171      final TaskAttemptContext context) throws IOException, InterruptedException {
172    return createRecordWriter(context, this.getOutputCommitter(context));
173  }
174
175  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
176    return combineTableNameSuffix(tableName, family);
177  }
178
179  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(
180    final TaskAttemptContext context, final OutputCommitter committer) throws IOException {
181
182    // Get the path of the temporary output file
183    final Path outputDir = ((FileOutputCommitter)committer).getWorkPath();
184    final Configuration conf = context.getConfiguration();
185    final boolean writeMultipleTables =
186      conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
187    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
188    if (writeTableNames == null || writeTableNames.isEmpty()) {
189      throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty");
190    }
191    final FileSystem fs = outputDir.getFileSystem(conf);
192    // These configs. are from hbase-*.xml
193    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
194        HConstants.DEFAULT_MAX_FILE_SIZE);
195    // Invented config.  Add to hbase-*.xml if other than default compression.
196    final String defaultCompressionStr = conf.get("hfile.compression",
197        Compression.Algorithm.NONE.getName());
198    final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
199    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
200    final Algorithm overriddenCompression = compressionStr != null ?
201      Compression.getCompressionAlgorithmByName(compressionStr): null;
202    final boolean compactionExclude = conf.getBoolean(
203        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
204    final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
205            Bytes.toString(tableSeparator))).collect(Collectors.toSet());
206
207    // create a map from column family to the compression algorithm
208    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
209    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
210    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
211    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
212
213    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
214    final Map<byte[], DataBlockEncoding> datablockEncodingMap
215        = createFamilyDataBlockEncodingMap(conf);
216    final DataBlockEncoding overriddenEncoding = dataBlockEncodingStr != null ?
217      DataBlockEncoding.valueOf(dataBlockEncodingStr) : null;
218
219    return new RecordWriter<ImmutableBytesWritable, V>() {
220      // Map of families to writers and how much has been output on the writer.
221      private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
222      private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR);
223      private final long now = EnvironmentEdgeManager.currentTime();
224      private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames);
225
226      @Override
227      public void write(ImmutableBytesWritable row, V cell) throws IOException {
228        Cell kv = cell;
229        // null input == user explicitly wants to flush
230        if (row == null && kv == null) {
231          rollWriters(null);
232          return;
233        }
234
235        byte[] rowKey = CellUtil.cloneRow(kv);
236        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
237        byte[] family = CellUtil.cloneFamily(kv);
238        if (writeMultipleTables) {
239          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
240          tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString()
241              .getBytes(Charset.defaultCharset());
242          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
243            throw new IllegalArgumentException("TableName " + Bytes.toString(tableNameBytes) +
244              " not expected");
245          }
246        }
247        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
248
249        WriterLength wl = this.writers.get(tableAndFamily);
250
251        // If this is a new column family, verify that the directory exists
252        if (wl == null) {
253          Path writerPath = null;
254          if (writeMultipleTables) {
255            Path tableRelPath = getTableRelativePath(tableNameBytes);
256            writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family)));
257          } else {
258            writerPath = new Path(outputDir, Bytes.toString(family));
259          }
260          fs.mkdirs(writerPath);
261          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
262        }
263
264        // This can only happen once a row is finished though
265        if (wl != null && wl.written + length >= maxsize
266                && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) {
267          rollWriters(wl);
268        }
269
270        // create a new WAL writer, if necessary
271        if (wl == null || wl.writer == null) {
272          InetSocketAddress[] favoredNodes = null;
273          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
274            HRegionLocation loc = null;
275            String tableName = Bytes.toString(tableNameBytes);
276            if (tableName != null) {
277              try (Connection connection = ConnectionFactory.createConnection(conf);
278                  RegionLocator locator =
279                      connection.getRegionLocator(TableName.valueOf(tableName))) {
280                loc = locator.getRegionLocation(rowKey);
281              } catch (Throwable e) {
282                LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey),
283                  tableName, e);
284                loc = null;
285              }
286            }
287            if (null == loc) {
288              LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
289            } else {
290              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
291              InetSocketAddress initialIsa =
292                  new InetSocketAddress(loc.getHostname(), loc.getPort());
293              if (initialIsa.isUnresolved()) {
294                LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
295              } else {
296                LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
297                favoredNodes = new InetSocketAddress[] { initialIsa };
298              }
299            }
300          }
301          wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);
302
303        }
304
305        // we now have the proper WAL writer. full steam ahead
306        PrivateCellUtil.updateLatestStamp(cell, this.now);
307        wl.writer.append(kv);
308        wl.written += length;
309
310        // Copy the row so we know when a row transition.
311        this.previousRows.put(family, rowKey);
312      }
313
314      private Path getTableRelativePath(byte[] tableNameBytes) {
315        String tableName = Bytes.toString(tableNameBytes);
316        String[] tableNameParts = tableName.split(":");
317        Path tableRelPath = new Path(tableNameParts[0]);
318        if (tableNameParts.length > 1) {
319          tableRelPath = new Path(tableRelPath, tableNameParts[1]);
320        }
321        return tableRelPath;
322      }
323      private void rollWriters(WriterLength writerLength) throws IOException {
324        if (writerLength != null) {
325          closeWriter(writerLength);
326        } else {
327          for (WriterLength wl : this.writers.values()) {
328            closeWriter(wl);
329          }
330        }
331      }
332
333      private void closeWriter(WriterLength wl) throws IOException {
334        if (wl.writer != null) {
335          LOG.info("Writer=" + wl.writer.getPath() +
336            ((wl.written == 0)? "": ", wrote=" + wl.written));
337          close(wl.writer);
338          wl.writer = null;
339        }
340        wl.written = 0;
341      }
342
343      /*
344       * Create a new StoreFile.Writer.
345       * @return A WriterLength, containing a new StoreFile.Writer.
346       */
347      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
348          justification="Not important")
349      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf,
350          InetSocketAddress[] favoredNodes) throws IOException {
351        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
352        Path familydir = new Path(outputDir, Bytes.toString(family));
353        if (writeMultipleTables) {
354          familydir = new Path(outputDir,
355            new Path(getTableRelativePath(tableName), Bytes.toString(family)));
356        }
357        WriterLength wl = new WriterLength();
358        Algorithm compression = overriddenCompression;
359        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
360        compression = compression == null ? defaultCompression : compression;
361        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
362        bloomType = bloomType == null ? BloomType.NONE : bloomType;
363        String bloomParam = bloomParamMap.get(tableAndFamily);
364        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
365          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
366        }
367        Integer blockSize = blockSizeMap.get(tableAndFamily);
368        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
369        DataBlockEncoding encoding = overriddenEncoding;
370        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
371        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
372        HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
373            .withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf))
374            .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize)
375            .withColumnFamily(family).withTableName(tableName);
376
377        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
378          contextBuilder.withIncludesTags(true);
379        }
380
381        HFileContext hFileContext = contextBuilder.build();
382        if (null == favoredNodes) {
383          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs)
384            .withOutputDir(familydir).withBloomType(bloomType)
385            .withFileContext(hFileContext).build();
386        } else {
387          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
388            .withOutputDir(familydir).withBloomType(bloomType)
389            .withFileContext(hFileContext).withFavoredNodes(favoredNodes).build();
390        }
391
392        this.writers.put(tableAndFamily, wl);
393        return wl;
394      }
395
396      private void close(final StoreFileWriter w) throws IOException {
397        if (w != null) {
398          w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
399          w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
400          w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
401          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
402          w.appendTrackedTimestampsToMetadata();
403          w.close();
404        }
405      }
406
407      @Override
408      public void close(TaskAttemptContext c) throws IOException, InterruptedException {
409        for (WriterLength wl: this.writers.values()) {
410          close(wl.writer);
411        }
412      }
413    };
414  }
415
416  /**
417   * Configure block storage policy for CF after the directory is created.
418   */
419  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
420      byte[] tableAndFamily, Path cfPath) {
421    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
422      return;
423    }
424
425    String policy =
426        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
427          conf.get(STORAGE_POLICY_PROPERTY));
428    CommonFSUtils.setStoragePolicy(fs, cfPath, policy);
429  }
430
431  /*
432   * Data structure to hold a Writer and amount of data written on it.
433   */
434  static class WriterLength {
435    long written = 0;
436    StoreFileWriter writer = null;
437  }
438
439  /**
440   * Return the start keys of all of the regions in this table,
441   * as a list of ImmutableBytesWritable.
442   */
443  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
444                                                                 boolean writeMultipleTables)
445          throws IOException {
446
447    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
448    for(RegionLocator regionLocator : regionLocators) {
449      TableName tableName = regionLocator.getName();
450      LOG.info("Looking up current regions for table " + tableName);
451      byte[][] byteKeys = regionLocator.getStartKeys();
452      for (byte[] byteKey : byteKeys) {
453        byte[] fullKey = byteKey; //HFileOutputFormat2 use case
454        if (writeMultipleTables) {
455          //MultiTableHFileOutputFormat use case
456          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
457        }
458        if (LOG.isDebugEnabled()) {
459          LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey));
460        }
461        ret.add(new ImmutableBytesWritable(fullKey));
462      }
463    }
464    return ret;
465  }
466
467  /**
468   * Write out a {@link SequenceFile} that can be read by
469   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
470   */
471  @SuppressWarnings("deprecation")
472  private static void writePartitions(Configuration conf, Path partitionsPath,
473      List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
474    LOG.info("Writing partition information to " + partitionsPath);
475    if (startKeys.isEmpty()) {
476      throw new IllegalArgumentException("No regions passed");
477    }
478
479    // We're generating a list of split points, and we don't ever
480    // have keys < the first region (which has an empty start key)
481    // so we need to remove it. Otherwise we would end up with an
482    // empty reducer with index 0
483    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
484    ImmutableBytesWritable first = sorted.first();
485    if (writeMultipleTables) {
486      first =
487        new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get()));
488    }
489    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
490      throw new IllegalArgumentException(
491          "First region of table should have empty start key. Instead has: "
492          + Bytes.toStringBinary(first.get()));
493    }
494    sorted.remove(sorted.first());
495
496    // Write the actual file
497    FileSystem fs = partitionsPath.getFileSystem(conf);
498    SequenceFile.Writer writer = SequenceFile.createWriter(
499      fs, conf, partitionsPath, ImmutableBytesWritable.class,
500      NullWritable.class);
501
502    try {
503      for (ImmutableBytesWritable startKey : sorted) {
504        writer.append(startKey, NullWritable.get());
505      }
506    } finally {
507      writer.close();
508    }
509  }
510
511  /**
512   * Configure a MapReduce Job to perform an incremental load into the given
513   * table. This
514   * <ul>
515   *   <li>Inspects the table to configure a total order partitioner</li>
516   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
517   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
518   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
519   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
520   *     PutSortReducer)</li>
521   * </ul>
522   * The user should be sure to set the map output value class to either KeyValue or Put before
523   * running this function.
524   */
525  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
526      throws IOException {
527    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
528  }
529
530  /**
531   * Configure a MapReduce Job to perform an incremental load into the given
532   * table. This
533   * <ul>
534   *   <li>Inspects the table to configure a total order partitioner</li>
535   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
536   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
537   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
538   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
539   *     PutSortReducer)</li>
540   * </ul>
541   * The user should be sure to set the map output value class to either KeyValue or Put before
542   * running this function.
543   */
544  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
545      RegionLocator regionLocator) throws IOException {
546    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
547    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
548    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
549  }
550
551  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
552      Class<? extends OutputFormat<?, ?>> cls) throws IOException {
553    Configuration conf = job.getConfiguration();
554    job.setOutputKeyClass(ImmutableBytesWritable.class);
555    job.setOutputValueClass(MapReduceExtendedCell.class);
556    job.setOutputFormatClass(cls);
557
558    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
559      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
560    }
561    boolean writeMultipleTables = false;
562    if (MultiTableHFileOutputFormat.class.equals(cls)) {
563      writeMultipleTables = true;
564      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
565    }
566    // Based on the configured map output class, set the correct reducer to properly
567    // sort the incoming values.
568    // TODO it would be nice to pick one or the other of these formats.
569    if (KeyValue.class.equals(job.getMapOutputValueClass())
570        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) {
571      job.setReducerClass(CellSortReducer.class);
572    } else if (Put.class.equals(job.getMapOutputValueClass())) {
573      job.setReducerClass(PutSortReducer.class);
574    } else if (Text.class.equals(job.getMapOutputValueClass())) {
575      job.setReducerClass(TextSortReducer.class);
576    } else {
577      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
578    }
579
580    conf.setStrings("io.serializations", conf.get("io.serializations"),
581        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
582        CellSerialization.class.getName());
583
584    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
585      LOG.info("bulkload locality sensitive enabled");
586    }
587
588    /* Now get the region start keys for every table required */
589    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
590    List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size());
591    List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size());
592
593    for(TableInfo tableInfo : multiTableInfo) {
594      regionLocators.add(tableInfo.getRegionLocator());
595      String tn = writeMultipleTables?
596        tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString():
597        tableInfo.getRegionLocator().getName().getNameAsString();
598      allTableNames.add(tn);
599      tableDescriptors.add(tableInfo.getTableDescriptor());
600    }
601    // Record tablenames for creating writer by favored nodes, and decoding compression,
602    // block size and other attributes of columnfamily per table
603    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
604            .toString(tableSeparator)));
605    List<ImmutableBytesWritable> startKeys =
606      getRegionStartKeys(regionLocators, writeMultipleTables);
607    // Use table's region boundaries for TOP split points.
608    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
609        "to match current region count for all tables");
610    job.setNumReduceTasks(startKeys.size());
611
612    configurePartitioner(job, startKeys, writeMultipleTables);
613    // Set compression algorithms based on column families
614
615    conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
616            tableDescriptors));
617    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
618            tableDescriptors));
619    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
620            tableDescriptors));
621    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails,
622        tableDescriptors));
623    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
624            serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
625
626    TableMapReduceUtil.addDependencyJars(job);
627    TableMapReduceUtil.initCredentials(job);
628    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
629  }
630
631  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws
632      IOException {
633    Configuration conf = job.getConfiguration();
634
635    job.setOutputKeyClass(ImmutableBytesWritable.class);
636    job.setOutputValueClass(MapReduceExtendedCell.class);
637    job.setOutputFormatClass(HFileOutputFormat2.class);
638
639    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
640    singleTableDescriptor.add(tableDescriptor);
641
642    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
643    // Set compression algorithms based on column families
644    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
645        serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
646    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
647        serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
648    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
649        serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
650    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
651        serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
652    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
653        serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
654
655    TableMapReduceUtil.addDependencyJars(job);
656    TableMapReduceUtil.initCredentials(job);
657    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
658  }
659
660  /**
661   * Runs inside the task to deserialize column family to compression algorithm
662   * map from the configuration.
663   *
664   * @param conf to read the serialized values from
665   * @return a map from column family to the configured compression algorithm
666   */
667  @InterfaceAudience.Private
668  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
669      conf) {
670    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
671        COMPRESSION_FAMILIES_CONF_KEY);
672    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
673    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
674      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
675      compressionMap.put(e.getKey(), algorithm);
676    }
677    return compressionMap;
678  }
679
680  /**
681   * Runs inside the task to deserialize column family to bloom filter type
682   * map from the configuration.
683   *
684   * @param conf to read the serialized values from
685   * @return a map from column family to the the configured bloom filter type
686   */
687  @InterfaceAudience.Private
688  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
689    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
690        BLOOM_TYPE_FAMILIES_CONF_KEY);
691    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
692    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
693      BloomType bloomType = BloomType.valueOf(e.getValue());
694      bloomTypeMap.put(e.getKey(), bloomType);
695    }
696    return bloomTypeMap;
697  }
698
699  /**
700   * Runs inside the task to deserialize column family to bloom filter param
701   * map from the configuration.
702   *
703   * @param conf to read the serialized values from
704   * @return a map from column family to the the configured bloom filter param
705   */
706  @InterfaceAudience.Private
707  static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
708    return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
709  }
710
711  /**
712   * Runs inside the task to deserialize column family to block size
713   * map from the configuration.
714   *
715   * @param conf to read the serialized values from
716   * @return a map from column family to the configured block size
717   */
718  @InterfaceAudience.Private
719  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
720    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
721        BLOCK_SIZE_FAMILIES_CONF_KEY);
722    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
723    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
724      Integer blockSize = Integer.parseInt(e.getValue());
725      blockSizeMap.put(e.getKey(), blockSize);
726    }
727    return blockSizeMap;
728  }
729
730  /**
731   * Runs inside the task to deserialize column family to data block encoding
732   * type map from the configuration.
733   *
734   * @param conf to read the serialized values from
735   * @return a map from column family to HFileDataBlockEncoder for the
736   *         configured data block type for the family
737   */
738  @InterfaceAudience.Private
739  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
740      Configuration conf) {
741    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
742        DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
743    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
744    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
745      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
746    }
747    return encoderMap;
748  }
749
750  /**
751   * Run inside the task to deserialize column family to given conf value map.
752   *
753   * @param conf to read the serialized values from
754   * @param confName conf key to read from the configuration
755   * @return a map of column family to the given configuration value
756   */
757  private static Map<byte[], String> createFamilyConfValueMap(
758      Configuration conf, String confName) {
759    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
760    String confVal = conf.get(confName, "");
761    for (String familyConf : confVal.split("&")) {
762      String[] familySplit = familyConf.split("=");
763      if (familySplit.length != 2) {
764        continue;
765      }
766      try {
767        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
768            URLDecoder.decode(familySplit[1], "UTF-8"));
769      } catch (UnsupportedEncodingException e) {
770        // will not happen with UTF-8 encoding
771        throw new AssertionError(e);
772      }
773    }
774    return confValMap;
775  }
776
777  /**
778   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
779   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
780   */
781  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
782          writeMultipleTables)
783      throws IOException {
784    Configuration conf = job.getConfiguration();
785    // create the partitions file
786    FileSystem fs = FileSystem.get(conf);
787    String hbaseTmpFsDir =
788        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
789            fs.getHomeDirectory() + "/hbase-staging");
790    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
791    fs.makeQualified(partitionsPath);
792    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
793    fs.deleteOnExit(partitionsPath);
794
795    // configure job to use it
796    job.setPartitionerClass(TotalOrderPartitioner.class);
797    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
798  }
799
800  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
801    "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
802  @InterfaceAudience.Private
803  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn,
804        List<TableDescriptor> allTables)
805      throws UnsupportedEncodingException {
806    StringBuilder attributeValue = new StringBuilder();
807    int i = 0;
808    for (TableDescriptor tableDescriptor : allTables) {
809      if (tableDescriptor == null) {
810        // could happen with mock table instance
811        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
812        return "";
813      }
814      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
815        if (i++ > 0) {
816          attributeValue.append('&');
817        }
818        attributeValue.append(URLEncoder.encode(
819          Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(),
820            familyDescriptor.getName())), "UTF-8"));
821        attributeValue.append('=');
822        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
823      }
824    }
825    // Get rid of the last ampersand
826    return attributeValue.toString();
827  }
828
829  /**
830   * Serialize column family to compression algorithm map to configuration.
831   * Invoked while configuring the MR job for incremental load.
832   */
833  @InterfaceAudience.Private
834  static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor ->
835          familyDescriptor.getCompressionType().getName();
836
837  /**
838   * Serialize column family to block size map to configuration. Invoked while
839   * configuring the MR job for incremental load.
840   */
841  @InterfaceAudience.Private
842  static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String
843          .valueOf(familyDescriptor.getBlocksize());
844
845  /**
846   * Serialize column family to bloom type map to configuration. Invoked while
847   * configuring the MR job for incremental load.
848   */
849  @InterfaceAudience.Private
850  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
851    String bloomType = familyDescriptor.getBloomFilterType().toString();
852    if (bloomType == null) {
853      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
854    }
855    return bloomType;
856  };
857
858  /**
859   * Serialize column family to bloom param map to configuration. Invoked while
860   * configuring the MR job for incremental load.
861   */
862  @InterfaceAudience.Private
863  static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
864    BloomType bloomType = familyDescriptor.getBloomFilterType();
865    String bloomParam = "";
866    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
867      bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
868    }
869    return bloomParam;
870  };
871
872  /**
873   * Serialize column family to data block encoding map to configuration.
874   * Invoked while configuring the MR job for incremental load.
875   */
876  @InterfaceAudience.Private
877  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
878    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
879    if (encoding == null) {
880      encoding = DataBlockEncoding.NONE;
881    }
882    return encoding.toString();
883  };
884
885}