001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.nio.charset.Charset;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.TreeMap;
037import java.util.TreeSet;
038import java.util.UUID;
039import java.util.function.Function;
040import java.util.stream.Collectors;
041
042import org.apache.commons.lang3.StringUtils;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.CellComparator;
048import org.apache.hadoop.hbase.CellUtil;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.HRegionLocation;
051import org.apache.hadoop.hbase.HTableDescriptor;
052import org.apache.hadoop.hbase.KeyValue;
053import org.apache.hadoop.hbase.PrivateCellUtil;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
057import org.apache.hadoop.hbase.client.Connection;
058import org.apache.hadoop.hbase.client.ConnectionFactory;
059import org.apache.hadoop.hbase.client.Put;
060import org.apache.hadoop.hbase.client.RegionLocator;
061import org.apache.hadoop.hbase.client.Table;
062import org.apache.hadoop.hbase.client.TableDescriptor;
063import org.apache.hadoop.hbase.fs.HFileSystem;
064import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
065import org.apache.hadoop.hbase.io.compress.Compression;
066import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
067import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
068import org.apache.hadoop.hbase.io.hfile.CacheConfig;
069import org.apache.hadoop.hbase.io.hfile.HFile;
070import org.apache.hadoop.hbase.io.hfile.HFileContext;
071import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
072import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
073import org.apache.hadoop.hbase.regionserver.BloomType;
074import org.apache.hadoop.hbase.regionserver.HStore;
075import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
076import org.apache.hadoop.hbase.util.BloomFilterUtil;
077import org.apache.hadoop.hbase.util.Bytes;
078import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
079import org.apache.hadoop.hbase.util.FSUtils;
080import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
081import org.apache.hadoop.io.NullWritable;
082import org.apache.hadoop.io.SequenceFile;
083import org.apache.hadoop.io.Text;
084import org.apache.hadoop.mapreduce.Job;
085import org.apache.hadoop.mapreduce.OutputCommitter;
086import org.apache.hadoop.mapreduce.OutputFormat;
087import org.apache.hadoop.mapreduce.RecordWriter;
088import org.apache.hadoop.mapreduce.TaskAttemptContext;
089import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
090import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
091import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
092import org.apache.yetus.audience.InterfaceAudience;
093import org.slf4j.Logger;
094import org.slf4j.LoggerFactory;
095
096import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
097
098/**
099 * Writes HFiles. Passed Cells must arrive in order.
100 * Writes current time as the sequence id for the file. Sets the major compacted
101 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
102 * all HFiles being written.
103 * <p>
104 * Using this class as part of a MapReduce job is best done
105 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
106 */
107@InterfaceAudience.Public
108public class HFileOutputFormat2
109    extends FileOutputFormat<ImmutableBytesWritable, Cell> {
110  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
111  static class TableInfo {
112    private TableDescriptor tableDesctiptor;
113    private RegionLocator regionLocator;
114
115    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
116      this.tableDesctiptor = tableDesctiptor;
117      this.regionLocator = regionLocator;
118    }
119
120    /**
121     * The modification for the returned HTD doesn't affect the inner TD.
122     * @return A clone of inner table descriptor
123     * @deprecated since 2.0.0 and will be removed in 3.0.0. Use {@link #getTableDescriptor()}
124     *   instead.
125     * @see #getTableDescriptor()
126     * @see <a href="https://issues.apache.org/jira/browse/HBASE-18241">HBASE-18241</a>
127     */
128    @Deprecated
129    public HTableDescriptor getHTableDescriptor() {
130      return new HTableDescriptor(tableDesctiptor);
131    }
132
133    public TableDescriptor getTableDescriptor() {
134      return tableDesctiptor;
135    }
136
137    public RegionLocator getRegionLocator() {
138      return regionLocator;
139    }
140  }
141
142  protected static final byte[] tableSeparator = Bytes.toBytes(";");
143
144  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
145    return Bytes.add(tableName, tableSeparator, suffix);
146  }
147
148  // The following constants are private since these are used by
149  // HFileOutputFormat2 to internally transfer data between job setup and
150  // reducer run using conf.
151  // These should not be changed by the client.
152  static final String COMPRESSION_FAMILIES_CONF_KEY =
153      "hbase.hfileoutputformat.families.compression";
154  static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
155      "hbase.hfileoutputformat.families.bloomtype";
156  static final String BLOOM_PARAM_FAMILIES_CONF_KEY =
157      "hbase.hfileoutputformat.families.bloomparam";
158  static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
159      "hbase.mapreduce.hfileoutputformat.blocksize";
160  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
161      "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
162
163  // This constant is public since the client can modify this when setting
164  // up their conf object and thus refer to this symbol.
165  // It is present for backwards compatibility reasons. Use it only to
166  // override the auto-detection of datablock encoding and compression.
167  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
168      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
169  public static final String COMPRESSION_OVERRIDE_CONF_KEY =
170      "hbase.mapreduce.hfileoutputformat.compression";
171
172  /**
173   * Keep locality while generating HFiles for bulkload. See HBASE-12596
174   */
175  public static final String LOCALITY_SENSITIVE_CONF_KEY =
176      "hbase.bulkload.locality.sensitive.enabled";
177  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
178  static final String OUTPUT_TABLE_NAME_CONF_KEY =
179      "hbase.mapreduce.hfileoutputformat.table.name";
180  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
181          "hbase.mapreduce.use.multi.table.hfileoutputformat";
182
183  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
184  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
185
186  @Override
187  public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
188      final TaskAttemptContext context) throws IOException, InterruptedException {
189    return createRecordWriter(context, this.getOutputCommitter(context));
190  }
191
192  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
193    return combineTableNameSuffix(tableName, family);
194  }
195
196  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
197      createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer)
198          throws IOException {
199
200    // Get the path of the temporary output file
201    final Path outputDir = ((FileOutputCommitter)committer).getWorkPath();
202    final Configuration conf = context.getConfiguration();
203    final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ;
204    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
205    if (writeTableNames==null || writeTableNames.isEmpty()) {
206      throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY
207              + " cannot be empty");
208    }
209    final FileSystem fs = outputDir.getFileSystem(conf);
210    // These configs. are from hbase-*.xml
211    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
212        HConstants.DEFAULT_MAX_FILE_SIZE);
213    // Invented config.  Add to hbase-*.xml if other than default compression.
214    final String defaultCompressionStr = conf.get("hfile.compression",
215        Compression.Algorithm.NONE.getName());
216    final Algorithm defaultCompression = HFileWriterImpl
217        .compressionByName(defaultCompressionStr);
218    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
219    final Algorithm overriddenCompression;
220    if (compressionStr != null) {
221      overriddenCompression = Compression.getCompressionAlgorithmByName(compressionStr);
222    } else {
223      overriddenCompression = null;
224    }
225
226    final boolean compactionExclude = conf.getBoolean(
227        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
228
229    final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
230            Bytes.toString(tableSeparator))).collect(Collectors.toSet());
231
232    // create a map from column family to the compression algorithm
233    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
234    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
235    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
236    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
237
238    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
239    final Map<byte[], DataBlockEncoding> datablockEncodingMap
240        = createFamilyDataBlockEncodingMap(conf);
241    final DataBlockEncoding overriddenEncoding;
242    if (dataBlockEncodingStr != null) {
243      overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
244    } else {
245      overriddenEncoding = null;
246    }
247
248    return new RecordWriter<ImmutableBytesWritable, V>() {
249      // Map of families to writers and how much has been output on the writer.
250      private final Map<byte[], WriterLength> writers =
251              new TreeMap<>(Bytes.BYTES_COMPARATOR);
252      private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
253      private final long now = EnvironmentEdgeManager.currentTime();
254      private boolean rollRequested = false;
255
256      @Override
257      public void write(ImmutableBytesWritable row, V cell)
258          throws IOException {
259        Cell kv = cell;
260        // null input == user explicitly wants to flush
261        if (row == null && kv == null) {
262          rollWriters(null);
263          return;
264        }
265
266        byte[] rowKey = CellUtil.cloneRow(kv);
267        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
268        byte[] family = CellUtil.cloneFamily(kv);
269        byte[] tableNameBytes = null;
270        if (writeMultipleTables) {
271          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
272          tableNameBytes =
273              TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString()
274              .getBytes(Charset.defaultCharset());
275          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
276            throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) +
277                    "' not" + " expected");
278          }
279        } else {
280          tableNameBytes = Bytes.toBytes(writeTableNames);
281        }
282        String tableName = Bytes.toString(tableNameBytes);
283        Path tableRelPath = getTableRelativePath(tableNameBytes);
284        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
285
286        WriterLength wl = this.writers.get(tableAndFamily);
287
288        // If this is a new column family, verify that the directory exists
289        if (wl == null) {
290          Path writerPath = null;
291          if (writeMultipleTables) {
292            writerPath = new Path(outputDir,new Path(tableRelPath, Bytes
293                    .toString(family)));
294
295          }
296          else {
297            writerPath = new Path(outputDir, Bytes.toString(family));
298          }
299          fs.mkdirs(writerPath);
300          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
301        }
302
303        if (wl != null && wl.written + length >= maxsize) {
304          this.rollRequested = true;
305        }
306
307        // This can only happen once a row is finished though
308        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
309          rollWriters(wl);
310        }
311
312        // create a new WAL writer, if necessary
313        if (wl == null || wl.writer == null) {
314          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
315            HRegionLocation loc = null;
316
317            if (tableName != null) {
318              try (Connection connection = ConnectionFactory.createConnection(conf);
319                     RegionLocator locator =
320                       connection.getRegionLocator(TableName.valueOf(tableName))) {
321                loc = locator.getRegionLocation(rowKey);
322              } catch (Throwable e) {
323                LOG.warn("There's something wrong when locating rowkey: " +
324                  Bytes.toString(rowKey) + " for tablename: " + tableName, e);
325                loc = null;
326              } }
327
328            if (null == loc) {
329              if (LOG.isTraceEnabled()) {
330                LOG.trace("failed to get region location, so use default writer for rowkey: " +
331                  Bytes.toString(rowKey));
332              }
333              wl = getNewWriter(tableNameBytes, family, conf, null);
334            } else {
335              if (LOG.isDebugEnabled()) {
336                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
337              }
338              InetSocketAddress initialIsa =
339                  new InetSocketAddress(loc.getHostname(), loc.getPort());
340              if (initialIsa.isUnresolved()) {
341                if (LOG.isTraceEnabled()) {
342                  LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
343                      + loc.getPort() + ", so use default writer");
344                }
345                wl = getNewWriter(tableNameBytes, family, conf, null);
346              } else {
347                if (LOG.isDebugEnabled()) {
348                  LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
349                }
350                wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
351                });
352              }
353            }
354          } else {
355            wl = getNewWriter(tableNameBytes, family, conf, null);
356          }
357        }
358
359        // we now have the proper WAL writer. full steam ahead
360        PrivateCellUtil.updateLatestStamp(cell, this.now);
361        wl.writer.append(kv);
362        wl.written += length;
363
364        // Copy the row so we know when a row transition.
365        this.previousRow = rowKey;
366      }
367
368      private Path getTableRelativePath(byte[] tableNameBytes) {
369        String tableName = Bytes.toString(tableNameBytes);
370        String[] tableNameParts = tableName.split(":");
371        Path tableRelPath = new Path(tableName.split(":")[0]);
372        if (tableNameParts.length > 1) {
373          tableRelPath = new Path(tableRelPath, tableName.split(":")[1]);
374        }
375        return tableRelPath;
376      }
377      private void rollWriters(WriterLength writerLength) throws IOException {
378        if (writerLength != null) {
379          closeWriter(writerLength);
380        } else {
381          for (WriterLength wl : this.writers.values()) {
382            closeWriter(wl);
383          }
384        }
385        this.rollRequested = false;
386      }
387
388      private void closeWriter(WriterLength wl) throws IOException {
389        if (wl.writer != null) {
390          LOG.info(
391              "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written));
392          close(wl.writer);
393        }
394        wl.writer = null;
395        wl.written = 0;
396      }
397
398      /*
399       * Create a new StoreFile.Writer.
400       * @param family
401       * @return A WriterLength, containing a new StoreFile.Writer.
402       * @throws IOException
403       */
404      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
405          justification="Not important")
406      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration
407              conf, InetSocketAddress[] favoredNodes) throws IOException {
408        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
409        Path familydir = new Path(outputDir, Bytes.toString(family));
410        if (writeMultipleTables) {
411          familydir = new Path(outputDir,
412                  new Path(getTableRelativePath(tableName), Bytes.toString(family)));
413        }
414        WriterLength wl = new WriterLength();
415        Algorithm compression = overriddenCompression;
416        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
417        compression = compression == null ? defaultCompression : compression;
418        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
419        bloomType = bloomType == null ? BloomType.NONE : bloomType;
420        String bloomParam = bloomParamMap.get(tableAndFamily);
421        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
422          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
423        }
424        Integer blockSize = blockSizeMap.get(tableAndFamily);
425        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
426        DataBlockEncoding encoding = overriddenEncoding;
427        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
428        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
429        HFileContextBuilder contextBuilder = new HFileContextBuilder()
430                                    .withCompression(compression)
431                                    .withChecksumType(HStore.getChecksumType(conf))
432                                    .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
433                                    .withBlockSize(blockSize);
434
435        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
436          contextBuilder.withIncludesTags(true);
437        }
438
439        contextBuilder.withDataBlockEncoding(encoding);
440        HFileContext hFileContext = contextBuilder.build();
441        if (null == favoredNodes) {
442          wl.writer =
443              new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs)
444                  .withOutputDir(familydir).withBloomType(bloomType)
445                  .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build();
446        } else {
447          wl.writer =
448              new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
449                  .withOutputDir(familydir).withBloomType(bloomType)
450                  .withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
451                  .withFavoredNodes(favoredNodes).build();
452        }
453
454        this.writers.put(tableAndFamily, wl);
455        return wl;
456      }
457
458      private void close(final StoreFileWriter w) throws IOException {
459        if (w != null) {
460          w.appendFileInfo(BULKLOAD_TIME_KEY,
461              Bytes.toBytes(System.currentTimeMillis()));
462          w.appendFileInfo(BULKLOAD_TASK_KEY,
463              Bytes.toBytes(context.getTaskAttemptID().toString()));
464          w.appendFileInfo(MAJOR_COMPACTION_KEY,
465              Bytes.toBytes(true));
466          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY,
467              Bytes.toBytes(compactionExclude));
468          w.appendTrackedTimestampsToMetadata();
469          w.close();
470        }
471      }
472
473      @Override
474      public void close(TaskAttemptContext c)
475      throws IOException, InterruptedException {
476        for (WriterLength wl: this.writers.values()) {
477          close(wl.writer);
478        }
479      }
480    };
481  }
482
483  /**
484   * Configure block storage policy for CF after the directory is created.
485   */
486  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
487      byte[] tableAndFamily, Path cfPath) {
488    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
489      return;
490    }
491
492    String policy =
493        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
494          conf.get(STORAGE_POLICY_PROPERTY));
495    FSUtils.setStoragePolicy(fs, cfPath, policy);
496  }
497
498  /*
499   * Data structure to hold a Writer and amount of data written on it.
500   */
501  static class WriterLength {
502    long written = 0;
503    StoreFileWriter writer = null;
504  }
505
506  /**
507   * Return the start keys of all of the regions in this table,
508   * as a list of ImmutableBytesWritable.
509   */
510  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
511                                                                 boolean writeMultipleTables)
512          throws IOException {
513
514    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
515    for(RegionLocator regionLocator : regionLocators)
516    {
517      TableName tableName = regionLocator.getName();
518      LOG.info("Looking up current regions for table " + tableName);
519      byte[][] byteKeys = regionLocator.getStartKeys();
520      for (byte[] byteKey : byteKeys) {
521        byte[] fullKey = byteKey; //HFileOutputFormat2 use case
522        if (writeMultipleTables)
523        {
524          //MultiTableHFileOutputFormat use case
525          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
526        }
527        if (LOG.isDebugEnabled()) {
528          LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary
529                  (fullKey) + "]");
530        }
531        ret.add(new ImmutableBytesWritable(fullKey));
532      }
533    }
534    return ret;
535  }
536
537  /**
538   * Write out a {@link SequenceFile} that can be read by
539   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
540   */
541  @SuppressWarnings("deprecation")
542  private static void writePartitions(Configuration conf, Path partitionsPath,
543      List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
544    LOG.info("Writing partition information to " + partitionsPath);
545    if (startKeys.isEmpty()) {
546      throw new IllegalArgumentException("No regions passed");
547    }
548
549    // We're generating a list of split points, and we don't ever
550    // have keys < the first region (which has an empty start key)
551    // so we need to remove it. Otherwise we would end up with an
552    // empty reducer with index 0
553    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
554    ImmutableBytesWritable first = sorted.first();
555    if (writeMultipleTables) {
556      first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first
557              ().get()));
558    }
559    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
560      throw new IllegalArgumentException(
561          "First region of table should have empty start key. Instead has: "
562          + Bytes.toStringBinary(first.get()));
563    }
564    sorted.remove(sorted.first());
565
566    // Write the actual file
567    FileSystem fs = partitionsPath.getFileSystem(conf);
568    SequenceFile.Writer writer = SequenceFile.createWriter(
569      fs, conf, partitionsPath, ImmutableBytesWritable.class,
570      NullWritable.class);
571
572    try {
573      for (ImmutableBytesWritable startKey : sorted) {
574        writer.append(startKey, NullWritable.get());
575      }
576    } finally {
577      writer.close();
578    }
579  }
580
581  /**
582   * Configure a MapReduce Job to perform an incremental load into the given
583   * table. This
584   * <ul>
585   *   <li>Inspects the table to configure a total order partitioner</li>
586   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
587   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
588   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
589   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
590   *     PutSortReducer)</li>
591   * </ul>
592   * The user should be sure to set the map output value class to either KeyValue or Put before
593   * running this function.
594   */
595  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
596      throws IOException {
597    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
598  }
599
600  /**
601   * Configure a MapReduce Job to perform an incremental load into the given
602   * table. This
603   * <ul>
604   *   <li>Inspects the table to configure a total order partitioner</li>
605   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
606   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
607   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
608   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
609   *     PutSortReducer)</li>
610   * </ul>
611   * The user should be sure to set the map output value class to either KeyValue or Put before
612   * running this function.
613   */
614  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
615      RegionLocator regionLocator) throws IOException {
616    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
617    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
618    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
619  }
620
621  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
622      Class<? extends OutputFormat<?, ?>> cls) throws IOException {
623    Configuration conf = job.getConfiguration();
624    job.setOutputKeyClass(ImmutableBytesWritable.class);
625    job.setOutputValueClass(MapReduceExtendedCell.class);
626    job.setOutputFormatClass(cls);
627
628    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
629      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
630    }
631    boolean writeMultipleTables = false;
632    if (MultiTableHFileOutputFormat.class.equals(cls)) {
633      writeMultipleTables = true;
634      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
635    }
636    // Based on the configured map output class, set the correct reducer to properly
637    // sort the incoming values.
638    // TODO it would be nice to pick one or the other of these formats.
639    if (KeyValue.class.equals(job.getMapOutputValueClass())
640        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) {
641      job.setReducerClass(CellSortReducer.class);
642    } else if (Put.class.equals(job.getMapOutputValueClass())) {
643      job.setReducerClass(PutSortReducer.class);
644    } else if (Text.class.equals(job.getMapOutputValueClass())) {
645      job.setReducerClass(TextSortReducer.class);
646    } else {
647      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
648    }
649
650    conf.setStrings("io.serializations", conf.get("io.serializations"),
651        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
652        CellSerialization.class.getName());
653
654    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
655      LOG.info("bulkload locality sensitive enabled");
656    }
657
658    /* Now get the region start keys for every table required */
659    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
660    List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size());
661    List<TableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size());
662
663    for( TableInfo tableInfo : multiTableInfo )
664    {
665      regionLocators.add(tableInfo.getRegionLocator());
666      String tn = writeMultipleTables?
667        tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString():
668        tableInfo.getRegionLocator().getName().getNameAsString();
669      allTableNames.add(tn);
670      tableDescriptors.add(tableInfo.getTableDescriptor());
671    }
672    // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table
673    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
674            .toString(tableSeparator)));
675    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables);
676    // Use table's region boundaries for TOP split points.
677    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
678        "to match current region count for all tables");
679    job.setNumReduceTasks(startKeys.size());
680
681    configurePartitioner(job, startKeys, writeMultipleTables);
682    // Set compression algorithms based on column families
683
684    conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
685            tableDescriptors));
686    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
687            tableDescriptors));
688    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
689            tableDescriptors));
690    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails,
691        tableDescriptors));
692    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
693            serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
694
695    TableMapReduceUtil.addDependencyJars(job);
696    TableMapReduceUtil.initCredentials(job);
697    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
698  }
699
700  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws
701      IOException {
702    Configuration conf = job.getConfiguration();
703
704    job.setOutputKeyClass(ImmutableBytesWritable.class);
705    job.setOutputValueClass(MapReduceExtendedCell.class);
706    job.setOutputFormatClass(HFileOutputFormat2.class);
707
708    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
709    singleTableDescriptor.add(tableDescriptor);
710
711    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
712    // Set compression algorithms based on column families
713    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
714        serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
715    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
716        serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
717    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
718        serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
719    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
720        serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
721    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
722        serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
723
724    TableMapReduceUtil.addDependencyJars(job);
725    TableMapReduceUtil.initCredentials(job);
726    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
727  }
728
729  /**
730   * Runs inside the task to deserialize column family to compression algorithm
731   * map from the configuration.
732   *
733   * @param conf to read the serialized values from
734   * @return a map from column family to the configured compression algorithm
735   */
736  @VisibleForTesting
737  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
738      conf) {
739    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
740        COMPRESSION_FAMILIES_CONF_KEY);
741    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
742    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
743      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
744      compressionMap.put(e.getKey(), algorithm);
745    }
746    return compressionMap;
747  }
748
749  /**
750   * Runs inside the task to deserialize column family to bloom filter type
751   * map from the configuration.
752   *
753   * @param conf to read the serialized values from
754   * @return a map from column family to the the configured bloom filter type
755   */
756  @VisibleForTesting
757  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
758    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
759        BLOOM_TYPE_FAMILIES_CONF_KEY);
760    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
761    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
762      BloomType bloomType = BloomType.valueOf(e.getValue());
763      bloomTypeMap.put(e.getKey(), bloomType);
764    }
765    return bloomTypeMap;
766  }
767
768  /**
769   * Runs inside the task to deserialize column family to bloom filter param
770   * map from the configuration.
771   *
772   * @param conf to read the serialized values from
773   * @return a map from column family to the the configured bloom filter param
774   */
775  @VisibleForTesting
776  static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
777    return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
778  }
779
780
781  /**
782   * Runs inside the task to deserialize column family to block size
783   * map from the configuration.
784   *
785   * @param conf to read the serialized values from
786   * @return a map from column family to the configured block size
787   */
788  @VisibleForTesting
789  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
790    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
791        BLOCK_SIZE_FAMILIES_CONF_KEY);
792    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
793    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
794      Integer blockSize = Integer.parseInt(e.getValue());
795      blockSizeMap.put(e.getKey(), blockSize);
796    }
797    return blockSizeMap;
798  }
799
800  /**
801   * Runs inside the task to deserialize column family to data block encoding
802   * type map from the configuration.
803   *
804   * @param conf to read the serialized values from
805   * @return a map from column family to HFileDataBlockEncoder for the
806   *         configured data block type for the family
807   */
808  @VisibleForTesting
809  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
810      Configuration conf) {
811    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
812        DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
813    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
814    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
815      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
816    }
817    return encoderMap;
818  }
819
820
821  /**
822   * Run inside the task to deserialize column family to given conf value map.
823   *
824   * @param conf to read the serialized values from
825   * @param confName conf key to read from the configuration
826   * @return a map of column family to the given configuration value
827   */
828  private static Map<byte[], String> createFamilyConfValueMap(
829      Configuration conf, String confName) {
830    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
831    String confVal = conf.get(confName, "");
832    for (String familyConf : confVal.split("&")) {
833      String[] familySplit = familyConf.split("=");
834      if (familySplit.length != 2) {
835        continue;
836      }
837      try {
838        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
839            URLDecoder.decode(familySplit[1], "UTF-8"));
840      } catch (UnsupportedEncodingException e) {
841        // will not happen with UTF-8 encoding
842        throw new AssertionError(e);
843      }
844    }
845    return confValMap;
846  }
847
848  /**
849   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
850   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
851   */
852  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
853          writeMultipleTables)
854      throws IOException {
855    Configuration conf = job.getConfiguration();
856    // create the partitions file
857    FileSystem fs = FileSystem.get(conf);
858    String hbaseTmpFsDir =
859        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
860            fs.getHomeDirectory() + "/hbase-staging");
861    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
862    fs.makeQualified(partitionsPath);
863    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
864    fs.deleteOnExit(partitionsPath);
865
866    // configure job to use it
867    job.setPartitionerClass(TotalOrderPartitioner.class);
868    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
869  }
870
871  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
872  @VisibleForTesting
873  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables)
874      throws UnsupportedEncodingException {
875    StringBuilder attributeValue = new StringBuilder();
876    int i = 0;
877    for (TableDescriptor tableDescriptor : allTables) {
878      if (tableDescriptor == null) {
879        // could happen with mock table instance
880        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
881        return "";
882      }
883      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
884        if (i++ > 0) {
885          attributeValue.append('&');
886        }
887        attributeValue.append(URLEncoder.encode(
888            Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())),
889            "UTF-8"));
890        attributeValue.append('=');
891        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
892      }
893    }
894    // Get rid of the last ampersand
895    return attributeValue.toString();
896  }
897
898  /**
899   * Serialize column family to compression algorithm map to configuration.
900   * Invoked while configuring the MR job for incremental load.
901   */
902  @VisibleForTesting
903  static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor ->
904          familyDescriptor.getCompressionType().getName();
905
906  /**
907   * Serialize column family to block size map to configuration. Invoked while
908   * configuring the MR job for incremental load.
909   */
910  @VisibleForTesting
911  static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String
912          .valueOf(familyDescriptor.getBlocksize());
913
914  /**
915   * Serialize column family to bloom type map to configuration. Invoked while
916   * configuring the MR job for incremental load.
917   */
918  @VisibleForTesting
919  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
920    String bloomType = familyDescriptor.getBloomFilterType().toString();
921    if (bloomType == null) {
922      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
923    }
924    return bloomType;
925  };
926
927  /**
928   * Serialize column family to bloom param map to configuration. Invoked while
929   * configuring the MR job for incremental load.
930   */
931  @VisibleForTesting
932  static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
933    BloomType bloomType = familyDescriptor.getBloomFilterType();
934    String bloomParam = "";
935    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
936      bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
937    }
938    return bloomParam;
939  };
940
941  /**
942   * Serialize column family to data block encoding map to configuration.
943   * Invoked while configuring the MR job for incremental load.
944   */
945  @VisibleForTesting
946  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
947    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
948    if (encoding == null) {
949      encoding = DataBlockEncoding.NONE;
950    }
951    return encoding.toString();
952  };
953
954}