001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.nio.charset.Charset;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Set;
038import java.util.TreeMap;
039import java.util.TreeSet;
040import java.util.UUID;
041import java.util.function.Function;
042import java.util.stream.Collectors;
043import org.apache.commons.lang3.StringUtils;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.Cell;
048import org.apache.hadoop.hbase.CellUtil;
049import org.apache.hadoop.hbase.ExtendedCell;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.HRegionLocation;
052import org.apache.hadoop.hbase.KeyValue;
053import org.apache.hadoop.hbase.KeyValueUtil;
054import org.apache.hadoop.hbase.PrivateCellUtil;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
058import org.apache.hadoop.hbase.client.Connection;
059import org.apache.hadoop.hbase.client.ConnectionFactory;
060import org.apache.hadoop.hbase.client.Put;
061import org.apache.hadoop.hbase.client.RegionLocator;
062import org.apache.hadoop.hbase.client.Table;
063import org.apache.hadoop.hbase.client.TableDescriptor;
064import org.apache.hadoop.hbase.fs.HFileSystem;
065import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
066import org.apache.hadoop.hbase.io.compress.Compression;
067import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
068import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
069import org.apache.hadoop.hbase.io.hfile.CacheConfig;
070import org.apache.hadoop.hbase.io.hfile.HFile;
071import org.apache.hadoop.hbase.io.hfile.HFileContext;
072import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
073import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
074import org.apache.hadoop.hbase.regionserver.BloomType;
075import org.apache.hadoop.hbase.regionserver.HStore;
076import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
077import org.apache.hadoop.hbase.regionserver.StoreUtils;
078import org.apache.hadoop.hbase.util.BloomFilterUtil;
079import org.apache.hadoop.hbase.util.Bytes;
080import org.apache.hadoop.hbase.util.CommonFSUtils;
081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
082import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
083import org.apache.hadoop.io.NullWritable;
084import org.apache.hadoop.io.SequenceFile;
085import org.apache.hadoop.io.Text;
086import org.apache.hadoop.io.Writable;
087import org.apache.hadoop.mapreduce.Job;
088import org.apache.hadoop.mapreduce.OutputCommitter;
089import org.apache.hadoop.mapreduce.OutputFormat;
090import org.apache.hadoop.mapreduce.RecordWriter;
091import org.apache.hadoop.mapreduce.TaskAttemptContext;
092import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
093import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
094import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
095import org.apache.yetus.audience.InterfaceAudience;
096import org.slf4j.Logger;
097import org.slf4j.LoggerFactory;
098
099/**
100 * Writes HFiles. Passed Cells must arrive in order. Writes current time as the sequence id for the
101 * file. Sets the major compacted attribute on created {@link HFile}s. Calling write(null,null) will
102 * forcibly roll all HFiles being written.
103 * <p>
104 * Using this class as part of a MapReduce job is best done using
105 * {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
106 */
107@InterfaceAudience.Public
108public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> {
109  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
110
111  static class TableInfo {
112    private TableDescriptor tableDesctiptor;
113    private RegionLocator regionLocator;
114
115    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
116      this.tableDesctiptor = tableDesctiptor;
117      this.regionLocator = regionLocator;
118    }
119
120    public TableDescriptor getTableDescriptor() {
121      return tableDesctiptor;
122    }
123
124    public RegionLocator getRegionLocator() {
125      return regionLocator;
126    }
127  }
128
129  protected static final byte[] tableSeparator = Bytes.toBytes(";");
130
131  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
132    return Bytes.add(tableName, tableSeparator, suffix);
133  }
134
135  // The following constants are private since these are used by
136  // HFileOutputFormat2 to internally transfer data between job setup and
137  // reducer run using conf.
138  // These should not be changed by the client.
139  static final String COMPRESSION_FAMILIES_CONF_KEY =
140    "hbase.hfileoutputformat.families.compression";
141  static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
142  static final String BLOOM_PARAM_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomparam";
143  static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize";
144  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
145    "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
146
147  // This constant is public since the client can modify this when setting
148  // up their conf object and thus refer to this symbol.
149  // It is present for backwards compatibility reasons. Use it only to
150  // override the auto-detection of datablock encoding and compression.
151  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
152    "hbase.mapreduce.hfileoutputformat.datablock.encoding";
153  public static final String COMPRESSION_OVERRIDE_CONF_KEY =
154    "hbase.mapreduce.hfileoutputformat.compression";
155
156  /**
157   * Keep locality while generating HFiles for bulkload. See HBASE-12596
158   */
159  public static final String LOCALITY_SENSITIVE_CONF_KEY =
160    "hbase.bulkload.locality.sensitive.enabled";
161  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
162  static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name";
163  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
164    "hbase.mapreduce.use.multi.table.hfileoutputformat";
165
166  /**
167   * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config
168   * for internal usage in jobs like WALPlayer which need to use features of ExtendedCell.
169   */
170  @InterfaceAudience.Private
171  public static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
172    "hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
173  static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
174
175  @InterfaceAudience.Private
176  public static final String DISK_BASED_SORTING_ENABLED_KEY =
177    "hbase.mapreduce.hfileoutputformat.disk.based.sorting.enabled";
178  private static final boolean DISK_BASED_SORTING_ENABLED_DEFAULT = false;
179
180  public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster.";
181  public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
182    REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
183  public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY =
184    REMOTE_CLUSTER_CONF_PREFIX + "zookeeper." + HConstants.CLIENT_PORT_STR;
185  public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY =
186    REMOTE_CLUSTER_CONF_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT;
187
188  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
189  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
190
191  @Override
192  public RecordWriter<ImmutableBytesWritable, Cell>
193    getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
194    return createRecordWriter(context, this.getOutputCommitter(context));
195  }
196
197  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
198    return combineTableNameSuffix(tableName, family);
199  }
200
201  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(
202    final TaskAttemptContext context, final OutputCommitter committer) throws IOException {
203
204    // Get the path of the temporary output file
205    final Path outputDir = ((PathOutputCommitter) committer).getWorkPath();
206    final Configuration conf = context.getConfiguration();
207    final boolean writeMultipleTables =
208      conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
209    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
210    if (writeTableNames == null || writeTableNames.isEmpty()) {
211      throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty");
212    }
213    final FileSystem fs = outputDir.getFileSystem(conf);
214    // These configs. are from hbase-*.xml
215    final long maxsize =
216      conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
217    // Invented config. Add to hbase-*.xml if other than default compression.
218    final String defaultCompressionStr =
219      conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
220    final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
221    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
222    final Algorithm overriddenCompression =
223      compressionStr != null ? Compression.getCompressionAlgorithmByName(compressionStr) : null;
224    final boolean compactionExclude =
225      conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
226    final Set<String> allTableNames = Arrays
227      .stream(writeTableNames.split(Bytes.toString(tableSeparator))).collect(Collectors.toSet());
228
229    // create a map from column family to the compression algorithm
230    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
231    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
232    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
233    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
234
235    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
236    final Map<byte[], DataBlockEncoding> datablockEncodingMap =
237      createFamilyDataBlockEncodingMap(conf);
238    final DataBlockEncoding overriddenEncoding =
239      dataBlockEncodingStr != null ? DataBlockEncoding.valueOf(dataBlockEncodingStr) : null;
240
241    return new RecordWriter<ImmutableBytesWritable, V>() {
242      // Map of families to writers and how much has been output on the writer.
243      private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
244      private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR);
245      private final long now = EnvironmentEdgeManager.currentTime();
246      private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames);
247
248      @Override
249      public void write(ImmutableBytesWritable row, V cell) throws IOException {
250        // null input == user explicitly wants to flush
251        if (row == null && cell == null) {
252          rollWriters(null);
253          return;
254        }
255
256        ExtendedCell kv = PrivateCellUtil.ensureExtendedCell(cell);
257        byte[] rowKey = CellUtil.cloneRow(kv);
258        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
259        byte[] family = CellUtil.cloneFamily(kv);
260        if (writeMultipleTables) {
261          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
262          tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString()
263            .getBytes(Charset.defaultCharset());
264          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
265            throw new IllegalArgumentException(
266              "TableName " + Bytes.toString(tableNameBytes) + " not expected");
267          }
268        }
269        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
270
271        WriterLength wl = this.writers.get(tableAndFamily);
272
273        // If this is a new column family, verify that the directory exists
274        if (wl == null) {
275          Path writerPath = null;
276          if (writeMultipleTables) {
277            Path tableRelPath = getTableRelativePath(tableNameBytes);
278            writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family)));
279          } else {
280            writerPath = new Path(outputDir, Bytes.toString(family));
281          }
282          fs.mkdirs(writerPath);
283          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
284        }
285
286        // This can only happen once a row is finished though
287        if (
288          wl != null && wl.written + length >= maxsize
289            && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0
290        ) {
291          rollWriters(wl);
292        }
293
294        // create a new WAL writer, if necessary
295        if (wl == null || wl.writer == null) {
296          InetSocketAddress[] favoredNodes = null;
297          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
298            HRegionLocation loc = null;
299            String tableName = Bytes.toString(tableNameBytes);
300            if (tableName != null) {
301              try (
302                Connection connection =
303                  ConnectionFactory.createConnection(createRemoteClusterConf(conf));
304                RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
305                loc = locator.getRegionLocation(rowKey);
306              } catch (Throwable e) {
307                LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey),
308                  tableName, e);
309                loc = null;
310              }
311            }
312            if (null == loc) {
313              LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
314            } else {
315              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
316              InetSocketAddress initialIsa =
317                new InetSocketAddress(loc.getHostname(), loc.getPort());
318              if (initialIsa.isUnresolved()) {
319                LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
320              } else {
321                LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
322                favoredNodes = new InetSocketAddress[] { initialIsa };
323              }
324            }
325          }
326          wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);
327
328        }
329
330        // we now have the proper WAL writer. full steam ahead
331        PrivateCellUtil.updateLatestStamp(kv, this.now);
332        wl.writer.append((ExtendedCell) kv);
333        wl.written += length;
334
335        // Copy the row so we know when a row transition.
336        this.previousRows.put(family, rowKey);
337      }
338
339      private Path getTableRelativePath(byte[] tableNameBytes) {
340        String tableName = Bytes.toString(tableNameBytes);
341        String[] tableNameParts = tableName.split(":");
342        Path tableRelPath = new Path(tableNameParts[0]);
343        if (tableNameParts.length > 1) {
344          tableRelPath = new Path(tableRelPath, tableNameParts[1]);
345        }
346        return tableRelPath;
347      }
348
349      private void rollWriters(WriterLength writerLength) throws IOException {
350        if (writerLength != null) {
351          closeWriter(writerLength);
352        } else {
353          for (WriterLength wl : this.writers.values()) {
354            closeWriter(wl);
355          }
356        }
357      }
358
359      private void closeWriter(WriterLength wl) throws IOException {
360        if (wl.writer != null) {
361          LOG.info(
362            "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
363          close(wl.writer);
364          wl.writer = null;
365        }
366        wl.written = 0;
367      }
368
369      private Configuration createRemoteClusterConf(Configuration conf) {
370        final Configuration newConf = new Configuration(conf);
371
372        final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY);
373        final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY);
374        final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY);
375
376        if (quorum != null && clientPort != null && parent != null) {
377          newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum);
378          newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort));
379          newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent);
380        }
381
382        for (Entry<String, String> entry : conf) {
383          String key = entry.getKey();
384          if (
385            REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY.equals(key)
386              || REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY.equals(key)
387              || REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY.equals(key)
388          ) {
389            // Handled them above
390            continue;
391          }
392
393          if (entry.getKey().startsWith(REMOTE_CLUSTER_CONF_PREFIX)) {
394            String originalKey = entry.getKey().substring(REMOTE_CLUSTER_CONF_PREFIX.length());
395            if (!originalKey.isEmpty()) {
396              newConf.set(originalKey, entry.getValue());
397            }
398          }
399        }
400
401        return newConf;
402      }
403
404      /*
405       * Create a new StoreFile.Writer.
406       * @return A WriterLength, containing a new StoreFile.Writer.
407       */
408      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
409          justification = "Not important")
410      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf,
411        InetSocketAddress[] favoredNodes) throws IOException {
412        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
413        Path familydir = new Path(outputDir, Bytes.toString(family));
414        if (writeMultipleTables) {
415          familydir =
416            new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family)));
417        }
418        WriterLength wl = new WriterLength();
419        Algorithm compression = overriddenCompression;
420        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
421        compression = compression == null ? defaultCompression : compression;
422        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
423        bloomType = bloomType == null ? BloomType.NONE : bloomType;
424        String bloomParam = bloomParamMap.get(tableAndFamily);
425        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
426          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
427        }
428        Integer blockSize = blockSizeMap.get(tableAndFamily);
429        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
430        DataBlockEncoding encoding = overriddenEncoding;
431        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
432        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
433        HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
434          .withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf))
435          .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize)
436          .withColumnFamily(family).withTableName(tableName)
437          .withCreateTime(EnvironmentEdgeManager.currentTime());
438
439        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
440          contextBuilder.withIncludesTags(true);
441        }
442
443        HFileContext hFileContext = contextBuilder.build();
444        if (null == favoredNodes) {
445          wl.writer =
446            new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir)
447              .withBloomType(bloomType).withFileContext(hFileContext).build();
448        } else {
449          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
450            .withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext)
451            .withFavoredNodes(favoredNodes).build();
452        }
453
454        this.writers.put(tableAndFamily, wl);
455        return wl;
456      }
457
458      private void close(final StoreFileWriter w) throws IOException {
459        if (w != null) {
460          w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
461          w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
462          w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
463          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
464          w.appendTrackedTimestampsToMetadata();
465          w.close();
466        }
467      }
468
469      @Override
470      public void close(TaskAttemptContext c) throws IOException, InterruptedException {
471        for (WriterLength wl : this.writers.values()) {
472          close(wl.writer);
473        }
474      }
475    };
476  }
477
478  /**
479   * Configure block storage policy for CF after the directory is created.
480   */
481  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
482    byte[] tableAndFamily, Path cfPath) {
483    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
484      return;
485    }
486
487    String policy = conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
488      conf.get(STORAGE_POLICY_PROPERTY));
489    CommonFSUtils.setStoragePolicy(fs, cfPath, policy);
490  }
491
492  /*
493   * Data structure to hold a Writer and amount of data written on it.
494   */
495  static class WriterLength {
496    long written = 0;
497    StoreFileWriter writer = null;
498  }
499
500  /**
501   * Return the start keys of all of the regions in this table, as a list of ImmutableBytesWritable.
502   */
503  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
504    boolean writeMultipleTables) throws IOException {
505
506    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
507    for (RegionLocator regionLocator : regionLocators) {
508      TableName tableName = regionLocator.getName();
509      LOG.info("Looking up current regions for table " + tableName);
510      byte[][] byteKeys = regionLocator.getStartKeys();
511      for (byte[] byteKey : byteKeys) {
512        byte[] fullKey = byteKey; // HFileOutputFormat2 use case
513        if (writeMultipleTables) {
514          // MultiTableHFileOutputFormat use case
515          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
516        }
517        if (LOG.isDebugEnabled()) {
518          LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey));
519        }
520        ret.add(new ImmutableBytesWritable(fullKey));
521      }
522    }
523    return ret;
524  }
525
526  /**
527   * Write out a {@link SequenceFile} that can be read by {@link TotalOrderPartitioner} that
528   * contains the split points in startKeys.
529   */
530  @SuppressWarnings("deprecation")
531  private static void writePartitions(Configuration conf, Path partitionsPath,
532    List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
533    LOG.info("Writing partition information to " + partitionsPath);
534    if (startKeys.isEmpty()) {
535      throw new IllegalArgumentException("No regions passed");
536    }
537
538    // We're generating a list of split points, and we don't ever
539    // have keys < the first region (which has an empty start key)
540    // so we need to remove it. Otherwise we would end up with an
541    // empty reducer with index 0
542    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
543    ImmutableBytesWritable first = sorted.first();
544    if (writeMultipleTables) {
545      first =
546        new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get()));
547    }
548    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
549      throw new IllegalArgumentException(
550        "First region of table should have empty start key. Instead has: "
551          + Bytes.toStringBinary(first.get()));
552    }
553    sorted.remove(sorted.first());
554
555    // Write the actual file
556    FileSystem fs = partitionsPath.getFileSystem(conf);
557    boolean diskBasedSortingEnabled = diskBasedSortingEnabled(conf);
558    Class<? extends Writable> keyClass =
559      diskBasedSortingEnabled ? KeyOnlyCellComparable.class : ImmutableBytesWritable.class;
560    SequenceFile.Writer writer =
561      SequenceFile.createWriter(fs, conf, partitionsPath, keyClass, NullWritable.class);
562
563    try {
564      for (ImmutableBytesWritable startKey : sorted) {
565        Writable writable = diskBasedSortingEnabled
566          ? new KeyOnlyCellComparable(KeyValueUtil.createFirstOnRow(startKey.get()))
567          : startKey;
568
569        writer.append(writable, NullWritable.get());
570      }
571    } finally {
572      writer.close();
573    }
574  }
575
576  /**
577   * Configure a MapReduce Job to perform an incremental load into the given table. This
578   * <ul>
579   * <li>Inspects the table to configure a total order partitioner</li>
580   * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
581   * <li>Sets the number of reduce tasks to match the current number of regions</li>
582   * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
583   * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
584   * PutSortReducer)</li>
585   * <li>Sets the HBase cluster key to load region locations for locality-sensitive</li>
586   * </ul>
587   * The user should be sure to set the map output value class to either KeyValue or Put before
588   * running this function.
589   */
590  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
591    throws IOException {
592    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
593    configureRemoteCluster(job, table.getConfiguration());
594  }
595
596  /**
597   * Configure a MapReduce Job to perform an incremental load into the given table. This
598   * <ul>
599   * <li>Inspects the table to configure a total order partitioner</li>
600   * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
601   * <li>Sets the number of reduce tasks to match the current number of regions</li>
602   * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
603   * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
604   * PutSortReducer)</li>
605   * </ul>
606   * The user should be sure to set the map output value class to either KeyValue or Put before
607   * running this function.
608   */
609  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
610    RegionLocator regionLocator) throws IOException {
611    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
612    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
613    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
614  }
615
616  public static boolean diskBasedSortingEnabled(Configuration conf) {
617    return conf.getBoolean(DISK_BASED_SORTING_ENABLED_KEY, DISK_BASED_SORTING_ENABLED_DEFAULT);
618  }
619
620  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
621    Class<? extends OutputFormat<?, ?>> cls) throws IOException {
622    Configuration conf = job.getConfiguration();
623    job.setOutputKeyClass(ImmutableBytesWritable.class);
624    job.setOutputValueClass(MapReduceExtendedCell.class);
625    job.setOutputFormatClass(cls);
626
627    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
628      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
629    }
630    boolean writeMultipleTables = false;
631    if (MultiTableHFileOutputFormat.class.equals(cls)) {
632      writeMultipleTables = true;
633      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
634    }
635    // Based on the configured map output class, set the correct reducer to properly
636    // sort the incoming values.
637    // TODO it would be nice to pick one or the other of these formats.
638    boolean diskBasedSorting = diskBasedSortingEnabled(conf);
639
640    if (diskBasedSorting) {
641      job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
642      job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
643      job.setReducerClass(PreSortedCellsReducer.class);
644    } else if (
645      KeyValue.class.equals(job.getMapOutputValueClass())
646        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())
647    ) {
648      job.setReducerClass(CellSortReducer.class);
649    } else if (Put.class.equals(job.getMapOutputValueClass())) {
650      job.setReducerClass(PutSortReducer.class);
651    } else if (Text.class.equals(job.getMapOutputValueClass())) {
652      job.setReducerClass(TextSortReducer.class);
653    } else {
654      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
655    }
656
657    mergeSerializations(conf);
658
659    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
660      LOG.info("bulkload locality sensitive enabled");
661    }
662
663    /* Now get the region start keys for every table required */
664    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
665    List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size());
666    List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size());
667
668    for (TableInfo tableInfo : multiTableInfo) {
669      regionLocators.add(tableInfo.getRegionLocator());
670      String tn = writeMultipleTables
671        ? tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString()
672        : tableInfo.getRegionLocator().getName().getNameAsString();
673      allTableNames.add(tn);
674      tableDescriptors.add(tableInfo.getTableDescriptor());
675    }
676    // Record tablenames for creating writer by favored nodes, and decoding compression,
677    // block size and other attributes of columnfamily per table
678    conf.set(OUTPUT_TABLE_NAME_CONF_KEY,
679      StringUtils.join(allTableNames, Bytes.toString(tableSeparator)));
680    List<ImmutableBytesWritable> startKeys =
681      getRegionStartKeys(regionLocators, writeMultipleTables);
682    // Use table's region boundaries for TOP split points.
683    LOG.info("Configuring " + startKeys.size() + " reduce partitions "
684      + "to match current region count for all tables");
685    job.setNumReduceTasks(startKeys.size());
686
687    configurePartitioner(job, startKeys, writeMultipleTables);
688    // Set compression algorithms based on column families
689
690    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
691      serializeColumnFamilyAttribute(compressionDetails, tableDescriptors));
692    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
693      serializeColumnFamilyAttribute(blockSizeDetails, tableDescriptors));
694    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
695      serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors));
696    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
697      serializeColumnFamilyAttribute(bloomParamDetails, tableDescriptors));
698    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
699      serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
700
701    TableMapReduceUtil.addDependencyJars(job);
702    TableMapReduceUtil.initCredentials(job);
703    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
704  }
705
706  private static void mergeSerializations(Configuration conf) {
707    List<String> serializations = new ArrayList<>();
708
709    // add any existing values that have been set
710    String[] existing = conf.getStrings("io.serializations");
711    if (existing != null) {
712      Collections.addAll(serializations, existing);
713    }
714
715    serializations.add(MutationSerialization.class.getName());
716    serializations.add(ResultSerialization.class.getName());
717
718    // Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's
719    // SerializationFactory runs through serializations in the order they are registered.
720    // We want to register ExtendedCellSerialization before CellSerialization because both
721    // work for ExtendedCells but only ExtendedCellSerialization handles them properly.
722    if (
723      conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
724        EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT)
725    ) {
726      serializations.add(ExtendedCellSerialization.class.getName());
727    }
728    serializations.add(CellSerialization.class.getName());
729
730    conf.setStrings("io.serializations", serializations.toArray(new String[0]));
731  }
732
733  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor)
734    throws IOException {
735    Configuration conf = job.getConfiguration();
736
737    job.setOutputKeyClass(ImmutableBytesWritable.class);
738    job.setOutputValueClass(MapReduceExtendedCell.class);
739    job.setOutputFormatClass(HFileOutputFormat2.class);
740
741    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
742    singleTableDescriptor.add(tableDescriptor);
743
744    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
745    // Set compression algorithms based on column families
746    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
747      serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
748    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
749      serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
750    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
751      serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
752    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
753      serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
754    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
755      serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
756
757    TableMapReduceUtil.addDependencyJars(job);
758    TableMapReduceUtil.initCredentials(job);
759    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
760  }
761
762  /**
763   * Configure HBase cluster key for remote cluster to load region location for locality-sensitive
764   * if it's enabled. It's not necessary to call this method explicitly when the cluster key for
765   * HBase cluster to be used to load region location is configured in the job configuration. Call
766   * this method when another HBase cluster key is configured in the job configuration. For example,
767   * you should call when you load data from HBase cluster A using {@link TableInputFormat} and
768   * generate hfiles for HBase cluster B. Otherwise, HFileOutputFormat2 fetch location from cluster
769   * A and locality-sensitive won't working correctly.
770   * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using
771   * {@link Table#getConfiguration} as clusterConf. See HBASE-25608.
772   * @param job         which has configuration to be updated
773   * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive
774   * @see #configureIncrementalLoad(Job, Table, RegionLocator)
775   * @see #LOCALITY_SENSITIVE_CONF_KEY
776   * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY
777   * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY
778   * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY
779   */
780  public static void configureRemoteCluster(Job job, Configuration clusterConf) throws IOException {
781    Configuration conf = job.getConfiguration();
782
783    if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
784      return;
785    }
786
787    final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM);
788    final int clientPort = clusterConf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
789      HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
790    final String parent =
791      clusterConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
792
793    conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum);
794    conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort);
795    conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent);
796
797    TableMapReduceUtil.initCredentialsForCluster(job, clusterConf);
798
799    LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort
800      + "/" + parent);
801  }
802
803  /**
804   * Runs inside the task to deserialize column family to compression algorithm map from the
805   * configuration.
806   * @param conf to read the serialized values from
807   * @return a map from column family to the configured compression algorithm
808   */
809  @InterfaceAudience.Private
810  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) {
811    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY);
812    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
813    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
814      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
815      compressionMap.put(e.getKey(), algorithm);
816    }
817    return compressionMap;
818  }
819
820  /**
821   * Runs inside the task to deserialize column family to bloom filter type map from the
822   * configuration.
823   * @param conf to read the serialized values from
824   * @return a map from column family to the the configured bloom filter type
825   */
826  @InterfaceAudience.Private
827  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
828    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY);
829    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
830    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
831      BloomType bloomType = BloomType.valueOf(e.getValue());
832      bloomTypeMap.put(e.getKey(), bloomType);
833    }
834    return bloomTypeMap;
835  }
836
837  /**
838   * Runs inside the task to deserialize column family to bloom filter param map from the
839   * configuration.
840   * @param conf to read the serialized values from
841   * @return a map from column family to the the configured bloom filter param
842   */
843  @InterfaceAudience.Private
844  static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
845    return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
846  }
847
848  /**
849   * Runs inside the task to deserialize column family to block size map from the configuration.
850   * @param conf to read the serialized values from
851   * @return a map from column family to the configured block size
852   */
853  @InterfaceAudience.Private
854  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
855    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY);
856    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
857    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
858      Integer blockSize = Integer.parseInt(e.getValue());
859      blockSizeMap.put(e.getKey(), blockSize);
860    }
861    return blockSizeMap;
862  }
863
864  /**
865   * Runs inside the task to deserialize column family to data block encoding type map from the
866   * configuration.
867   * @param conf to read the serialized values from
868   * @return a map from column family to HFileDataBlockEncoder for the configured data block type
869   *         for the family
870   */
871  @InterfaceAudience.Private
872  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) {
873    Map<byte[], String> stringMap =
874      createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
875    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
876    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
877      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
878    }
879    return encoderMap;
880  }
881
882  /**
883   * Run inside the task to deserialize column family to given conf value map.
884   * @param conf     to read the serialized values from
885   * @param confName conf key to read from the configuration
886   * @return a map of column family to the given configuration value
887   */
888  private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
889    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
890    String confVal = conf.get(confName, "");
891    for (String familyConf : confVal.split("&")) {
892      String[] familySplit = familyConf.split("=");
893      if (familySplit.length != 2) {
894        continue;
895      }
896      try {
897        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
898          URLDecoder.decode(familySplit[1], "UTF-8"));
899      } catch (UnsupportedEncodingException e) {
900        // will not happen with UTF-8 encoding
901        throw new AssertionError(e);
902      }
903    }
904    return confValMap;
905  }
906
907  /**
908   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
909   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
910   */
911  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints,
912    boolean writeMultipleTables) throws IOException {
913    Configuration conf = job.getConfiguration();
914    // create the partitions file
915    FileSystem fs = FileSystem.get(conf);
916    String hbaseTmpFsDir =
917      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging");
918    Path partitionsPath =
919      fs.makeQualified(new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()));
920    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
921    fs.deleteOnExit(partitionsPath);
922
923    // configure job to use it
924    job.setPartitionerClass(TotalOrderPartitioner.class);
925    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
926  }
927
928  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
929      value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
930  @InterfaceAudience.Private
931  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn,
932    List<TableDescriptor> allTables) throws UnsupportedEncodingException {
933    StringBuilder attributeValue = new StringBuilder();
934    int i = 0;
935    for (TableDescriptor tableDescriptor : allTables) {
936      if (tableDescriptor == null) {
937        // could happen with mock table instance
938        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
939        return "";
940      }
941      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
942        if (i++ > 0) {
943          attributeValue.append('&');
944        }
945        attributeValue.append(URLEncoder
946          .encode(Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(),
947            familyDescriptor.getName())), "UTF-8"));
948        attributeValue.append('=');
949        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
950      }
951    }
952    // Get rid of the last ampersand
953    return attributeValue.toString();
954  }
955
956  /**
957   * Serialize column family to compression algorithm map to configuration. Invoked while
958   * configuring the MR job for incremental load.
959   */
960  @InterfaceAudience.Private
961  static Function<ColumnFamilyDescriptor, String> compressionDetails =
962    familyDescriptor -> familyDescriptor.getCompressionType().getName();
963
964  /**
965   * Serialize column family to block size map to configuration. Invoked while configuring the MR
966   * job for incremental load.
967   */
968  @InterfaceAudience.Private
969  static Function<ColumnFamilyDescriptor, String> blockSizeDetails =
970    familyDescriptor -> String.valueOf(familyDescriptor.getBlocksize());
971
972  /**
973   * Serialize column family to bloom type map to configuration. Invoked while configuring the MR
974   * job for incremental load.
975   */
976  @InterfaceAudience.Private
977  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
978    String bloomType = familyDescriptor.getBloomFilterType().toString();
979    if (bloomType == null) {
980      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
981    }
982    return bloomType;
983  };
984
985  /**
986   * Serialize column family to bloom param map to configuration. Invoked while configuring the MR
987   * job for incremental load.
988   */
989  @InterfaceAudience.Private
990  static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
991    BloomType bloomType = familyDescriptor.getBloomFilterType();
992    String bloomParam = "";
993    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
994      bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
995    }
996    return bloomParam;
997  };
998
999  /**
1000   * Serialize column family to data block encoding map to configuration. Invoked while configuring
1001   * the MR job for incremental load.
1002   */
1003  @InterfaceAudience.Private
1004  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
1005    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
1006    if (encoding == null) {
1007      encoding = DataBlockEncoding.NONE;
1008    }
1009    return encoding.toString();
1010  };
1011
1012}