001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.TreeMap;
036import java.util.TreeSet;
037import java.util.UUID;
038import java.util.function.Function;
039import java.util.stream.Collectors;
040
041import org.apache.commons.lang3.StringUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellComparator;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.HRegionLocation;
050import org.apache.hadoop.hbase.HTableDescriptor;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.PrivateCellUtil;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.Put;
059import org.apache.hadoop.hbase.client.RegionLocator;
060import org.apache.hadoop.hbase.client.Table;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.fs.HFileSystem;
063import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
064import org.apache.hadoop.hbase.io.compress.Compression;
065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
066import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
067import org.apache.hadoop.hbase.io.hfile.CacheConfig;
068import org.apache.hadoop.hbase.io.hfile.HFile;
069import org.apache.hadoop.hbase.io.hfile.HFileContext;
070import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
071import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
072import org.apache.hadoop.hbase.regionserver.BloomType;
073import org.apache.hadoop.hbase.regionserver.HStore;
074import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
075import org.apache.hadoop.hbase.util.BloomFilterUtil;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
078import org.apache.hadoop.hbase.util.FSUtils;
079import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
080import org.apache.hadoop.io.NullWritable;
081import org.apache.hadoop.io.SequenceFile;
082import org.apache.hadoop.io.Text;
083import org.apache.hadoop.mapreduce.Job;
084import org.apache.hadoop.mapreduce.OutputCommitter;
085import org.apache.hadoop.mapreduce.OutputFormat;
086import org.apache.hadoop.mapreduce.RecordWriter;
087import org.apache.hadoop.mapreduce.TaskAttemptContext;
088import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
089import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
090import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
091import org.apache.yetus.audience.InterfaceAudience;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
096
097/**
098 * Writes HFiles. Passed Cells must arrive in order.
099 * Writes current time as the sequence id for the file. Sets the major compacted
100 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
101 * all HFiles being written.
102 * <p>
103 * Using this class as part of a MapReduce job is best done
104 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
105 */
106@InterfaceAudience.Public
107public class HFileOutputFormat2
108    extends FileOutputFormat<ImmutableBytesWritable, Cell> {
109  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
110  static class TableInfo {
111    private TableDescriptor tableDesctiptor;
112    private RegionLocator regionLocator;
113
114    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
115      this.tableDesctiptor = tableDesctiptor;
116      this.regionLocator = regionLocator;
117    }
118
119    /**
120     * The modification for the returned HTD doesn't affect the inner TD.
121     * @return A clone of inner table descriptor
122     * @deprecated since 2.0.0 and will be removed in 3.0.0. Use {@link #getTableDescriptor()}
123     *   instead.
124     * @see #getTableDescriptor()
125     * @see <a href="https://issues.apache.org/jira/browse/HBASE-18241">HBASE-18241</a>
126     */
127    @Deprecated
128    public HTableDescriptor getHTableDescriptor() {
129      return new HTableDescriptor(tableDesctiptor);
130    }
131
132    public TableDescriptor getTableDescriptor() {
133      return tableDesctiptor;
134    }
135
136    public RegionLocator getRegionLocator() {
137      return regionLocator;
138    }
139  }
140
141  protected static final byte[] tableSeparator = Bytes.toBytes(";");
142
143  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
144    return Bytes.add(tableName, tableSeparator, suffix);
145  }
146
147  // The following constants are private since these are used by
148  // HFileOutputFormat2 to internally transfer data between job setup and
149  // reducer run using conf.
150  // These should not be changed by the client.
151  static final String COMPRESSION_FAMILIES_CONF_KEY =
152      "hbase.hfileoutputformat.families.compression";
153  static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
154      "hbase.hfileoutputformat.families.bloomtype";
155  static final String BLOOM_PARAM_FAMILIES_CONF_KEY =
156      "hbase.hfileoutputformat.families.bloomparam";
157  static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
158      "hbase.mapreduce.hfileoutputformat.blocksize";
159  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
160      "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
161
162  // This constant is public since the client can modify this when setting
163  // up their conf object and thus refer to this symbol.
164  // It is present for backwards compatibility reasons. Use it only to
165  // override the auto-detection of datablock encoding and compression.
166  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
167      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
168  public static final String COMPRESSION_OVERRIDE_CONF_KEY =
169      "hbase.mapreduce.hfileoutputformat.compression";
170
171  /**
172   * Keep locality while generating HFiles for bulkload. See HBASE-12596
173   */
174  public static final String LOCALITY_SENSITIVE_CONF_KEY =
175      "hbase.bulkload.locality.sensitive.enabled";
176  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
177  static final String OUTPUT_TABLE_NAME_CONF_KEY =
178      "hbase.mapreduce.hfileoutputformat.table.name";
179  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
180          "hbase.mapreduce.use.multi.table.hfileoutputformat";
181
182  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
183  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
184
185  @Override
186  public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
187      final TaskAttemptContext context) throws IOException, InterruptedException {
188    return createRecordWriter(context, this.getOutputCommitter(context));
189  }
190
191  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
192    return combineTableNameSuffix(tableName, family);
193  }
194
195  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
196      createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer)
197          throws IOException {
198
199    // Get the path of the temporary output file
200    final Path outputDir = ((FileOutputCommitter)committer).getWorkPath();
201    final Configuration conf = context.getConfiguration();
202    final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ;
203    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
204    if (writeTableNames==null || writeTableNames.isEmpty()) {
205      throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY
206              + " cannot be empty");
207    }
208    final FileSystem fs = outputDir.getFileSystem(conf);
209    // These configs. are from hbase-*.xml
210    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
211        HConstants.DEFAULT_MAX_FILE_SIZE);
212    // Invented config.  Add to hbase-*.xml if other than default compression.
213    final String defaultCompressionStr = conf.get("hfile.compression",
214        Compression.Algorithm.NONE.getName());
215    final Algorithm defaultCompression = HFileWriterImpl
216        .compressionByName(defaultCompressionStr);
217    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
218    final Algorithm overriddenCompression;
219    if (compressionStr != null) {
220      overriddenCompression = Compression.getCompressionAlgorithmByName(compressionStr);
221    } else {
222      overriddenCompression = null;
223    }
224    final boolean compactionExclude = conf.getBoolean(
225        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
226
227    final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
228            Bytes.toString(tableSeparator))).collect(Collectors.toSet());
229
230    // create a map from column family to the compression algorithm
231    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
232    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
233    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
234    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
235
236    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
237    final Map<byte[], DataBlockEncoding> datablockEncodingMap
238        = createFamilyDataBlockEncodingMap(conf);
239    final DataBlockEncoding overriddenEncoding;
240    if (dataBlockEncodingStr != null) {
241      overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
242    } else {
243      overriddenEncoding = null;
244    }
245
246    return new RecordWriter<ImmutableBytesWritable, V>() {
247      // Map of families to writers and how much has been output on the writer.
248      private final Map<byte[], WriterLength> writers =
249              new TreeMap<>(Bytes.BYTES_COMPARATOR);
250      private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
251      private final long now = EnvironmentEdgeManager.currentTime();
252      private boolean rollRequested = false;
253
254      @Override
255      public void write(ImmutableBytesWritable row, V cell)
256          throws IOException {
257        Cell kv = cell;
258        // null input == user explicitly wants to flush
259        if (row == null && kv == null) {
260          rollWriters(null);
261          return;
262        }
263
264        byte[] rowKey = CellUtil.cloneRow(kv);
265        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
266        byte[] family = CellUtil.cloneFamily(kv);
267        byte[] tableNameBytes = null;
268        if (writeMultipleTables) {
269          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
270          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
271            throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) +
272                    "' not" + " expected");
273          }
274        } else {
275          tableNameBytes = Bytes.toBytes(writeTableNames);
276        }
277        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
278        WriterLength wl = this.writers.get(tableAndFamily);
279
280        // If this is a new column family, verify that the directory exists
281        if (wl == null) {
282          Path writerPath = null;
283          if (writeMultipleTables) {
284            writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes
285                    .toString(family)));
286          }
287          else {
288            writerPath = new Path(outputDir, Bytes.toString(family));
289          }
290          fs.mkdirs(writerPath);
291          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
292        }
293
294        if (wl != null && wl.written + length >= maxsize) {
295          this.rollRequested = true;
296        }
297
298        // This can only happen once a row is finished though
299        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
300          rollWriters(wl);
301        }
302
303        // create a new WAL writer, if necessary
304        if (wl == null || wl.writer == null) {
305          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
306            HRegionLocation loc = null;
307
308            String tableName = Bytes.toString(tableNameBytes);
309            if (tableName != null) {
310              try (Connection connection = ConnectionFactory.createConnection(conf);
311                     RegionLocator locator =
312                       connection.getRegionLocator(TableName.valueOf(tableName))) {
313                loc = locator.getRegionLocation(rowKey);
314              } catch (Throwable e) {
315                LOG.warn("There's something wrong when locating rowkey: " +
316                  Bytes.toString(rowKey) + " for tablename: " + tableName, e);
317                loc = null;
318              } }
319
320            if (null == loc) {
321              if (LOG.isTraceEnabled()) {
322                LOG.trace("failed to get region location, so use default writer for rowkey: " +
323                  Bytes.toString(rowKey));
324              }
325              wl = getNewWriter(tableNameBytes, family, conf, null);
326            } else {
327              if (LOG.isDebugEnabled()) {
328                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
329              }
330              InetSocketAddress initialIsa =
331                  new InetSocketAddress(loc.getHostname(), loc.getPort());
332              if (initialIsa.isUnresolved()) {
333                if (LOG.isTraceEnabled()) {
334                  LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
335                      + loc.getPort() + ", so use default writer");
336                }
337                wl = getNewWriter(tableNameBytes, family, conf, null);
338              } else {
339                if (LOG.isDebugEnabled()) {
340                  LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
341                }
342                wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
343                });
344              }
345            }
346          } else {
347            wl = getNewWriter(tableNameBytes, family, conf, null);
348          }
349        }
350
351        // we now have the proper WAL writer. full steam ahead
352        PrivateCellUtil.updateLatestStamp(cell, this.now);
353        wl.writer.append(kv);
354        wl.written += length;
355
356        // Copy the row so we know when a row transition.
357        this.previousRow = rowKey;
358      }
359
360      private void rollWriters(WriterLength writerLength) throws IOException {
361        if (writerLength != null) {
362          closeWriter(writerLength);
363        } else {
364          for (WriterLength wl : this.writers.values()) {
365            closeWriter(wl);
366          }
367        }
368        this.rollRequested = false;
369      }
370
371      private void closeWriter(WriterLength wl) throws IOException {
372        if (wl.writer != null) {
373          LOG.info(
374              "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written));
375          close(wl.writer);
376        }
377        wl.writer = null;
378        wl.written = 0;
379      }
380
381      /*
382       * Create a new StoreFile.Writer.
383       * @param family
384       * @return A WriterLength, containing a new StoreFile.Writer.
385       * @throws IOException
386       */
387      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
388          justification="Not important")
389      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration
390              conf, InetSocketAddress[] favoredNodes) throws IOException {
391        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
392        Path familydir = new Path(outputDir, Bytes.toString(family));
393        if (writeMultipleTables) {
394          familydir = new Path(outputDir,
395                  new Path(Bytes.toString(tableName), Bytes.toString(family)));
396        }
397        WriterLength wl = new WriterLength();
398        Algorithm compression = overriddenCompression;
399        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
400        compression = compression == null ? defaultCompression : compression;
401        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
402        bloomType = bloomType == null ? BloomType.NONE : bloomType;
403        String bloomParam = bloomParamMap.get(tableAndFamily);
404        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
405          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
406        }
407        Integer blockSize = blockSizeMap.get(tableAndFamily);
408        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
409        DataBlockEncoding encoding = overriddenEncoding;
410        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
411        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
412        HFileContextBuilder contextBuilder = new HFileContextBuilder()
413                                    .withCompression(compression)
414                                    .withChecksumType(HStore.getChecksumType(conf))
415                                    .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
416                                    .withBlockSize(blockSize);
417
418        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
419          contextBuilder.withIncludesTags(true);
420        }
421
422        contextBuilder.withDataBlockEncoding(encoding);
423        HFileContext hFileContext = contextBuilder.build();
424        if (null == favoredNodes) {
425          wl.writer =
426              new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs)
427                  .withOutputDir(familydir).withBloomType(bloomType)
428                  .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build();
429        } else {
430          wl.writer =
431              new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
432                  .withOutputDir(familydir).withBloomType(bloomType)
433                  .withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
434                  .withFavoredNodes(favoredNodes).build();
435        }
436
437        this.writers.put(tableAndFamily, wl);
438        return wl;
439      }
440
441      private void close(final StoreFileWriter w) throws IOException {
442        if (w != null) {
443          w.appendFileInfo(BULKLOAD_TIME_KEY,
444              Bytes.toBytes(System.currentTimeMillis()));
445          w.appendFileInfo(BULKLOAD_TASK_KEY,
446              Bytes.toBytes(context.getTaskAttemptID().toString()));
447          w.appendFileInfo(MAJOR_COMPACTION_KEY,
448              Bytes.toBytes(true));
449          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY,
450              Bytes.toBytes(compactionExclude));
451          w.appendTrackedTimestampsToMetadata();
452          w.close();
453        }
454      }
455
456      @Override
457      public void close(TaskAttemptContext c)
458      throws IOException, InterruptedException {
459        for (WriterLength wl: this.writers.values()) {
460          close(wl.writer);
461        }
462      }
463    };
464  }
465
466  /**
467   * Configure block storage policy for CF after the directory is created.
468   */
469  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
470      byte[] tableAndFamily, Path cfPath) {
471    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
472      return;
473    }
474
475    String policy =
476        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
477          conf.get(STORAGE_POLICY_PROPERTY));
478    FSUtils.setStoragePolicy(fs, cfPath, policy);
479  }
480
481  /*
482   * Data structure to hold a Writer and amount of data written on it.
483   */
484  static class WriterLength {
485    long written = 0;
486    StoreFileWriter writer = null;
487  }
488
489  /**
490   * Return the start keys of all of the regions in this table,
491   * as a list of ImmutableBytesWritable.
492   */
493  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
494                                                                 boolean writeMultipleTables)
495          throws IOException {
496
497    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
498    for(RegionLocator regionLocator : regionLocators)
499    {
500      TableName tableName = regionLocator.getName();
501      LOG.info("Looking up current regions for table " + tableName);
502      byte[][] byteKeys = regionLocator.getStartKeys();
503      for (byte[] byteKey : byteKeys) {
504        byte[] fullKey = byteKey; //HFileOutputFormat2 use case
505        if (writeMultipleTables)
506        {
507          //MultiTableHFileOutputFormat use case
508          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
509        }
510        if (LOG.isDebugEnabled()) {
511          LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary
512                  (fullKey) + "]");
513        }
514        ret.add(new ImmutableBytesWritable(fullKey));
515      }
516    }
517    return ret;
518  }
519
520  /**
521   * Write out a {@link SequenceFile} that can be read by
522   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
523   */
524  @SuppressWarnings("deprecation")
525  private static void writePartitions(Configuration conf, Path partitionsPath,
526      List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
527    LOG.info("Writing partition information to " + partitionsPath);
528    if (startKeys.isEmpty()) {
529      throw new IllegalArgumentException("No regions passed");
530    }
531
532    // We're generating a list of split points, and we don't ever
533    // have keys < the first region (which has an empty start key)
534    // so we need to remove it. Otherwise we would end up with an
535    // empty reducer with index 0
536    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
537    ImmutableBytesWritable first = sorted.first();
538    if (writeMultipleTables) {
539      first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first
540              ().get()));
541    }
542    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
543      throw new IllegalArgumentException(
544          "First region of table should have empty start key. Instead has: "
545          + Bytes.toStringBinary(first.get()));
546    }
547    sorted.remove(sorted.first());
548
549    // Write the actual file
550    FileSystem fs = partitionsPath.getFileSystem(conf);
551    SequenceFile.Writer writer = SequenceFile.createWriter(
552      fs, conf, partitionsPath, ImmutableBytesWritable.class,
553      NullWritable.class);
554
555    try {
556      for (ImmutableBytesWritable startKey : sorted) {
557        writer.append(startKey, NullWritable.get());
558      }
559    } finally {
560      writer.close();
561    }
562  }
563
564  /**
565   * Configure a MapReduce Job to perform an incremental load into the given
566   * table. This
567   * <ul>
568   *   <li>Inspects the table to configure a total order partitioner</li>
569   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
570   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
571   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
572   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
573   *     PutSortReducer)</li>
574   * </ul>
575   * The user should be sure to set the map output value class to either KeyValue or Put before
576   * running this function.
577   */
578  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
579      throws IOException {
580    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
581  }
582
583  /**
584   * Configure a MapReduce Job to perform an incremental load into the given
585   * table. This
586   * <ul>
587   *   <li>Inspects the table to configure a total order partitioner</li>
588   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
589   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
590   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
591   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
592   *     PutSortReducer)</li>
593   * </ul>
594   * The user should be sure to set the map output value class to either KeyValue or Put before
595   * running this function.
596   */
597  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
598      RegionLocator regionLocator) throws IOException {
599    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
600    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
601    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
602  }
603
604  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
605      Class<? extends OutputFormat<?, ?>> cls) throws IOException {
606    Configuration conf = job.getConfiguration();
607    job.setOutputKeyClass(ImmutableBytesWritable.class);
608    job.setOutputValueClass(MapReduceExtendedCell.class);
609    job.setOutputFormatClass(cls);
610
611    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
612      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
613    }
614    boolean writeMultipleTables = false;
615    if (MultiTableHFileOutputFormat.class.equals(cls)) {
616      writeMultipleTables = true;
617      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
618    }
619    // Based on the configured map output class, set the correct reducer to properly
620    // sort the incoming values.
621    // TODO it would be nice to pick one or the other of these formats.
622    if (KeyValue.class.equals(job.getMapOutputValueClass())
623        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) {
624      job.setReducerClass(CellSortReducer.class);
625    } else if (Put.class.equals(job.getMapOutputValueClass())) {
626      job.setReducerClass(PutSortReducer.class);
627    } else if (Text.class.equals(job.getMapOutputValueClass())) {
628      job.setReducerClass(TextSortReducer.class);
629    } else {
630      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
631    }
632
633    conf.setStrings("io.serializations", conf.get("io.serializations"),
634        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
635        CellSerialization.class.getName());
636
637    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
638      LOG.info("bulkload locality sensitive enabled");
639    }
640
641    /* Now get the region start keys for every table required */
642    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
643    List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size());
644    List<TableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size());
645
646    for( TableInfo tableInfo : multiTableInfo )
647    {
648      regionLocators.add(tableInfo.getRegionLocator());
649      allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
650      tableDescriptors.add(tableInfo.getTableDescriptor());
651    }
652    // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table
653    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
654            .toString(tableSeparator)));
655    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables);
656    // Use table's region boundaries for TOP split points.
657    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
658        "to match current region count for all tables");
659    job.setNumReduceTasks(startKeys.size());
660
661    configurePartitioner(job, startKeys, writeMultipleTables);
662    // Set compression algorithms based on column families
663
664    conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
665            tableDescriptors));
666    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
667            tableDescriptors));
668    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
669            tableDescriptors));
670    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails,
671        tableDescriptors));
672    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
673            serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
674
675    TableMapReduceUtil.addDependencyJars(job);
676    TableMapReduceUtil.initCredentials(job);
677    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
678  }
679
680  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws
681      IOException {
682    Configuration conf = job.getConfiguration();
683
684    job.setOutputKeyClass(ImmutableBytesWritable.class);
685    job.setOutputValueClass(MapReduceExtendedCell.class);
686    job.setOutputFormatClass(HFileOutputFormat2.class);
687
688    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
689    singleTableDescriptor.add(tableDescriptor);
690
691    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
692    // Set compression algorithms based on column families
693    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
694        serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
695    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
696        serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
697    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
698        serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
699    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
700        serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
701    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
702        serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
703
704    TableMapReduceUtil.addDependencyJars(job);
705    TableMapReduceUtil.initCredentials(job);
706    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
707  }
708
709  /**
710   * Runs inside the task to deserialize column family to compression algorithm
711   * map from the configuration.
712   *
713   * @param conf to read the serialized values from
714   * @return a map from column family to the configured compression algorithm
715   */
716  @VisibleForTesting
717  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
718      conf) {
719    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
720        COMPRESSION_FAMILIES_CONF_KEY);
721    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
722    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
723      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
724      compressionMap.put(e.getKey(), algorithm);
725    }
726    return compressionMap;
727  }
728
729  /**
730   * Runs inside the task to deserialize column family to bloom filter type
731   * map from the configuration.
732   *
733   * @param conf to read the serialized values from
734   * @return a map from column family to the the configured bloom filter type
735   */
736  @VisibleForTesting
737  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
738    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
739        BLOOM_TYPE_FAMILIES_CONF_KEY);
740    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
741    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
742      BloomType bloomType = BloomType.valueOf(e.getValue());
743      bloomTypeMap.put(e.getKey(), bloomType);
744    }
745    return bloomTypeMap;
746  }
747
748  /**
749   * Runs inside the task to deserialize column family to bloom filter param
750   * map from the configuration.
751   *
752   * @param conf to read the serialized values from
753   * @return a map from column family to the the configured bloom filter param
754   */
755  @VisibleForTesting
756  static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
757    return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
758  }
759
760
761  /**
762   * Runs inside the task to deserialize column family to block size
763   * map from the configuration.
764   *
765   * @param conf to read the serialized values from
766   * @return a map from column family to the configured block size
767   */
768  @VisibleForTesting
769  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
770    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
771        BLOCK_SIZE_FAMILIES_CONF_KEY);
772    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
773    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
774      Integer blockSize = Integer.parseInt(e.getValue());
775      blockSizeMap.put(e.getKey(), blockSize);
776    }
777    return blockSizeMap;
778  }
779
780  /**
781   * Runs inside the task to deserialize column family to data block encoding
782   * type map from the configuration.
783   *
784   * @param conf to read the serialized values from
785   * @return a map from column family to HFileDataBlockEncoder for the
786   *         configured data block type for the family
787   */
788  @VisibleForTesting
789  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
790      Configuration conf) {
791    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
792        DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
793    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
794    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
795      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
796    }
797    return encoderMap;
798  }
799
800
801  /**
802   * Run inside the task to deserialize column family to given conf value map.
803   *
804   * @param conf to read the serialized values from
805   * @param confName conf key to read from the configuration
806   * @return a map of column family to the given configuration value
807   */
808  private static Map<byte[], String> createFamilyConfValueMap(
809      Configuration conf, String confName) {
810    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
811    String confVal = conf.get(confName, "");
812    for (String familyConf : confVal.split("&")) {
813      String[] familySplit = familyConf.split("=");
814      if (familySplit.length != 2) {
815        continue;
816      }
817      try {
818        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
819            URLDecoder.decode(familySplit[1], "UTF-8"));
820      } catch (UnsupportedEncodingException e) {
821        // will not happen with UTF-8 encoding
822        throw new AssertionError(e);
823      }
824    }
825    return confValMap;
826  }
827
828  /**
829   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
830   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
831   */
832  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
833          writeMultipleTables)
834      throws IOException {
835    Configuration conf = job.getConfiguration();
836    // create the partitions file
837    FileSystem fs = FileSystem.get(conf);
838    String hbaseTmpFsDir =
839        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
840          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
841    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
842    fs.makeQualified(partitionsPath);
843    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
844    fs.deleteOnExit(partitionsPath);
845
846    // configure job to use it
847    job.setPartitionerClass(TotalOrderPartitioner.class);
848    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
849  }
850
851  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
852  @VisibleForTesting
853  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables)
854      throws UnsupportedEncodingException {
855    StringBuilder attributeValue = new StringBuilder();
856    int i = 0;
857    for (TableDescriptor tableDescriptor : allTables) {
858      if (tableDescriptor == null) {
859        // could happen with mock table instance
860        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
861        return "";
862      }
863      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
864        if (i++ > 0) {
865          attributeValue.append('&');
866        }
867        attributeValue.append(URLEncoder.encode(
868            Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())),
869            "UTF-8"));
870        attributeValue.append('=');
871        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
872      }
873    }
874    // Get rid of the last ampersand
875    return attributeValue.toString();
876  }
877
878  /**
879   * Serialize column family to compression algorithm map to configuration.
880   * Invoked while configuring the MR job for incremental load.
881   */
882  @VisibleForTesting
883  static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor ->
884          familyDescriptor.getCompressionType().getName();
885
886  /**
887   * Serialize column family to block size map to configuration. Invoked while
888   * configuring the MR job for incremental load.
889   */
890  @VisibleForTesting
891  static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String
892          .valueOf(familyDescriptor.getBlocksize());
893
894  /**
895   * Serialize column family to bloom type map to configuration. Invoked while
896   * configuring the MR job for incremental load.
897   */
898  @VisibleForTesting
899  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
900    String bloomType = familyDescriptor.getBloomFilterType().toString();
901    if (bloomType == null) {
902      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
903    }
904    return bloomType;
905  };
906
907  /**
908   * Serialize column family to bloom param map to configuration. Invoked while
909   * configuring the MR job for incremental load.
910   */
911  @VisibleForTesting
912  static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
913    BloomType bloomType = familyDescriptor.getBloomFilterType();
914    String bloomParam = "";
915    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
916      bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
917    }
918    return bloomParam;
919  };
920
921  /**
922   * Serialize column family to data block encoding map to configuration.
923   * Invoked while configuring the MR job for incremental load.
924   */
925  @VisibleForTesting
926  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
927    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
928    if (encoding == null) {
929      encoding = DataBlockEncoding.NONE;
930    }
931    return encoding.toString();
932  };
933
934}