001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UncheckedIOException;
027import java.io.UnsupportedEncodingException;
028import java.net.InetSocketAddress;
029import java.net.URLDecoder;
030import java.net.URLEncoder;
031import java.nio.charset.Charset;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collections;
035import java.util.List;
036import java.util.Map;
037import java.util.Map.Entry;
038import java.util.Set;
039import java.util.TreeMap;
040import java.util.TreeSet;
041import java.util.UUID;
042import java.util.function.Function;
043import java.util.stream.Collectors;
044import org.apache.commons.lang3.StringUtils;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.hbase.Cell;
049import org.apache.hadoop.hbase.CellUtil;
050import org.apache.hadoop.hbase.ExtendedCell;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.HRegionLocation;
053import org.apache.hadoop.hbase.KeyValue;
054import org.apache.hadoop.hbase.KeyValueUtil;
055import org.apache.hadoop.hbase.PrivateCellUtil;
056import org.apache.hadoop.hbase.TableName;
057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
059import org.apache.hadoop.hbase.client.Connection;
060import org.apache.hadoop.hbase.client.ConnectionFactory;
061import org.apache.hadoop.hbase.client.Put;
062import org.apache.hadoop.hbase.client.RegionLocator;
063import org.apache.hadoop.hbase.client.Table;
064import org.apache.hadoop.hbase.client.TableDescriptor;
065import org.apache.hadoop.hbase.fs.HFileSystem;
066import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
067import org.apache.hadoop.hbase.io.compress.Compression;
068import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
069import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
070import org.apache.hadoop.hbase.io.hfile.CacheConfig;
071import org.apache.hadoop.hbase.io.hfile.HFile;
072import org.apache.hadoop.hbase.io.hfile.HFileContext;
073import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
074import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
075import org.apache.hadoop.hbase.regionserver.BloomType;
076import org.apache.hadoop.hbase.regionserver.HStore;
077import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
078import org.apache.hadoop.hbase.regionserver.StoreUtils;
079import org.apache.hadoop.hbase.util.BloomFilterUtil;
080import org.apache.hadoop.hbase.util.Bytes;
081import org.apache.hadoop.hbase.util.CommonFSUtils;
082import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
083import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
084import org.apache.hadoop.io.NullWritable;
085import org.apache.hadoop.io.SequenceFile;
086import org.apache.hadoop.io.Text;
087import org.apache.hadoop.io.Writable;
088import org.apache.hadoop.mapreduce.Job;
089import org.apache.hadoop.mapreduce.OutputCommitter;
090import org.apache.hadoop.mapreduce.OutputFormat;
091import org.apache.hadoop.mapreduce.RecordWriter;
092import org.apache.hadoop.mapreduce.TaskAttemptContext;
093import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
094import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
095import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
096import org.apache.yetus.audience.InterfaceAudience;
097import org.slf4j.Logger;
098import org.slf4j.LoggerFactory;
099
100/**
101 * Writes HFiles. Passed Cells must arrive in order. Writes current time as the sequence id for the
102 * file. Sets the major compacted attribute on created {@link HFile}s. Calling write(null,null) will
103 * forcibly roll all HFiles being written.
104 * <p>
105 * Using this class as part of a MapReduce job is best done using
106 * {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
107 */
108@InterfaceAudience.Public
109public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> {
110  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
111
112  static class TableInfo {
113    private TableDescriptor tableDesctiptor;
114    private RegionLocator regionLocator;
115
116    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
117      this.tableDesctiptor = tableDesctiptor;
118      this.regionLocator = regionLocator;
119    }
120
121    public TableDescriptor getTableDescriptor() {
122      return tableDesctiptor;
123    }
124
125    public RegionLocator getRegionLocator() {
126      return regionLocator;
127    }
128  }
129
130  protected static final byte[] tableSeparator = Bytes.toBytes(";");
131
132  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
133    return Bytes.add(tableName, tableSeparator, suffix);
134  }
135
136  // The following constants are private since these are used by
137  // HFileOutputFormat2 to internally transfer data between job setup and
138  // reducer run using conf.
139  // These should not be changed by the client.
140  static final String COMPRESSION_FAMILIES_CONF_KEY =
141    "hbase.hfileoutputformat.families.compression";
142  static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
143  static final String BLOOM_PARAM_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomparam";
144  static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize";
145  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
146    "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
147
148  // This constant is public since the client can modify this when setting
149  // up their conf object and thus refer to this symbol.
150  // It is present for backwards compatibility reasons. Use it only to
151  // override the auto-detection of datablock encoding and compression.
152  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
153    "hbase.mapreduce.hfileoutputformat.datablock.encoding";
154  public static final String COMPRESSION_OVERRIDE_CONF_KEY =
155    "hbase.mapreduce.hfileoutputformat.compression";
156
157  /**
158   * Keep locality while generating HFiles for bulkload. See HBASE-12596
159   */
160  public static final String LOCALITY_SENSITIVE_CONF_KEY =
161    "hbase.bulkload.locality.sensitive.enabled";
162  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
163  static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name";
164  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
165    "hbase.mapreduce.use.multi.table.hfileoutputformat";
166
167  /**
168   * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config
169   * for internal usage in jobs like WALPlayer which need to use features of ExtendedCell.
170   */
171  @InterfaceAudience.Private
172  public static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
173    "hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
174  static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
175
176  @InterfaceAudience.Private
177  public static final String DISK_BASED_SORTING_ENABLED_KEY =
178    "hbase.mapreduce.hfileoutputformat.disk.based.sorting.enabled";
179  private static final boolean DISK_BASED_SORTING_ENABLED_DEFAULT = false;
180
181  public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster.";
182  public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
183    REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
184  public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY =
185    REMOTE_CLUSTER_CONF_PREFIX + "zookeeper." + HConstants.CLIENT_PORT_STR;
186  public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY =
187    REMOTE_CLUSTER_CONF_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT;
188
189  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
190  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
191
192  @Override
193  public RecordWriter<ImmutableBytesWritable, Cell>
194    getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
195    return createRecordWriter(context, this.getOutputCommitter(context));
196  }
197
198  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
199    return combineTableNameSuffix(tableName, family);
200  }
201
202  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(
203    final TaskAttemptContext context, final OutputCommitter committer) throws IOException {
204
205    // Get the path of the temporary output file
206    final Path outputDir = ((PathOutputCommitter) committer).getWorkPath();
207    final Configuration conf = context.getConfiguration();
208    final boolean writeMultipleTables =
209      conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
210    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
211    if (writeTableNames == null || writeTableNames.isEmpty()) {
212      throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty");
213    }
214    final FileSystem fs = outputDir.getFileSystem(conf);
215    // These configs. are from hbase-*.xml
216    final long maxsize =
217      conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
218    // Invented config. Add to hbase-*.xml if other than default compression.
219    final String defaultCompressionStr =
220      conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
221    final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
222    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
223    final Algorithm overriddenCompression =
224      compressionStr != null ? Compression.getCompressionAlgorithmByName(compressionStr) : null;
225    final boolean compactionExclude =
226      conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
227    final Set<String> allTableNames = Arrays
228      .stream(writeTableNames.split(Bytes.toString(tableSeparator))).collect(Collectors.toSet());
229
230    // create a map from column family to the compression algorithm
231    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
232    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
233    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
234    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
235
236    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
237    final Map<byte[], DataBlockEncoding> datablockEncodingMap =
238      createFamilyDataBlockEncodingMap(conf);
239    final DataBlockEncoding overriddenEncoding =
240      dataBlockEncodingStr != null ? DataBlockEncoding.valueOf(dataBlockEncodingStr) : null;
241
242    return new RecordWriter<ImmutableBytesWritable, V>() {
243      // Map of families to writers and how much has been output on the writer.
244      private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
245      private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR);
246      private final long now = EnvironmentEdgeManager.currentTime();
247      private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames);
248
249      @Override
250      public void write(ImmutableBytesWritable row, V cell) throws IOException {
251        // null input == user explicitly wants to flush
252        if (row == null && cell == null) {
253          rollWriters(null);
254          return;
255        }
256
257        ExtendedCell kv = PrivateCellUtil.ensureExtendedCell(cell);
258        byte[] rowKey = CellUtil.cloneRow(kv);
259        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
260        byte[] family = CellUtil.cloneFamily(kv);
261        if (writeMultipleTables) {
262          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
263          tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString()
264            .getBytes(Charset.defaultCharset());
265          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
266            throw new IllegalArgumentException(
267              "TableName " + Bytes.toString(tableNameBytes) + " not expected");
268          }
269        }
270        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
271
272        WriterLength wl = this.writers.get(tableAndFamily);
273
274        // If this is a new column family, verify that the directory exists
275        if (wl == null) {
276          Path writerPath = null;
277          if (writeMultipleTables) {
278            Path tableRelPath = getTableRelativePath(tableNameBytes);
279            writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family)));
280          } else {
281            writerPath = new Path(outputDir, Bytes.toString(family));
282          }
283          fs.mkdirs(writerPath);
284          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
285        }
286
287        // This can only happen once a row is finished though
288        if (
289          wl != null && wl.written + length >= maxsize
290            && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0
291        ) {
292          rollWriters(wl);
293        }
294
295        // create a new WAL writer, if necessary
296        if (wl == null || wl.writer == null) {
297          InetSocketAddress[] favoredNodes = null;
298          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
299            HRegionLocation loc = null;
300            String tableName = Bytes.toString(tableNameBytes);
301            if (tableName != null) {
302              try (
303                Connection connection =
304                  ConnectionFactory.createConnection(createRemoteClusterConf(conf));
305                RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
306                loc = locator.getRegionLocation(rowKey);
307              } catch (Throwable e) {
308                LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey),
309                  tableName, e);
310                loc = null;
311              }
312            }
313            if (null == loc) {
314              LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
315            } else {
316              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
317              InetSocketAddress initialIsa =
318                new InetSocketAddress(loc.getHostname(), loc.getPort());
319              if (initialIsa.isUnresolved()) {
320                LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
321              } else {
322                LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
323                favoredNodes = new InetSocketAddress[] { initialIsa };
324              }
325            }
326          }
327          wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);
328
329        }
330
331        // we now have the proper WAL writer. full steam ahead
332        PrivateCellUtil.updateLatestStamp(kv, this.now);
333        wl.writer.append((ExtendedCell) kv);
334        wl.written += length;
335
336        // Copy the row so we know when a row transition.
337        this.previousRows.put(family, rowKey);
338      }
339
340      private Path getTableRelativePath(byte[] tableNameBytes) {
341        String tableName = Bytes.toString(tableNameBytes);
342        String[] tableNameParts = tableName.split(":");
343        Path tableRelPath = new Path(tableNameParts[0]);
344        if (tableNameParts.length > 1) {
345          tableRelPath = new Path(tableRelPath, tableNameParts[1]);
346        }
347        return tableRelPath;
348      }
349
350      private void rollWriters(WriterLength writerLength) throws IOException {
351        if (writerLength != null) {
352          closeWriter(writerLength);
353        } else {
354          for (WriterLength wl : this.writers.values()) {
355            closeWriter(wl);
356          }
357        }
358      }
359
360      private void closeWriter(WriterLength wl) throws IOException {
361        if (wl.writer != null) {
362          LOG.info(
363            "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
364          close(wl.writer);
365          wl.writer = null;
366        }
367        wl.written = 0;
368      }
369
370      private Configuration createRemoteClusterConf(Configuration conf) {
371        final Configuration newConf = new Configuration(conf);
372
373        final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY);
374        final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY);
375        final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY);
376
377        if (quorum != null && clientPort != null && parent != null) {
378          newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum);
379          newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort));
380          newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent);
381        }
382
383        for (Entry<String, String> entry : conf) {
384          String key = entry.getKey();
385          if (
386            REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY.equals(key)
387              || REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY.equals(key)
388              || REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY.equals(key)
389          ) {
390            // Handled them above
391            continue;
392          }
393
394          if (entry.getKey().startsWith(REMOTE_CLUSTER_CONF_PREFIX)) {
395            String originalKey = entry.getKey().substring(REMOTE_CLUSTER_CONF_PREFIX.length());
396            if (!originalKey.isEmpty()) {
397              newConf.set(originalKey, entry.getValue());
398            }
399          }
400        }
401
402        return newConf;
403      }
404
405      /*
406       * Create a new StoreFile.Writer.
407       * @return A WriterLength, containing a new StoreFile.Writer.
408       */
409      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
410          justification = "Not important")
411      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf,
412        InetSocketAddress[] favoredNodes) throws IOException {
413        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
414        Path familydir = new Path(outputDir, Bytes.toString(family));
415        if (writeMultipleTables) {
416          familydir =
417            new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family)));
418        }
419        WriterLength wl = new WriterLength();
420        Algorithm compression = overriddenCompression;
421        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
422        compression = compression == null ? defaultCompression : compression;
423        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
424        bloomType = bloomType == null ? BloomType.NONE : bloomType;
425        String bloomParam = bloomParamMap.get(tableAndFamily);
426        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
427          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
428        }
429        Integer blockSize = blockSizeMap.get(tableAndFamily);
430        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
431        DataBlockEncoding encoding = overriddenEncoding;
432        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
433        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
434        HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
435          .withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf))
436          .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize)
437          .withColumnFamily(family).withTableName(tableName)
438          .withCreateTime(EnvironmentEdgeManager.currentTime());
439
440        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
441          contextBuilder.withIncludesTags(true);
442        }
443
444        HFileContext hFileContext = contextBuilder.build();
445        if (null == favoredNodes) {
446          wl.writer =
447            new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir)
448              .withBloomType(bloomType).withFileContext(hFileContext).build();
449        } else {
450          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
451            .withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext)
452            .withFavoredNodes(favoredNodes).build();
453        }
454
455        this.writers.put(tableAndFamily, wl);
456        return wl;
457      }
458
459      private void close(final StoreFileWriter w) throws IOException {
460        if (w != null) {
461          w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
462          w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
463          w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
464          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
465          w.appendTrackedTimestampsToMetadata();
466          w.close();
467        }
468      }
469
470      @Override
471      public void close(TaskAttemptContext c) throws IOException, InterruptedException {
472        for (WriterLength wl : this.writers.values()) {
473          close(wl.writer);
474        }
475      }
476    };
477  }
478
479  /**
480   * Configure block storage policy for CF after the directory is created.
481   */
482  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
483    byte[] tableAndFamily, Path cfPath) {
484    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
485      return;
486    }
487
488    String policy = conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
489      conf.get(STORAGE_POLICY_PROPERTY));
490    CommonFSUtils.setStoragePolicy(fs, cfPath, policy);
491  }
492
493  /*
494   * Data structure to hold a Writer and amount of data written on it.
495   */
496  static class WriterLength {
497    long written = 0;
498    StoreFileWriter writer = null;
499  }
500
501  /**
502   * Return the start keys of all of the regions in this table, as a list of ImmutableBytesWritable.
503   */
504  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
505    boolean writeMultipleTables) throws IOException {
506
507    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
508    for (RegionLocator regionLocator : regionLocators) {
509      TableName tableName = regionLocator.getName();
510      LOG.info("Looking up current regions for table " + tableName);
511      byte[][] byteKeys = regionLocator.getStartKeys();
512      for (byte[] byteKey : byteKeys) {
513        byte[] fullKey = byteKey; // HFileOutputFormat2 use case
514        if (writeMultipleTables) {
515          // MultiTableHFileOutputFormat use case
516          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
517        }
518        if (LOG.isDebugEnabled()) {
519          LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey));
520        }
521        ret.add(new ImmutableBytesWritable(fullKey));
522      }
523    }
524    return ret;
525  }
526
527  /**
528   * Write out a {@link SequenceFile} that can be read by {@link TotalOrderPartitioner} that
529   * contains the split points in startKeys.
530   */
531  @SuppressWarnings("deprecation")
532  private static void writePartitions(Configuration conf, Path partitionsPath,
533    List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
534    LOG.info("Writing partition information to " + partitionsPath);
535    if (startKeys.isEmpty()) {
536      throw new IllegalArgumentException("No regions passed");
537    }
538
539    // We're generating a list of split points, and we don't ever
540    // have keys < the first region (which has an empty start key)
541    // so we need to remove it. Otherwise we would end up with an
542    // empty reducer with index 0
543    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
544    ImmutableBytesWritable first = sorted.first();
545    if (writeMultipleTables) {
546      first =
547        new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get()));
548    }
549    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
550      throw new IllegalArgumentException(
551        "First region of table should have empty start key. Instead has: "
552          + Bytes.toStringBinary(first.get()));
553    }
554    sorted.remove(sorted.first());
555
556    // Write the actual file
557    FileSystem fs = partitionsPath.getFileSystem(conf);
558    boolean diskBasedSortingEnabled = diskBasedSortingEnabled(conf);
559    Class<? extends Writable> keyClass =
560      diskBasedSortingEnabled ? KeyOnlyCellComparable.class : ImmutableBytesWritable.class;
561    SequenceFile.Writer writer =
562      SequenceFile.createWriter(fs, conf, partitionsPath, keyClass, NullWritable.class);
563
564    try {
565      for (ImmutableBytesWritable startKey : sorted) {
566        Writable writable = diskBasedSortingEnabled
567          ? new KeyOnlyCellComparable(KeyValueUtil.createFirstOnRow(startKey.get()))
568          : startKey;
569
570        writer.append(writable, NullWritable.get());
571      }
572    } finally {
573      writer.close();
574    }
575  }
576
577  /**
578   * Configure a MapReduce Job to perform an incremental load into the given table. This
579   * <ul>
580   * <li>Inspects the table to configure a total order partitioner</li>
581   * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
582   * <li>Sets the number of reduce tasks to match the current number of regions</li>
583   * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
584   * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
585   * PutSortReducer)</li>
586   * <li>Sets the HBase cluster key to load region locations for locality-sensitive</li>
587   * </ul>
588   * The user should be sure to set the map output value class to either KeyValue or Put before
589   * running this function.
590   */
591  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
592    throws IOException {
593    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
594    configureForRemoteCluster(job, table.getConfiguration());
595  }
596
597  /**
598   * Configure a MapReduce Job to perform an incremental load into the given table. This
599   * <ul>
600   * <li>Inspects the table to configure a total order partitioner</li>
601   * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
602   * <li>Sets the number of reduce tasks to match the current number of regions</li>
603   * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
604   * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
605   * PutSortReducer)</li>
606   * </ul>
607   * The user should be sure to set the map output value class to either KeyValue or Put before
608   * running this function.
609   */
610  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
611    RegionLocator regionLocator) throws IOException {
612    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
613    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
614    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
615  }
616
617  public static boolean diskBasedSortingEnabled(Configuration conf) {
618    return conf.getBoolean(DISK_BASED_SORTING_ENABLED_KEY, DISK_BASED_SORTING_ENABLED_DEFAULT);
619  }
620
621  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
622    Class<? extends OutputFormat<?, ?>> cls) throws IOException {
623    Configuration conf = job.getConfiguration();
624    job.setOutputKeyClass(ImmutableBytesWritable.class);
625    job.setOutputValueClass(MapReduceExtendedCell.class);
626    job.setOutputFormatClass(cls);
627
628    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
629      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
630    }
631    boolean writeMultipleTables = false;
632    if (MultiTableHFileOutputFormat.class.equals(cls)) {
633      writeMultipleTables = true;
634      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
635    }
636    // Based on the configured map output class, set the correct reducer to properly
637    // sort the incoming values.
638    // TODO it would be nice to pick one or the other of these formats.
639    boolean diskBasedSorting = diskBasedSortingEnabled(conf);
640
641    if (diskBasedSorting) {
642      job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
643      job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
644      job.setReducerClass(PreSortedCellsReducer.class);
645    } else if (
646      KeyValue.class.equals(job.getMapOutputValueClass())
647        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())
648    ) {
649      job.setReducerClass(CellSortReducer.class);
650    } else if (Put.class.equals(job.getMapOutputValueClass())) {
651      job.setReducerClass(PutSortReducer.class);
652    } else if (Text.class.equals(job.getMapOutputValueClass())) {
653      job.setReducerClass(TextSortReducer.class);
654    } else {
655      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
656    }
657
658    mergeSerializations(conf);
659
660    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
661      LOG.info("bulkload locality sensitive enabled");
662    }
663
664    /* Now get the region start keys for every table required */
665    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
666    List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size());
667    List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size());
668
669    for (TableInfo tableInfo : multiTableInfo) {
670      regionLocators.add(tableInfo.getRegionLocator());
671      String tn = writeMultipleTables
672        ? tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString()
673        : tableInfo.getRegionLocator().getName().getNameAsString();
674      allTableNames.add(tn);
675      tableDescriptors.add(tableInfo.getTableDescriptor());
676    }
677    // Record tablenames for creating writer by favored nodes, and decoding compression,
678    // block size and other attributes of columnfamily per table
679    conf.set(OUTPUT_TABLE_NAME_CONF_KEY,
680      StringUtils.join(allTableNames, Bytes.toString(tableSeparator)));
681    List<ImmutableBytesWritable> startKeys =
682      getRegionStartKeys(regionLocators, writeMultipleTables);
683    // Use table's region boundaries for TOP split points.
684    LOG.info("Configuring " + startKeys.size() + " reduce partitions "
685      + "to match current region count for all tables");
686    job.setNumReduceTasks(startKeys.size());
687
688    configurePartitioner(job, startKeys, writeMultipleTables);
689    // Set compression algorithms based on column families
690
691    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
692      serializeColumnFamilyAttribute(compressionDetails, tableDescriptors));
693    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
694      serializeColumnFamilyAttribute(blockSizeDetails, tableDescriptors));
695    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
696      serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors));
697    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
698      serializeColumnFamilyAttribute(bloomParamDetails, tableDescriptors));
699    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
700      serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
701
702    TableMapReduceUtil.addDependencyJars(job);
703    TableMapReduceUtil.initCredentials(job);
704    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
705  }
706
707  private static void mergeSerializations(Configuration conf) {
708    List<String> serializations = new ArrayList<>();
709
710    // add any existing values that have been set
711    String[] existing = conf.getStrings("io.serializations");
712    if (existing != null) {
713      Collections.addAll(serializations, existing);
714    }
715
716    serializations.add(MutationSerialization.class.getName());
717    serializations.add(ResultSerialization.class.getName());
718
719    // Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's
720    // SerializationFactory runs through serializations in the order they are registered.
721    // We want to register ExtendedCellSerialization before CellSerialization because both
722    // work for ExtendedCells but only ExtendedCellSerialization handles them properly.
723    if (
724      conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
725        EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT)
726    ) {
727      serializations.add(ExtendedCellSerialization.class.getName());
728    }
729    serializations.add(CellSerialization.class.getName());
730
731    conf.setStrings("io.serializations", serializations.toArray(new String[0]));
732  }
733
734  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor)
735    throws IOException {
736    Configuration conf = job.getConfiguration();
737
738    job.setOutputKeyClass(ImmutableBytesWritable.class);
739    job.setOutputValueClass(MapReduceExtendedCell.class);
740    job.setOutputFormatClass(HFileOutputFormat2.class);
741
742    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
743    singleTableDescriptor.add(tableDescriptor);
744
745    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
746    // Set compression algorithms based on column families
747    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
748      serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
749    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
750      serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
751    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
752      serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
753    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
754      serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
755    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
756      serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
757
758    TableMapReduceUtil.addDependencyJars(job);
759    TableMapReduceUtil.initCredentials(job);
760    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
761  }
762
763  /**
764   * Configure HBase cluster key for remote cluster to load region location for locality-sensitive
765   * if it's enabled. It's not necessary to call this method explicitly when the cluster key for
766   * HBase cluster to be used to load region location is configured in the job configuration. Call
767   * this method when another HBase cluster key is configured in the job configuration. For example,
768   * you should call when you load data from HBase cluster A using {@link TableInputFormat} and
769   * generate hfiles for HBase cluster B. Otherwise, HFileOutputFormat2 fetch location from cluster
770   * A and locality-sensitive won't working correctly.
771   * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using
772   * {@link Table#getConfiguration} as clusterConf. See HBASE-25608.
773   * @param job         which has configuration to be updated
774   * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive
775   * @see #configureIncrementalLoad(Job, Table, RegionLocator)
776   * @see #LOCALITY_SENSITIVE_CONF_KEY
777   * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY
778   * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY
779   * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY
780   * @deprecated As of release 2.6.4, this will be removed in HBase 4.0.0 Use
781   *             {@link #configureForRemoteCluster(Job, Configuration)} instead.
782   */
783  @Deprecated
784  public static void configureRemoteCluster(Job job, Configuration clusterConf) {
785    try {
786      configureForRemoteCluster(job, clusterConf);
787    } catch (IOException e) {
788      LOG.error("Configure remote cluster error.", e);
789      throw new UncheckedIOException("Configure remote cluster error.", e);
790    }
791  }
792
793  /**
794   * Configure HBase cluster key for remote cluster to load region location for locality-sensitive
795   * if it's enabled. It's not necessary to call this method explicitly when the cluster key for
796   * HBase cluster to be used to load region location is configured in the job configuration. Call
797   * this method when another HBase cluster key is configured in the job configuration. For example,
798   * you should call when you load data from HBase cluster A using {@link TableInputFormat} and
799   * generate hfiles for HBase cluster B. Otherwise, HFileOutputFormat2 fetch location from cluster
800   * A and locality-sensitive won't working correctly. If authentication is enabled, it obtains the
801   * token for the specific cluster.
802   * @param job         which has configuration to be updated
803   * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive
804   * @throws IOException Exception while initializing cluster credentials
805   */
806  public static void configureForRemoteCluster(Job job, Configuration clusterConf)
807    throws IOException {
808    Configuration conf = job.getConfiguration();
809
810    if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
811      return;
812    }
813
814    final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM);
815    final int clientPort = clusterConf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
816      HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
817    final String parent =
818      clusterConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
819
820    conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum);
821    conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort);
822    conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent);
823
824    TableMapReduceUtil.initCredentialsForCluster(job, clusterConf);
825
826    LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort
827      + "/" + parent);
828  }
829
830  /**
831   * Runs inside the task to deserialize column family to compression algorithm map from the
832   * configuration.
833   * @param conf to read the serialized values from
834   * @return a map from column family to the configured compression algorithm
835   */
836  @InterfaceAudience.Private
837  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) {
838    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY);
839    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
840    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
841      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
842      compressionMap.put(e.getKey(), algorithm);
843    }
844    return compressionMap;
845  }
846
847  /**
848   * Runs inside the task to deserialize column family to bloom filter type map from the
849   * configuration.
850   * @param conf to read the serialized values from
851   * @return a map from column family to the the configured bloom filter type
852   */
853  @InterfaceAudience.Private
854  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
855    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY);
856    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
857    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
858      BloomType bloomType = BloomType.valueOf(e.getValue());
859      bloomTypeMap.put(e.getKey(), bloomType);
860    }
861    return bloomTypeMap;
862  }
863
864  /**
865   * Runs inside the task to deserialize column family to bloom filter param map from the
866   * configuration.
867   * @param conf to read the serialized values from
868   * @return a map from column family to the the configured bloom filter param
869   */
870  @InterfaceAudience.Private
871  static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
872    return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
873  }
874
875  /**
876   * Runs inside the task to deserialize column family to block size map from the configuration.
877   * @param conf to read the serialized values from
878   * @return a map from column family to the configured block size
879   */
880  @InterfaceAudience.Private
881  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
882    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY);
883    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
884    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
885      Integer blockSize = Integer.parseInt(e.getValue());
886      blockSizeMap.put(e.getKey(), blockSize);
887    }
888    return blockSizeMap;
889  }
890
891  /**
892   * Runs inside the task to deserialize column family to data block encoding type map from the
893   * configuration.
894   * @param conf to read the serialized values from
895   * @return a map from column family to HFileDataBlockEncoder for the configured data block type
896   *         for the family
897   */
898  @InterfaceAudience.Private
899  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) {
900    Map<byte[], String> stringMap =
901      createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
902    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
903    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
904      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
905    }
906    return encoderMap;
907  }
908
909  /**
910   * Run inside the task to deserialize column family to given conf value map.
911   * @param conf     to read the serialized values from
912   * @param confName conf key to read from the configuration
913   * @return a map of column family to the given configuration value
914   */
915  private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
916    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
917    String confVal = conf.get(confName, "");
918    for (String familyConf : confVal.split("&")) {
919      String[] familySplit = familyConf.split("=");
920      if (familySplit.length != 2) {
921        continue;
922      }
923      try {
924        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
925          URLDecoder.decode(familySplit[1], "UTF-8"));
926      } catch (UnsupportedEncodingException e) {
927        // will not happen with UTF-8 encoding
928        throw new AssertionError(e);
929      }
930    }
931    return confValMap;
932  }
933
934  /**
935   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
936   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
937   */
938  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints,
939    boolean writeMultipleTables) throws IOException {
940    Configuration conf = job.getConfiguration();
941    // create the partitions file
942    FileSystem fs = FileSystem.get(conf);
943    String hbaseTmpFsDir =
944      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging");
945    Path partitionsPath =
946      fs.makeQualified(new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()));
947    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
948    fs.deleteOnExit(partitionsPath);
949
950    // configure job to use it
951    job.setPartitionerClass(TotalOrderPartitioner.class);
952    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
953  }
954
955  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
956      value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
957  @InterfaceAudience.Private
958  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn,
959    List<TableDescriptor> allTables) throws UnsupportedEncodingException {
960    StringBuilder attributeValue = new StringBuilder();
961    int i = 0;
962    for (TableDescriptor tableDescriptor : allTables) {
963      if (tableDescriptor == null) {
964        // could happen with mock table instance
965        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
966        return "";
967      }
968      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
969        if (i++ > 0) {
970          attributeValue.append('&');
971        }
972        attributeValue.append(URLEncoder
973          .encode(Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(),
974            familyDescriptor.getName())), "UTF-8"));
975        attributeValue.append('=');
976        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
977      }
978    }
979    // Get rid of the last ampersand
980    return attributeValue.toString();
981  }
982
983  /**
984   * Serialize column family to compression algorithm map to configuration. Invoked while
985   * configuring the MR job for incremental load.
986   */
987  @InterfaceAudience.Private
988  static Function<ColumnFamilyDescriptor, String> compressionDetails =
989    familyDescriptor -> familyDescriptor.getCompressionType().getName();
990
991  /**
992   * Serialize column family to block size map to configuration. Invoked while configuring the MR
993   * job for incremental load.
994   */
995  @InterfaceAudience.Private
996  static Function<ColumnFamilyDescriptor, String> blockSizeDetails =
997    familyDescriptor -> String.valueOf(familyDescriptor.getBlocksize());
998
999  /**
1000   * Serialize column family to bloom type map to configuration. Invoked while configuring the MR
1001   * job for incremental load.
1002   */
1003  @InterfaceAudience.Private
1004  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
1005    String bloomType = familyDescriptor.getBloomFilterType().toString();
1006    if (bloomType == null) {
1007      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
1008    }
1009    return bloomType;
1010  };
1011
1012  /**
1013   * Serialize column family to bloom param map to configuration. Invoked while configuring the MR
1014   * job for incremental load.
1015   */
1016  @InterfaceAudience.Private
1017  static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
1018    BloomType bloomType = familyDescriptor.getBloomFilterType();
1019    String bloomParam = "";
1020    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
1021      bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
1022    }
1023    return bloomParam;
1024  };
1025
1026  /**
1027   * Serialize column family to data block encoding map to configuration. Invoked while configuring
1028   * the MR job for incremental load.
1029   */
1030  @InterfaceAudience.Private
1031  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
1032    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
1033    if (encoding == null) {
1034      encoding = DataBlockEncoding.NONE;
1035    }
1036    return encoding.toString();
1037  };
1038
1039}