001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.TreeMap;
036import java.util.TreeSet;
037import java.util.UUID;
038import java.util.function.Function;
039import java.util.stream.Collectors;
040import org.apache.commons.lang3.StringUtils;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.CellUtil;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.HTableDescriptor;
049import org.apache.hadoop.hbase.KeyValue;
050import org.apache.hadoop.hbase.PrivateCellUtil;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
054import org.apache.hadoop.hbase.client.Connection;
055import org.apache.hadoop.hbase.client.ConnectionFactory;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.RegionLocator;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.client.TableDescriptor;
060import org.apache.hadoop.hbase.fs.HFileSystem;
061import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
062import org.apache.hadoop.hbase.io.compress.Compression;
063import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
064import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
065import org.apache.hadoop.hbase.io.hfile.CacheConfig;
066import org.apache.hadoop.hbase.io.hfile.HFile;
067import org.apache.hadoop.hbase.io.hfile.HFileContext;
068import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
069import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
070import org.apache.hadoop.hbase.regionserver.BloomType;
071import org.apache.hadoop.hbase.regionserver.HStore;
072import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
073import org.apache.hadoop.hbase.util.BloomFilterUtil;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.CommonFSUtils;
076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
077import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
078import org.apache.hadoop.io.NullWritable;
079import org.apache.hadoop.io.SequenceFile;
080import org.apache.hadoop.io.Text;
081import org.apache.hadoop.mapreduce.Job;
082import org.apache.hadoop.mapreduce.OutputCommitter;
083import org.apache.hadoop.mapreduce.OutputFormat;
084import org.apache.hadoop.mapreduce.RecordWriter;
085import org.apache.hadoop.mapreduce.TaskAttemptContext;
086import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
087import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
088import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
089import org.apache.yetus.audience.InterfaceAudience;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093/**
094 * Writes HFiles. Passed Cells must arrive in order.
095 * Writes current time as the sequence id for the file. Sets the major compacted
096 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
097 * all HFiles being written.
098 * <p>
099 * Using this class as part of a MapReduce job is best done
100 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
101 */
102@InterfaceAudience.Public
103public class HFileOutputFormat2
104    extends FileOutputFormat<ImmutableBytesWritable, Cell> {
105  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
106  static class TableInfo {
107    private TableDescriptor tableDesctiptor;
108    private RegionLocator regionLocator;
109
110    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
111      this.tableDesctiptor = tableDesctiptor;
112      this.regionLocator = regionLocator;
113    }
114
115    /**
116     * The modification for the returned HTD doesn't affect the inner TD.
117     * @return A clone of inner table descriptor
118     * @deprecated since 2.0.0 and will be removed in 3.0.0. Use {@link #getTableDescriptor()}
119     *   instead.
120     * @see #getTableDescriptor()
121     * @see <a href="https://issues.apache.org/jira/browse/HBASE-18241">HBASE-18241</a>
122     */
123    @Deprecated
124    public HTableDescriptor getHTableDescriptor() {
125      return new HTableDescriptor(tableDesctiptor);
126    }
127
128    public TableDescriptor getTableDescriptor() {
129      return tableDesctiptor;
130    }
131
132    public RegionLocator getRegionLocator() {
133      return regionLocator;
134    }
135  }
136
137  protected static final byte[] tableSeparator = Bytes.toBytes(";");
138
139  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
140    return Bytes.add(tableName, tableSeparator, suffix);
141  }
142
143  // The following constants are private since these are used by
144  // HFileOutputFormat2 to internally transfer data between job setup and
145  // reducer run using conf.
146  // These should not be changed by the client.
147  static final String COMPRESSION_FAMILIES_CONF_KEY =
148      "hbase.hfileoutputformat.families.compression";
149  static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
150      "hbase.hfileoutputformat.families.bloomtype";
151  static final String BLOOM_PARAM_FAMILIES_CONF_KEY =
152      "hbase.hfileoutputformat.families.bloomparam";
153  static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
154      "hbase.mapreduce.hfileoutputformat.blocksize";
155  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
156      "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
157
158  // This constant is public since the client can modify this when setting
159  // up their conf object and thus refer to this symbol.
160  // It is present for backwards compatibility reasons. Use it only to
161  // override the auto-detection of datablock encoding and compression.
162  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
163      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
164  public static final String COMPRESSION_OVERRIDE_CONF_KEY =
165      "hbase.mapreduce.hfileoutputformat.compression";
166
167  /**
168   * Keep locality while generating HFiles for bulkload. See HBASE-12596
169   */
170  public static final String LOCALITY_SENSITIVE_CONF_KEY =
171      "hbase.bulkload.locality.sensitive.enabled";
172  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
173  static final String OUTPUT_TABLE_NAME_CONF_KEY =
174      "hbase.mapreduce.hfileoutputformat.table.name";
175  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
176          "hbase.mapreduce.use.multi.table.hfileoutputformat";
177
178  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
179  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
180
181  @Override
182  public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
183      final TaskAttemptContext context) throws IOException, InterruptedException {
184    return createRecordWriter(context, this.getOutputCommitter(context));
185  }
186
187  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
188    return combineTableNameSuffix(tableName, family);
189  }
190
191  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(
192    final TaskAttemptContext context, final OutputCommitter committer) throws IOException {
193
194    // Get the path of the temporary output file
195    final Path outputDir = ((FileOutputCommitter)committer).getWorkPath();
196    final Configuration conf = context.getConfiguration();
197    final boolean writeMultipleTables =
198      conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
199    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
200    if (writeTableNames == null || writeTableNames.isEmpty()) {
201      throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty");
202    }
203    final FileSystem fs = outputDir.getFileSystem(conf);
204    // These configs. are from hbase-*.xml
205    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
206        HConstants.DEFAULT_MAX_FILE_SIZE);
207    // Invented config.  Add to hbase-*.xml if other than default compression.
208    final String defaultCompressionStr = conf.get("hfile.compression",
209        Compression.Algorithm.NONE.getName());
210    final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
211    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
212    final Algorithm overriddenCompression = compressionStr != null ?
213      Compression.getCompressionAlgorithmByName(compressionStr): null;
214    final boolean compactionExclude = conf.getBoolean(
215        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
216    final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
217            Bytes.toString(tableSeparator))).collect(Collectors.toSet());
218
219    // create a map from column family to the compression algorithm
220    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
221    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
222    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
223    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
224
225    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
226    final Map<byte[], DataBlockEncoding> datablockEncodingMap
227        = createFamilyDataBlockEncodingMap(conf);
228    final DataBlockEncoding overriddenEncoding = dataBlockEncodingStr != null ?
229      DataBlockEncoding.valueOf(dataBlockEncodingStr) : null;
230
231    return new RecordWriter<ImmutableBytesWritable, V>() {
232      // Map of families to writers and how much has been output on the writer.
233      private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
234      private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR);
235      private final long now = EnvironmentEdgeManager.currentTime();
236
237      @Override
238      public void write(ImmutableBytesWritable row, V cell) throws IOException {
239        Cell kv = cell;
240        // null input == user explicitly wants to flush
241        if (row == null && kv == null) {
242          rollWriters(null);
243          return;
244        }
245
246        byte[] rowKey = CellUtil.cloneRow(kv);
247        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
248        byte[] family = CellUtil.cloneFamily(kv);
249        byte[] tableNameBytes = null;
250        if (writeMultipleTables) {
251          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
252          tableNameBytes = TableName.valueOf(tableNameBytes).toBytes();
253          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
254            throw new IllegalArgumentException("TableName " + Bytes.toString(tableNameBytes) +
255              " not expected");
256          }
257        } else {
258          tableNameBytes = Bytes.toBytes(writeTableNames);
259        }
260        Path tableRelPath = getTableRelativePath(tableNameBytes);
261        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
262        WriterLength wl = this.writers.get(tableAndFamily);
263
264        // If this is a new column family, verify that the directory exists
265        if (wl == null) {
266          Path writerPath = null;
267          if (writeMultipleTables) {
268            writerPath = new Path(outputDir,new Path(tableRelPath, Bytes.toString(family)));
269          }
270          else {
271            writerPath = new Path(outputDir, Bytes.toString(family));
272          }
273          fs.mkdirs(writerPath);
274          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
275        }
276
277        // This can only happen once a row is finished though
278        if (wl != null && wl.written + length >= maxsize
279                && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) {
280          rollWriters(wl);
281        }
282
283        // create a new WAL writer, if necessary
284        if (wl == null || wl.writer == null) {
285          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
286            HRegionLocation loc = null;
287
288            String tableName = Bytes.toString(tableNameBytes);
289            if (tableName != null) {
290              try (Connection connection = ConnectionFactory.createConnection(conf);
291                     RegionLocator locator =
292                       connection.getRegionLocator(TableName.valueOf(tableName))) {
293                loc = locator.getRegionLocation(rowKey);
294              } catch (Throwable e) {
295                LOG.warn("Something wrong locating rowkey {} in {}",
296                  Bytes.toString(rowKey), tableName, e);
297                loc = null;
298              } }
299
300            if (null == loc) {
301              LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
302              wl = getNewWriter(tableNameBytes, family, conf, null);
303            } else {
304              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
305              InetSocketAddress initialIsa =
306                  new InetSocketAddress(loc.getHostname(), loc.getPort());
307              if (initialIsa.isUnresolved()) {
308                LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
309                wl = getNewWriter(tableNameBytes, family, conf, null);
310              } else {
311                LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
312                wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
313                });
314              }
315            }
316          } else {
317            wl = getNewWriter(tableNameBytes, family, conf, null);
318          }
319        }
320
321        // we now have the proper WAL writer. full steam ahead
322        PrivateCellUtil.updateLatestStamp(cell, this.now);
323        wl.writer.append(kv);
324        wl.written += length;
325
326        // Copy the row so we know when a row transition.
327        this.previousRows.put(family, rowKey);
328      }
329
330      private Path getTableRelativePath(byte[] tableNameBytes) {
331        String tableName = Bytes.toString(tableNameBytes);
332        String[] tableNameParts = tableName.split(":");
333        Path tableRelPath = new Path(tableName.split(":")[0]);
334        if (tableNameParts.length > 1) {
335          tableRelPath = new Path(tableRelPath, tableName.split(":")[1]);
336        }
337        return tableRelPath;
338      }
339
340      private void rollWriters(WriterLength writerLength) throws IOException {
341        if (writerLength != null) {
342          closeWriter(writerLength);
343        } else {
344          for (WriterLength wl : this.writers.values()) {
345            closeWriter(wl);
346          }
347        }
348      }
349
350      private void closeWriter(WriterLength wl) throws IOException {
351        if (wl.writer != null) {
352          LOG.info("Writer=" + wl.writer.getPath() +
353            ((wl.written == 0)? "": ", wrote=" + wl.written));
354          close(wl.writer);
355          wl.writer = null;
356        }
357        wl.written = 0;
358      }
359
360      /*
361       * Create a new StoreFile.Writer.
362       * @return A WriterLength, containing a new StoreFile.Writer.
363       */
364      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
365          justification="Not important")
366      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf,
367          InetSocketAddress[] favoredNodes) throws IOException {
368        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
369        Path familydir = new Path(outputDir, Bytes.toString(family));
370        if (writeMultipleTables) {
371          familydir = new Path(outputDir,
372            new Path(getTableRelativePath(tableName), Bytes.toString(family)));
373        }
374        WriterLength wl = new WriterLength();
375        Algorithm compression = overriddenCompression;
376        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
377        compression = compression == null ? defaultCompression : compression;
378        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
379        bloomType = bloomType == null ? BloomType.NONE : bloomType;
380        String bloomParam = bloomParamMap.get(tableAndFamily);
381        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
382          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
383        }
384        Integer blockSize = blockSizeMap.get(tableAndFamily);
385        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
386        DataBlockEncoding encoding = overriddenEncoding;
387        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
388        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
389        HFileContextBuilder contextBuilder = new HFileContextBuilder()
390          .withCompression(compression).withChecksumType(HStore.getChecksumType(conf))
391          .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize)
392          .withColumnFamily(family).withTableName(tableName);
393
394        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
395          contextBuilder.withIncludesTags(true);
396        }
397
398        contextBuilder.withDataBlockEncoding(encoding);
399        HFileContext hFileContext = contextBuilder.build();
400        if (null == favoredNodes) {
401          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs)
402            .withOutputDir(familydir).withBloomType(bloomType)
403            .withFileContext(hFileContext).build();
404        } else {
405          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
406            .withOutputDir(familydir).withBloomType(bloomType)
407            .withFileContext(hFileContext).withFavoredNodes(favoredNodes).build();
408        }
409
410        this.writers.put(tableAndFamily, wl);
411        return wl;
412      }
413
414      private void close(final StoreFileWriter w) throws IOException {
415        if (w != null) {
416          w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
417          w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
418          w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
419          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
420          w.appendTrackedTimestampsToMetadata();
421          w.close();
422        }
423      }
424
425      @Override
426      public void close(TaskAttemptContext c) throws IOException, InterruptedException {
427        for (WriterLength wl: this.writers.values()) {
428          close(wl.writer);
429        }
430      }
431    };
432  }
433
434  /**
435   * Configure block storage policy for CF after the directory is created.
436   */
437  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
438      byte[] tableAndFamily, Path cfPath) {
439    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
440      return;
441    }
442
443    String policy =
444        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
445          conf.get(STORAGE_POLICY_PROPERTY));
446    CommonFSUtils.setStoragePolicy(fs, cfPath, policy);
447  }
448
449  /*
450   * Data structure to hold a Writer and amount of data written on it.
451   */
452  static class WriterLength {
453    long written = 0;
454    StoreFileWriter writer = null;
455  }
456
457  /**
458   * Return the start keys of all of the regions in this table,
459   * as a list of ImmutableBytesWritable.
460   */
461  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
462                                                                 boolean writeMultipleTables)
463          throws IOException {
464
465    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
466    for(RegionLocator regionLocator : regionLocators) {
467      TableName tableName = regionLocator.getName();
468      LOG.info("Looking up current regions for table " + tableName);
469      byte[][] byteKeys = regionLocator.getStartKeys();
470      for (byte[] byteKey : byteKeys) {
471        byte[] fullKey = byteKey; //HFileOutputFormat2 use case
472        if (writeMultipleTables) {
473          //MultiTableHFileOutputFormat use case
474          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
475        }
476        if (LOG.isDebugEnabled()) {
477          LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey));
478        }
479        ret.add(new ImmutableBytesWritable(fullKey));
480      }
481    }
482    return ret;
483  }
484
485  /**
486   * Write out a {@link SequenceFile} that can be read by
487   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
488   */
489  @SuppressWarnings("deprecation")
490  private static void writePartitions(Configuration conf, Path partitionsPath,
491      List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
492    LOG.info("Writing partition information to " + partitionsPath);
493    if (startKeys.isEmpty()) {
494      throw new IllegalArgumentException("No regions passed");
495    }
496
497    // We're generating a list of split points, and we don't ever
498    // have keys < the first region (which has an empty start key)
499    // so we need to remove it. Otherwise we would end up with an
500    // empty reducer with index 0
501    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
502    ImmutableBytesWritable first = sorted.first();
503    if (writeMultipleTables) {
504      first =
505        new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get()));
506    }
507    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
508      throw new IllegalArgumentException(
509          "First region of table should have empty start key. Instead has: "
510          + Bytes.toStringBinary(first.get()));
511    }
512    sorted.remove(sorted.first());
513
514    // Write the actual file
515    FileSystem fs = partitionsPath.getFileSystem(conf);
516    SequenceFile.Writer writer = SequenceFile.createWriter(
517      fs, conf, partitionsPath, ImmutableBytesWritable.class,
518      NullWritable.class);
519
520    try {
521      for (ImmutableBytesWritable startKey : sorted) {
522        writer.append(startKey, NullWritable.get());
523      }
524    } finally {
525      writer.close();
526    }
527  }
528
529  /**
530   * Configure a MapReduce Job to perform an incremental load into the given
531   * table. This
532   * <ul>
533   *   <li>Inspects the table to configure a total order partitioner</li>
534   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
535   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
536   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
537   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
538   *     PutSortReducer)</li>
539   * </ul>
540   * The user should be sure to set the map output value class to either KeyValue or Put before
541   * running this function.
542   */
543  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
544      throws IOException {
545    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
546  }
547
548  /**
549   * Configure a MapReduce Job to perform an incremental load into the given
550   * table. This
551   * <ul>
552   *   <li>Inspects the table to configure a total order partitioner</li>
553   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
554   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
555   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
556   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
557   *     PutSortReducer)</li>
558   * </ul>
559   * The user should be sure to set the map output value class to either KeyValue or Put before
560   * running this function.
561   */
562  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
563      RegionLocator regionLocator) throws IOException {
564    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
565    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
566    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
567  }
568
569  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
570      Class<? extends OutputFormat<?, ?>> cls) throws IOException {
571    Configuration conf = job.getConfiguration();
572    job.setOutputKeyClass(ImmutableBytesWritable.class);
573    job.setOutputValueClass(MapReduceExtendedCell.class);
574    job.setOutputFormatClass(cls);
575
576    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
577      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
578    }
579    boolean writeMultipleTables = false;
580    if (MultiTableHFileOutputFormat.class.equals(cls)) {
581      writeMultipleTables = true;
582      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
583    }
584    // Based on the configured map output class, set the correct reducer to properly
585    // sort the incoming values.
586    // TODO it would be nice to pick one or the other of these formats.
587    if (KeyValue.class.equals(job.getMapOutputValueClass())
588        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) {
589      job.setReducerClass(CellSortReducer.class);
590    } else if (Put.class.equals(job.getMapOutputValueClass())) {
591      job.setReducerClass(PutSortReducer.class);
592    } else if (Text.class.equals(job.getMapOutputValueClass())) {
593      job.setReducerClass(TextSortReducer.class);
594    } else {
595      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
596    }
597
598    conf.setStrings("io.serializations", conf.get("io.serializations"),
599        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
600        CellSerialization.class.getName());
601
602    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
603      LOG.info("bulkload locality sensitive enabled");
604    }
605
606    /* Now get the region start keys for every table required */
607    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
608    List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size());
609    List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size());
610
611    for(TableInfo tableInfo : multiTableInfo) {
612      regionLocators.add(tableInfo.getRegionLocator());
613      allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
614      tableDescriptors.add(tableInfo.getTableDescriptor());
615    }
616    // Record tablenames for creating writer by favored nodes, and decoding compression,
617    // block size and other attributes of columnfamily per table
618    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
619            .toString(tableSeparator)));
620    List<ImmutableBytesWritable> startKeys =
621      getRegionStartKeys(regionLocators, writeMultipleTables);
622    // Use table's region boundaries for TOP split points.
623    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
624        "to match current region count for all tables");
625    job.setNumReduceTasks(startKeys.size());
626
627    configurePartitioner(job, startKeys, writeMultipleTables);
628    // Set compression algorithms based on column families
629
630    conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
631            tableDescriptors));
632    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
633            tableDescriptors));
634    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
635            tableDescriptors));
636    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails,
637        tableDescriptors));
638    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
639            serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
640
641    TableMapReduceUtil.addDependencyJars(job);
642    TableMapReduceUtil.initCredentials(job);
643    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
644  }
645
646  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws
647      IOException {
648    Configuration conf = job.getConfiguration();
649
650    job.setOutputKeyClass(ImmutableBytesWritable.class);
651    job.setOutputValueClass(MapReduceExtendedCell.class);
652    job.setOutputFormatClass(HFileOutputFormat2.class);
653
654    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
655    singleTableDescriptor.add(tableDescriptor);
656
657    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
658    // Set compression algorithms based on column families
659    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
660        serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
661    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
662        serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
663    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
664        serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
665    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
666        serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
667    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
668        serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
669
670    TableMapReduceUtil.addDependencyJars(job);
671    TableMapReduceUtil.initCredentials(job);
672    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
673  }
674
675  /**
676   * Runs inside the task to deserialize column family to compression algorithm
677   * map from the configuration.
678   *
679   * @param conf to read the serialized values from
680   * @return a map from column family to the configured compression algorithm
681   */
682  @InterfaceAudience.Private
683  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
684      conf) {
685    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
686        COMPRESSION_FAMILIES_CONF_KEY);
687    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
688    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
689      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
690      compressionMap.put(e.getKey(), algorithm);
691    }
692    return compressionMap;
693  }
694
695  /**
696   * Runs inside the task to deserialize column family to bloom filter type
697   * map from the configuration.
698   *
699   * @param conf to read the serialized values from
700   * @return a map from column family to the the configured bloom filter type
701   */
702  @InterfaceAudience.Private
703  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
704    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
705        BLOOM_TYPE_FAMILIES_CONF_KEY);
706    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
707    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
708      BloomType bloomType = BloomType.valueOf(e.getValue());
709      bloomTypeMap.put(e.getKey(), bloomType);
710    }
711    return bloomTypeMap;
712  }
713
714  /**
715   * Runs inside the task to deserialize column family to bloom filter param
716   * map from the configuration.
717   *
718   * @param conf to read the serialized values from
719   * @return a map from column family to the the configured bloom filter param
720   */
721  @InterfaceAudience.Private
722  static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
723    return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
724  }
725
726
727  /**
728   * Runs inside the task to deserialize column family to block size
729   * map from the configuration.
730   *
731   * @param conf to read the serialized values from
732   * @return a map from column family to the configured block size
733   */
734  @InterfaceAudience.Private
735  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
736    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
737        BLOCK_SIZE_FAMILIES_CONF_KEY);
738    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
739    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
740      Integer blockSize = Integer.parseInt(e.getValue());
741      blockSizeMap.put(e.getKey(), blockSize);
742    }
743    return blockSizeMap;
744  }
745
746  /**
747   * Runs inside the task to deserialize column family to data block encoding
748   * type map from the configuration.
749   *
750   * @param conf to read the serialized values from
751   * @return a map from column family to HFileDataBlockEncoder for the
752   *         configured data block type for the family
753   */
754  @InterfaceAudience.Private
755  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
756      Configuration conf) {
757    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
758        DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
759    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
760    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
761      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
762    }
763    return encoderMap;
764  }
765
766
767  /**
768   * Run inside the task to deserialize column family to given conf value map.
769   *
770   * @param conf to read the serialized values from
771   * @param confName conf key to read from the configuration
772   * @return a map of column family to the given configuration value
773   */
774  private static Map<byte[], String> createFamilyConfValueMap(
775      Configuration conf, String confName) {
776    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
777    String confVal = conf.get(confName, "");
778    for (String familyConf : confVal.split("&")) {
779      String[] familySplit = familyConf.split("=");
780      if (familySplit.length != 2) {
781        continue;
782      }
783      try {
784        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
785            URLDecoder.decode(familySplit[1], "UTF-8"));
786      } catch (UnsupportedEncodingException e) {
787        // will not happen with UTF-8 encoding
788        throw new AssertionError(e);
789      }
790    }
791    return confValMap;
792  }
793
794  /**
795   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
796   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
797   */
798  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
799          writeMultipleTables)
800      throws IOException {
801    Configuration conf = job.getConfiguration();
802    // create the partitions file
803    FileSystem fs = FileSystem.get(conf);
804    String hbaseTmpFsDir =
805        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
806          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
807    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
808    fs.makeQualified(partitionsPath);
809    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
810    fs.deleteOnExit(partitionsPath);
811
812    // configure job to use it
813    job.setPartitionerClass(TotalOrderPartitioner.class);
814    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
815  }
816
817  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
818    "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
819  @InterfaceAudience.Private
820  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn,
821        List<TableDescriptor> allTables)
822      throws UnsupportedEncodingException {
823    StringBuilder attributeValue = new StringBuilder();
824    int i = 0;
825    for (TableDescriptor tableDescriptor : allTables) {
826      if (tableDescriptor == null) {
827        // could happen with mock table instance
828        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
829        return "";
830      }
831      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
832        if (i++ > 0) {
833          attributeValue.append('&');
834        }
835        attributeValue.append(URLEncoder.encode(
836          Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(),
837            familyDescriptor.getName())), "UTF-8"));
838        attributeValue.append('=');
839        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
840      }
841    }
842    // Get rid of the last ampersand
843    return attributeValue.toString();
844  }
845
846  /**
847   * Serialize column family to compression algorithm map to configuration.
848   * Invoked while configuring the MR job for incremental load.
849   */
850  @InterfaceAudience.Private
851  static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor ->
852          familyDescriptor.getCompressionType().getName();
853
854  /**
855   * Serialize column family to block size map to configuration. Invoked while
856   * configuring the MR job for incremental load.
857   */
858  @InterfaceAudience.Private
859  static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String
860          .valueOf(familyDescriptor.getBlocksize());
861
862  /**
863   * Serialize column family to bloom type map to configuration. Invoked while
864   * configuring the MR job for incremental load.
865   */
866  @InterfaceAudience.Private
867  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
868    String bloomType = familyDescriptor.getBloomFilterType().toString();
869    if (bloomType == null) {
870      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
871    }
872    return bloomType;
873  };
874
875  /**
876   * Serialize column family to bloom param map to configuration. Invoked while
877   * configuring the MR job for incremental load.
878   */
879  @InterfaceAudience.Private
880  static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
881    BloomType bloomType = familyDescriptor.getBloomFilterType();
882    String bloomParam = "";
883    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
884      bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
885    }
886    return bloomParam;
887  };
888
889  /**
890   * Serialize column family to data block encoding map to configuration.
891   * Invoked while configuring the MR job for incremental load.
892   */
893  @InterfaceAudience.Private
894  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
895    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
896    if (encoding == null) {
897      encoding = DataBlockEncoding.NONE;
898    }
899    return encoding.toString();
900  };
901
902}