001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.TreeMap;
036import java.util.TreeSet;
037import java.util.UUID;
038import java.util.function.Function;
039import java.util.stream.Collectors;
040
041import org.apache.commons.lang3.StringUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellComparator;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.HRegionLocation;
050import org.apache.hadoop.hbase.HTableDescriptor;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.PrivateCellUtil;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.Put;
059import org.apache.hadoop.hbase.client.RegionLocator;
060import org.apache.hadoop.hbase.client.Table;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.fs.HFileSystem;
063import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
064import org.apache.hadoop.hbase.io.compress.Compression;
065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
066import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
067import org.apache.hadoop.hbase.io.hfile.CacheConfig;
068import org.apache.hadoop.hbase.io.hfile.HFile;
069import org.apache.hadoop.hbase.io.hfile.HFileContext;
070import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
071import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
072import org.apache.hadoop.hbase.regionserver.BloomType;
073import org.apache.hadoop.hbase.regionserver.HStore;
074import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
075import org.apache.hadoop.hbase.util.Bytes;
076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
077import org.apache.hadoop.hbase.util.FSUtils;
078import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
079import org.apache.hadoop.io.NullWritable;
080import org.apache.hadoop.io.SequenceFile;
081import org.apache.hadoop.io.Text;
082import org.apache.hadoop.mapreduce.Job;
083import org.apache.hadoop.mapreduce.OutputCommitter;
084import org.apache.hadoop.mapreduce.OutputFormat;
085import org.apache.hadoop.mapreduce.RecordWriter;
086import org.apache.hadoop.mapreduce.TaskAttemptContext;
087import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
088import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
089import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
090import org.apache.yetus.audience.InterfaceAudience;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
095
096/**
097 * Writes HFiles. Passed Cells must arrive in order.
098 * Writes current time as the sequence id for the file. Sets the major compacted
099 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
100 * all HFiles being written.
101 * <p>
102 * Using this class as part of a MapReduce job is best done
103 * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
104 */
105@InterfaceAudience.Public
106public class HFileOutputFormat2
107    extends FileOutputFormat<ImmutableBytesWritable, Cell> {
108  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
109  static class TableInfo {
110    private TableDescriptor tableDesctiptor;
111    private RegionLocator regionLocator;
112
113    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
114      this.tableDesctiptor = tableDesctiptor;
115      this.regionLocator = regionLocator;
116    }
117
118    /**
119     * The modification for the returned HTD doesn't affect the inner TD.
120     * @return A clone of inner table descriptor
121     * @deprecated use {@link #getTableDescriptor}
122     */
123    @Deprecated
124    public HTableDescriptor getHTableDescriptor() {
125      return new HTableDescriptor(tableDesctiptor);
126    }
127
128    public TableDescriptor getTableDescriptor() {
129      return tableDesctiptor;
130    }
131
132    public RegionLocator getRegionLocator() {
133      return regionLocator;
134    }
135  }
136
137  protected static final byte[] tableSeparator = Bytes.toBytes(";");
138
139  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
140    return Bytes.add(tableName, tableSeparator, suffix);
141  }
142
143  // The following constants are private since these are used by
144  // HFileOutputFormat2 to internally transfer data between job setup and
145  // reducer run using conf.
146  // These should not be changed by the client.
147  static final String COMPRESSION_FAMILIES_CONF_KEY =
148      "hbase.hfileoutputformat.families.compression";
149  static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
150      "hbase.hfileoutputformat.families.bloomtype";
151  static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
152      "hbase.mapreduce.hfileoutputformat.blocksize";
153  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
154      "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
155
156  // This constant is public since the client can modify this when setting
157  // up their conf object and thus refer to this symbol.
158  // It is present for backwards compatibility reasons. Use it only to
159  // override the auto-detection of datablock encoding.
160  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
161      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
162
163  /**
164   * Keep locality while generating HFiles for bulkload. See HBASE-12596
165   */
166  public static final String LOCALITY_SENSITIVE_CONF_KEY =
167      "hbase.bulkload.locality.sensitive.enabled";
168  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
169  static final String OUTPUT_TABLE_NAME_CONF_KEY =
170      "hbase.mapreduce.hfileoutputformat.table.name";
171  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
172          "hbase.mapreduce.use.multi.table.hfileoutputformat";
173
174  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
175  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
176
177  @Override
178  public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
179      final TaskAttemptContext context) throws IOException, InterruptedException {
180    return createRecordWriter(context, this.getOutputCommitter(context));
181  }
182
183  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
184    return combineTableNameSuffix(tableName, family);
185  }
186
187  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
188      createRecordWriter(final TaskAttemptContext context, final OutputCommitter committer)
189          throws IOException {
190
191    // Get the path of the temporary output file
192    final Path outputDir = ((FileOutputCommitter)committer).getWorkPath();
193    final Configuration conf = context.getConfiguration();
194    final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ;
195    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
196    if (writeTableNames==null || writeTableNames.isEmpty()) {
197      throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY
198              + " cannot be empty");
199    }
200    final FileSystem fs = outputDir.getFileSystem(conf);
201    // These configs. are from hbase-*.xml
202    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
203        HConstants.DEFAULT_MAX_FILE_SIZE);
204    // Invented config.  Add to hbase-*.xml if other than default compression.
205    final String defaultCompressionStr = conf.get("hfile.compression",
206        Compression.Algorithm.NONE.getName());
207    final Algorithm defaultCompression = HFileWriterImpl
208        .compressionByName(defaultCompressionStr);
209    final boolean compactionExclude = conf.getBoolean(
210        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
211
212    final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
213            Bytes.toString(tableSeparator))).collect(Collectors.toSet());
214
215    // create a map from column family to the compression algorithm
216    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
217    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
218    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
219
220    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
221    final Map<byte[], DataBlockEncoding> datablockEncodingMap
222        = createFamilyDataBlockEncodingMap(conf);
223    final DataBlockEncoding overriddenEncoding;
224    if (dataBlockEncodingStr != null) {
225      overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
226    } else {
227      overriddenEncoding = null;
228    }
229
230    return new RecordWriter<ImmutableBytesWritable, V>() {
231      // Map of families to writers and how much has been output on the writer.
232      private final Map<byte[], WriterLength> writers =
233              new TreeMap<>(Bytes.BYTES_COMPARATOR);
234      private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
235      private final long now = EnvironmentEdgeManager.currentTime();
236      private boolean rollRequested = false;
237
238      @Override
239      public void write(ImmutableBytesWritable row, V cell)
240          throws IOException {
241        Cell kv = cell;
242        // null input == user explicitly wants to flush
243        if (row == null && kv == null) {
244          rollWriters(null);
245          return;
246        }
247
248        byte[] rowKey = CellUtil.cloneRow(kv);
249        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
250        byte[] family = CellUtil.cloneFamily(kv);
251        byte[] tableNameBytes = null;
252        if (writeMultipleTables) {
253          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
254          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
255            throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) +
256                    "' not" + " expected");
257          }
258        } else {
259          tableNameBytes = Bytes.toBytes(writeTableNames);
260        }
261        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
262        WriterLength wl = this.writers.get(tableAndFamily);
263
264        // If this is a new column family, verify that the directory exists
265        if (wl == null) {
266          Path writerPath = null;
267          if (writeMultipleTables) {
268            writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes
269                    .toString(family)));
270          }
271          else {
272            writerPath = new Path(outputDir, Bytes.toString(family));
273          }
274          fs.mkdirs(writerPath);
275          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
276        }
277
278        if (wl != null && wl.written + length >= maxsize) {
279          this.rollRequested = true;
280        }
281
282        // This can only happen once a row is finished though
283        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
284          rollWriters(wl);
285        }
286
287        // create a new WAL writer, if necessary
288        if (wl == null || wl.writer == null) {
289          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
290            HRegionLocation loc = null;
291
292            String tableName = Bytes.toString(tableNameBytes);
293            if (tableName != null) {
294              try (Connection connection = ConnectionFactory.createConnection(conf);
295                     RegionLocator locator =
296                       connection.getRegionLocator(TableName.valueOf(tableName))) {
297                loc = locator.getRegionLocation(rowKey);
298              } catch (Throwable e) {
299                LOG.warn("There's something wrong when locating rowkey: " +
300                  Bytes.toString(rowKey) + " for tablename: " + tableName, e);
301                loc = null;
302              } }
303
304            if (null == loc) {
305              if (LOG.isTraceEnabled()) {
306                LOG.trace("failed to get region location, so use default writer for rowkey: " +
307                  Bytes.toString(rowKey));
308              }
309              wl = getNewWriter(tableNameBytes, family, conf, null);
310            } else {
311              if (LOG.isDebugEnabled()) {
312                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
313              }
314              InetSocketAddress initialIsa =
315                  new InetSocketAddress(loc.getHostname(), loc.getPort());
316              if (initialIsa.isUnresolved()) {
317                if (LOG.isTraceEnabled()) {
318                  LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
319                      + loc.getPort() + ", so use default writer");
320                }
321                wl = getNewWriter(tableNameBytes, family, conf, null);
322              } else {
323                if (LOG.isDebugEnabled()) {
324                  LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
325                }
326                wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
327                });
328              }
329            }
330          } else {
331            wl = getNewWriter(tableNameBytes, family, conf, null);
332          }
333        }
334
335        // we now have the proper WAL writer. full steam ahead
336        PrivateCellUtil.updateLatestStamp(cell, this.now);
337        wl.writer.append(kv);
338        wl.written += length;
339
340        // Copy the row so we know when a row transition.
341        this.previousRow = rowKey;
342      }
343
344      private void rollWriters(WriterLength writerLength) throws IOException {
345        if (writerLength != null) {
346          closeWriter(writerLength);
347        } else {
348          for (WriterLength wl : this.writers.values()) {
349            closeWriter(wl);
350          }
351        }
352        this.rollRequested = false;
353      }
354
355      private void closeWriter(WriterLength wl) throws IOException {
356        if (wl.writer != null) {
357          LOG.info(
358              "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written));
359          close(wl.writer);
360        }
361        wl.writer = null;
362        wl.written = 0;
363      }
364
365      /*
366       * Create a new StoreFile.Writer.
367       * @param family
368       * @return A WriterLength, containing a new StoreFile.Writer.
369       * @throws IOException
370       */
371      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
372          justification="Not important")
373      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration
374              conf, InetSocketAddress[] favoredNodes) throws IOException {
375        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
376        Path familydir = new Path(outputDir, Bytes.toString(family));
377        if (writeMultipleTables) {
378          familydir = new Path(outputDir,
379                  new Path(Bytes.toString(tableName), Bytes.toString(family)));
380        }
381        WriterLength wl = new WriterLength();
382        Algorithm compression = compressionMap.get(tableAndFamily);
383        compression = compression == null ? defaultCompression : compression;
384        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
385        bloomType = bloomType == null ? BloomType.NONE : bloomType;
386        Integer blockSize = blockSizeMap.get(tableAndFamily);
387        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
388        DataBlockEncoding encoding = overriddenEncoding;
389        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
390        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
391        HFileContextBuilder contextBuilder = new HFileContextBuilder()
392                                    .withCompression(compression)
393                                    .withChecksumType(HStore.getChecksumType(conf))
394                                    .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
395                                    .withBlockSize(blockSize);
396
397        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
398          contextBuilder.withIncludesTags(true);
399        }
400
401        contextBuilder.withDataBlockEncoding(encoding);
402        HFileContext hFileContext = contextBuilder.build();
403        if (null == favoredNodes) {
404          wl.writer =
405              new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs)
406                  .withOutputDir(familydir).withBloomType(bloomType)
407                  .withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build();
408        } else {
409          wl.writer =
410              new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
411                  .withOutputDir(familydir).withBloomType(bloomType)
412                  .withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
413                  .withFavoredNodes(favoredNodes).build();
414        }
415
416        this.writers.put(tableAndFamily, wl);
417        return wl;
418      }
419
420      private void close(final StoreFileWriter w) throws IOException {
421        if (w != null) {
422          w.appendFileInfo(BULKLOAD_TIME_KEY,
423              Bytes.toBytes(System.currentTimeMillis()));
424          w.appendFileInfo(BULKLOAD_TASK_KEY,
425              Bytes.toBytes(context.getTaskAttemptID().toString()));
426          w.appendFileInfo(MAJOR_COMPACTION_KEY,
427              Bytes.toBytes(true));
428          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY,
429              Bytes.toBytes(compactionExclude));
430          w.appendTrackedTimestampsToMetadata();
431          w.close();
432        }
433      }
434
435      @Override
436      public void close(TaskAttemptContext c)
437      throws IOException, InterruptedException {
438        for (WriterLength wl: this.writers.values()) {
439          close(wl.writer);
440        }
441      }
442    };
443  }
444
445  /**
446   * Configure block storage policy for CF after the directory is created.
447   */
448  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
449      byte[] tableAndFamily, Path cfPath) {
450    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
451      return;
452    }
453
454    String policy =
455        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
456          conf.get(STORAGE_POLICY_PROPERTY));
457    FSUtils.setStoragePolicy(fs, cfPath, policy);
458  }
459
460  /*
461   * Data structure to hold a Writer and amount of data written on it.
462   */
463  static class WriterLength {
464    long written = 0;
465    StoreFileWriter writer = null;
466  }
467
468  /**
469   * Return the start keys of all of the regions in this table,
470   * as a list of ImmutableBytesWritable.
471   */
472  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
473                                                                 boolean writeMultipleTables)
474          throws IOException {
475
476    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
477    for(RegionLocator regionLocator : regionLocators)
478    {
479      TableName tableName = regionLocator.getName();
480      LOG.info("Looking up current regions for table " + tableName);
481      byte[][] byteKeys = regionLocator.getStartKeys();
482      for (byte[] byteKey : byteKeys) {
483        byte[] fullKey = byteKey; //HFileOutputFormat2 use case
484        if (writeMultipleTables)
485        {
486          //MultiTableHFileOutputFormat use case
487          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
488        }
489        if (LOG.isDebugEnabled()) {
490          LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary
491                  (fullKey) + "]");
492        }
493        ret.add(new ImmutableBytesWritable(fullKey));
494      }
495    }
496    return ret;
497  }
498
499  /**
500   * Write out a {@link SequenceFile} that can be read by
501   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
502   */
503  @SuppressWarnings("deprecation")
504  private static void writePartitions(Configuration conf, Path partitionsPath,
505      List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
506    LOG.info("Writing partition information to " + partitionsPath);
507    if (startKeys.isEmpty()) {
508      throw new IllegalArgumentException("No regions passed");
509    }
510
511    // We're generating a list of split points, and we don't ever
512    // have keys < the first region (which has an empty start key)
513    // so we need to remove it. Otherwise we would end up with an
514    // empty reducer with index 0
515    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
516    ImmutableBytesWritable first = sorted.first();
517    if (writeMultipleTables) {
518      first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first
519              ().get()));
520    }
521    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
522      throw new IllegalArgumentException(
523          "First region of table should have empty start key. Instead has: "
524          + Bytes.toStringBinary(first.get()));
525    }
526    sorted.remove(sorted.first());
527
528    // Write the actual file
529    FileSystem fs = partitionsPath.getFileSystem(conf);
530    SequenceFile.Writer writer = SequenceFile.createWriter(
531      fs, conf, partitionsPath, ImmutableBytesWritable.class,
532      NullWritable.class);
533
534    try {
535      for (ImmutableBytesWritable startKey : sorted) {
536        writer.append(startKey, NullWritable.get());
537      }
538    } finally {
539      writer.close();
540    }
541  }
542
543  /**
544   * Configure a MapReduce Job to perform an incremental load into the given
545   * table. This
546   * <ul>
547   *   <li>Inspects the table to configure a total order partitioner</li>
548   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
549   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
550   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
551   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
552   *     PutSortReducer)</li>
553   * </ul>
554   * The user should be sure to set the map output value class to either KeyValue or Put before
555   * running this function.
556   */
557  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
558      throws IOException {
559    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
560  }
561
562  /**
563   * Configure a MapReduce Job to perform an incremental load into the given
564   * table. This
565   * <ul>
566   *   <li>Inspects the table to configure a total order partitioner</li>
567   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
568   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
569   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
570   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
571   *     PutSortReducer)</li>
572   * </ul>
573   * The user should be sure to set the map output value class to either KeyValue or Put before
574   * running this function.
575   */
576  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
577      RegionLocator regionLocator) throws IOException {
578    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
579    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
580    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
581  }
582
583  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
584      Class<? extends OutputFormat<?, ?>> cls) throws IOException {
585    Configuration conf = job.getConfiguration();
586    job.setOutputKeyClass(ImmutableBytesWritable.class);
587    job.setOutputValueClass(MapReduceExtendedCell.class);
588    job.setOutputFormatClass(cls);
589
590    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
591      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
592    }
593    boolean writeMultipleTables = false;
594    if (MultiTableHFileOutputFormat.class.equals(cls)) {
595      writeMultipleTables = true;
596      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
597    }
598    // Based on the configured map output class, set the correct reducer to properly
599    // sort the incoming values.
600    // TODO it would be nice to pick one or the other of these formats.
601    if (KeyValue.class.equals(job.getMapOutputValueClass())
602        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) {
603      job.setReducerClass(CellSortReducer.class);
604    } else if (Put.class.equals(job.getMapOutputValueClass())) {
605      job.setReducerClass(PutSortReducer.class);
606    } else if (Text.class.equals(job.getMapOutputValueClass())) {
607      job.setReducerClass(TextSortReducer.class);
608    } else {
609      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
610    }
611
612    conf.setStrings("io.serializations", conf.get("io.serializations"),
613        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
614        CellSerialization.class.getName());
615
616    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
617      LOG.info("bulkload locality sensitive enabled");
618    }
619
620    /* Now get the region start keys for every table required */
621    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
622    List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size());
623    List<TableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size());
624
625    for( TableInfo tableInfo : multiTableInfo )
626    {
627      regionLocators.add(tableInfo.getRegionLocator());
628      allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
629      tableDescriptors.add(tableInfo.getTableDescriptor());
630    }
631    // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table
632    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
633            .toString(tableSeparator)));
634    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables);
635    // Use table's region boundaries for TOP split points.
636    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
637        "to match current region count for all tables");
638    job.setNumReduceTasks(startKeys.size());
639
640    configurePartitioner(job, startKeys, writeMultipleTables);
641    // Set compression algorithms based on column families
642
643    conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
644            tableDescriptors));
645    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
646            tableDescriptors));
647    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
648            tableDescriptors));
649    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
650            serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
651
652    TableMapReduceUtil.addDependencyJars(job);
653    TableMapReduceUtil.initCredentials(job);
654    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
655  }
656
657  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws
658      IOException {
659    Configuration conf = job.getConfiguration();
660
661    job.setOutputKeyClass(ImmutableBytesWritable.class);
662    job.setOutputValueClass(MapReduceExtendedCell.class);
663    job.setOutputFormatClass(HFileOutputFormat2.class);
664
665    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
666    singleTableDescriptor.add(tableDescriptor);
667
668    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
669    // Set compression algorithms based on column families
670    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
671        serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
672    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
673        serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
674    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
675        serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
676    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
677        serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
678
679    TableMapReduceUtil.addDependencyJars(job);
680    TableMapReduceUtil.initCredentials(job);
681    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
682  }
683
684  /**
685   * Runs inside the task to deserialize column family to compression algorithm
686   * map from the configuration.
687   *
688   * @param conf to read the serialized values from
689   * @return a map from column family to the configured compression algorithm
690   */
691  @VisibleForTesting
692  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
693      conf) {
694    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
695        COMPRESSION_FAMILIES_CONF_KEY);
696    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
697    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
698      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
699      compressionMap.put(e.getKey(), algorithm);
700    }
701    return compressionMap;
702  }
703
704  /**
705   * Runs inside the task to deserialize column family to bloom filter type
706   * map from the configuration.
707   *
708   * @param conf to read the serialized values from
709   * @return a map from column family to the the configured bloom filter type
710   */
711  @VisibleForTesting
712  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
713    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
714        BLOOM_TYPE_FAMILIES_CONF_KEY);
715    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
716    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
717      BloomType bloomType = BloomType.valueOf(e.getValue());
718      bloomTypeMap.put(e.getKey(), bloomType);
719    }
720    return bloomTypeMap;
721  }
722
723  /**
724   * Runs inside the task to deserialize column family to block size
725   * map from the configuration.
726   *
727   * @param conf to read the serialized values from
728   * @return a map from column family to the configured block size
729   */
730  @VisibleForTesting
731  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
732    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
733        BLOCK_SIZE_FAMILIES_CONF_KEY);
734    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
735    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
736      Integer blockSize = Integer.parseInt(e.getValue());
737      blockSizeMap.put(e.getKey(), blockSize);
738    }
739    return blockSizeMap;
740  }
741
742  /**
743   * Runs inside the task to deserialize column family to data block encoding
744   * type map from the configuration.
745   *
746   * @param conf to read the serialized values from
747   * @return a map from column family to HFileDataBlockEncoder for the
748   *         configured data block type for the family
749   */
750  @VisibleForTesting
751  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
752      Configuration conf) {
753    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
754        DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
755    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
756    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
757      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
758    }
759    return encoderMap;
760  }
761
762
763  /**
764   * Run inside the task to deserialize column family to given conf value map.
765   *
766   * @param conf to read the serialized values from
767   * @param confName conf key to read from the configuration
768   * @return a map of column family to the given configuration value
769   */
770  private static Map<byte[], String> createFamilyConfValueMap(
771      Configuration conf, String confName) {
772    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
773    String confVal = conf.get(confName, "");
774    for (String familyConf : confVal.split("&")) {
775      String[] familySplit = familyConf.split("=");
776      if (familySplit.length != 2) {
777        continue;
778      }
779      try {
780        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
781            URLDecoder.decode(familySplit[1], "UTF-8"));
782      } catch (UnsupportedEncodingException e) {
783        // will not happen with UTF-8 encoding
784        throw new AssertionError(e);
785      }
786    }
787    return confValMap;
788  }
789
790  /**
791   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
792   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
793   */
794  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
795          writeMultipleTables)
796      throws IOException {
797    Configuration conf = job.getConfiguration();
798    // create the partitions file
799    FileSystem fs = FileSystem.get(conf);
800    String hbaseTmpFsDir =
801        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
802          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
803    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
804    fs.makeQualified(partitionsPath);
805    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
806    fs.deleteOnExit(partitionsPath);
807
808    // configure job to use it
809    job.setPartitionerClass(TotalOrderPartitioner.class);
810    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
811  }
812
813  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
814  @VisibleForTesting
815  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables)
816      throws UnsupportedEncodingException {
817    StringBuilder attributeValue = new StringBuilder();
818    int i = 0;
819    for (TableDescriptor tableDescriptor : allTables) {
820      if (tableDescriptor == null) {
821        // could happen with mock table instance
822        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
823        return "";
824      }
825      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
826        if (i++ > 0) {
827          attributeValue.append('&');
828        }
829        attributeValue.append(URLEncoder.encode(
830            Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())),
831            "UTF-8"));
832        attributeValue.append('=');
833        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
834      }
835    }
836    // Get rid of the last ampersand
837    return attributeValue.toString();
838  }
839
840  /**
841   * Serialize column family to compression algorithm map to configuration.
842   * Invoked while configuring the MR job for incremental load.
843   *
844   * @param tableDescriptor to read the properties from
845   * @param conf to persist serialized values into
846   * @throws IOException
847   *           on failure to read column family descriptors
848   */
849  @VisibleForTesting
850  static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor ->
851          familyDescriptor.getCompressionType().getName();
852
853  /**
854   * Serialize column family to block size map to configuration. Invoked while
855   * configuring the MR job for incremental load.
856   *
857   * @param tableDescriptor
858   *          to read the properties from
859   * @param conf
860   *          to persist serialized values into
861   *
862   * @throws IOException
863   *           on failure to read column family descriptors
864   */
865  @VisibleForTesting
866  static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String
867          .valueOf(familyDescriptor.getBlocksize());
868
869  /**
870   * Serialize column family to bloom type map to configuration. Invoked while
871   * configuring the MR job for incremental load.
872   *
873   * @param tableDescriptor
874   *          to read the properties from
875   * @param conf
876   *          to persist serialized values into
877   *
878   * @throws IOException
879   *           on failure to read column family descriptors
880   */
881  @VisibleForTesting
882  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
883    String bloomType = familyDescriptor.getBloomFilterType().toString();
884    if (bloomType == null) {
885      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
886    }
887    return bloomType;
888  };
889
890  /**
891   * Serialize column family to data block encoding map to configuration.
892   * Invoked while configuring the MR job for incremental load.
893   *
894   * @param tableDescriptor
895   *          to read the properties from
896   * @param conf
897   *          to persist serialized values into
898   * @throws IOException
899   *           on failure to read column family descriptors
900   */
901  @VisibleForTesting
902  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
903    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
904    if (encoding == null) {
905      encoding = DataBlockEncoding.NONE;
906    }
907    return encoding.toString();
908  };
909
910}