001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.nio.charset.Charset;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Set;
038import java.util.TreeMap;
039import java.util.TreeSet;
040import java.util.UUID;
041import java.util.function.Function;
042import java.util.stream.Collectors;
043import org.apache.commons.lang3.StringUtils;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.Cell;
048import org.apache.hadoop.hbase.CellUtil;
049import org.apache.hadoop.hbase.ExtendedCell;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.HRegionLocation;
052import org.apache.hadoop.hbase.KeyValue;
053import org.apache.hadoop.hbase.PrivateCellUtil;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
057import org.apache.hadoop.hbase.client.Connection;
058import org.apache.hadoop.hbase.client.ConnectionFactory;
059import org.apache.hadoop.hbase.client.Put;
060import org.apache.hadoop.hbase.client.RegionLocator;
061import org.apache.hadoop.hbase.client.Table;
062import org.apache.hadoop.hbase.client.TableDescriptor;
063import org.apache.hadoop.hbase.fs.HFileSystem;
064import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
065import org.apache.hadoop.hbase.io.compress.Compression;
066import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
067import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
068import org.apache.hadoop.hbase.io.hfile.CacheConfig;
069import org.apache.hadoop.hbase.io.hfile.HFile;
070import org.apache.hadoop.hbase.io.hfile.HFileContext;
071import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
072import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
073import org.apache.hadoop.hbase.regionserver.BloomType;
074import org.apache.hadoop.hbase.regionserver.HStore;
075import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
076import org.apache.hadoop.hbase.regionserver.StoreUtils;
077import org.apache.hadoop.hbase.util.BloomFilterUtil;
078import org.apache.hadoop.hbase.util.Bytes;
079import org.apache.hadoop.hbase.util.CommonFSUtils;
080import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
081import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
082import org.apache.hadoop.io.NullWritable;
083import org.apache.hadoop.io.SequenceFile;
084import org.apache.hadoop.io.Text;
085import org.apache.hadoop.mapreduce.Job;
086import org.apache.hadoop.mapreduce.OutputCommitter;
087import org.apache.hadoop.mapreduce.OutputFormat;
088import org.apache.hadoop.mapreduce.RecordWriter;
089import org.apache.hadoop.mapreduce.TaskAttemptContext;
090import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
091import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
092import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
093import org.apache.yetus.audience.InterfaceAudience;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097/**
098 * Writes HFiles. Passed Cells must arrive in order. Writes current time as the sequence id for the
099 * file. Sets the major compacted attribute on created {@link HFile}s. Calling write(null,null) will
100 * forcibly roll all HFiles being written.
101 * <p>
102 * Using this class as part of a MapReduce job is best done using
103 * {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
104 */
105@InterfaceAudience.Public
106public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> {
107  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
108
109  static class TableInfo {
110    private TableDescriptor tableDesctiptor;
111    private RegionLocator regionLocator;
112
113    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
114      this.tableDesctiptor = tableDesctiptor;
115      this.regionLocator = regionLocator;
116    }
117
118    public TableDescriptor getTableDescriptor() {
119      return tableDesctiptor;
120    }
121
122    public RegionLocator getRegionLocator() {
123      return regionLocator;
124    }
125  }
126
127  protected static final byte[] tableSeparator = Bytes.toBytes(";");
128
129  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
130    return Bytes.add(tableName, tableSeparator, suffix);
131  }
132
133  // The following constants are private since these are used by
134  // HFileOutputFormat2 to internally transfer data between job setup and
135  // reducer run using conf.
136  // These should not be changed by the client.
137  static final String COMPRESSION_FAMILIES_CONF_KEY =
138    "hbase.hfileoutputformat.families.compression";
139  static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
140  static final String BLOOM_PARAM_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomparam";
141  static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize";
142  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
143    "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
144
145  // This constant is public since the client can modify this when setting
146  // up their conf object and thus refer to this symbol.
147  // It is present for backwards compatibility reasons. Use it only to
148  // override the auto-detection of datablock encoding and compression.
149  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
150    "hbase.mapreduce.hfileoutputformat.datablock.encoding";
151  public static final String COMPRESSION_OVERRIDE_CONF_KEY =
152    "hbase.mapreduce.hfileoutputformat.compression";
153
154  /**
155   * Keep locality while generating HFiles for bulkload. See HBASE-12596
156   */
157  public static final String LOCALITY_SENSITIVE_CONF_KEY =
158    "hbase.bulkload.locality.sensitive.enabled";
159  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
160  static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name";
161  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
162    "hbase.mapreduce.use.multi.table.hfileoutputformat";
163
164  /**
165   * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config
166   * for internal usage in jobs like WALPlayer which need to use features of ExtendedCell.
167   */
168  @InterfaceAudience.Private
169  public static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
170    "hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
171  static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
172
173  public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster.";
174  public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
175    REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
176  public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY =
177    REMOTE_CLUSTER_CONF_PREFIX + "zookeeper." + HConstants.CLIENT_PORT_STR;
178  public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY =
179    REMOTE_CLUSTER_CONF_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT;
180
181  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
182  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
183
184  @Override
185  public RecordWriter<ImmutableBytesWritable, Cell>
186    getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
187    return createRecordWriter(context, this.getOutputCommitter(context));
188  }
189
190  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
191    return combineTableNameSuffix(tableName, family);
192  }
193
194  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(
195    final TaskAttemptContext context, final OutputCommitter committer) throws IOException {
196
197    // Get the path of the temporary output file
198    final Path outputDir = ((PathOutputCommitter) committer).getWorkPath();
199    final Configuration conf = context.getConfiguration();
200    final boolean writeMultipleTables =
201      conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
202    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
203    if (writeTableNames == null || writeTableNames.isEmpty()) {
204      throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty");
205    }
206    final FileSystem fs = outputDir.getFileSystem(conf);
207    // These configs. are from hbase-*.xml
208    final long maxsize =
209      conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
210    // Invented config. Add to hbase-*.xml if other than default compression.
211    final String defaultCompressionStr =
212      conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
213    final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
214    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
215    final Algorithm overriddenCompression =
216      compressionStr != null ? Compression.getCompressionAlgorithmByName(compressionStr) : null;
217    final boolean compactionExclude =
218      conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
219    final Set<String> allTableNames = Arrays
220      .stream(writeTableNames.split(Bytes.toString(tableSeparator))).collect(Collectors.toSet());
221
222    // create a map from column family to the compression algorithm
223    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
224    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
225    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
226    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
227
228    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
229    final Map<byte[], DataBlockEncoding> datablockEncodingMap =
230      createFamilyDataBlockEncodingMap(conf);
231    final DataBlockEncoding overriddenEncoding =
232      dataBlockEncodingStr != null ? DataBlockEncoding.valueOf(dataBlockEncodingStr) : null;
233
234    return new RecordWriter<ImmutableBytesWritable, V>() {
235      // Map of families to writers and how much has been output on the writer.
236      private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
237      private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR);
238      private final long now = EnvironmentEdgeManager.currentTime();
239      private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames);
240
241      @Override
242      public void write(ImmutableBytesWritable row, V cell) throws IOException {
243        // null input == user explicitly wants to flush
244        if (row == null && cell == null) {
245          rollWriters(null);
246          return;
247        }
248
249        ExtendedCell kv = PrivateCellUtil.ensureExtendedCell(cell);
250        byte[] rowKey = CellUtil.cloneRow(kv);
251        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
252        byte[] family = CellUtil.cloneFamily(kv);
253        if (writeMultipleTables) {
254          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
255          tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString()
256            .getBytes(Charset.defaultCharset());
257          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
258            throw new IllegalArgumentException(
259              "TableName " + Bytes.toString(tableNameBytes) + " not expected");
260          }
261        }
262        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
263
264        WriterLength wl = this.writers.get(tableAndFamily);
265
266        // If this is a new column family, verify that the directory exists
267        if (wl == null) {
268          Path writerPath = null;
269          if (writeMultipleTables) {
270            Path tableRelPath = getTableRelativePath(tableNameBytes);
271            writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family)));
272          } else {
273            writerPath = new Path(outputDir, Bytes.toString(family));
274          }
275          fs.mkdirs(writerPath);
276          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
277        }
278
279        // This can only happen once a row is finished though
280        if (
281          wl != null && wl.written + length >= maxsize
282            && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0
283        ) {
284          rollWriters(wl);
285        }
286
287        // create a new WAL writer, if necessary
288        if (wl == null || wl.writer == null) {
289          InetSocketAddress[] favoredNodes = null;
290          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
291            HRegionLocation loc = null;
292            String tableName = Bytes.toString(tableNameBytes);
293            if (tableName != null) {
294              try (
295                Connection connection =
296                  ConnectionFactory.createConnection(createRemoteClusterConf(conf));
297                RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
298                loc = locator.getRegionLocation(rowKey);
299              } catch (Throwable e) {
300                LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey),
301                  tableName, e);
302                loc = null;
303              }
304            }
305            if (null == loc) {
306              LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
307            } else {
308              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
309              InetSocketAddress initialIsa =
310                new InetSocketAddress(loc.getHostname(), loc.getPort());
311              if (initialIsa.isUnresolved()) {
312                LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
313              } else {
314                LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
315                favoredNodes = new InetSocketAddress[] { initialIsa };
316              }
317            }
318          }
319          wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);
320
321        }
322
323        // we now have the proper WAL writer. full steam ahead
324        PrivateCellUtil.updateLatestStamp(kv, this.now);
325        wl.writer.append((ExtendedCell) kv);
326        wl.written += length;
327
328        // Copy the row so we know when a row transition.
329        this.previousRows.put(family, rowKey);
330      }
331
332      private Path getTableRelativePath(byte[] tableNameBytes) {
333        String tableName = Bytes.toString(tableNameBytes);
334        String[] tableNameParts = tableName.split(":");
335        Path tableRelPath = new Path(tableNameParts[0]);
336        if (tableNameParts.length > 1) {
337          tableRelPath = new Path(tableRelPath, tableNameParts[1]);
338        }
339        return tableRelPath;
340      }
341
342      private void rollWriters(WriterLength writerLength) throws IOException {
343        if (writerLength != null) {
344          closeWriter(writerLength);
345        } else {
346          for (WriterLength wl : this.writers.values()) {
347            closeWriter(wl);
348          }
349        }
350      }
351
352      private void closeWriter(WriterLength wl) throws IOException {
353        if (wl.writer != null) {
354          LOG.info(
355            "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
356          close(wl.writer);
357          wl.writer = null;
358        }
359        wl.written = 0;
360      }
361
362      private Configuration createRemoteClusterConf(Configuration conf) {
363        final Configuration newConf = new Configuration(conf);
364
365        final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY);
366        final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY);
367        final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY);
368
369        if (quorum != null && clientPort != null && parent != null) {
370          newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum);
371          newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort));
372          newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent);
373        }
374
375        for (Entry<String, String> entry : conf) {
376          String key = entry.getKey();
377          if (
378            REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY.equals(key)
379              || REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY.equals(key)
380              || REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY.equals(key)
381          ) {
382            // Handled them above
383            continue;
384          }
385
386          if (entry.getKey().startsWith(REMOTE_CLUSTER_CONF_PREFIX)) {
387            String originalKey = entry.getKey().substring(REMOTE_CLUSTER_CONF_PREFIX.length());
388            if (!originalKey.isEmpty()) {
389              newConf.set(originalKey, entry.getValue());
390            }
391          }
392        }
393
394        return newConf;
395      }
396
397      /*
398       * Create a new StoreFile.Writer.
399       * @return A WriterLength, containing a new StoreFile.Writer.
400       */
401      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
402          justification = "Not important")
403      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf,
404        InetSocketAddress[] favoredNodes) throws IOException {
405        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
406        Path familydir = new Path(outputDir, Bytes.toString(family));
407        if (writeMultipleTables) {
408          familydir =
409            new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family)));
410        }
411        WriterLength wl = new WriterLength();
412        Algorithm compression = overriddenCompression;
413        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
414        compression = compression == null ? defaultCompression : compression;
415        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
416        bloomType = bloomType == null ? BloomType.NONE : bloomType;
417        String bloomParam = bloomParamMap.get(tableAndFamily);
418        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
419          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
420        }
421        Integer blockSize = blockSizeMap.get(tableAndFamily);
422        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
423        DataBlockEncoding encoding = overriddenEncoding;
424        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
425        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
426        HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
427          .withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf))
428          .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize)
429          .withColumnFamily(family).withTableName(tableName)
430          .withCreateTime(EnvironmentEdgeManager.currentTime());
431
432        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
433          contextBuilder.withIncludesTags(true);
434        }
435
436        HFileContext hFileContext = contextBuilder.build();
437        if (null == favoredNodes) {
438          wl.writer =
439            new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir)
440              .withBloomType(bloomType).withFileContext(hFileContext).build();
441        } else {
442          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
443            .withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext)
444            .withFavoredNodes(favoredNodes).build();
445        }
446
447        this.writers.put(tableAndFamily, wl);
448        return wl;
449      }
450
451      private void close(final StoreFileWriter w) throws IOException {
452        if (w != null) {
453          w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
454          w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
455          w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
456          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
457          w.appendTrackedTimestampsToMetadata();
458          w.close();
459        }
460      }
461
462      @Override
463      public void close(TaskAttemptContext c) throws IOException, InterruptedException {
464        for (WriterLength wl : this.writers.values()) {
465          close(wl.writer);
466        }
467      }
468    };
469  }
470
471  /**
472   * Configure block storage policy for CF after the directory is created.
473   */
474  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
475    byte[] tableAndFamily, Path cfPath) {
476    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
477      return;
478    }
479
480    String policy = conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
481      conf.get(STORAGE_POLICY_PROPERTY));
482    CommonFSUtils.setStoragePolicy(fs, cfPath, policy);
483  }
484
485  /*
486   * Data structure to hold a Writer and amount of data written on it.
487   */
488  static class WriterLength {
489    long written = 0;
490    StoreFileWriter writer = null;
491  }
492
493  /**
494   * Return the start keys of all of the regions in this table, as a list of ImmutableBytesWritable.
495   */
496  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
497    boolean writeMultipleTables) throws IOException {
498
499    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
500    for (RegionLocator regionLocator : regionLocators) {
501      TableName tableName = regionLocator.getName();
502      LOG.info("Looking up current regions for table " + tableName);
503      byte[][] byteKeys = regionLocator.getStartKeys();
504      for (byte[] byteKey : byteKeys) {
505        byte[] fullKey = byteKey; // HFileOutputFormat2 use case
506        if (writeMultipleTables) {
507          // MultiTableHFileOutputFormat use case
508          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
509        }
510        if (LOG.isDebugEnabled()) {
511          LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey));
512        }
513        ret.add(new ImmutableBytesWritable(fullKey));
514      }
515    }
516    return ret;
517  }
518
519  /**
520   * Write out a {@link SequenceFile} that can be read by {@link TotalOrderPartitioner} that
521   * contains the split points in startKeys.
522   */
523  @SuppressWarnings("deprecation")
524  private static void writePartitions(Configuration conf, Path partitionsPath,
525    List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
526    LOG.info("Writing partition information to " + partitionsPath);
527    if (startKeys.isEmpty()) {
528      throw new IllegalArgumentException("No regions passed");
529    }
530
531    // We're generating a list of split points, and we don't ever
532    // have keys < the first region (which has an empty start key)
533    // so we need to remove it. Otherwise we would end up with an
534    // empty reducer with index 0
535    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
536    ImmutableBytesWritable first = sorted.first();
537    if (writeMultipleTables) {
538      first =
539        new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get()));
540    }
541    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
542      throw new IllegalArgumentException(
543        "First region of table should have empty start key. Instead has: "
544          + Bytes.toStringBinary(first.get()));
545    }
546    sorted.remove(sorted.first());
547
548    // Write the actual file
549    FileSystem fs = partitionsPath.getFileSystem(conf);
550    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath,
551      ImmutableBytesWritable.class, NullWritable.class);
552
553    try {
554      for (ImmutableBytesWritable startKey : sorted) {
555        writer.append(startKey, NullWritable.get());
556      }
557    } finally {
558      writer.close();
559    }
560  }
561
562  /**
563   * Configure a MapReduce Job to perform an incremental load into the given table. This
564   * <ul>
565   * <li>Inspects the table to configure a total order partitioner</li>
566   * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
567   * <li>Sets the number of reduce tasks to match the current number of regions</li>
568   * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
569   * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
570   * PutSortReducer)</li>
571   * <li>Sets the HBase cluster key to load region locations for locality-sensitive</li>
572   * </ul>
573   * The user should be sure to set the map output value class to either KeyValue or Put before
574   * running this function.
575   */
576  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
577    throws IOException {
578    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
579    configureRemoteCluster(job, table.getConfiguration());
580  }
581
582  /**
583   * Configure a MapReduce Job to perform an incremental load into the given table. This
584   * <ul>
585   * <li>Inspects the table to configure a total order partitioner</li>
586   * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
587   * <li>Sets the number of reduce tasks to match the current number of regions</li>
588   * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
589   * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
590   * PutSortReducer)</li>
591   * </ul>
592   * The user should be sure to set the map output value class to either KeyValue or Put before
593   * running this function.
594   */
595  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
596    RegionLocator regionLocator) throws IOException {
597    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
598    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
599    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
600  }
601
602  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
603    Class<? extends OutputFormat<?, ?>> cls) throws IOException {
604    Configuration conf = job.getConfiguration();
605    job.setOutputKeyClass(ImmutableBytesWritable.class);
606    job.setOutputValueClass(MapReduceExtendedCell.class);
607    job.setOutputFormatClass(cls);
608
609    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
610      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
611    }
612    boolean writeMultipleTables = false;
613    if (MultiTableHFileOutputFormat.class.equals(cls)) {
614      writeMultipleTables = true;
615      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
616    }
617    // Based on the configured map output class, set the correct reducer to properly
618    // sort the incoming values.
619    // TODO it would be nice to pick one or the other of these formats.
620    if (
621      KeyValue.class.equals(job.getMapOutputValueClass())
622        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())
623    ) {
624      job.setReducerClass(CellSortReducer.class);
625    } else if (Put.class.equals(job.getMapOutputValueClass())) {
626      job.setReducerClass(PutSortReducer.class);
627    } else if (Text.class.equals(job.getMapOutputValueClass())) {
628      job.setReducerClass(TextSortReducer.class);
629    } else {
630      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
631    }
632
633    mergeSerializations(conf);
634
635    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
636      LOG.info("bulkload locality sensitive enabled");
637    }
638
639    /* Now get the region start keys for every table required */
640    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
641    List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size());
642    List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size());
643
644    for (TableInfo tableInfo : multiTableInfo) {
645      regionLocators.add(tableInfo.getRegionLocator());
646      String tn = writeMultipleTables
647        ? tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString()
648        : tableInfo.getRegionLocator().getName().getNameAsString();
649      allTableNames.add(tn);
650      tableDescriptors.add(tableInfo.getTableDescriptor());
651    }
652    // Record tablenames for creating writer by favored nodes, and decoding compression,
653    // block size and other attributes of columnfamily per table
654    conf.set(OUTPUT_TABLE_NAME_CONF_KEY,
655      StringUtils.join(allTableNames, Bytes.toString(tableSeparator)));
656    List<ImmutableBytesWritable> startKeys =
657      getRegionStartKeys(regionLocators, writeMultipleTables);
658    // Use table's region boundaries for TOP split points.
659    LOG.info("Configuring " + startKeys.size() + " reduce partitions "
660      + "to match current region count for all tables");
661    job.setNumReduceTasks(startKeys.size());
662
663    configurePartitioner(job, startKeys, writeMultipleTables);
664    // Set compression algorithms based on column families
665
666    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
667      serializeColumnFamilyAttribute(compressionDetails, tableDescriptors));
668    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
669      serializeColumnFamilyAttribute(blockSizeDetails, tableDescriptors));
670    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
671      serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors));
672    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
673      serializeColumnFamilyAttribute(bloomParamDetails, tableDescriptors));
674    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
675      serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
676
677    TableMapReduceUtil.addDependencyJars(job);
678    TableMapReduceUtil.initCredentials(job);
679    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
680  }
681
682  private static void mergeSerializations(Configuration conf) {
683    List<String> serializations = new ArrayList<>();
684
685    // add any existing values that have been set
686    String[] existing = conf.getStrings("io.serializations");
687    if (existing != null) {
688      Collections.addAll(serializations, existing);
689    }
690
691    serializations.add(MutationSerialization.class.getName());
692    serializations.add(ResultSerialization.class.getName());
693
694    // Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's
695    // SerializationFactory runs through serializations in the order they are registered.
696    // We want to register ExtendedCellSerialization before CellSerialization because both
697    // work for ExtendedCells but only ExtendedCellSerialization handles them properly.
698    if (
699      conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
700        EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT)
701    ) {
702      serializations.add(ExtendedCellSerialization.class.getName());
703    }
704    serializations.add(CellSerialization.class.getName());
705
706    conf.setStrings("io.serializations", serializations.toArray(new String[0]));
707  }
708
709  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor)
710    throws IOException {
711    Configuration conf = job.getConfiguration();
712
713    job.setOutputKeyClass(ImmutableBytesWritable.class);
714    job.setOutputValueClass(MapReduceExtendedCell.class);
715    job.setOutputFormatClass(HFileOutputFormat2.class);
716
717    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
718    singleTableDescriptor.add(tableDescriptor);
719
720    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
721    // Set compression algorithms based on column families
722    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
723      serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
724    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
725      serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
726    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
727      serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
728    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
729      serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
730    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
731      serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
732
733    TableMapReduceUtil.addDependencyJars(job);
734    TableMapReduceUtil.initCredentials(job);
735    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
736  }
737
738  /**
739   * Configure HBase cluster key for remote cluster to load region location for locality-sensitive
740   * if it's enabled. It's not necessary to call this method explicitly when the cluster key for
741   * HBase cluster to be used to load region location is configured in the job configuration. Call
742   * this method when another HBase cluster key is configured in the job configuration. For example,
743   * you should call when you load data from HBase cluster A using {@link TableInputFormat} and
744   * generate hfiles for HBase cluster B. Otherwise, HFileOutputFormat2 fetch location from cluster
745   * A and locality-sensitive won't working correctly.
746   * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using
747   * {@link Table#getConfiguration} as clusterConf. See HBASE-25608.
748   * @param job         which has configuration to be updated
749   * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive
750   * @see #configureIncrementalLoad(Job, Table, RegionLocator)
751   * @see #LOCALITY_SENSITIVE_CONF_KEY
752   * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY
753   * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY
754   * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY
755   */
756  public static void configureRemoteCluster(Job job, Configuration clusterConf) {
757    Configuration conf = job.getConfiguration();
758
759    if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
760      return;
761    }
762
763    final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM);
764    final int clientPort = clusterConf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
765      HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
766    final String parent =
767      clusterConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
768
769    conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum);
770    conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort);
771    conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent);
772
773    LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort
774      + "/" + parent);
775  }
776
777  /**
778   * Runs inside the task to deserialize column family to compression algorithm map from the
779   * configuration.
780   * @param conf to read the serialized values from
781   * @return a map from column family to the configured compression algorithm
782   */
783  @InterfaceAudience.Private
784  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) {
785    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY);
786    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
787    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
788      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
789      compressionMap.put(e.getKey(), algorithm);
790    }
791    return compressionMap;
792  }
793
794  /**
795   * Runs inside the task to deserialize column family to bloom filter type map from the
796   * configuration.
797   * @param conf to read the serialized values from
798   * @return a map from column family to the the configured bloom filter type
799   */
800  @InterfaceAudience.Private
801  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
802    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY);
803    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
804    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
805      BloomType bloomType = BloomType.valueOf(e.getValue());
806      bloomTypeMap.put(e.getKey(), bloomType);
807    }
808    return bloomTypeMap;
809  }
810
811  /**
812   * Runs inside the task to deserialize column family to bloom filter param map from the
813   * configuration.
814   * @param conf to read the serialized values from
815   * @return a map from column family to the the configured bloom filter param
816   */
817  @InterfaceAudience.Private
818  static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
819    return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
820  }
821
822  /**
823   * Runs inside the task to deserialize column family to block size map from the configuration.
824   * @param conf to read the serialized values from
825   * @return a map from column family to the configured block size
826   */
827  @InterfaceAudience.Private
828  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
829    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY);
830    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
831    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
832      Integer blockSize = Integer.parseInt(e.getValue());
833      blockSizeMap.put(e.getKey(), blockSize);
834    }
835    return blockSizeMap;
836  }
837
838  /**
839   * Runs inside the task to deserialize column family to data block encoding type map from the
840   * configuration.
841   * @param conf to read the serialized values from
842   * @return a map from column family to HFileDataBlockEncoder for the configured data block type
843   *         for the family
844   */
845  @InterfaceAudience.Private
846  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) {
847    Map<byte[], String> stringMap =
848      createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
849    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
850    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
851      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
852    }
853    return encoderMap;
854  }
855
856  /**
857   * Run inside the task to deserialize column family to given conf value map.
858   * @param conf     to read the serialized values from
859   * @param confName conf key to read from the configuration
860   * @return a map of column family to the given configuration value
861   */
862  private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
863    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
864    String confVal = conf.get(confName, "");
865    for (String familyConf : confVal.split("&")) {
866      String[] familySplit = familyConf.split("=");
867      if (familySplit.length != 2) {
868        continue;
869      }
870      try {
871        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
872          URLDecoder.decode(familySplit[1], "UTF-8"));
873      } catch (UnsupportedEncodingException e) {
874        // will not happen with UTF-8 encoding
875        throw new AssertionError(e);
876      }
877    }
878    return confValMap;
879  }
880
881  /**
882   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
883   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
884   */
885  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints,
886    boolean writeMultipleTables) throws IOException {
887    Configuration conf = job.getConfiguration();
888    // create the partitions file
889    FileSystem fs = FileSystem.get(conf);
890    String hbaseTmpFsDir =
891      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging");
892    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
893    fs.makeQualified(partitionsPath);
894    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
895    fs.deleteOnExit(partitionsPath);
896
897    // configure job to use it
898    job.setPartitionerClass(TotalOrderPartitioner.class);
899    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
900  }
901
902  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
903      value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
904  @InterfaceAudience.Private
905  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn,
906    List<TableDescriptor> allTables) throws UnsupportedEncodingException {
907    StringBuilder attributeValue = new StringBuilder();
908    int i = 0;
909    for (TableDescriptor tableDescriptor : allTables) {
910      if (tableDescriptor == null) {
911        // could happen with mock table instance
912        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
913        return "";
914      }
915      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
916        if (i++ > 0) {
917          attributeValue.append('&');
918        }
919        attributeValue.append(URLEncoder
920          .encode(Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(),
921            familyDescriptor.getName())), "UTF-8"));
922        attributeValue.append('=');
923        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
924      }
925    }
926    // Get rid of the last ampersand
927    return attributeValue.toString();
928  }
929
930  /**
931   * Serialize column family to compression algorithm map to configuration. Invoked while
932   * configuring the MR job for incremental load.
933   */
934  @InterfaceAudience.Private
935  static Function<ColumnFamilyDescriptor, String> compressionDetails =
936    familyDescriptor -> familyDescriptor.getCompressionType().getName();
937
938  /**
939   * Serialize column family to block size map to configuration. Invoked while configuring the MR
940   * job for incremental load.
941   */
942  @InterfaceAudience.Private
943  static Function<ColumnFamilyDescriptor, String> blockSizeDetails =
944    familyDescriptor -> String.valueOf(familyDescriptor.getBlocksize());
945
946  /**
947   * Serialize column family to bloom type map to configuration. Invoked while configuring the MR
948   * job for incremental load.
949   */
950  @InterfaceAudience.Private
951  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
952    String bloomType = familyDescriptor.getBloomFilterType().toString();
953    if (bloomType == null) {
954      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
955    }
956    return bloomType;
957  };
958
959  /**
960   * Serialize column family to bloom param map to configuration. Invoked while configuring the MR
961   * job for incremental load.
962   */
963  @InterfaceAudience.Private
964  static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
965    BloomType bloomType = familyDescriptor.getBloomFilterType();
966    String bloomParam = "";
967    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
968      bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
969    }
970    return bloomParam;
971  };
972
973  /**
974   * Serialize column family to data block encoding map to configuration. Invoked while configuring
975   * the MR job for incremental load.
976   */
977  @InterfaceAudience.Private
978  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
979    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
980    if (encoding == null) {
981      encoding = DataBlockEncoding.NONE;
982    }
983    return encoding.toString();
984  };
985
986}