001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.nio.charset.Charset;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.Map;
035import java.util.Map.Entry;
036import java.util.Set;
037import java.util.TreeMap;
038import java.util.TreeSet;
039import java.util.UUID;
040import java.util.function.Function;
041import java.util.stream.Collectors;
042import org.apache.commons.lang3.StringUtils;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.HRegionLocation;
050import org.apache.hadoop.hbase.KeyValue;
051import org.apache.hadoop.hbase.PrivateCellUtil;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
055import org.apache.hadoop.hbase.client.Connection;
056import org.apache.hadoop.hbase.client.ConnectionFactory;
057import org.apache.hadoop.hbase.client.Put;
058import org.apache.hadoop.hbase.client.RegionLocator;
059import org.apache.hadoop.hbase.client.Table;
060import org.apache.hadoop.hbase.client.TableDescriptor;
061import org.apache.hadoop.hbase.fs.HFileSystem;
062import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
063import org.apache.hadoop.hbase.io.compress.Compression;
064import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
065import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
066import org.apache.hadoop.hbase.io.hfile.CacheConfig;
067import org.apache.hadoop.hbase.io.hfile.HFile;
068import org.apache.hadoop.hbase.io.hfile.HFileContext;
069import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
070import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
071import org.apache.hadoop.hbase.regionserver.BloomType;
072import org.apache.hadoop.hbase.regionserver.HStore;
073import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
074import org.apache.hadoop.hbase.regionserver.StoreUtils;
075import org.apache.hadoop.hbase.util.BloomFilterUtil;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.CommonFSUtils;
078import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
079import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
080import org.apache.hadoop.io.NullWritable;
081import org.apache.hadoop.io.SequenceFile;
082import org.apache.hadoop.io.Text;
083import org.apache.hadoop.mapreduce.Job;
084import org.apache.hadoop.mapreduce.OutputCommitter;
085import org.apache.hadoop.mapreduce.OutputFormat;
086import org.apache.hadoop.mapreduce.RecordWriter;
087import org.apache.hadoop.mapreduce.TaskAttemptContext;
088import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
089import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
090import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
091import org.apache.yetus.audience.InterfaceAudience;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095/**
096 * Writes HFiles. Passed Cells must arrive in order. Writes current time as the sequence id for the
097 * file. Sets the major compacted attribute on created {@link HFile}s. Calling write(null,null) will
098 * forcibly roll all HFiles being written.
099 * <p>
100 * Using this class as part of a MapReduce job is best done using
101 * {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
102 */
103@InterfaceAudience.Public
104public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> {
105  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
106
107  static class TableInfo {
108    private TableDescriptor tableDesctiptor;
109    private RegionLocator regionLocator;
110
111    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
112      this.tableDesctiptor = tableDesctiptor;
113      this.regionLocator = regionLocator;
114    }
115
116    public TableDescriptor getTableDescriptor() {
117      return tableDesctiptor;
118    }
119
120    public RegionLocator getRegionLocator() {
121      return regionLocator;
122    }
123  }
124
125  protected static final byte[] tableSeparator = Bytes.toBytes(";");
126
127  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
128    return Bytes.add(tableName, tableSeparator, suffix);
129  }
130
131  // The following constants are private since these are used by
132  // HFileOutputFormat2 to internally transfer data between job setup and
133  // reducer run using conf.
134  // These should not be changed by the client.
135  static final String COMPRESSION_FAMILIES_CONF_KEY =
136    "hbase.hfileoutputformat.families.compression";
137  static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
138  static final String BLOOM_PARAM_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomparam";
139  static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize";
140  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
141    "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
142
143  // This constant is public since the client can modify this when setting
144  // up their conf object and thus refer to this symbol.
145  // It is present for backwards compatibility reasons. Use it only to
146  // override the auto-detection of datablock encoding and compression.
147  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
148    "hbase.mapreduce.hfileoutputformat.datablock.encoding";
149  public static final String COMPRESSION_OVERRIDE_CONF_KEY =
150    "hbase.mapreduce.hfileoutputformat.compression";
151
152  /**
153   * Keep locality while generating HFiles for bulkload. See HBASE-12596
154   */
155  public static final String LOCALITY_SENSITIVE_CONF_KEY =
156    "hbase.bulkload.locality.sensitive.enabled";
157  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
158  static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name";
159  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
160    "hbase.mapreduce.use.multi.table.hfileoutputformat";
161
162  public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster.";
163  public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
164    REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
165  public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY =
166    REMOTE_CLUSTER_CONF_PREFIX + "zookeeper." + HConstants.CLIENT_PORT_STR;
167  public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY =
168    REMOTE_CLUSTER_CONF_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT;
169
170  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
171  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
172
173  @Override
174  public RecordWriter<ImmutableBytesWritable, Cell>
175    getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
176    return createRecordWriter(context, this.getOutputCommitter(context));
177  }
178
179  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
180    return combineTableNameSuffix(tableName, family);
181  }
182
183  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(
184    final TaskAttemptContext context, final OutputCommitter committer) throws IOException {
185
186    // Get the path of the temporary output file
187    final Path outputDir = ((FileOutputCommitter) committer).getWorkPath();
188    final Configuration conf = context.getConfiguration();
189    final boolean writeMultipleTables =
190      conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
191    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
192    if (writeTableNames == null || writeTableNames.isEmpty()) {
193      throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty");
194    }
195    final FileSystem fs = outputDir.getFileSystem(conf);
196    // These configs. are from hbase-*.xml
197    final long maxsize =
198      conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
199    // Invented config. Add to hbase-*.xml if other than default compression.
200    final String defaultCompressionStr =
201      conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
202    final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
203    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
204    final Algorithm overriddenCompression =
205      compressionStr != null ? Compression.getCompressionAlgorithmByName(compressionStr) : null;
206    final boolean compactionExclude =
207      conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
208    final Set<String> allTableNames = Arrays
209      .stream(writeTableNames.split(Bytes.toString(tableSeparator))).collect(Collectors.toSet());
210
211    // create a map from column family to the compression algorithm
212    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
213    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
214    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
215    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
216
217    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
218    final Map<byte[], DataBlockEncoding> datablockEncodingMap =
219      createFamilyDataBlockEncodingMap(conf);
220    final DataBlockEncoding overriddenEncoding =
221      dataBlockEncodingStr != null ? DataBlockEncoding.valueOf(dataBlockEncodingStr) : null;
222
223    return new RecordWriter<ImmutableBytesWritable, V>() {
224      // Map of families to writers and how much has been output on the writer.
225      private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
226      private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR);
227      private final long now = EnvironmentEdgeManager.currentTime();
228      private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames);
229
230      @Override
231      public void write(ImmutableBytesWritable row, V cell) throws IOException {
232        Cell kv = cell;
233        // null input == user explicitly wants to flush
234        if (row == null && kv == null) {
235          rollWriters(null);
236          return;
237        }
238
239        byte[] rowKey = CellUtil.cloneRow(kv);
240        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
241        byte[] family = CellUtil.cloneFamily(kv);
242        if (writeMultipleTables) {
243          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
244          tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString()
245            .getBytes(Charset.defaultCharset());
246          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
247            throw new IllegalArgumentException(
248              "TableName " + Bytes.toString(tableNameBytes) + " not expected");
249          }
250        }
251        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
252
253        WriterLength wl = this.writers.get(tableAndFamily);
254
255        // If this is a new column family, verify that the directory exists
256        if (wl == null) {
257          Path writerPath = null;
258          if (writeMultipleTables) {
259            Path tableRelPath = getTableRelativePath(tableNameBytes);
260            writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family)));
261          } else {
262            writerPath = new Path(outputDir, Bytes.toString(family));
263          }
264          fs.mkdirs(writerPath);
265          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
266        }
267
268        // This can only happen once a row is finished though
269        if (
270          wl != null && wl.written + length >= maxsize
271            && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0
272        ) {
273          rollWriters(wl);
274        }
275
276        // create a new WAL writer, if necessary
277        if (wl == null || wl.writer == null) {
278          InetSocketAddress[] favoredNodes = null;
279          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
280            HRegionLocation loc = null;
281            String tableName = Bytes.toString(tableNameBytes);
282            if (tableName != null) {
283              try (
284                Connection connection =
285                  ConnectionFactory.createConnection(createRemoteClusterConf(conf));
286                RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
287                loc = locator.getRegionLocation(rowKey);
288              } catch (Throwable e) {
289                LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey),
290                  tableName, e);
291                loc = null;
292              }
293            }
294            if (null == loc) {
295              LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
296            } else {
297              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
298              InetSocketAddress initialIsa =
299                new InetSocketAddress(loc.getHostname(), loc.getPort());
300              if (initialIsa.isUnresolved()) {
301                LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
302              } else {
303                LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
304                favoredNodes = new InetSocketAddress[] { initialIsa };
305              }
306            }
307          }
308          wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);
309
310        }
311
312        // we now have the proper WAL writer. full steam ahead
313        PrivateCellUtil.updateLatestStamp(cell, this.now);
314        wl.writer.append(kv);
315        wl.written += length;
316
317        // Copy the row so we know when a row transition.
318        this.previousRows.put(family, rowKey);
319      }
320
321      private Path getTableRelativePath(byte[] tableNameBytes) {
322        String tableName = Bytes.toString(tableNameBytes);
323        String[] tableNameParts = tableName.split(":");
324        Path tableRelPath = new Path(tableNameParts[0]);
325        if (tableNameParts.length > 1) {
326          tableRelPath = new Path(tableRelPath, tableNameParts[1]);
327        }
328        return tableRelPath;
329      }
330
331      private void rollWriters(WriterLength writerLength) throws IOException {
332        if (writerLength != null) {
333          closeWriter(writerLength);
334        } else {
335          for (WriterLength wl : this.writers.values()) {
336            closeWriter(wl);
337          }
338        }
339      }
340
341      private void closeWriter(WriterLength wl) throws IOException {
342        if (wl.writer != null) {
343          LOG.info(
344            "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
345          close(wl.writer);
346          wl.writer = null;
347        }
348        wl.written = 0;
349      }
350
351      private Configuration createRemoteClusterConf(Configuration conf) {
352        final Configuration newConf = new Configuration(conf);
353
354        final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY);
355        final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY);
356        final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY);
357
358        if (quorum != null && clientPort != null && parent != null) {
359          newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum);
360          newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort));
361          newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent);
362        }
363
364        for (Entry<String, String> entry : conf) {
365          String key = entry.getKey();
366          if (
367            REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY.equals(key)
368              || REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY.equals(key)
369              || REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY.equals(key)
370          ) {
371            // Handled them above
372            continue;
373          }
374
375          if (entry.getKey().startsWith(REMOTE_CLUSTER_CONF_PREFIX)) {
376            String originalKey = entry.getKey().substring(REMOTE_CLUSTER_CONF_PREFIX.length());
377            if (!originalKey.isEmpty()) {
378              newConf.set(originalKey, entry.getValue());
379            }
380          }
381        }
382
383        return newConf;
384      }
385
386      /*
387       * Create a new StoreFile.Writer.
388       * @return A WriterLength, containing a new StoreFile.Writer.
389       */
390      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
391          justification = "Not important")
392      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf,
393        InetSocketAddress[] favoredNodes) throws IOException {
394        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
395        Path familydir = new Path(outputDir, Bytes.toString(family));
396        if (writeMultipleTables) {
397          familydir =
398            new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family)));
399        }
400        WriterLength wl = new WriterLength();
401        Algorithm compression = overriddenCompression;
402        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
403        compression = compression == null ? defaultCompression : compression;
404        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
405        bloomType = bloomType == null ? BloomType.NONE : bloomType;
406        String bloomParam = bloomParamMap.get(tableAndFamily);
407        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
408          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
409        }
410        Integer blockSize = blockSizeMap.get(tableAndFamily);
411        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
412        DataBlockEncoding encoding = overriddenEncoding;
413        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
414        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
415        HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
416          .withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf))
417          .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize)
418          .withColumnFamily(family).withTableName(tableName);
419
420        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
421          contextBuilder.withIncludesTags(true);
422        }
423
424        HFileContext hFileContext = contextBuilder.build();
425        if (null == favoredNodes) {
426          wl.writer =
427            new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir)
428              .withBloomType(bloomType).withFileContext(hFileContext).build();
429        } else {
430          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
431            .withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext)
432            .withFavoredNodes(favoredNodes).build();
433        }
434
435        this.writers.put(tableAndFamily, wl);
436        return wl;
437      }
438
439      private void close(final StoreFileWriter w) throws IOException {
440        if (w != null) {
441          w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
442          w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
443          w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
444          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
445          w.appendTrackedTimestampsToMetadata();
446          w.close();
447        }
448      }
449
450      @Override
451      public void close(TaskAttemptContext c) throws IOException, InterruptedException {
452        for (WriterLength wl : this.writers.values()) {
453          close(wl.writer);
454        }
455      }
456    };
457  }
458
459  /**
460   * Configure block storage policy for CF after the directory is created.
461   */
462  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
463    byte[] tableAndFamily, Path cfPath) {
464    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
465      return;
466    }
467
468    String policy = conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
469      conf.get(STORAGE_POLICY_PROPERTY));
470    CommonFSUtils.setStoragePolicy(fs, cfPath, policy);
471  }
472
473  /*
474   * Data structure to hold a Writer and amount of data written on it.
475   */
476  static class WriterLength {
477    long written = 0;
478    StoreFileWriter writer = null;
479  }
480
481  /**
482   * Return the start keys of all of the regions in this table, as a list of ImmutableBytesWritable.
483   */
484  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
485    boolean writeMultipleTables) throws IOException {
486
487    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
488    for (RegionLocator regionLocator : regionLocators) {
489      TableName tableName = regionLocator.getName();
490      LOG.info("Looking up current regions for table " + tableName);
491      byte[][] byteKeys = regionLocator.getStartKeys();
492      for (byte[] byteKey : byteKeys) {
493        byte[] fullKey = byteKey; // HFileOutputFormat2 use case
494        if (writeMultipleTables) {
495          // MultiTableHFileOutputFormat use case
496          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
497        }
498        if (LOG.isDebugEnabled()) {
499          LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey));
500        }
501        ret.add(new ImmutableBytesWritable(fullKey));
502      }
503    }
504    return ret;
505  }
506
507  /**
508   * Write out a {@link SequenceFile} that can be read by {@link TotalOrderPartitioner} that
509   * contains the split points in startKeys.
510   */
511  @SuppressWarnings("deprecation")
512  private static void writePartitions(Configuration conf, Path partitionsPath,
513    List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
514    LOG.info("Writing partition information to " + partitionsPath);
515    if (startKeys.isEmpty()) {
516      throw new IllegalArgumentException("No regions passed");
517    }
518
519    // We're generating a list of split points, and we don't ever
520    // have keys < the first region (which has an empty start key)
521    // so we need to remove it. Otherwise we would end up with an
522    // empty reducer with index 0
523    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
524    ImmutableBytesWritable first = sorted.first();
525    if (writeMultipleTables) {
526      first =
527        new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get()));
528    }
529    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
530      throw new IllegalArgumentException(
531        "First region of table should have empty start key. Instead has: "
532          + Bytes.toStringBinary(first.get()));
533    }
534    sorted.remove(sorted.first());
535
536    // Write the actual file
537    FileSystem fs = partitionsPath.getFileSystem(conf);
538    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath,
539      ImmutableBytesWritable.class, NullWritable.class);
540
541    try {
542      for (ImmutableBytesWritable startKey : sorted) {
543        writer.append(startKey, NullWritable.get());
544      }
545    } finally {
546      writer.close();
547    }
548  }
549
550  /**
551   * Configure a MapReduce Job to perform an incremental load into the given table. This
552   * <ul>
553   * <li>Inspects the table to configure a total order partitioner</li>
554   * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
555   * <li>Sets the number of reduce tasks to match the current number of regions</li>
556   * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
557   * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
558   * PutSortReducer)</li>
559   * <li>Sets the HBase cluster key to load region locations for locality-sensitive</li>
560   * </ul>
561   * The user should be sure to set the map output value class to either KeyValue or Put before
562   * running this function.
563   */
564  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
565    throws IOException {
566    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
567    configureRemoteCluster(job, table.getConfiguration());
568  }
569
570  /**
571   * Configure a MapReduce Job to perform an incremental load into the given table. This
572   * <ul>
573   * <li>Inspects the table to configure a total order partitioner</li>
574   * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
575   * <li>Sets the number of reduce tasks to match the current number of regions</li>
576   * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
577   * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
578   * PutSortReducer)</li>
579   * </ul>
580   * The user should be sure to set the map output value class to either KeyValue or Put before
581   * running this function.
582   */
583  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
584    RegionLocator regionLocator) throws IOException {
585    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
586    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
587    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
588  }
589
590  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
591    Class<? extends OutputFormat<?, ?>> cls) throws IOException {
592    Configuration conf = job.getConfiguration();
593    job.setOutputKeyClass(ImmutableBytesWritable.class);
594    job.setOutputValueClass(MapReduceExtendedCell.class);
595    job.setOutputFormatClass(cls);
596
597    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
598      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
599    }
600    boolean writeMultipleTables = false;
601    if (MultiTableHFileOutputFormat.class.equals(cls)) {
602      writeMultipleTables = true;
603      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
604    }
605    // Based on the configured map output class, set the correct reducer to properly
606    // sort the incoming values.
607    // TODO it would be nice to pick one or the other of these formats.
608    if (
609      KeyValue.class.equals(job.getMapOutputValueClass())
610        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())
611    ) {
612      job.setReducerClass(CellSortReducer.class);
613    } else if (Put.class.equals(job.getMapOutputValueClass())) {
614      job.setReducerClass(PutSortReducer.class);
615    } else if (Text.class.equals(job.getMapOutputValueClass())) {
616      job.setReducerClass(TextSortReducer.class);
617    } else {
618      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
619    }
620
621    conf.setStrings("io.serializations", conf.get("io.serializations"),
622      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
623      CellSerialization.class.getName());
624
625    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
626      LOG.info("bulkload locality sensitive enabled");
627    }
628
629    /* Now get the region start keys for every table required */
630    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
631    List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size());
632    List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size());
633
634    for (TableInfo tableInfo : multiTableInfo) {
635      regionLocators.add(tableInfo.getRegionLocator());
636      String tn = writeMultipleTables
637        ? tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString()
638        : tableInfo.getRegionLocator().getName().getNameAsString();
639      allTableNames.add(tn);
640      tableDescriptors.add(tableInfo.getTableDescriptor());
641    }
642    // Record tablenames for creating writer by favored nodes, and decoding compression,
643    // block size and other attributes of columnfamily per table
644    conf.set(OUTPUT_TABLE_NAME_CONF_KEY,
645      StringUtils.join(allTableNames, Bytes.toString(tableSeparator)));
646    List<ImmutableBytesWritable> startKeys =
647      getRegionStartKeys(regionLocators, writeMultipleTables);
648    // Use table's region boundaries for TOP split points.
649    LOG.info("Configuring " + startKeys.size() + " reduce partitions "
650      + "to match current region count for all tables");
651    job.setNumReduceTasks(startKeys.size());
652
653    configurePartitioner(job, startKeys, writeMultipleTables);
654    // Set compression algorithms based on column families
655
656    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
657      serializeColumnFamilyAttribute(compressionDetails, tableDescriptors));
658    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
659      serializeColumnFamilyAttribute(blockSizeDetails, tableDescriptors));
660    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
661      serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors));
662    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
663      serializeColumnFamilyAttribute(bloomParamDetails, tableDescriptors));
664    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
665      serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
666
667    TableMapReduceUtil.addDependencyJars(job);
668    TableMapReduceUtil.initCredentials(job);
669    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
670  }
671
672  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor)
673    throws IOException {
674    Configuration conf = job.getConfiguration();
675
676    job.setOutputKeyClass(ImmutableBytesWritable.class);
677    job.setOutputValueClass(MapReduceExtendedCell.class);
678    job.setOutputFormatClass(HFileOutputFormat2.class);
679
680    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
681    singleTableDescriptor.add(tableDescriptor);
682
683    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
684    // Set compression algorithms based on column families
685    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
686      serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
687    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
688      serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
689    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
690      serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
691    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
692      serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
693    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
694      serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
695
696    TableMapReduceUtil.addDependencyJars(job);
697    TableMapReduceUtil.initCredentials(job);
698    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
699  }
700
701  /**
702   * Configure HBase cluster key for remote cluster to load region location for locality-sensitive
703   * if it's enabled. It's not necessary to call this method explicitly when the cluster key for
704   * HBase cluster to be used to load region location is configured in the job configuration. Call
705   * this method when another HBase cluster key is configured in the job configuration. For example,
706   * you should call when you load data from HBase cluster A using {@link TableInputFormat} and
707   * generate hfiles for HBase cluster B. Otherwise, HFileOutputFormat2 fetch location from cluster
708   * A and locality-sensitive won't working correctly.
709   * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using
710   * {@link Table#getConfiguration} as clusterConf. See HBASE-25608.
711   * @param job         which has configuration to be updated
712   * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive
713   * @see #configureIncrementalLoad(Job, Table, RegionLocator)
714   * @see #LOCALITY_SENSITIVE_CONF_KEY
715   * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY
716   * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY
717   * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY
718   */
719  public static void configureRemoteCluster(Job job, Configuration clusterConf) {
720    Configuration conf = job.getConfiguration();
721
722    if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
723      return;
724    }
725
726    final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM);
727    final int clientPort = clusterConf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
728      HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
729    final String parent =
730      clusterConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
731
732    conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum);
733    conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort);
734    conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent);
735
736    LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort
737      + "/" + parent);
738  }
739
740  /**
741   * Runs inside the task to deserialize column family to compression algorithm map from the
742   * configuration.
743   * @param conf to read the serialized values from
744   * @return a map from column family to the configured compression algorithm
745   */
746  @InterfaceAudience.Private
747  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) {
748    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY);
749    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
750    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
751      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
752      compressionMap.put(e.getKey(), algorithm);
753    }
754    return compressionMap;
755  }
756
757  /**
758   * Runs inside the task to deserialize column family to bloom filter type map from the
759   * configuration.
760   * @param conf to read the serialized values from
761   * @return a map from column family to the the configured bloom filter type
762   */
763  @InterfaceAudience.Private
764  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
765    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY);
766    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
767    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
768      BloomType bloomType = BloomType.valueOf(e.getValue());
769      bloomTypeMap.put(e.getKey(), bloomType);
770    }
771    return bloomTypeMap;
772  }
773
774  /**
775   * Runs inside the task to deserialize column family to bloom filter param map from the
776   * configuration.
777   * @param conf to read the serialized values from
778   * @return a map from column family to the the configured bloom filter param
779   */
780  @InterfaceAudience.Private
781  static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
782    return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
783  }
784
785  /**
786   * Runs inside the task to deserialize column family to block size map from the configuration.
787   * @param conf to read the serialized values from
788   * @return a map from column family to the configured block size
789   */
790  @InterfaceAudience.Private
791  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
792    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY);
793    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
794    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
795      Integer blockSize = Integer.parseInt(e.getValue());
796      blockSizeMap.put(e.getKey(), blockSize);
797    }
798    return blockSizeMap;
799  }
800
801  /**
802   * Runs inside the task to deserialize column family to data block encoding type map from the
803   * configuration.
804   * @param conf to read the serialized values from
805   * @return a map from column family to HFileDataBlockEncoder for the configured data block type
806   *         for the family
807   */
808  @InterfaceAudience.Private
809  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) {
810    Map<byte[], String> stringMap =
811      createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
812    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
813    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
814      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
815    }
816    return encoderMap;
817  }
818
819  /**
820   * Run inside the task to deserialize column family to given conf value map.
821   * @param conf     to read the serialized values from
822   * @param confName conf key to read from the configuration
823   * @return a map of column family to the given configuration value
824   */
825  private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
826    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
827    String confVal = conf.get(confName, "");
828    for (String familyConf : confVal.split("&")) {
829      String[] familySplit = familyConf.split("=");
830      if (familySplit.length != 2) {
831        continue;
832      }
833      try {
834        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
835          URLDecoder.decode(familySplit[1], "UTF-8"));
836      } catch (UnsupportedEncodingException e) {
837        // will not happen with UTF-8 encoding
838        throw new AssertionError(e);
839      }
840    }
841    return confValMap;
842  }
843
844  /**
845   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
846   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
847   */
848  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints,
849    boolean writeMultipleTables) throws IOException {
850    Configuration conf = job.getConfiguration();
851    // create the partitions file
852    FileSystem fs = FileSystem.get(conf);
853    String hbaseTmpFsDir =
854      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging");
855    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
856    fs.makeQualified(partitionsPath);
857    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
858    fs.deleteOnExit(partitionsPath);
859
860    // configure job to use it
861    job.setPartitionerClass(TotalOrderPartitioner.class);
862    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
863  }
864
865  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
866      value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
867  @InterfaceAudience.Private
868  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn,
869    List<TableDescriptor> allTables) throws UnsupportedEncodingException {
870    StringBuilder attributeValue = new StringBuilder();
871    int i = 0;
872    for (TableDescriptor tableDescriptor : allTables) {
873      if (tableDescriptor == null) {
874        // could happen with mock table instance
875        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
876        return "";
877      }
878      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
879        if (i++ > 0) {
880          attributeValue.append('&');
881        }
882        attributeValue.append(URLEncoder
883          .encode(Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(),
884            familyDescriptor.getName())), "UTF-8"));
885        attributeValue.append('=');
886        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
887      }
888    }
889    // Get rid of the last ampersand
890    return attributeValue.toString();
891  }
892
893  /**
894   * Serialize column family to compression algorithm map to configuration. Invoked while
895   * configuring the MR job for incremental load.
896   */
897  @InterfaceAudience.Private
898  static Function<ColumnFamilyDescriptor, String> compressionDetails =
899    familyDescriptor -> familyDescriptor.getCompressionType().getName();
900
901  /**
902   * Serialize column family to block size map to configuration. Invoked while configuring the MR
903   * job for incremental load.
904   */
905  @InterfaceAudience.Private
906  static Function<ColumnFamilyDescriptor, String> blockSizeDetails =
907    familyDescriptor -> String.valueOf(familyDescriptor.getBlocksize());
908
909  /**
910   * Serialize column family to bloom type map to configuration. Invoked while configuring the MR
911   * job for incremental load.
912   */
913  @InterfaceAudience.Private
914  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
915    String bloomType = familyDescriptor.getBloomFilterType().toString();
916    if (bloomType == null) {
917      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
918    }
919    return bloomType;
920  };
921
922  /**
923   * Serialize column family to bloom param map to configuration. Invoked while configuring the MR
924   * job for incremental load.
925   */
926  @InterfaceAudience.Private
927  static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
928    BloomType bloomType = familyDescriptor.getBloomFilterType();
929    String bloomParam = "";
930    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
931      bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
932    }
933    return bloomParam;
934  };
935
936  /**
937   * Serialize column family to data block encoding map to configuration. Invoked while configuring
938   * the MR job for incremental load.
939   */
940  @InterfaceAudience.Private
941  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
942    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
943    if (encoding == null) {
944      encoding = DataBlockEncoding.NONE;
945    }
946    return encoding.toString();
947  };
948
949}