001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.nio.charset.Charset;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.TreeMap;
037import java.util.TreeSet;
038import java.util.UUID;
039import java.util.function.Function;
040import java.util.stream.Collectors;
041
042import org.apache.commons.lang3.StringUtils;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.CellComparator;
048import org.apache.hadoop.hbase.CellUtil;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.HRegionLocation;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.PrivateCellUtil;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.Put;
059import org.apache.hadoop.hbase.client.RegionLocator;
060import org.apache.hadoop.hbase.client.Table;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.fs.HFileSystem;
063import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
064import org.apache.hadoop.hbase.io.compress.Compression;
065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
066import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
067import org.apache.hadoop.hbase.io.hfile.CacheConfig;
068import org.apache.hadoop.hbase.io.hfile.HFile;
069import org.apache.hadoop.hbase.io.hfile.HFileContext;
070import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
071import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
072import org.apache.hadoop.hbase.regionserver.BloomType;
073import org.apache.hadoop.hbase.regionserver.HStore;
074import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
075import org.apache.hadoop.hbase.util.BloomFilterUtil;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
078import org.apache.hadoop.hbase.util.FSUtils;
079import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
080import org.apache.hadoop.io.NullWritable;
081import org.apache.hadoop.io.SequenceFile;
082import org.apache.hadoop.io.Text;
083import org.apache.hadoop.mapreduce.Job;
084import org.apache.hadoop.mapreduce.OutputCommitter;
085import org.apache.hadoop.mapreduce.OutputFormat;
086import org.apache.hadoop.mapreduce.RecordWriter;
087import org.apache.hadoop.mapreduce.TaskAttemptContext;
088import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
089import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
090import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
091import org.apache.yetus.audience.InterfaceAudience;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
096
097/**
098 * Writes HFiles. Passed Cells must arrive in order.
099 * Writes current time as the sequence id for the file. Sets the major compacted
100 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
101 * all HFiles being written.
102 * <p>
103 * Using this class as part of a MapReduce job is best done
104 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
105 */
106@InterfaceAudience.Public
107public class HFileOutputFormat2
108    extends FileOutputFormat<ImmutableBytesWritable, Cell> {
109  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
110  static class TableInfo {
111    private TableDescriptor tableDesctiptor;
112    private RegionLocator regionLocator;
113
114    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
115      this.tableDesctiptor = tableDesctiptor;
116      this.regionLocator = regionLocator;
117    }
118
119    public TableDescriptor getTableDescriptor() {
120      return tableDesctiptor;
121    }
122
123    public RegionLocator getRegionLocator() {
124      return regionLocator;
125    }
126  }
127
128  protected static final byte[] tableSeparator = Bytes.toBytes(";");
129
130  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
131    return Bytes.add(tableName, tableSeparator, suffix);
132  }
133
134  // The following constants are private since these are used by
135  // HFileOutputFormat2 to internally transfer data between job setup and
136  // reducer run using conf.
137  // These should not be changed by the client.
138  static final String COMPRESSION_FAMILIES_CONF_KEY =
139      "hbase.hfileoutputformat.families.compression";
140  static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
141      "hbase.hfileoutputformat.families.bloomtype";
142  static final String BLOOM_PARAM_FAMILIES_CONF_KEY =
143      "hbase.hfileoutputformat.families.bloomparam";
144  static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
145      "hbase.mapreduce.hfileoutputformat.blocksize";
146  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
147      "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
148
149  // This constant is public since the client can modify this when setting
150  // up their conf object and thus refer to this symbol.
151  // It is present for backwards compatibility reasons. Use it only to
152  // override the auto-detection of datablock encoding and compression.
153  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
154      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
155  public static final String COMPRESSION_OVERRIDE_CONF_KEY =
156      "hbase.mapreduce.hfileoutputformat.compression";
157
158  /**
159   * Keep locality while generating HFiles for bulkload. See HBASE-12596
160   */
161  public static final String LOCALITY_SENSITIVE_CONF_KEY =
162      "hbase.bulkload.locality.sensitive.enabled";
163  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
164  static final String OUTPUT_TABLE_NAME_CONF_KEY =
165      "hbase.mapreduce.hfileoutputformat.table.name";
166  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
167          "hbase.mapreduce.use.multi.table.hfileoutputformat";
168
169  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
170  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
171
172  @Override
173  public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
174      final TaskAttemptContext context) throws IOException, InterruptedException {
175    return createRecordWriter(context, this.getOutputCommitter(context));
176  }
177
178  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
179    return combineTableNameSuffix(tableName, family);
180  }
181
182  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
183      createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer)
184          throws IOException {
185
186    // Get the path of the temporary output file
187    final Path outputDir = ((FileOutputCommitter)committer).getWorkPath();
188    final Configuration conf = context.getConfiguration();
189    final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ;
190    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
191    if (writeTableNames==null || writeTableNames.isEmpty()) {
192      throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY
193              + " cannot be empty");
194    }
195    final FileSystem fs = outputDir.getFileSystem(conf);
196    // These configs. are from hbase-*.xml
197    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
198        HConstants.DEFAULT_MAX_FILE_SIZE);
199    // Invented config.  Add to hbase-*.xml if other than default compression.
200    final String defaultCompressionStr = conf.get("hfile.compression",
201        Compression.Algorithm.NONE.getName());
202    final Algorithm defaultCompression = HFileWriterImpl
203        .compressionByName(defaultCompressionStr);
204    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
205    final Algorithm overriddenCompression;
206    if (compressionStr != null) {
207      overriddenCompression = Compression.getCompressionAlgorithmByName(compressionStr);
208    } else {
209      overriddenCompression = null;
210    }
211
212    final boolean compactionExclude = conf.getBoolean(
213        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
214
215    final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
216            Bytes.toString(tableSeparator))).collect(Collectors.toSet());
217
218    // create a map from column family to the compression algorithm
219    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
220    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
221    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
222    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
223
224    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
225    final Map<byte[], DataBlockEncoding> datablockEncodingMap
226        = createFamilyDataBlockEncodingMap(conf);
227    final DataBlockEncoding overriddenEncoding;
228    if (dataBlockEncodingStr != null) {
229      overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
230    } else {
231      overriddenEncoding = null;
232    }
233
234    return new RecordWriter<ImmutableBytesWritable, V>() {
235      // Map of families to writers and how much has been output on the writer.
236      private final Map<byte[], WriterLength> writers =
237              new TreeMap<>(Bytes.BYTES_COMPARATOR);
238      private final Map<byte[], byte[]> previousRows =
239              new TreeMap<>(Bytes.BYTES_COMPARATOR);
240      private final long now = EnvironmentEdgeManager.currentTime();
241
242      @Override
243      public void write(ImmutableBytesWritable row, V cell)
244          throws IOException {
245        Cell kv = cell;
246        // null input == user explicitly wants to flush
247        if (row == null && kv == null) {
248          rollWriters(null);
249          return;
250        }
251
252        byte[] rowKey = CellUtil.cloneRow(kv);
253        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
254        byte[] family = CellUtil.cloneFamily(kv);
255        byte[] tableNameBytes = null;
256        if (writeMultipleTables) {
257          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
258          tableNameBytes =
259              TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString()
260              .getBytes(Charset.defaultCharset());
261          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
262            throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) +
263                    "' not" + " expected");
264          }
265        } else {
266          tableNameBytes = Bytes.toBytes(writeTableNames);
267        }
268        String tableName = Bytes.toString(tableNameBytes);
269        Path tableRelPath = getTableRelativePath(tableNameBytes);
270        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
271
272        WriterLength wl = this.writers.get(tableAndFamily);
273
274        // If this is a new column family, verify that the directory exists
275        if (wl == null) {
276          Path writerPath = null;
277          if (writeMultipleTables) {
278            writerPath = new Path(outputDir,new Path(tableRelPath, Bytes
279                    .toString(family)));
280
281          }
282          else {
283            writerPath = new Path(outputDir, Bytes.toString(family));
284          }
285          fs.mkdirs(writerPath);
286          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
287        }
288
289        // This can only happen once a row is finished though
290        if (wl != null && wl.written + length >= maxsize
291                && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) {
292          rollWriters(wl);
293        }
294
295        // create a new WAL writer, if necessary
296        if (wl == null || wl.writer == null) {
297          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
298            HRegionLocation loc = null;
299
300            if (tableName != null) {
301              try (Connection connection = ConnectionFactory.createConnection(conf);
302                     RegionLocator locator =
303                       connection.getRegionLocator(TableName.valueOf(tableName))) {
304                loc = locator.getRegionLocation(rowKey);
305              } catch (Throwable e) {
306                LOG.warn("There's something wrong when locating rowkey: " +
307                  Bytes.toString(rowKey) + " for tablename: " + tableName, e);
308                loc = null;
309              } }
310
311            if (null == loc) {
312              if (LOG.isTraceEnabled()) {
313                LOG.trace("failed to get region location, so use default writer for rowkey: " +
314                  Bytes.toString(rowKey));
315              }
316              wl = getNewWriter(tableNameBytes, family, conf, null);
317            } else {
318              if (LOG.isDebugEnabled()) {
319                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
320              }
321              InetSocketAddress initialIsa =
322                  new InetSocketAddress(loc.getHostname(), loc.getPort());
323              if (initialIsa.isUnresolved()) {
324                if (LOG.isTraceEnabled()) {
325                  LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
326                      + loc.getPort() + ", so use default writer");
327                }
328                wl = getNewWriter(tableNameBytes, family, conf, null);
329              } else {
330                if (LOG.isDebugEnabled()) {
331                  LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
332                }
333                wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
334                });
335              }
336            }
337          } else {
338            wl = getNewWriter(tableNameBytes, family, conf, null);
339          }
340        }
341
342        // we now have the proper WAL writer. full steam ahead
343        PrivateCellUtil.updateLatestStamp(cell, this.now);
344        wl.writer.append(kv);
345        wl.written += length;
346
347        // Copy the row so we know when a row transition.
348        this.previousRows.put(family, rowKey);
349      }
350
351      private Path getTableRelativePath(byte[] tableNameBytes) {
352        String tableName = Bytes.toString(tableNameBytes);
353        String[] tableNameParts = tableName.split(":");
354        Path tableRelPath = new Path(tableName.split(":")[0]);
355        if (tableNameParts.length > 1) {
356          tableRelPath = new Path(tableRelPath, tableName.split(":")[1]);
357        }
358        return tableRelPath;
359      }
360      private void rollWriters(WriterLength writerLength) throws IOException {
361        if (writerLength != null) {
362          closeWriter(writerLength);
363        } else {
364          for (WriterLength wl : this.writers.values()) {
365            closeWriter(wl);
366          }
367        }
368      }
369
370      private void closeWriter(WriterLength wl) throws IOException {
371        if (wl.writer != null) {
372          LOG.info(
373              "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written));
374          close(wl.writer);
375        }
376        wl.writer = null;
377        wl.written = 0;
378      }
379
380      /*
381       * Create a new StoreFile.Writer.
382       * @param family
383       * @return A WriterLength, containing a new StoreFile.Writer.
384       * @throws IOException
385       */
386      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
387          justification="Not important")
388      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration
389              conf, InetSocketAddress[] favoredNodes) throws IOException {
390        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
391        Path familydir = new Path(outputDir, Bytes.toString(family));
392        if (writeMultipleTables) {
393          familydir = new Path(outputDir,
394                  new Path(getTableRelativePath(tableName), Bytes.toString(family)));
395        }
396        WriterLength wl = new WriterLength();
397        Algorithm compression = overriddenCompression;
398        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
399        compression = compression == null ? defaultCompression : compression;
400        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
401        bloomType = bloomType == null ? BloomType.NONE : bloomType;
402        String bloomParam = bloomParamMap.get(tableAndFamily);
403        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
404          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
405        }
406        Integer blockSize = blockSizeMap.get(tableAndFamily);
407        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
408        DataBlockEncoding encoding = overriddenEncoding;
409        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
410        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
411        HFileContextBuilder contextBuilder = new HFileContextBuilder()
412                                    .withCompression(compression)
413                                    .withChecksumType(HStore.getChecksumType(conf))
414                                    .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
415                                    .withBlockSize(blockSize);
416
417        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
418          contextBuilder.withIncludesTags(true);
419        }
420
421        contextBuilder.withDataBlockEncoding(encoding);
422        HFileContext hFileContext = contextBuilder.build();
423        if (null == favoredNodes) {
424          wl.writer =
425              new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs)
426                  .withOutputDir(familydir).withBloomType(bloomType)
427                  .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build();
428        } else {
429          wl.writer =
430              new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
431                  .withOutputDir(familydir).withBloomType(bloomType)
432                  .withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
433                  .withFavoredNodes(favoredNodes).build();
434        }
435
436        this.writers.put(tableAndFamily, wl);
437        return wl;
438      }
439
440      private void close(final StoreFileWriter w) throws IOException {
441        if (w != null) {
442          w.appendFileInfo(BULKLOAD_TIME_KEY,
443              Bytes.toBytes(System.currentTimeMillis()));
444          w.appendFileInfo(BULKLOAD_TASK_KEY,
445              Bytes.toBytes(context.getTaskAttemptID().toString()));
446          w.appendFileInfo(MAJOR_COMPACTION_KEY,
447              Bytes.toBytes(true));
448          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY,
449              Bytes.toBytes(compactionExclude));
450          w.appendTrackedTimestampsToMetadata();
451          w.close();
452        }
453      }
454
455      @Override
456      public void close(TaskAttemptContext c)
457      throws IOException, InterruptedException {
458        for (WriterLength wl: this.writers.values()) {
459          close(wl.writer);
460        }
461      }
462    };
463  }
464
465  /**
466   * Configure block storage policy for CF after the directory is created.
467   */
468  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
469      byte[] tableAndFamily, Path cfPath) {
470    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
471      return;
472    }
473
474    String policy =
475        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
476          conf.get(STORAGE_POLICY_PROPERTY));
477    FSUtils.setStoragePolicy(fs, cfPath, policy);
478  }
479
480  /*
481   * Data structure to hold a Writer and amount of data written on it.
482   */
483  static class WriterLength {
484    long written = 0;
485    StoreFileWriter writer = null;
486  }
487
488  /**
489   * Return the start keys of all of the regions in this table,
490   * as a list of ImmutableBytesWritable.
491   */
492  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
493                                                                 boolean writeMultipleTables)
494          throws IOException {
495
496    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
497    for(RegionLocator regionLocator : regionLocators)
498    {
499      TableName tableName = regionLocator.getName();
500      LOG.info("Looking up current regions for table " + tableName);
501      byte[][] byteKeys = regionLocator.getStartKeys();
502      for (byte[] byteKey : byteKeys) {
503        byte[] fullKey = byteKey; //HFileOutputFormat2 use case
504        if (writeMultipleTables)
505        {
506          //MultiTableHFileOutputFormat use case
507          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
508        }
509        if (LOG.isDebugEnabled()) {
510          LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary
511                  (fullKey) + "]");
512        }
513        ret.add(new ImmutableBytesWritable(fullKey));
514      }
515    }
516    return ret;
517  }
518
519  /**
520   * Write out a {@link SequenceFile} that can be read by
521   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
522   */
523  @SuppressWarnings("deprecation")
524  private static void writePartitions(Configuration conf, Path partitionsPath,
525      List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
526    LOG.info("Writing partition information to " + partitionsPath);
527    if (startKeys.isEmpty()) {
528      throw new IllegalArgumentException("No regions passed");
529    }
530
531    // We're generating a list of split points, and we don't ever
532    // have keys < the first region (which has an empty start key)
533    // so we need to remove it. Otherwise we would end up with an
534    // empty reducer with index 0
535    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
536    ImmutableBytesWritable first = sorted.first();
537    if (writeMultipleTables) {
538      first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first
539              ().get()));
540    }
541    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
542      throw new IllegalArgumentException(
543          "First region of table should have empty start key. Instead has: "
544          + Bytes.toStringBinary(first.get()));
545    }
546    sorted.remove(sorted.first());
547
548    // Write the actual file
549    FileSystem fs = partitionsPath.getFileSystem(conf);
550    SequenceFile.Writer writer = SequenceFile.createWriter(
551      fs, conf, partitionsPath, ImmutableBytesWritable.class,
552      NullWritable.class);
553
554    try {
555      for (ImmutableBytesWritable startKey : sorted) {
556        writer.append(startKey, NullWritable.get());
557      }
558    } finally {
559      writer.close();
560    }
561  }
562
563  /**
564   * Configure a MapReduce Job to perform an incremental load into the given
565   * table. This
566   * <ul>
567   *   <li>Inspects the table to configure a total order partitioner</li>
568   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
569   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
570   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
571   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
572   *     PutSortReducer)</li>
573   * </ul>
574   * The user should be sure to set the map output value class to either KeyValue or Put before
575   * running this function.
576   */
577  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
578      throws IOException {
579    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
580  }
581
582  /**
583   * Configure a MapReduce Job to perform an incremental load into the given
584   * table. This
585   * <ul>
586   *   <li>Inspects the table to configure a total order partitioner</li>
587   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
588   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
589   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
590   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
591   *     PutSortReducer)</li>
592   * </ul>
593   * The user should be sure to set the map output value class to either KeyValue or Put before
594   * running this function.
595   */
596  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
597      RegionLocator regionLocator) throws IOException {
598    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
599    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
600    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
601  }
602
603  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
604      Class<? extends OutputFormat<?, ?>> cls) throws IOException {
605    Configuration conf = job.getConfiguration();
606    job.setOutputKeyClass(ImmutableBytesWritable.class);
607    job.setOutputValueClass(MapReduceExtendedCell.class);
608    job.setOutputFormatClass(cls);
609
610    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
611      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
612    }
613    boolean writeMultipleTables = false;
614    if (MultiTableHFileOutputFormat.class.equals(cls)) {
615      writeMultipleTables = true;
616      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
617    }
618    // Based on the configured map output class, set the correct reducer to properly
619    // sort the incoming values.
620    // TODO it would be nice to pick one or the other of these formats.
621    if (KeyValue.class.equals(job.getMapOutputValueClass())
622        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) {
623      job.setReducerClass(CellSortReducer.class);
624    } else if (Put.class.equals(job.getMapOutputValueClass())) {
625      job.setReducerClass(PutSortReducer.class);
626    } else if (Text.class.equals(job.getMapOutputValueClass())) {
627      job.setReducerClass(TextSortReducer.class);
628    } else {
629      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
630    }
631
632    conf.setStrings("io.serializations", conf.get("io.serializations"),
633        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
634        CellSerialization.class.getName());
635
636    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
637      LOG.info("bulkload locality sensitive enabled");
638    }
639
640    /* Now get the region start keys for every table required */
641    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
642    List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size());
643    List<TableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size());
644
645    for( TableInfo tableInfo : multiTableInfo )
646    {
647      regionLocators.add(tableInfo.getRegionLocator());
648      String tn = writeMultipleTables?
649        tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString():
650        tableInfo.getRegionLocator().getName().getNameAsString();
651      allTableNames.add(tn);
652      tableDescriptors.add(tableInfo.getTableDescriptor());
653    }
654    // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table
655    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
656            .toString(tableSeparator)));
657    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables);
658    // Use table's region boundaries for TOP split points.
659    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
660        "to match current region count for all tables");
661    job.setNumReduceTasks(startKeys.size());
662
663    configurePartitioner(job, startKeys, writeMultipleTables);
664    // Set compression algorithms based on column families
665
666    conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
667            tableDescriptors));
668    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
669            tableDescriptors));
670    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
671            tableDescriptors));
672    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails,
673        tableDescriptors));
674    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
675            serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
676
677    TableMapReduceUtil.addDependencyJars(job);
678    TableMapReduceUtil.initCredentials(job);
679    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
680  }
681
682  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws
683      IOException {
684    Configuration conf = job.getConfiguration();
685
686    job.setOutputKeyClass(ImmutableBytesWritable.class);
687    job.setOutputValueClass(MapReduceExtendedCell.class);
688    job.setOutputFormatClass(HFileOutputFormat2.class);
689
690    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
691    singleTableDescriptor.add(tableDescriptor);
692
693    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
694    // Set compression algorithms based on column families
695    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
696        serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
697    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
698        serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
699    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
700        serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
701    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
702        serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
703    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
704        serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
705
706    TableMapReduceUtil.addDependencyJars(job);
707    TableMapReduceUtil.initCredentials(job);
708    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
709  }
710
711  /**
712   * Runs inside the task to deserialize column family to compression algorithm
713   * map from the configuration.
714   *
715   * @param conf to read the serialized values from
716   * @return a map from column family to the configured compression algorithm
717   */
718  @VisibleForTesting
719  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
720      conf) {
721    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
722        COMPRESSION_FAMILIES_CONF_KEY);
723    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
724    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
725      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
726      compressionMap.put(e.getKey(), algorithm);
727    }
728    return compressionMap;
729  }
730
731  /**
732   * Runs inside the task to deserialize column family to bloom filter type
733   * map from the configuration.
734   *
735   * @param conf to read the serialized values from
736   * @return a map from column family to the the configured bloom filter type
737   */
738  @VisibleForTesting
739  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
740    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
741        BLOOM_TYPE_FAMILIES_CONF_KEY);
742    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
743    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
744      BloomType bloomType = BloomType.valueOf(e.getValue());
745      bloomTypeMap.put(e.getKey(), bloomType);
746    }
747    return bloomTypeMap;
748  }
749
750  /**
751   * Runs inside the task to deserialize column family to bloom filter param
752   * map from the configuration.
753   *
754   * @param conf to read the serialized values from
755   * @return a map from column family to the the configured bloom filter param
756   */
757  @VisibleForTesting
758  static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
759    return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
760  }
761
762
763  /**
764   * Runs inside the task to deserialize column family to block size
765   * map from the configuration.
766   *
767   * @param conf to read the serialized values from
768   * @return a map from column family to the configured block size
769   */
770  @VisibleForTesting
771  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
772    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
773        BLOCK_SIZE_FAMILIES_CONF_KEY);
774    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
775    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
776      Integer blockSize = Integer.parseInt(e.getValue());
777      blockSizeMap.put(e.getKey(), blockSize);
778    }
779    return blockSizeMap;
780  }
781
782  /**
783   * Runs inside the task to deserialize column family to data block encoding
784   * type map from the configuration.
785   *
786   * @param conf to read the serialized values from
787   * @return a map from column family to HFileDataBlockEncoder for the
788   *         configured data block type for the family
789   */
790  @VisibleForTesting
791  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
792      Configuration conf) {
793    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
794        DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
795    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
796    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
797      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
798    }
799    return encoderMap;
800  }
801
802
803  /**
804   * Run inside the task to deserialize column family to given conf value map.
805   *
806   * @param conf to read the serialized values from
807   * @param confName conf key to read from the configuration
808   * @return a map of column family to the given configuration value
809   */
810  private static Map<byte[], String> createFamilyConfValueMap(
811      Configuration conf, String confName) {
812    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
813    String confVal = conf.get(confName, "");
814    for (String familyConf : confVal.split("&")) {
815      String[] familySplit = familyConf.split("=");
816      if (familySplit.length != 2) {
817        continue;
818      }
819      try {
820        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
821            URLDecoder.decode(familySplit[1], "UTF-8"));
822      } catch (UnsupportedEncodingException e) {
823        // will not happen with UTF-8 encoding
824        throw new AssertionError(e);
825      }
826    }
827    return confValMap;
828  }
829
830  /**
831   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
832   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
833   */
834  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
835          writeMultipleTables)
836      throws IOException {
837    Configuration conf = job.getConfiguration();
838    // create the partitions file
839    FileSystem fs = FileSystem.get(conf);
840    String hbaseTmpFsDir =
841        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
842            fs.getHomeDirectory() + "/hbase-staging");
843    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
844    fs.makeQualified(partitionsPath);
845    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
846    fs.deleteOnExit(partitionsPath);
847
848    // configure job to use it
849    job.setPartitionerClass(TotalOrderPartitioner.class);
850    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
851  }
852
853  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
854  @VisibleForTesting
855  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables)
856      throws UnsupportedEncodingException {
857    StringBuilder attributeValue = new StringBuilder();
858    int i = 0;
859    for (TableDescriptor tableDescriptor : allTables) {
860      if (tableDescriptor == null) {
861        // could happen with mock table instance
862        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
863        return "";
864      }
865      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
866        if (i++ > 0) {
867          attributeValue.append('&');
868        }
869        attributeValue.append(URLEncoder.encode(
870            Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())),
871            "UTF-8"));
872        attributeValue.append('=');
873        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
874      }
875    }
876    // Get rid of the last ampersand
877    return attributeValue.toString();
878  }
879
880  /**
881   * Serialize column family to compression algorithm map to configuration.
882   * Invoked while configuring the MR job for incremental load.
883   */
884  @VisibleForTesting
885  static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor ->
886          familyDescriptor.getCompressionType().getName();
887
888  /**
889   * Serialize column family to block size map to configuration. Invoked while
890   * configuring the MR job for incremental load.
891   */
892  @VisibleForTesting
893  static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String
894          .valueOf(familyDescriptor.getBlocksize());
895
896  /**
897   * Serialize column family to bloom type map to configuration. Invoked while
898   * configuring the MR job for incremental load.
899   */
900  @VisibleForTesting
901  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
902    String bloomType = familyDescriptor.getBloomFilterType().toString();
903    if (bloomType == null) {
904      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
905    }
906    return bloomType;
907  };
908
909  /**
910   * Serialize column family to bloom param map to configuration. Invoked while
911   * configuring the MR job for incremental load.
912   */
913  @VisibleForTesting
914  static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
915    BloomType bloomType = familyDescriptor.getBloomFilterType();
916    String bloomParam = "";
917    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
918      bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
919    }
920    return bloomParam;
921  };
922
923  /**
924   * Serialize column family to data block encoding map to configuration.
925   * Invoked while configuring the MR job for incremental load.
926   */
927  @VisibleForTesting
928  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
929    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
930    if (encoding == null) {
931      encoding = DataBlockEncoding.NONE;
932    }
933    return encoding.toString();
934  };
935
936}