001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.nio.charset.Charset;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.TreeMap;
037import java.util.TreeSet;
038import java.util.UUID;
039import java.util.function.Function;
040import java.util.stream.Collectors;
041import org.apache.commons.lang3.StringUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellUtil;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.HRegionLocation;
049import org.apache.hadoop.hbase.KeyValue;
050import org.apache.hadoop.hbase.PrivateCellUtil;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
054import org.apache.hadoop.hbase.client.Connection;
055import org.apache.hadoop.hbase.client.ConnectionFactory;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.RegionLocator;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.client.TableDescriptor;
060import org.apache.hadoop.hbase.fs.HFileSystem;
061import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
062import org.apache.hadoop.hbase.io.compress.Compression;
063import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
064import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
065import org.apache.hadoop.hbase.io.hfile.CacheConfig;
066import org.apache.hadoop.hbase.io.hfile.HFile;
067import org.apache.hadoop.hbase.io.hfile.HFileContext;
068import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
069import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
070import org.apache.hadoop.hbase.regionserver.BloomType;
071import org.apache.hadoop.hbase.regionserver.HStore;
072import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
073import org.apache.hadoop.hbase.util.BloomFilterUtil;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.CommonFSUtils;
076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
077import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
078import org.apache.hadoop.io.NullWritable;
079import org.apache.hadoop.io.SequenceFile;
080import org.apache.hadoop.io.Text;
081import org.apache.hadoop.mapreduce.Job;
082import org.apache.hadoop.mapreduce.OutputCommitter;
083import org.apache.hadoop.mapreduce.OutputFormat;
084import org.apache.hadoop.mapreduce.RecordWriter;
085import org.apache.hadoop.mapreduce.TaskAttemptContext;
086import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
087import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
088import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
089import org.apache.yetus.audience.InterfaceAudience;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
094
095/**
096 * Writes HFiles. Passed Cells must arrive in order.
097 * Writes current time as the sequence id for the file. Sets the major compacted
098 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
099 * all HFiles being written.
100 * <p>
101 * Using this class as part of a MapReduce job is best done
102 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
103 */
104@InterfaceAudience.Public
105public class HFileOutputFormat2
106    extends FileOutputFormat<ImmutableBytesWritable, Cell> {
107  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
108  static class TableInfo {
109    private TableDescriptor tableDesctiptor;
110    private RegionLocator regionLocator;
111
112    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
113      this.tableDesctiptor = tableDesctiptor;
114      this.regionLocator = regionLocator;
115    }
116
117    public TableDescriptor getTableDescriptor() {
118      return tableDesctiptor;
119    }
120
121    public RegionLocator getRegionLocator() {
122      return regionLocator;
123    }
124  }
125
126  protected static final byte[] tableSeparator = Bytes.toBytes(";");
127
128  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
129    return Bytes.add(tableName, tableSeparator, suffix);
130  }
131
132  // The following constants are private since these are used by
133  // HFileOutputFormat2 to internally transfer data between job setup and
134  // reducer run using conf.
135  // These should not be changed by the client.
136  static final String COMPRESSION_FAMILIES_CONF_KEY =
137      "hbase.hfileoutputformat.families.compression";
138  static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
139      "hbase.hfileoutputformat.families.bloomtype";
140  static final String BLOOM_PARAM_FAMILIES_CONF_KEY =
141      "hbase.hfileoutputformat.families.bloomparam";
142  static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
143      "hbase.mapreduce.hfileoutputformat.blocksize";
144  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
145      "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
146
147  // This constant is public since the client can modify this when setting
148  // up their conf object and thus refer to this symbol.
149  // It is present for backwards compatibility reasons. Use it only to
150  // override the auto-detection of datablock encoding and compression.
151  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
152      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
153  public static final String COMPRESSION_OVERRIDE_CONF_KEY =
154      "hbase.mapreduce.hfileoutputformat.compression";
155
156  /**
157   * Keep locality while generating HFiles for bulkload. See HBASE-12596
158   */
159  public static final String LOCALITY_SENSITIVE_CONF_KEY =
160      "hbase.bulkload.locality.sensitive.enabled";
161  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
162  static final String OUTPUT_TABLE_NAME_CONF_KEY =
163      "hbase.mapreduce.hfileoutputformat.table.name";
164  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
165          "hbase.mapreduce.use.multi.table.hfileoutputformat";
166
167  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
168  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
169
170  @Override
171  public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
172      final TaskAttemptContext context) throws IOException, InterruptedException {
173    return createRecordWriter(context, this.getOutputCommitter(context));
174  }
175
176  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
177    return combineTableNameSuffix(tableName, family);
178  }
179
180  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(
181    final TaskAttemptContext context, final OutputCommitter committer) throws IOException {
182
183    // Get the path of the temporary output file
184    final Path outputDir = ((FileOutputCommitter)committer).getWorkPath();
185    final Configuration conf = context.getConfiguration();
186    final boolean writeMultipleTables =
187      conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
188    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
189    if (writeTableNames == null || writeTableNames.isEmpty()) {
190      throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty");
191    }
192    final FileSystem fs = outputDir.getFileSystem(conf);
193    // These configs. are from hbase-*.xml
194    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
195        HConstants.DEFAULT_MAX_FILE_SIZE);
196    // Invented config.  Add to hbase-*.xml if other than default compression.
197    final String defaultCompressionStr = conf.get("hfile.compression",
198        Compression.Algorithm.NONE.getName());
199    final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
200    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
201    final Algorithm overriddenCompression = compressionStr != null ?
202      Compression.getCompressionAlgorithmByName(compressionStr): null;
203    final boolean compactionExclude = conf.getBoolean(
204        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
205    final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
206            Bytes.toString(tableSeparator))).collect(Collectors.toSet());
207
208    // create a map from column family to the compression algorithm
209    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
210    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
211    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
212    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
213
214    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
215    final Map<byte[], DataBlockEncoding> datablockEncodingMap
216        = createFamilyDataBlockEncodingMap(conf);
217    final DataBlockEncoding overriddenEncoding = dataBlockEncodingStr != null ?
218      DataBlockEncoding.valueOf(dataBlockEncodingStr) : null;
219
220    return new RecordWriter<ImmutableBytesWritable, V>() {
221      // Map of families to writers and how much has been output on the writer.
222      private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
223      private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR);
224      private final long now = EnvironmentEdgeManager.currentTime();
225
226      @Override
227      public void write(ImmutableBytesWritable row, V cell) throws IOException {
228        Cell kv = cell;
229        // null input == user explicitly wants to flush
230        if (row == null && kv == null) {
231          rollWriters(null);
232          return;
233        }
234
235        byte[] rowKey = CellUtil.cloneRow(kv);
236        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
237        byte[] family = CellUtil.cloneFamily(kv);
238        byte[] tableNameBytes = null;
239        if (writeMultipleTables) {
240          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
241          tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString()
242              .getBytes(Charset.defaultCharset());
243          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
244            throw new IllegalArgumentException("TableName " + Bytes.toString(tableNameBytes) +
245              " not expected");
246          }
247        } else {
248          tableNameBytes = Bytes.toBytes(writeTableNames);
249        }
250        String tableName = Bytes.toString(tableNameBytes);
251        Path tableRelPath = getTableRelativePath(tableNameBytes);
252        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
253
254        WriterLength wl = this.writers.get(tableAndFamily);
255
256        // If this is a new column family, verify that the directory exists
257        if (wl == null) {
258          Path writerPath = null;
259          if (writeMultipleTables) {
260            writerPath = new Path(outputDir,new Path(tableRelPath, Bytes.toString(family)));
261          }
262          else {
263            writerPath = new Path(outputDir, Bytes.toString(family));
264          }
265          fs.mkdirs(writerPath);
266          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
267        }
268
269        // This can only happen once a row is finished though
270        if (wl != null && wl.written + length >= maxsize
271                && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) {
272          rollWriters(wl);
273        }
274
275        // create a new WAL writer, if necessary
276        if (wl == null || wl.writer == null) {
277          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
278            HRegionLocation loc = null;
279
280            if (tableName != null) {
281              try (Connection connection = ConnectionFactory.createConnection(conf);
282                     RegionLocator locator =
283                       connection.getRegionLocator(TableName.valueOf(tableName))) {
284                loc = locator.getRegionLocation(rowKey);
285              } catch (Throwable e) {
286                LOG.warn("Something wrong locating rowkey {} in {}",
287                  Bytes.toString(rowKey), tableName, e);
288                loc = null;
289              } }
290
291            if (null == loc) {
292              LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
293              wl = getNewWriter(tableNameBytes, family, conf, null);
294            } else {
295              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
296              InetSocketAddress initialIsa =
297                  new InetSocketAddress(loc.getHostname(), loc.getPort());
298              if (initialIsa.isUnresolved()) {
299                LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
300                wl = getNewWriter(tableNameBytes, family, conf, null);
301              } else {
302                LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
303                wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
304                });
305              }
306            }
307          } else {
308            wl = getNewWriter(tableNameBytes, family, conf, null);
309          }
310        }
311
312        // we now have the proper WAL writer. full steam ahead
313        PrivateCellUtil.updateLatestStamp(cell, this.now);
314        wl.writer.append(kv);
315        wl.written += length;
316
317        // Copy the row so we know when a row transition.
318        this.previousRows.put(family, rowKey);
319      }
320
321      private Path getTableRelativePath(byte[] tableNameBytes) {
322        String tableName = Bytes.toString(tableNameBytes);
323        String[] tableNameParts = tableName.split(":");
324        Path tableRelPath = new Path(tableName.split(":")[0]);
325        if (tableNameParts.length > 1) {
326          tableRelPath = new Path(tableRelPath, tableName.split(":")[1]);
327        }
328        return tableRelPath;
329      }
330      private void rollWriters(WriterLength writerLength) throws IOException {
331        if (writerLength != null) {
332          closeWriter(writerLength);
333        } else {
334          for (WriterLength wl : this.writers.values()) {
335            closeWriter(wl);
336          }
337        }
338      }
339
340      private void closeWriter(WriterLength wl) throws IOException {
341        if (wl.writer != null) {
342          LOG.info("Writer=" + wl.writer.getPath() +
343            ((wl.written == 0)? "": ", wrote=" + wl.written));
344          close(wl.writer);
345          wl.writer = null;
346        }
347        wl.written = 0;
348      }
349
350      /*
351       * Create a new StoreFile.Writer.
352       * @return A WriterLength, containing a new StoreFile.Writer.
353       */
354      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
355          justification="Not important")
356      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf,
357          InetSocketAddress[] favoredNodes) throws IOException {
358        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
359        Path familydir = new Path(outputDir, Bytes.toString(family));
360        if (writeMultipleTables) {
361          familydir = new Path(outputDir,
362            new Path(getTableRelativePath(tableName), Bytes.toString(family)));
363        }
364        WriterLength wl = new WriterLength();
365        Algorithm compression = overriddenCompression;
366        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
367        compression = compression == null ? defaultCompression : compression;
368        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
369        bloomType = bloomType == null ? BloomType.NONE : bloomType;
370        String bloomParam = bloomParamMap.get(tableAndFamily);
371        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
372          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
373        }
374        Integer blockSize = blockSizeMap.get(tableAndFamily);
375        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
376        DataBlockEncoding encoding = overriddenEncoding;
377        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
378        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
379        HFileContextBuilder contextBuilder = new HFileContextBuilder()
380          .withCompression(compression).withChecksumType(HStore.getChecksumType(conf))
381          .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize)
382          .withColumnFamily(family).withTableName(tableName);
383
384        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
385          contextBuilder.withIncludesTags(true);
386        }
387
388        contextBuilder.withDataBlockEncoding(encoding);
389        HFileContext hFileContext = contextBuilder.build();
390        if (null == favoredNodes) {
391          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs)
392            .withOutputDir(familydir).withBloomType(bloomType)
393            .withFileContext(hFileContext).build();
394        } else {
395          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
396            .withOutputDir(familydir).withBloomType(bloomType)
397            .withFileContext(hFileContext).withFavoredNodes(favoredNodes).build();
398        }
399
400        this.writers.put(tableAndFamily, wl);
401        return wl;
402      }
403
404      private void close(final StoreFileWriter w) throws IOException {
405        if (w != null) {
406          w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
407          w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
408          w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
409          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
410          w.appendTrackedTimestampsToMetadata();
411          w.close();
412        }
413      }
414
415      @Override
416      public void close(TaskAttemptContext c) throws IOException, InterruptedException {
417        for (WriterLength wl: this.writers.values()) {
418          close(wl.writer);
419        }
420      }
421    };
422  }
423
424  /**
425   * Configure block storage policy for CF after the directory is created.
426   */
427  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
428      byte[] tableAndFamily, Path cfPath) {
429    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
430      return;
431    }
432
433    String policy =
434        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
435          conf.get(STORAGE_POLICY_PROPERTY));
436    CommonFSUtils.setStoragePolicy(fs, cfPath, policy);
437  }
438
439  /*
440   * Data structure to hold a Writer and amount of data written on it.
441   */
442  static class WriterLength {
443    long written = 0;
444    StoreFileWriter writer = null;
445  }
446
447  /**
448   * Return the start keys of all of the regions in this table,
449   * as a list of ImmutableBytesWritable.
450   */
451  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
452                                                                 boolean writeMultipleTables)
453          throws IOException {
454
455    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
456    for(RegionLocator regionLocator : regionLocators) {
457      TableName tableName = regionLocator.getName();
458      LOG.info("Looking up current regions for table " + tableName);
459      byte[][] byteKeys = regionLocator.getStartKeys();
460      for (byte[] byteKey : byteKeys) {
461        byte[] fullKey = byteKey; //HFileOutputFormat2 use case
462        if (writeMultipleTables) {
463          //MultiTableHFileOutputFormat use case
464          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
465        }
466        if (LOG.isDebugEnabled()) {
467          LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey));
468        }
469        ret.add(new ImmutableBytesWritable(fullKey));
470      }
471    }
472    return ret;
473  }
474
475  /**
476   * Write out a {@link SequenceFile} that can be read by
477   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
478   */
479  @SuppressWarnings("deprecation")
480  private static void writePartitions(Configuration conf, Path partitionsPath,
481      List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
482    LOG.info("Writing partition information to " + partitionsPath);
483    if (startKeys.isEmpty()) {
484      throw new IllegalArgumentException("No regions passed");
485    }
486
487    // We're generating a list of split points, and we don't ever
488    // have keys < the first region (which has an empty start key)
489    // so we need to remove it. Otherwise we would end up with an
490    // empty reducer with index 0
491    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
492    ImmutableBytesWritable first = sorted.first();
493    if (writeMultipleTables) {
494      first =
495        new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get()));
496    }
497    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
498      throw new IllegalArgumentException(
499          "First region of table should have empty start key. Instead has: "
500          + Bytes.toStringBinary(first.get()));
501    }
502    sorted.remove(sorted.first());
503
504    // Write the actual file
505    FileSystem fs = partitionsPath.getFileSystem(conf);
506    SequenceFile.Writer writer = SequenceFile.createWriter(
507      fs, conf, partitionsPath, ImmutableBytesWritable.class,
508      NullWritable.class);
509
510    try {
511      for (ImmutableBytesWritable startKey : sorted) {
512        writer.append(startKey, NullWritable.get());
513      }
514    } finally {
515      writer.close();
516    }
517  }
518
519  /**
520   * Configure a MapReduce Job to perform an incremental load into the given
521   * table. This
522   * <ul>
523   *   <li>Inspects the table to configure a total order partitioner</li>
524   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
525   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
526   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
527   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
528   *     PutSortReducer)</li>
529   * </ul>
530   * The user should be sure to set the map output value class to either KeyValue or Put before
531   * running this function.
532   */
533  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
534      throws IOException {
535    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
536  }
537
538  /**
539   * Configure a MapReduce Job to perform an incremental load into the given
540   * table. This
541   * <ul>
542   *   <li>Inspects the table to configure a total order partitioner</li>
543   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
544   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
545   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
546   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
547   *     PutSortReducer)</li>
548   * </ul>
549   * The user should be sure to set the map output value class to either KeyValue or Put before
550   * running this function.
551   */
552  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
553      RegionLocator regionLocator) throws IOException {
554    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
555    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
556    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
557  }
558
559  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
560      Class<? extends OutputFormat<?, ?>> cls) throws IOException {
561    Configuration conf = job.getConfiguration();
562    job.setOutputKeyClass(ImmutableBytesWritable.class);
563    job.setOutputValueClass(MapReduceExtendedCell.class);
564    job.setOutputFormatClass(cls);
565
566    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
567      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
568    }
569    boolean writeMultipleTables = false;
570    if (MultiTableHFileOutputFormat.class.equals(cls)) {
571      writeMultipleTables = true;
572      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
573    }
574    // Based on the configured map output class, set the correct reducer to properly
575    // sort the incoming values.
576    // TODO it would be nice to pick one or the other of these formats.
577    if (KeyValue.class.equals(job.getMapOutputValueClass())
578        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) {
579      job.setReducerClass(CellSortReducer.class);
580    } else if (Put.class.equals(job.getMapOutputValueClass())) {
581      job.setReducerClass(PutSortReducer.class);
582    } else if (Text.class.equals(job.getMapOutputValueClass())) {
583      job.setReducerClass(TextSortReducer.class);
584    } else {
585      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
586    }
587
588    conf.setStrings("io.serializations", conf.get("io.serializations"),
589        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
590        CellSerialization.class.getName());
591
592    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
593      LOG.info("bulkload locality sensitive enabled");
594    }
595
596    /* Now get the region start keys for every table required */
597    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
598    List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size());
599    List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size());
600
601    for(TableInfo tableInfo : multiTableInfo) {
602      regionLocators.add(tableInfo.getRegionLocator());
603      String tn = writeMultipleTables?
604        tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString():
605        tableInfo.getRegionLocator().getName().getNameAsString();
606      allTableNames.add(tn);
607      tableDescriptors.add(tableInfo.getTableDescriptor());
608    }
609    // Record tablenames for creating writer by favored nodes, and decoding compression,
610    // block size and other attributes of columnfamily per table
611    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
612            .toString(tableSeparator)));
613    List<ImmutableBytesWritable> startKeys =
614      getRegionStartKeys(regionLocators, writeMultipleTables);
615    // Use table's region boundaries for TOP split points.
616    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
617        "to match current region count for all tables");
618    job.setNumReduceTasks(startKeys.size());
619
620    configurePartitioner(job, startKeys, writeMultipleTables);
621    // Set compression algorithms based on column families
622
623    conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
624            tableDescriptors));
625    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
626            tableDescriptors));
627    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
628            tableDescriptors));
629    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails,
630        tableDescriptors));
631    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
632            serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
633
634    TableMapReduceUtil.addDependencyJars(job);
635    TableMapReduceUtil.initCredentials(job);
636    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
637  }
638
639  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws
640      IOException {
641    Configuration conf = job.getConfiguration();
642
643    job.setOutputKeyClass(ImmutableBytesWritable.class);
644    job.setOutputValueClass(MapReduceExtendedCell.class);
645    job.setOutputFormatClass(HFileOutputFormat2.class);
646
647    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
648    singleTableDescriptor.add(tableDescriptor);
649
650    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
651    // Set compression algorithms based on column families
652    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
653        serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
654    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
655        serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
656    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
657        serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
658    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
659        serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
660    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
661        serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
662
663    TableMapReduceUtil.addDependencyJars(job);
664    TableMapReduceUtil.initCredentials(job);
665    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
666  }
667
668  /**
669   * Runs inside the task to deserialize column family to compression algorithm
670   * map from the configuration.
671   *
672   * @param conf to read the serialized values from
673   * @return a map from column family to the configured compression algorithm
674   */
675  @VisibleForTesting
676  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
677      conf) {
678    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
679        COMPRESSION_FAMILIES_CONF_KEY);
680    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
681    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
682      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
683      compressionMap.put(e.getKey(), algorithm);
684    }
685    return compressionMap;
686  }
687
688  /**
689   * Runs inside the task to deserialize column family to bloom filter type
690   * map from the configuration.
691   *
692   * @param conf to read the serialized values from
693   * @return a map from column family to the the configured bloom filter type
694   */
695  @VisibleForTesting
696  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
697    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
698        BLOOM_TYPE_FAMILIES_CONF_KEY);
699    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
700    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
701      BloomType bloomType = BloomType.valueOf(e.getValue());
702      bloomTypeMap.put(e.getKey(), bloomType);
703    }
704    return bloomTypeMap;
705  }
706
707  /**
708   * Runs inside the task to deserialize column family to bloom filter param
709   * map from the configuration.
710   *
711   * @param conf to read the serialized values from
712   * @return a map from column family to the the configured bloom filter param
713   */
714  @VisibleForTesting
715  static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
716    return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
717  }
718
719
720  /**
721   * Runs inside the task to deserialize column family to block size
722   * map from the configuration.
723   *
724   * @param conf to read the serialized values from
725   * @return a map from column family to the configured block size
726   */
727  @VisibleForTesting
728  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
729    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
730        BLOCK_SIZE_FAMILIES_CONF_KEY);
731    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
732    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
733      Integer blockSize = Integer.parseInt(e.getValue());
734      blockSizeMap.put(e.getKey(), blockSize);
735    }
736    return blockSizeMap;
737  }
738
739  /**
740   * Runs inside the task to deserialize column family to data block encoding
741   * type map from the configuration.
742   *
743   * @param conf to read the serialized values from
744   * @return a map from column family to HFileDataBlockEncoder for the
745   *         configured data block type for the family
746   */
747  @VisibleForTesting
748  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
749      Configuration conf) {
750    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
751        DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
752    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
753    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
754      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
755    }
756    return encoderMap;
757  }
758
759
760  /**
761   * Run inside the task to deserialize column family to given conf value map.
762   *
763   * @param conf to read the serialized values from
764   * @param confName conf key to read from the configuration
765   * @return a map of column family to the given configuration value
766   */
767  private static Map<byte[], String> createFamilyConfValueMap(
768      Configuration conf, String confName) {
769    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
770    String confVal = conf.get(confName, "");
771    for (String familyConf : confVal.split("&")) {
772      String[] familySplit = familyConf.split("=");
773      if (familySplit.length != 2) {
774        continue;
775      }
776      try {
777        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
778            URLDecoder.decode(familySplit[1], "UTF-8"));
779      } catch (UnsupportedEncodingException e) {
780        // will not happen with UTF-8 encoding
781        throw new AssertionError(e);
782      }
783    }
784    return confValMap;
785  }
786
787  /**
788   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
789   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
790   */
791  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
792          writeMultipleTables)
793      throws IOException {
794    Configuration conf = job.getConfiguration();
795    // create the partitions file
796    FileSystem fs = FileSystem.get(conf);
797    String hbaseTmpFsDir =
798        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
799            fs.getHomeDirectory() + "/hbase-staging");
800    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
801    fs.makeQualified(partitionsPath);
802    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
803    fs.deleteOnExit(partitionsPath);
804
805    // configure job to use it
806    job.setPartitionerClass(TotalOrderPartitioner.class);
807    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
808  }
809
810  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
811    "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
812  @VisibleForTesting
813  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn,
814        List<TableDescriptor> allTables)
815      throws UnsupportedEncodingException {
816    StringBuilder attributeValue = new StringBuilder();
817    int i = 0;
818    for (TableDescriptor tableDescriptor : allTables) {
819      if (tableDescriptor == null) {
820        // could happen with mock table instance
821        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
822        return "";
823      }
824      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
825        if (i++ > 0) {
826          attributeValue.append('&');
827        }
828        attributeValue.append(URLEncoder.encode(
829          Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(),
830            familyDescriptor.getName())), "UTF-8"));
831        attributeValue.append('=');
832        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
833      }
834    }
835    // Get rid of the last ampersand
836    return attributeValue.toString();
837  }
838
839  /**
840   * Serialize column family to compression algorithm map to configuration.
841   * Invoked while configuring the MR job for incremental load.
842   */
843  @VisibleForTesting
844  static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor ->
845          familyDescriptor.getCompressionType().getName();
846
847  /**
848   * Serialize column family to block size map to configuration. Invoked while
849   * configuring the MR job for incremental load.
850   */
851  @VisibleForTesting
852  static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String
853          .valueOf(familyDescriptor.getBlocksize());
854
855  /**
856   * Serialize column family to bloom type map to configuration. Invoked while
857   * configuring the MR job for incremental load.
858   */
859  @VisibleForTesting
860  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
861    String bloomType = familyDescriptor.getBloomFilterType().toString();
862    if (bloomType == null) {
863      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
864    }
865    return bloomType;
866  };
867
868  /**
869   * Serialize column family to bloom param map to configuration. Invoked while
870   * configuring the MR job for incremental load.
871   */
872  @VisibleForTesting
873  static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
874    BloomType bloomType = familyDescriptor.getBloomFilterType();
875    String bloomParam = "";
876    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
877      bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
878    }
879    return bloomParam;
880  };
881
882  /**
883   * Serialize column family to data block encoding map to configuration.
884   * Invoked while configuring the MR job for incremental load.
885   */
886  @VisibleForTesting
887  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
888    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
889    if (encoding == null) {
890      encoding = DataBlockEncoding.NONE;
891    }
892    return encoding.toString();
893  };
894
895}