001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.TreeMap;
036import java.util.TreeSet;
037import java.util.UUID;
038import java.util.function.Function;
039import java.util.stream.Collectors;
040import org.apache.commons.lang3.StringUtils;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.CellUtil;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.HTableDescriptor;
049import org.apache.hadoop.hbase.KeyValue;
050import org.apache.hadoop.hbase.PrivateCellUtil;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
054import org.apache.hadoop.hbase.client.Connection;
055import org.apache.hadoop.hbase.client.ConnectionFactory;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.RegionLocator;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.client.TableDescriptor;
060import org.apache.hadoop.hbase.fs.HFileSystem;
061import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
062import org.apache.hadoop.hbase.io.compress.Compression;
063import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
064import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
065import org.apache.hadoop.hbase.io.hfile.CacheConfig;
066import org.apache.hadoop.hbase.io.hfile.HFile;
067import org.apache.hadoop.hbase.io.hfile.HFileContext;
068import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
069import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
070import org.apache.hadoop.hbase.regionserver.BloomType;
071import org.apache.hadoop.hbase.regionserver.HStore;
072import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
073import org.apache.hadoop.hbase.util.BloomFilterUtil;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.CommonFSUtils;
076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
077import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
078import org.apache.hadoop.io.NullWritable;
079import org.apache.hadoop.io.SequenceFile;
080import org.apache.hadoop.io.Text;
081import org.apache.hadoop.mapreduce.Job;
082import org.apache.hadoop.mapreduce.OutputCommitter;
083import org.apache.hadoop.mapreduce.OutputFormat;
084import org.apache.hadoop.mapreduce.RecordWriter;
085import org.apache.hadoop.mapreduce.TaskAttemptContext;
086import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
087import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
088import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
089import org.apache.yetus.audience.InterfaceAudience;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
094
095/**
096 * Writes HFiles. Passed Cells must arrive in order.
097 * Writes current time as the sequence id for the file. Sets the major compacted
098 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
099 * all HFiles being written.
100 * <p>
101 * Using this class as part of a MapReduce job is best done
102 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
103 */
104@InterfaceAudience.Public
105public class HFileOutputFormat2
106    extends FileOutputFormat<ImmutableBytesWritable, Cell> {
107  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
108  static class TableInfo {
109    private TableDescriptor tableDesctiptor;
110    private RegionLocator regionLocator;
111
112    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
113      this.tableDesctiptor = tableDesctiptor;
114      this.regionLocator = regionLocator;
115    }
116
117    /**
118     * The modification for the returned HTD doesn't affect the inner TD.
119     * @return A clone of inner table descriptor
120     * @deprecated since 2.0.0 and will be removed in 3.0.0. Use {@link #getTableDescriptor()}
121     *   instead.
122     * @see #getTableDescriptor()
123     * @see <a href="https://issues.apache.org/jira/browse/HBASE-18241">HBASE-18241</a>
124     */
125    @Deprecated
126    public HTableDescriptor getHTableDescriptor() {
127      return new HTableDescriptor(tableDesctiptor);
128    }
129
130    public TableDescriptor getTableDescriptor() {
131      return tableDesctiptor;
132    }
133
134    public RegionLocator getRegionLocator() {
135      return regionLocator;
136    }
137  }
138
139  protected static final byte[] tableSeparator = Bytes.toBytes(";");
140
141  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
142    return Bytes.add(tableName, tableSeparator, suffix);
143  }
144
145  // The following constants are private since these are used by
146  // HFileOutputFormat2 to internally transfer data between job setup and
147  // reducer run using conf.
148  // These should not be changed by the client.
149  static final String COMPRESSION_FAMILIES_CONF_KEY =
150      "hbase.hfileoutputformat.families.compression";
151  static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
152      "hbase.hfileoutputformat.families.bloomtype";
153  static final String BLOOM_PARAM_FAMILIES_CONF_KEY =
154      "hbase.hfileoutputformat.families.bloomparam";
155  static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
156      "hbase.mapreduce.hfileoutputformat.blocksize";
157  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
158      "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
159
160  // This constant is public since the client can modify this when setting
161  // up their conf object and thus refer to this symbol.
162  // It is present for backwards compatibility reasons. Use it only to
163  // override the auto-detection of datablock encoding and compression.
164  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
165      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
166  public static final String COMPRESSION_OVERRIDE_CONF_KEY =
167      "hbase.mapreduce.hfileoutputformat.compression";
168
169  /**
170   * Keep locality while generating HFiles for bulkload. See HBASE-12596
171   */
172  public static final String LOCALITY_SENSITIVE_CONF_KEY =
173      "hbase.bulkload.locality.sensitive.enabled";
174  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
175  static final String OUTPUT_TABLE_NAME_CONF_KEY =
176      "hbase.mapreduce.hfileoutputformat.table.name";
177  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
178          "hbase.mapreduce.use.multi.table.hfileoutputformat";
179
180  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
181  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
182
183  @Override
184  public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
185      final TaskAttemptContext context) throws IOException, InterruptedException {
186    return createRecordWriter(context, this.getOutputCommitter(context));
187  }
188
189  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
190    return combineTableNameSuffix(tableName, family);
191  }
192
193  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(
194    final TaskAttemptContext context, final OutputCommitter committer) throws IOException {
195
196    // Get the path of the temporary output file
197    final Path outputDir = ((FileOutputCommitter)committer).getWorkPath();
198    final Configuration conf = context.getConfiguration();
199    final boolean writeMultipleTables =
200      conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
201    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
202    if (writeTableNames == null || writeTableNames.isEmpty()) {
203      throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty");
204    }
205    final FileSystem fs = outputDir.getFileSystem(conf);
206    // These configs. are from hbase-*.xml
207    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
208        HConstants.DEFAULT_MAX_FILE_SIZE);
209    // Invented config.  Add to hbase-*.xml if other than default compression.
210    final String defaultCompressionStr = conf.get("hfile.compression",
211        Compression.Algorithm.NONE.getName());
212    final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
213    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
214    final Algorithm overriddenCompression = compressionStr != null ?
215      Compression.getCompressionAlgorithmByName(compressionStr): null;
216    final boolean compactionExclude = conf.getBoolean(
217        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
218    final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
219            Bytes.toString(tableSeparator))).collect(Collectors.toSet());
220
221    // create a map from column family to the compression algorithm
222    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
223    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
224    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
225    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
226
227    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
228    final Map<byte[], DataBlockEncoding> datablockEncodingMap
229        = createFamilyDataBlockEncodingMap(conf);
230    final DataBlockEncoding overriddenEncoding = dataBlockEncodingStr != null ?
231      DataBlockEncoding.valueOf(dataBlockEncodingStr) : null;
232
233    return new RecordWriter<ImmutableBytesWritable, V>() {
234      // Map of families to writers and how much has been output on the writer.
235      private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
236      private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR);
237      private final long now = EnvironmentEdgeManager.currentTime();
238
239      @Override
240      public void write(ImmutableBytesWritable row, V cell) throws IOException {
241        Cell kv = cell;
242        // null input == user explicitly wants to flush
243        if (row == null && kv == null) {
244          rollWriters(null);
245          return;
246        }
247
248        byte[] rowKey = CellUtil.cloneRow(kv);
249        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
250        byte[] family = CellUtil.cloneFamily(kv);
251        byte[] tableNameBytes = null;
252        if (writeMultipleTables) {
253          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
254          tableNameBytes = TableName.valueOf(tableNameBytes).toBytes();
255          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
256            throw new IllegalArgumentException("TableName " + Bytes.toString(tableNameBytes) +
257              " not expected");
258          }
259        } else {
260          tableNameBytes = Bytes.toBytes(writeTableNames);
261        }
262        Path tableRelPath = getTableRelativePath(tableNameBytes);
263        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
264        WriterLength wl = this.writers.get(tableAndFamily);
265
266        // If this is a new column family, verify that the directory exists
267        if (wl == null) {
268          Path writerPath = null;
269          if (writeMultipleTables) {
270            writerPath = new Path(outputDir,new Path(tableRelPath, Bytes.toString(family)));
271          }
272          else {
273            writerPath = new Path(outputDir, Bytes.toString(family));
274          }
275          fs.mkdirs(writerPath);
276          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
277        }
278
279        // This can only happen once a row is finished though
280        if (wl != null && wl.written + length >= maxsize
281                && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) {
282          rollWriters(wl);
283        }
284
285        // create a new WAL writer, if necessary
286        if (wl == null || wl.writer == null) {
287          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
288            HRegionLocation loc = null;
289
290            String tableName = Bytes.toString(tableNameBytes);
291            if (tableName != null) {
292              try (Connection connection = ConnectionFactory.createConnection(conf);
293                     RegionLocator locator =
294                       connection.getRegionLocator(TableName.valueOf(tableName))) {
295                loc = locator.getRegionLocation(rowKey);
296              } catch (Throwable e) {
297                LOG.warn("Something wrong locating rowkey {} in {}",
298                  Bytes.toString(rowKey), tableName, e);
299                loc = null;
300              } }
301
302            if (null == loc) {
303              LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
304              wl = getNewWriter(tableNameBytes, family, conf, null);
305            } else {
306              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
307              InetSocketAddress initialIsa =
308                  new InetSocketAddress(loc.getHostname(), loc.getPort());
309              if (initialIsa.isUnresolved()) {
310                LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
311                wl = getNewWriter(tableNameBytes, family, conf, null);
312              } else {
313                LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
314                wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
315                });
316              }
317            }
318          } else {
319            wl = getNewWriter(tableNameBytes, family, conf, null);
320          }
321        }
322
323        // we now have the proper WAL writer. full steam ahead
324        PrivateCellUtil.updateLatestStamp(cell, this.now);
325        wl.writer.append(kv);
326        wl.written += length;
327
328        // Copy the row so we know when a row transition.
329        this.previousRows.put(family, rowKey);
330      }
331
332      private Path getTableRelativePath(byte[] tableNameBytes) {
333        String tableName = Bytes.toString(tableNameBytes);
334        String[] tableNameParts = tableName.split(":");
335        Path tableRelPath = new Path(tableName.split(":")[0]);
336        if (tableNameParts.length > 1) {
337          tableRelPath = new Path(tableRelPath, tableName.split(":")[1]);
338        }
339        return tableRelPath;
340      }
341
342      private void rollWriters(WriterLength writerLength) throws IOException {
343        if (writerLength != null) {
344          closeWriter(writerLength);
345        } else {
346          for (WriterLength wl : this.writers.values()) {
347            closeWriter(wl);
348          }
349        }
350      }
351
352      private void closeWriter(WriterLength wl) throws IOException {
353        if (wl.writer != null) {
354          LOG.info("Writer=" + wl.writer.getPath() +
355            ((wl.written == 0)? "": ", wrote=" + wl.written));
356          close(wl.writer);
357          wl.writer = null;
358        }
359        wl.written = 0;
360      }
361
362      /*
363       * Create a new StoreFile.Writer.
364       * @return A WriterLength, containing a new StoreFile.Writer.
365       */
366      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
367          justification="Not important")
368      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf,
369          InetSocketAddress[] favoredNodes) throws IOException {
370        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
371        Path familydir = new Path(outputDir, Bytes.toString(family));
372        if (writeMultipleTables) {
373          familydir = new Path(outputDir,
374            new Path(getTableRelativePath(tableName), Bytes.toString(family)));
375        }
376        WriterLength wl = new WriterLength();
377        Algorithm compression = overriddenCompression;
378        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
379        compression = compression == null ? defaultCompression : compression;
380        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
381        bloomType = bloomType == null ? BloomType.NONE : bloomType;
382        String bloomParam = bloomParamMap.get(tableAndFamily);
383        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
384          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
385        }
386        Integer blockSize = blockSizeMap.get(tableAndFamily);
387        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
388        DataBlockEncoding encoding = overriddenEncoding;
389        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
390        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
391        HFileContextBuilder contextBuilder = new HFileContextBuilder()
392          .withCompression(compression).withChecksumType(HStore.getChecksumType(conf))
393          .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize)
394          .withColumnFamily(family).withTableName(tableName);
395
396        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
397          contextBuilder.withIncludesTags(true);
398        }
399
400        contextBuilder.withDataBlockEncoding(encoding);
401        HFileContext hFileContext = contextBuilder.build();
402        if (null == favoredNodes) {
403          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs)
404            .withOutputDir(familydir).withBloomType(bloomType)
405            .withFileContext(hFileContext).build();
406        } else {
407          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
408            .withOutputDir(familydir).withBloomType(bloomType)
409            .withFileContext(hFileContext).withFavoredNodes(favoredNodes).build();
410        }
411
412        this.writers.put(tableAndFamily, wl);
413        return wl;
414      }
415
416      private void close(final StoreFileWriter w) throws IOException {
417        if (w != null) {
418          w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
419          w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
420          w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
421          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
422          w.appendTrackedTimestampsToMetadata();
423          w.close();
424        }
425      }
426
427      @Override
428      public void close(TaskAttemptContext c) throws IOException, InterruptedException {
429        for (WriterLength wl: this.writers.values()) {
430          close(wl.writer);
431        }
432      }
433    };
434  }
435
436  /**
437   * Configure block storage policy for CF after the directory is created.
438   */
439  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
440      byte[] tableAndFamily, Path cfPath) {
441    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
442      return;
443    }
444
445    String policy =
446        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
447          conf.get(STORAGE_POLICY_PROPERTY));
448    CommonFSUtils.setStoragePolicy(fs, cfPath, policy);
449  }
450
451  /*
452   * Data structure to hold a Writer and amount of data written on it.
453   */
454  static class WriterLength {
455    long written = 0;
456    StoreFileWriter writer = null;
457  }
458
459  /**
460   * Return the start keys of all of the regions in this table,
461   * as a list of ImmutableBytesWritable.
462   */
463  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
464                                                                 boolean writeMultipleTables)
465          throws IOException {
466
467    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
468    for(RegionLocator regionLocator : regionLocators) {
469      TableName tableName = regionLocator.getName();
470      LOG.info("Looking up current regions for table " + tableName);
471      byte[][] byteKeys = regionLocator.getStartKeys();
472      for (byte[] byteKey : byteKeys) {
473        byte[] fullKey = byteKey; //HFileOutputFormat2 use case
474        if (writeMultipleTables) {
475          //MultiTableHFileOutputFormat use case
476          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
477        }
478        if (LOG.isDebugEnabled()) {
479          LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey));
480        }
481        ret.add(new ImmutableBytesWritable(fullKey));
482      }
483    }
484    return ret;
485  }
486
487  /**
488   * Write out a {@link SequenceFile} that can be read by
489   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
490   */
491  @SuppressWarnings("deprecation")
492  private static void writePartitions(Configuration conf, Path partitionsPath,
493      List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
494    LOG.info("Writing partition information to " + partitionsPath);
495    if (startKeys.isEmpty()) {
496      throw new IllegalArgumentException("No regions passed");
497    }
498
499    // We're generating a list of split points, and we don't ever
500    // have keys < the first region (which has an empty start key)
501    // so we need to remove it. Otherwise we would end up with an
502    // empty reducer with index 0
503    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
504    ImmutableBytesWritable first = sorted.first();
505    if (writeMultipleTables) {
506      first =
507        new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get()));
508    }
509    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
510      throw new IllegalArgumentException(
511          "First region of table should have empty start key. Instead has: "
512          + Bytes.toStringBinary(first.get()));
513    }
514    sorted.remove(sorted.first());
515
516    // Write the actual file
517    FileSystem fs = partitionsPath.getFileSystem(conf);
518    SequenceFile.Writer writer = SequenceFile.createWriter(
519      fs, conf, partitionsPath, ImmutableBytesWritable.class,
520      NullWritable.class);
521
522    try {
523      for (ImmutableBytesWritable startKey : sorted) {
524        writer.append(startKey, NullWritable.get());
525      }
526    } finally {
527      writer.close();
528    }
529  }
530
531  /**
532   * Configure a MapReduce Job to perform an incremental load into the given
533   * table. This
534   * <ul>
535   *   <li>Inspects the table to configure a total order partitioner</li>
536   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
537   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
538   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
539   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
540   *     PutSortReducer)</li>
541   * </ul>
542   * The user should be sure to set the map output value class to either KeyValue or Put before
543   * running this function.
544   */
545  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
546      throws IOException {
547    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
548  }
549
550  /**
551   * Configure a MapReduce Job to perform an incremental load into the given
552   * table. This
553   * <ul>
554   *   <li>Inspects the table to configure a total order partitioner</li>
555   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
556   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
557   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
558   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
559   *     PutSortReducer)</li>
560   * </ul>
561   * The user should be sure to set the map output value class to either KeyValue or Put before
562   * running this function.
563   */
564  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
565      RegionLocator regionLocator) throws IOException {
566    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
567    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
568    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
569  }
570
571  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
572      Class<? extends OutputFormat<?, ?>> cls) throws IOException {
573    Configuration conf = job.getConfiguration();
574    job.setOutputKeyClass(ImmutableBytesWritable.class);
575    job.setOutputValueClass(MapReduceExtendedCell.class);
576    job.setOutputFormatClass(cls);
577
578    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
579      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
580    }
581    boolean writeMultipleTables = false;
582    if (MultiTableHFileOutputFormat.class.equals(cls)) {
583      writeMultipleTables = true;
584      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
585    }
586    // Based on the configured map output class, set the correct reducer to properly
587    // sort the incoming values.
588    // TODO it would be nice to pick one or the other of these formats.
589    if (KeyValue.class.equals(job.getMapOutputValueClass())
590        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) {
591      job.setReducerClass(CellSortReducer.class);
592    } else if (Put.class.equals(job.getMapOutputValueClass())) {
593      job.setReducerClass(PutSortReducer.class);
594    } else if (Text.class.equals(job.getMapOutputValueClass())) {
595      job.setReducerClass(TextSortReducer.class);
596    } else {
597      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
598    }
599
600    conf.setStrings("io.serializations", conf.get("io.serializations"),
601        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
602        CellSerialization.class.getName());
603
604    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
605      LOG.info("bulkload locality sensitive enabled");
606    }
607
608    /* Now get the region start keys for every table required */
609    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
610    List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size());
611    List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size());
612
613    for(TableInfo tableInfo : multiTableInfo) {
614      regionLocators.add(tableInfo.getRegionLocator());
615      allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
616      tableDescriptors.add(tableInfo.getTableDescriptor());
617    }
618    // Record tablenames for creating writer by favored nodes, and decoding compression,
619    // block size and other attributes of columnfamily per table
620    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
621            .toString(tableSeparator)));
622    List<ImmutableBytesWritable> startKeys =
623      getRegionStartKeys(regionLocators, writeMultipleTables);
624    // Use table's region boundaries for TOP split points.
625    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
626        "to match current region count for all tables");
627    job.setNumReduceTasks(startKeys.size());
628
629    configurePartitioner(job, startKeys, writeMultipleTables);
630    // Set compression algorithms based on column families
631
632    conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
633            tableDescriptors));
634    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
635            tableDescriptors));
636    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
637            tableDescriptors));
638    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails,
639        tableDescriptors));
640    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
641            serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
642
643    TableMapReduceUtil.addDependencyJars(job);
644    TableMapReduceUtil.initCredentials(job);
645    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
646  }
647
648  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws
649      IOException {
650    Configuration conf = job.getConfiguration();
651
652    job.setOutputKeyClass(ImmutableBytesWritable.class);
653    job.setOutputValueClass(MapReduceExtendedCell.class);
654    job.setOutputFormatClass(HFileOutputFormat2.class);
655
656    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
657    singleTableDescriptor.add(tableDescriptor);
658
659    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
660    // Set compression algorithms based on column families
661    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
662        serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
663    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
664        serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
665    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
666        serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
667    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
668        serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
669    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
670        serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
671
672    TableMapReduceUtil.addDependencyJars(job);
673    TableMapReduceUtil.initCredentials(job);
674    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
675  }
676
677  /**
678   * Runs inside the task to deserialize column family to compression algorithm
679   * map from the configuration.
680   *
681   * @param conf to read the serialized values from
682   * @return a map from column family to the configured compression algorithm
683   */
684  @VisibleForTesting
685  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
686      conf) {
687    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
688        COMPRESSION_FAMILIES_CONF_KEY);
689    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
690    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
691      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
692      compressionMap.put(e.getKey(), algorithm);
693    }
694    return compressionMap;
695  }
696
697  /**
698   * Runs inside the task to deserialize column family to bloom filter type
699   * map from the configuration.
700   *
701   * @param conf to read the serialized values from
702   * @return a map from column family to the the configured bloom filter type
703   */
704  @VisibleForTesting
705  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
706    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
707        BLOOM_TYPE_FAMILIES_CONF_KEY);
708    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
709    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
710      BloomType bloomType = BloomType.valueOf(e.getValue());
711      bloomTypeMap.put(e.getKey(), bloomType);
712    }
713    return bloomTypeMap;
714  }
715
716  /**
717   * Runs inside the task to deserialize column family to bloom filter param
718   * map from the configuration.
719   *
720   * @param conf to read the serialized values from
721   * @return a map from column family to the the configured bloom filter param
722   */
723  @VisibleForTesting
724  static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
725    return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
726  }
727
728
729  /**
730   * Runs inside the task to deserialize column family to block size
731   * map from the configuration.
732   *
733   * @param conf to read the serialized values from
734   * @return a map from column family to the configured block size
735   */
736  @VisibleForTesting
737  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
738    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
739        BLOCK_SIZE_FAMILIES_CONF_KEY);
740    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
741    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
742      Integer blockSize = Integer.parseInt(e.getValue());
743      blockSizeMap.put(e.getKey(), blockSize);
744    }
745    return blockSizeMap;
746  }
747
748  /**
749   * Runs inside the task to deserialize column family to data block encoding
750   * type map from the configuration.
751   *
752   * @param conf to read the serialized values from
753   * @return a map from column family to HFileDataBlockEncoder for the
754   *         configured data block type for the family
755   */
756  @VisibleForTesting
757  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
758      Configuration conf) {
759    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
760        DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
761    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
762    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
763      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
764    }
765    return encoderMap;
766  }
767
768
769  /**
770   * Run inside the task to deserialize column family to given conf value map.
771   *
772   * @param conf to read the serialized values from
773   * @param confName conf key to read from the configuration
774   * @return a map of column family to the given configuration value
775   */
776  private static Map<byte[], String> createFamilyConfValueMap(
777      Configuration conf, String confName) {
778    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
779    String confVal = conf.get(confName, "");
780    for (String familyConf : confVal.split("&")) {
781      String[] familySplit = familyConf.split("=");
782      if (familySplit.length != 2) {
783        continue;
784      }
785      try {
786        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
787            URLDecoder.decode(familySplit[1], "UTF-8"));
788      } catch (UnsupportedEncodingException e) {
789        // will not happen with UTF-8 encoding
790        throw new AssertionError(e);
791      }
792    }
793    return confValMap;
794  }
795
796  /**
797   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
798   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
799   */
800  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
801          writeMultipleTables)
802      throws IOException {
803    Configuration conf = job.getConfiguration();
804    // create the partitions file
805    FileSystem fs = FileSystem.get(conf);
806    String hbaseTmpFsDir =
807        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
808          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
809    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
810    fs.makeQualified(partitionsPath);
811    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
812    fs.deleteOnExit(partitionsPath);
813
814    // configure job to use it
815    job.setPartitionerClass(TotalOrderPartitioner.class);
816    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
817  }
818
819  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
820    "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
821  @VisibleForTesting
822  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn,
823        List<TableDescriptor> allTables)
824      throws UnsupportedEncodingException {
825    StringBuilder attributeValue = new StringBuilder();
826    int i = 0;
827    for (TableDescriptor tableDescriptor : allTables) {
828      if (tableDescriptor == null) {
829        // could happen with mock table instance
830        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
831        return "";
832      }
833      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
834        if (i++ > 0) {
835          attributeValue.append('&');
836        }
837        attributeValue.append(URLEncoder.encode(
838          Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(),
839            familyDescriptor.getName())), "UTF-8"));
840        attributeValue.append('=');
841        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
842      }
843    }
844    // Get rid of the last ampersand
845    return attributeValue.toString();
846  }
847
848  /**
849   * Serialize column family to compression algorithm map to configuration.
850   * Invoked while configuring the MR job for incremental load.
851   */
852  @VisibleForTesting
853  static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor ->
854          familyDescriptor.getCompressionType().getName();
855
856  /**
857   * Serialize column family to block size map to configuration. Invoked while
858   * configuring the MR job for incremental load.
859   */
860  @VisibleForTesting
861  static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String
862          .valueOf(familyDescriptor.getBlocksize());
863
864  /**
865   * Serialize column family to bloom type map to configuration. Invoked while
866   * configuring the MR job for incremental load.
867   */
868  @VisibleForTesting
869  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
870    String bloomType = familyDescriptor.getBloomFilterType().toString();
871    if (bloomType == null) {
872      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
873    }
874    return bloomType;
875  };
876
877  /**
878   * Serialize column family to bloom param map to configuration. Invoked while
879   * configuring the MR job for incremental load.
880   */
881  @VisibleForTesting
882  static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
883    BloomType bloomType = familyDescriptor.getBloomFilterType();
884    String bloomParam = "";
885    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
886      bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
887    }
888    return bloomParam;
889  };
890
891  /**
892   * Serialize column family to data block encoding map to configuration.
893   * Invoked while configuring the MR job for incremental load.
894   */
895  @VisibleForTesting
896  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
897    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
898    if (encoding == null) {
899      encoding = DataBlockEncoding.NONE;
900    }
901    return encoding.toString();
902  };
903
904}