001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.TreeMap;
036import java.util.TreeSet;
037import java.util.UUID;
038import java.util.function.Function;
039import java.util.stream.Collectors;
040
041import org.apache.commons.lang3.StringUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellComparator;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.HRegionLocation;
050import org.apache.hadoop.hbase.HTableDescriptor;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.PrivateCellUtil;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.Put;
059import org.apache.hadoop.hbase.client.RegionLocator;
060import org.apache.hadoop.hbase.client.Table;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.fs.HFileSystem;
063import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
064import org.apache.hadoop.hbase.io.compress.Compression;
065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
066import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
067import org.apache.hadoop.hbase.io.hfile.CacheConfig;
068import org.apache.hadoop.hbase.io.hfile.HFile;
069import org.apache.hadoop.hbase.io.hfile.HFileContext;
070import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
071import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
072import org.apache.hadoop.hbase.regionserver.BloomType;
073import org.apache.hadoop.hbase.regionserver.HStore;
074import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
075import org.apache.hadoop.hbase.util.Bytes;
076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
077import org.apache.hadoop.hbase.util.FSUtils;
078import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
079import org.apache.hadoop.io.NullWritable;
080import org.apache.hadoop.io.SequenceFile;
081import org.apache.hadoop.io.Text;
082import org.apache.hadoop.mapreduce.Job;
083import org.apache.hadoop.mapreduce.OutputCommitter;
084import org.apache.hadoop.mapreduce.OutputFormat;
085import org.apache.hadoop.mapreduce.RecordWriter;
086import org.apache.hadoop.mapreduce.TaskAttemptContext;
087import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
088import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
089import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
090import org.apache.yetus.audience.InterfaceAudience;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
095
096/**
097 * Writes HFiles. Passed Cells must arrive in order.
098 * Writes current time as the sequence id for the file. Sets the major compacted
099 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
100 * all HFiles being written.
101 * <p>
102 * Using this class as part of a MapReduce job is best done
103 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
104 */
105@InterfaceAudience.Public
106public class HFileOutputFormat2
107    extends FileOutputFormat<ImmutableBytesWritable, Cell> {
108  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
109  static class TableInfo {
110    private TableDescriptor tableDesctiptor;
111    private RegionLocator regionLocator;
112
113    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
114      this.tableDesctiptor = tableDesctiptor;
115      this.regionLocator = regionLocator;
116    }
117
118    /**
119     * The modification for the returned HTD doesn't affect the inner TD.
120     * @return A clone of inner table descriptor
121     * @deprecated since 2.0.0 and will be removed in 3.0.0. Use {@link #getTableDescriptor()}
122     *   instead.
123     * @see #getTableDescriptor()
124     * @see <a href="https://issues.apache.org/jira/browse/HBASE-18241">HBASE-18241</a>
125     */
126    @Deprecated
127    public HTableDescriptor getHTableDescriptor() {
128      return new HTableDescriptor(tableDesctiptor);
129    }
130
131    public TableDescriptor getTableDescriptor() {
132      return tableDesctiptor;
133    }
134
135    public RegionLocator getRegionLocator() {
136      return regionLocator;
137    }
138  }
139
140  protected static final byte[] tableSeparator = Bytes.toBytes(";");
141
142  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
143    return Bytes.add(tableName, tableSeparator, suffix);
144  }
145
146  // The following constants are private since these are used by
147  // HFileOutputFormat2 to internally transfer data between job setup and
148  // reducer run using conf.
149  // These should not be changed by the client.
150  static final String COMPRESSION_FAMILIES_CONF_KEY =
151      "hbase.hfileoutputformat.families.compression";
152  static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
153      "hbase.hfileoutputformat.families.bloomtype";
154  static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
155      "hbase.mapreduce.hfileoutputformat.blocksize";
156  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
157      "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
158
159  // This constant is public since the client can modify this when setting
160  // up their conf object and thus refer to this symbol.
161  // It is present for backwards compatibility reasons. Use it only to
162  // override the auto-detection of datablock encoding.
163  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
164      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
165
166  /**
167   * Keep locality while generating HFiles for bulkload. See HBASE-12596
168   */
169  public static final String LOCALITY_SENSITIVE_CONF_KEY =
170      "hbase.bulkload.locality.sensitive.enabled";
171  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
172  static final String OUTPUT_TABLE_NAME_CONF_KEY =
173      "hbase.mapreduce.hfileoutputformat.table.name";
174  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
175          "hbase.mapreduce.use.multi.table.hfileoutputformat";
176
177  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
178  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
179
180  @Override
181  public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
182      final TaskAttemptContext context) throws IOException, InterruptedException {
183    return createRecordWriter(context, this.getOutputCommitter(context));
184  }
185
186  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
187    return combineTableNameSuffix(tableName, family);
188  }
189
190  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
191      createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer)
192          throws IOException {
193
194    // Get the path of the temporary output file
195    final Path outputDir = ((FileOutputCommitter)committer).getWorkPath();
196    final Configuration conf = context.getConfiguration();
197    final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ;
198    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
199    if (writeTableNames==null || writeTableNames.isEmpty()) {
200      throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY
201              + " cannot be empty");
202    }
203    final FileSystem fs = outputDir.getFileSystem(conf);
204    // These configs. are from hbase-*.xml
205    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
206        HConstants.DEFAULT_MAX_FILE_SIZE);
207    // Invented config.  Add to hbase-*.xml if other than default compression.
208    final String defaultCompressionStr = conf.get("hfile.compression",
209        Compression.Algorithm.NONE.getName());
210    final Algorithm defaultCompression = HFileWriterImpl
211        .compressionByName(defaultCompressionStr);
212    final boolean compactionExclude = conf.getBoolean(
213        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
214
215    final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
216            Bytes.toString(tableSeparator))).collect(Collectors.toSet());
217
218    // create a map from column family to the compression algorithm
219    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
220    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
221    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
222
223    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
224    final Map<byte[], DataBlockEncoding> datablockEncodingMap
225        = createFamilyDataBlockEncodingMap(conf);
226    final DataBlockEncoding overriddenEncoding;
227    if (dataBlockEncodingStr != null) {
228      overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
229    } else {
230      overriddenEncoding = null;
231    }
232
233    return new RecordWriter<ImmutableBytesWritable, V>() {
234      // Map of families to writers and how much has been output on the writer.
235      private final Map<byte[], WriterLength> writers =
236              new TreeMap<>(Bytes.BYTES_COMPARATOR);
237      private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
238      private final long now = EnvironmentEdgeManager.currentTime();
239      private boolean rollRequested = false;
240
241      @Override
242      public void write(ImmutableBytesWritable row, V cell)
243          throws IOException {
244        Cell kv = cell;
245        // null input == user explicitly wants to flush
246        if (row == null && kv == null) {
247          rollWriters(null);
248          return;
249        }
250
251        byte[] rowKey = CellUtil.cloneRow(kv);
252        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
253        byte[] family = CellUtil.cloneFamily(kv);
254        byte[] tableNameBytes = null;
255        if (writeMultipleTables) {
256          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
257          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
258            throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) +
259                    "' not" + " expected");
260          }
261        } else {
262          tableNameBytes = Bytes.toBytes(writeTableNames);
263        }
264        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
265        WriterLength wl = this.writers.get(tableAndFamily);
266
267        // If this is a new column family, verify that the directory exists
268        if (wl == null) {
269          Path writerPath = null;
270          if (writeMultipleTables) {
271            writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes
272                    .toString(family)));
273          }
274          else {
275            writerPath = new Path(outputDir, Bytes.toString(family));
276          }
277          fs.mkdirs(writerPath);
278          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
279        }
280
281        if (wl != null && wl.written + length >= maxsize) {
282          this.rollRequested = true;
283        }
284
285        // This can only happen once a row is finished though
286        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
287          rollWriters(wl);
288        }
289
290        // create a new WAL writer, if necessary
291        if (wl == null || wl.writer == null) {
292          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
293            HRegionLocation loc = null;
294
295            String tableName = Bytes.toString(tableNameBytes);
296            if (tableName != null) {
297              try (Connection connection = ConnectionFactory.createConnection(conf);
298                     RegionLocator locator =
299                       connection.getRegionLocator(TableName.valueOf(tableName))) {
300                loc = locator.getRegionLocation(rowKey);
301              } catch (Throwable e) {
302                LOG.warn("There's something wrong when locating rowkey: " +
303                  Bytes.toString(rowKey) + " for tablename: " + tableName, e);
304                loc = null;
305              } }
306
307            if (null == loc) {
308              if (LOG.isTraceEnabled()) {
309                LOG.trace("failed to get region location, so use default writer for rowkey: " +
310                  Bytes.toString(rowKey));
311              }
312              wl = getNewWriter(tableNameBytes, family, conf, null);
313            } else {
314              if (LOG.isDebugEnabled()) {
315                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
316              }
317              InetSocketAddress initialIsa =
318                  new InetSocketAddress(loc.getHostname(), loc.getPort());
319              if (initialIsa.isUnresolved()) {
320                if (LOG.isTraceEnabled()) {
321                  LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
322                      + loc.getPort() + ", so use default writer");
323                }
324                wl = getNewWriter(tableNameBytes, family, conf, null);
325              } else {
326                if (LOG.isDebugEnabled()) {
327                  LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
328                }
329                wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
330                });
331              }
332            }
333          } else {
334            wl = getNewWriter(tableNameBytes, family, conf, null);
335          }
336        }
337
338        // we now have the proper WAL writer. full steam ahead
339        PrivateCellUtil.updateLatestStamp(cell, this.now);
340        wl.writer.append(kv);
341        wl.written += length;
342
343        // Copy the row so we know when a row transition.
344        this.previousRow = rowKey;
345      }
346
347      private void rollWriters(WriterLength writerLength) throws IOException {
348        if (writerLength != null) {
349          closeWriter(writerLength);
350        } else {
351          for (WriterLength wl : this.writers.values()) {
352            closeWriter(wl);
353          }
354        }
355        this.rollRequested = false;
356      }
357
358      private void closeWriter(WriterLength wl) throws IOException {
359        if (wl.writer != null) {
360          LOG.info(
361              "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written));
362          close(wl.writer);
363        }
364        wl.writer = null;
365        wl.written = 0;
366      }
367
368      /*
369       * Create a new StoreFile.Writer.
370       * @param family
371       * @return A WriterLength, containing a new StoreFile.Writer.
372       * @throws IOException
373       */
374      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
375          justification="Not important")
376      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration
377              conf, InetSocketAddress[] favoredNodes) throws IOException {
378        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
379        Path familydir = new Path(outputDir, Bytes.toString(family));
380        if (writeMultipleTables) {
381          familydir = new Path(outputDir,
382                  new Path(Bytes.toString(tableName), Bytes.toString(family)));
383        }
384        WriterLength wl = new WriterLength();
385        Algorithm compression = compressionMap.get(tableAndFamily);
386        compression = compression == null ? defaultCompression : compression;
387        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
388        bloomType = bloomType == null ? BloomType.NONE : bloomType;
389        Integer blockSize = blockSizeMap.get(tableAndFamily);
390        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
391        DataBlockEncoding encoding = overriddenEncoding;
392        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
393        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
394        HFileContextBuilder contextBuilder = new HFileContextBuilder()
395                                    .withCompression(compression)
396                                    .withChecksumType(HStore.getChecksumType(conf))
397                                    .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
398                                    .withBlockSize(blockSize);
399
400        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
401          contextBuilder.withIncludesTags(true);
402        }
403
404        contextBuilder.withDataBlockEncoding(encoding);
405        HFileContext hFileContext = contextBuilder.build();
406        if (null == favoredNodes) {
407          wl.writer =
408              new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs)
409                  .withOutputDir(familydir).withBloomType(bloomType)
410                  .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build();
411        } else {
412          wl.writer =
413              new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
414                  .withOutputDir(familydir).withBloomType(bloomType)
415                  .withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
416                  .withFavoredNodes(favoredNodes).build();
417        }
418
419        this.writers.put(tableAndFamily, wl);
420        return wl;
421      }
422
423      private void close(final StoreFileWriter w) throws IOException {
424        if (w != null) {
425          w.appendFileInfo(BULKLOAD_TIME_KEY,
426              Bytes.toBytes(System.currentTimeMillis()));
427          w.appendFileInfo(BULKLOAD_TASK_KEY,
428              Bytes.toBytes(context.getTaskAttemptID().toString()));
429          w.appendFileInfo(MAJOR_COMPACTION_KEY,
430              Bytes.toBytes(true));
431          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY,
432              Bytes.toBytes(compactionExclude));
433          w.appendTrackedTimestampsToMetadata();
434          w.close();
435        }
436      }
437
438      @Override
439      public void close(TaskAttemptContext c)
440      throws IOException, InterruptedException {
441        for (WriterLength wl: this.writers.values()) {
442          close(wl.writer);
443        }
444      }
445    };
446  }
447
448  /**
449   * Configure block storage policy for CF after the directory is created.
450   */
451  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
452      byte[] tableAndFamily, Path cfPath) {
453    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
454      return;
455    }
456
457    String policy =
458        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
459          conf.get(STORAGE_POLICY_PROPERTY));
460    FSUtils.setStoragePolicy(fs, cfPath, policy);
461  }
462
463  /*
464   * Data structure to hold a Writer and amount of data written on it.
465   */
466  static class WriterLength {
467    long written = 0;
468    StoreFileWriter writer = null;
469  }
470
471  /**
472   * Return the start keys of all of the regions in this table,
473   * as a list of ImmutableBytesWritable.
474   */
475  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
476                                                                 boolean writeMultipleTables)
477          throws IOException {
478
479    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
480    for(RegionLocator regionLocator : regionLocators)
481    {
482      TableName tableName = regionLocator.getName();
483      LOG.info("Looking up current regions for table " + tableName);
484      byte[][] byteKeys = regionLocator.getStartKeys();
485      for (byte[] byteKey : byteKeys) {
486        byte[] fullKey = byteKey; //HFileOutputFormat2 use case
487        if (writeMultipleTables)
488        {
489          //MultiTableHFileOutputFormat use case
490          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
491        }
492        if (LOG.isDebugEnabled()) {
493          LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary
494                  (fullKey) + "]");
495        }
496        ret.add(new ImmutableBytesWritable(fullKey));
497      }
498    }
499    return ret;
500  }
501
502  /**
503   * Write out a {@link SequenceFile} that can be read by
504   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
505   */
506  @SuppressWarnings("deprecation")
507  private static void writePartitions(Configuration conf, Path partitionsPath,
508      List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
509    LOG.info("Writing partition information to " + partitionsPath);
510    if (startKeys.isEmpty()) {
511      throw new IllegalArgumentException("No regions passed");
512    }
513
514    // We're generating a list of split points, and we don't ever
515    // have keys < the first region (which has an empty start key)
516    // so we need to remove it. Otherwise we would end up with an
517    // empty reducer with index 0
518    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
519    ImmutableBytesWritable first = sorted.first();
520    if (writeMultipleTables) {
521      first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first
522              ().get()));
523    }
524    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
525      throw new IllegalArgumentException(
526          "First region of table should have empty start key. Instead has: "
527          + Bytes.toStringBinary(first.get()));
528    }
529    sorted.remove(sorted.first());
530
531    // Write the actual file
532    FileSystem fs = partitionsPath.getFileSystem(conf);
533    SequenceFile.Writer writer = SequenceFile.createWriter(
534      fs, conf, partitionsPath, ImmutableBytesWritable.class,
535      NullWritable.class);
536
537    try {
538      for (ImmutableBytesWritable startKey : sorted) {
539        writer.append(startKey, NullWritable.get());
540      }
541    } finally {
542      writer.close();
543    }
544  }
545
546  /**
547   * Configure a MapReduce Job to perform an incremental load into the given
548   * table. This
549   * <ul>
550   *   <li>Inspects the table to configure a total order partitioner</li>
551   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
552   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
553   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
554   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
555   *     PutSortReducer)</li>
556   * </ul>
557   * The user should be sure to set the map output value class to either KeyValue or Put before
558   * running this function.
559   */
560  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
561      throws IOException {
562    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
563  }
564
565  /**
566   * Configure a MapReduce Job to perform an incremental load into the given
567   * table. This
568   * <ul>
569   *   <li>Inspects the table to configure a total order partitioner</li>
570   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
571   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
572   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
573   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
574   *     PutSortReducer)</li>
575   * </ul>
576   * The user should be sure to set the map output value class to either KeyValue or Put before
577   * running this function.
578   */
579  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
580      RegionLocator regionLocator) throws IOException {
581    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
582    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
583    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
584  }
585
586  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
587      Class<? extends OutputFormat<?, ?>> cls) throws IOException {
588    Configuration conf = job.getConfiguration();
589    job.setOutputKeyClass(ImmutableBytesWritable.class);
590    job.setOutputValueClass(MapReduceExtendedCell.class);
591    job.setOutputFormatClass(cls);
592
593    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
594      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
595    }
596    boolean writeMultipleTables = false;
597    if (MultiTableHFileOutputFormat.class.equals(cls)) {
598      writeMultipleTables = true;
599      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
600    }
601    // Based on the configured map output class, set the correct reducer to properly
602    // sort the incoming values.
603    // TODO it would be nice to pick one or the other of these formats.
604    if (KeyValue.class.equals(job.getMapOutputValueClass())
605        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) {
606      job.setReducerClass(CellSortReducer.class);
607    } else if (Put.class.equals(job.getMapOutputValueClass())) {
608      job.setReducerClass(PutSortReducer.class);
609    } else if (Text.class.equals(job.getMapOutputValueClass())) {
610      job.setReducerClass(TextSortReducer.class);
611    } else {
612      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
613    }
614
615    conf.setStrings("io.serializations", conf.get("io.serializations"),
616        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
617        CellSerialization.class.getName());
618
619    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
620      LOG.info("bulkload locality sensitive enabled");
621    }
622
623    /* Now get the region start keys for every table required */
624    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
625    List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size());
626    List<TableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size());
627
628    for( TableInfo tableInfo : multiTableInfo )
629    {
630      regionLocators.add(tableInfo.getRegionLocator());
631      allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
632      tableDescriptors.add(tableInfo.getTableDescriptor());
633    }
634    // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table
635    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
636            .toString(tableSeparator)));
637    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables);
638    // Use table's region boundaries for TOP split points.
639    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
640        "to match current region count for all tables");
641    job.setNumReduceTasks(startKeys.size());
642
643    configurePartitioner(job, startKeys, writeMultipleTables);
644    // Set compression algorithms based on column families
645
646    conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
647            tableDescriptors));
648    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
649            tableDescriptors));
650    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
651            tableDescriptors));
652    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
653            serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
654
655    TableMapReduceUtil.addDependencyJars(job);
656    TableMapReduceUtil.initCredentials(job);
657    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
658  }
659
660  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws
661      IOException {
662    Configuration conf = job.getConfiguration();
663
664    job.setOutputKeyClass(ImmutableBytesWritable.class);
665    job.setOutputValueClass(MapReduceExtendedCell.class);
666    job.setOutputFormatClass(HFileOutputFormat2.class);
667
668    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
669    singleTableDescriptor.add(tableDescriptor);
670
671    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
672    // Set compression algorithms based on column families
673    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
674        serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
675    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
676        serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
677    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
678        serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
679    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
680        serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
681
682    TableMapReduceUtil.addDependencyJars(job);
683    TableMapReduceUtil.initCredentials(job);
684    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
685  }
686
687  /**
688   * Runs inside the task to deserialize column family to compression algorithm
689   * map from the configuration.
690   *
691   * @param conf to read the serialized values from
692   * @return a map from column family to the configured compression algorithm
693   */
694  @VisibleForTesting
695  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
696      conf) {
697    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
698        COMPRESSION_FAMILIES_CONF_KEY);
699    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
700    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
701      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
702      compressionMap.put(e.getKey(), algorithm);
703    }
704    return compressionMap;
705  }
706
707  /**
708   * Runs inside the task to deserialize column family to bloom filter type
709   * map from the configuration.
710   *
711   * @param conf to read the serialized values from
712   * @return a map from column family to the the configured bloom filter type
713   */
714  @VisibleForTesting
715  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
716    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
717        BLOOM_TYPE_FAMILIES_CONF_KEY);
718    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
719    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
720      BloomType bloomType = BloomType.valueOf(e.getValue());
721      bloomTypeMap.put(e.getKey(), bloomType);
722    }
723    return bloomTypeMap;
724  }
725
726  /**
727   * Runs inside the task to deserialize column family to block size
728   * map from the configuration.
729   *
730   * @param conf to read the serialized values from
731   * @return a map from column family to the configured block size
732   */
733  @VisibleForTesting
734  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
735    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
736        BLOCK_SIZE_FAMILIES_CONF_KEY);
737    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
738    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
739      Integer blockSize = Integer.parseInt(e.getValue());
740      blockSizeMap.put(e.getKey(), blockSize);
741    }
742    return blockSizeMap;
743  }
744
745  /**
746   * Runs inside the task to deserialize column family to data block encoding
747   * type map from the configuration.
748   *
749   * @param conf to read the serialized values from
750   * @return a map from column family to HFileDataBlockEncoder for the
751   *         configured data block type for the family
752   */
753  @VisibleForTesting
754  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
755      Configuration conf) {
756    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
757        DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
758    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
759    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
760      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
761    }
762    return encoderMap;
763  }
764
765
766  /**
767   * Run inside the task to deserialize column family to given conf value map.
768   *
769   * @param conf to read the serialized values from
770   * @param confName conf key to read from the configuration
771   * @return a map of column family to the given configuration value
772   */
773  private static Map<byte[], String> createFamilyConfValueMap(
774      Configuration conf, String confName) {
775    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
776    String confVal = conf.get(confName, "");
777    for (String familyConf : confVal.split("&")) {
778      String[] familySplit = familyConf.split("=");
779      if (familySplit.length != 2) {
780        continue;
781      }
782      try {
783        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
784            URLDecoder.decode(familySplit[1], "UTF-8"));
785      } catch (UnsupportedEncodingException e) {
786        // will not happen with UTF-8 encoding
787        throw new AssertionError(e);
788      }
789    }
790    return confValMap;
791  }
792
793  /**
794   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
795   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
796   */
797  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
798          writeMultipleTables)
799      throws IOException {
800    Configuration conf = job.getConfiguration();
801    // create the partitions file
802    FileSystem fs = FileSystem.get(conf);
803    String hbaseTmpFsDir =
804        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
805          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
806    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
807    fs.makeQualified(partitionsPath);
808    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
809    fs.deleteOnExit(partitionsPath);
810
811    // configure job to use it
812    job.setPartitionerClass(TotalOrderPartitioner.class);
813    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
814  }
815
816  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
817  @VisibleForTesting
818  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables)
819      throws UnsupportedEncodingException {
820    StringBuilder attributeValue = new StringBuilder();
821    int i = 0;
822    for (TableDescriptor tableDescriptor : allTables) {
823      if (tableDescriptor == null) {
824        // could happen with mock table instance
825        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
826        return "";
827      }
828      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
829        if (i++ > 0) {
830          attributeValue.append('&');
831        }
832        attributeValue.append(URLEncoder.encode(
833            Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())),
834            "UTF-8"));
835        attributeValue.append('=');
836        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
837      }
838    }
839    // Get rid of the last ampersand
840    return attributeValue.toString();
841  }
842
843  /**
844   * Serialize column family to compression algorithm map to configuration.
845   * Invoked while configuring the MR job for incremental load.
846   */
847  @VisibleForTesting
848  static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor ->
849          familyDescriptor.getCompressionType().getName();
850
851  /**
852   * Serialize column family to block size map to configuration. Invoked while
853   * configuring the MR job for incremental load.
854   */
855  @VisibleForTesting
856  static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String
857          .valueOf(familyDescriptor.getBlocksize());
858
859  /**
860   * Serialize column family to bloom type map to configuration. Invoked while
861   * configuring the MR job for incremental load.
862   */
863  @VisibleForTesting
864  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
865    String bloomType = familyDescriptor.getBloomFilterType().toString();
866    if (bloomType == null) {
867      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
868    }
869    return bloomType;
870  };
871
872  /**
873   * Serialize column family to data block encoding map to configuration.
874   * Invoked while configuring the MR job for incremental load.
875   */
876  @VisibleForTesting
877  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
878    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
879    if (encoding == null) {
880      encoding = DataBlockEncoding.NONE;
881    }
882    return encoding.toString();
883  };
884}