001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024
025import java.io.IOException;
026import java.io.UnsupportedEncodingException;
027import java.net.InetSocketAddress;
028import java.net.URLDecoder;
029import java.net.URLEncoder;
030import java.nio.charset.Charset;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Set;
038import java.util.TreeMap;
039import java.util.TreeSet;
040import java.util.UUID;
041import java.util.function.Function;
042import java.util.stream.Collectors;
043import org.apache.commons.lang3.StringUtils;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.Cell;
048import org.apache.hadoop.hbase.CellUtil;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.HRegionLocation;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.PrivateCellUtil;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.Put;
059import org.apache.hadoop.hbase.client.RegionLocator;
060import org.apache.hadoop.hbase.client.Table;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.fs.HFileSystem;
063import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
064import org.apache.hadoop.hbase.io.compress.Compression;
065import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
066import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
067import org.apache.hadoop.hbase.io.hfile.CacheConfig;
068import org.apache.hadoop.hbase.io.hfile.HFile;
069import org.apache.hadoop.hbase.io.hfile.HFileContext;
070import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
071import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
072import org.apache.hadoop.hbase.regionserver.BloomType;
073import org.apache.hadoop.hbase.regionserver.HStore;
074import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
075import org.apache.hadoop.hbase.regionserver.StoreUtils;
076import org.apache.hadoop.hbase.util.BloomFilterUtil;
077import org.apache.hadoop.hbase.util.Bytes;
078import org.apache.hadoop.hbase.util.CommonFSUtils;
079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
080import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
081import org.apache.hadoop.io.NullWritable;
082import org.apache.hadoop.io.SequenceFile;
083import org.apache.hadoop.io.Text;
084import org.apache.hadoop.mapreduce.Job;
085import org.apache.hadoop.mapreduce.OutputCommitter;
086import org.apache.hadoop.mapreduce.OutputFormat;
087import org.apache.hadoop.mapreduce.RecordWriter;
088import org.apache.hadoop.mapreduce.TaskAttemptContext;
089import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
090import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
091import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
092import org.apache.yetus.audience.InterfaceAudience;
093import org.slf4j.Logger;
094import org.slf4j.LoggerFactory;
095
096/**
097 * Writes HFiles. Passed Cells must arrive in order. Writes current time as the sequence id for the
098 * file. Sets the major compacted attribute on created {@link HFile}s. Calling write(null,null) will
099 * forcibly roll all HFiles being written.
100 * <p>
101 * Using this class as part of a MapReduce job is best done using
102 * {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
103 */
104@InterfaceAudience.Public
105public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, Cell> {
106  private static final Logger LOG = LoggerFactory.getLogger(HFileOutputFormat2.class);
107
108  static class TableInfo {
109    private TableDescriptor tableDesctiptor;
110    private RegionLocator regionLocator;
111
112    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
113      this.tableDesctiptor = tableDesctiptor;
114      this.regionLocator = regionLocator;
115    }
116
117    public TableDescriptor getTableDescriptor() {
118      return tableDesctiptor;
119    }
120
121    public RegionLocator getRegionLocator() {
122      return regionLocator;
123    }
124  }
125
126  protected static final byte[] tableSeparator = Bytes.toBytes(";");
127
128  protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
129    return Bytes.add(tableName, tableSeparator, suffix);
130  }
131
132  // The following constants are private since these are used by
133  // HFileOutputFormat2 to internally transfer data between job setup and
134  // reducer run using conf.
135  // These should not be changed by the client.
136  static final String COMPRESSION_FAMILIES_CONF_KEY =
137    "hbase.hfileoutputformat.families.compression";
138  static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
139  static final String BLOOM_PARAM_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomparam";
140  static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize";
141  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
142    "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
143
144  // This constant is public since the client can modify this when setting
145  // up their conf object and thus refer to this symbol.
146  // It is present for backwards compatibility reasons. Use it only to
147  // override the auto-detection of datablock encoding and compression.
148  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
149    "hbase.mapreduce.hfileoutputformat.datablock.encoding";
150  public static final String COMPRESSION_OVERRIDE_CONF_KEY =
151    "hbase.mapreduce.hfileoutputformat.compression";
152
153  /**
154   * Keep locality while generating HFiles for bulkload. See HBASE-12596
155   */
156  public static final String LOCALITY_SENSITIVE_CONF_KEY =
157    "hbase.bulkload.locality.sensitive.enabled";
158  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
159  static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name";
160  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
161    "hbase.mapreduce.use.multi.table.hfileoutputformat";
162
163  /**
164   * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config
165   * for internal usage in jobs like WALPlayer which need to use features of ExtendedCell.
166   */
167  @InterfaceAudience.Private
168  public static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
169    "hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
170  static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
171
172  public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster.";
173  public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
174    REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
175  public static final String REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY =
176    REMOTE_CLUSTER_CONF_PREFIX + "zookeeper." + HConstants.CLIENT_PORT_STR;
177  public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY =
178    REMOTE_CLUSTER_CONF_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT;
179
180  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
181  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
182
183  @Override
184  public RecordWriter<ImmutableBytesWritable, Cell>
185    getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
186    return createRecordWriter(context, this.getOutputCommitter(context));
187  }
188
189  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
190    return combineTableNameSuffix(tableName, family);
191  }
192
193  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(
194    final TaskAttemptContext context, final OutputCommitter committer) throws IOException {
195
196    // Get the path of the temporary output file
197    final Path outputDir = ((FileOutputCommitter) committer).getWorkPath();
198    final Configuration conf = context.getConfiguration();
199    final boolean writeMultipleTables =
200      conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
201    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
202    if (writeTableNames == null || writeTableNames.isEmpty()) {
203      throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty");
204    }
205    final FileSystem fs = outputDir.getFileSystem(conf);
206    // These configs. are from hbase-*.xml
207    final long maxsize =
208      conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
209    // Invented config. Add to hbase-*.xml if other than default compression.
210    final String defaultCompressionStr =
211      conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
212    final Algorithm defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
213    String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
214    final Algorithm overriddenCompression =
215      compressionStr != null ? Compression.getCompressionAlgorithmByName(compressionStr) : null;
216    final boolean compactionExclude =
217      conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
218    final Set<String> allTableNames = Arrays
219      .stream(writeTableNames.split(Bytes.toString(tableSeparator))).collect(Collectors.toSet());
220
221    // create a map from column family to the compression algorithm
222    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
223    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
224    final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
225    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
226
227    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
228    final Map<byte[], DataBlockEncoding> datablockEncodingMap =
229      createFamilyDataBlockEncodingMap(conf);
230    final DataBlockEncoding overriddenEncoding =
231      dataBlockEncodingStr != null ? DataBlockEncoding.valueOf(dataBlockEncodingStr) : null;
232
233    return new RecordWriter<ImmutableBytesWritable, V>() {
234      // Map of families to writers and how much has been output on the writer.
235      private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
236      private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR);
237      private final long now = EnvironmentEdgeManager.currentTime();
238      private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames);
239
240      @Override
241      public void write(ImmutableBytesWritable row, V cell) throws IOException {
242        Cell kv = cell;
243        // null input == user explicitly wants to flush
244        if (row == null && kv == null) {
245          rollWriters(null);
246          return;
247        }
248
249        byte[] rowKey = CellUtil.cloneRow(kv);
250        int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
251        byte[] family = CellUtil.cloneFamily(kv);
252        if (writeMultipleTables) {
253          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
254          tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString()
255            .getBytes(Charset.defaultCharset());
256          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
257            throw new IllegalArgumentException(
258              "TableName " + Bytes.toString(tableNameBytes) + " not expected");
259          }
260        }
261        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
262
263        WriterLength wl = this.writers.get(tableAndFamily);
264
265        // If this is a new column family, verify that the directory exists
266        if (wl == null) {
267          Path writerPath = null;
268          if (writeMultipleTables) {
269            Path tableRelPath = getTableRelativePath(tableNameBytes);
270            writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family)));
271          } else {
272            writerPath = new Path(outputDir, Bytes.toString(family));
273          }
274          fs.mkdirs(writerPath);
275          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
276        }
277
278        // This can only happen once a row is finished though
279        if (
280          wl != null && wl.written + length >= maxsize
281            && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0
282        ) {
283          rollWriters(wl);
284        }
285
286        // create a new WAL writer, if necessary
287        if (wl == null || wl.writer == null) {
288          InetSocketAddress[] favoredNodes = null;
289          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
290            HRegionLocation loc = null;
291            String tableName = Bytes.toString(tableNameBytes);
292            if (tableName != null) {
293              try (
294                Connection connection =
295                  ConnectionFactory.createConnection(createRemoteClusterConf(conf));
296                RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
297                loc = locator.getRegionLocation(rowKey);
298              } catch (Throwable e) {
299                LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey),
300                  tableName, e);
301                loc = null;
302              }
303            }
304            if (null == loc) {
305              LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
306            } else {
307              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
308              InetSocketAddress initialIsa =
309                new InetSocketAddress(loc.getHostname(), loc.getPort());
310              if (initialIsa.isUnresolved()) {
311                LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
312              } else {
313                LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
314                favoredNodes = new InetSocketAddress[] { initialIsa };
315              }
316            }
317          }
318          wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);
319
320        }
321
322        // we now have the proper WAL writer. full steam ahead
323        PrivateCellUtil.updateLatestStamp(cell, this.now);
324        wl.writer.append(kv);
325        wl.written += length;
326
327        // Copy the row so we know when a row transition.
328        this.previousRows.put(family, rowKey);
329      }
330
331      private Path getTableRelativePath(byte[] tableNameBytes) {
332        String tableName = Bytes.toString(tableNameBytes);
333        String[] tableNameParts = tableName.split(":");
334        Path tableRelPath = new Path(tableNameParts[0]);
335        if (tableNameParts.length > 1) {
336          tableRelPath = new Path(tableRelPath, tableNameParts[1]);
337        }
338        return tableRelPath;
339      }
340
341      private void rollWriters(WriterLength writerLength) throws IOException {
342        if (writerLength != null) {
343          closeWriter(writerLength);
344        } else {
345          for (WriterLength wl : this.writers.values()) {
346            closeWriter(wl);
347          }
348        }
349      }
350
351      private void closeWriter(WriterLength wl) throws IOException {
352        if (wl.writer != null) {
353          LOG.info(
354            "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
355          close(wl.writer);
356          wl.writer = null;
357        }
358        wl.written = 0;
359      }
360
361      private Configuration createRemoteClusterConf(Configuration conf) {
362        final Configuration newConf = new Configuration(conf);
363
364        final String quorum = conf.get(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY);
365        final String clientPort = conf.get(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY);
366        final String parent = conf.get(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY);
367
368        if (quorum != null && clientPort != null && parent != null) {
369          newConf.set(HConstants.ZOOKEEPER_QUORUM, quorum);
370          newConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(clientPort));
371          newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parent);
372        }
373
374        for (Entry<String, String> entry : conf) {
375          String key = entry.getKey();
376          if (
377            REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY.equals(key)
378              || REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY.equals(key)
379              || REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY.equals(key)
380          ) {
381            // Handled them above
382            continue;
383          }
384
385          if (entry.getKey().startsWith(REMOTE_CLUSTER_CONF_PREFIX)) {
386            String originalKey = entry.getKey().substring(REMOTE_CLUSTER_CONF_PREFIX.length());
387            if (!originalKey.isEmpty()) {
388              newConf.set(originalKey, entry.getValue());
389            }
390          }
391        }
392
393        return newConf;
394      }
395
396      /*
397       * Create a new StoreFile.Writer.
398       * @return A WriterLength, containing a new StoreFile.Writer.
399       */
400      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
401          justification = "Not important")
402      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration conf,
403        InetSocketAddress[] favoredNodes) throws IOException {
404        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
405        Path familydir = new Path(outputDir, Bytes.toString(family));
406        if (writeMultipleTables) {
407          familydir =
408            new Path(outputDir, new Path(getTableRelativePath(tableName), Bytes.toString(family)));
409        }
410        WriterLength wl = new WriterLength();
411        Algorithm compression = overriddenCompression;
412        compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
413        compression = compression == null ? defaultCompression : compression;
414        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
415        bloomType = bloomType == null ? BloomType.NONE : bloomType;
416        String bloomParam = bloomParamMap.get(tableAndFamily);
417        if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
418          conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
419        }
420        Integer blockSize = blockSizeMap.get(tableAndFamily);
421        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
422        DataBlockEncoding encoding = overriddenEncoding;
423        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
424        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
425        HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
426          .withDataBlockEncoding(encoding).withChecksumType(StoreUtils.getChecksumType(conf))
427          .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blockSize)
428          .withColumnFamily(family).withTableName(tableName)
429          .withCreateTime(EnvironmentEdgeManager.currentTime());
430
431        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
432          contextBuilder.withIncludesTags(true);
433        }
434
435        HFileContext hFileContext = contextBuilder.build();
436        if (null == favoredNodes) {
437          wl.writer =
438            new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs).withOutputDir(familydir)
439              .withBloomType(bloomType).withFileContext(hFileContext).build();
440        } else {
441          wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, new HFileSystem(fs))
442            .withOutputDir(familydir).withBloomType(bloomType).withFileContext(hFileContext)
443            .withFavoredNodes(favoredNodes).build();
444        }
445
446        this.writers.put(tableAndFamily, wl);
447        return wl;
448      }
449
450      private void close(final StoreFileWriter w) throws IOException {
451        if (w != null) {
452          w.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
453          w.appendFileInfo(BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
454          w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
455          w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
456          w.appendTrackedTimestampsToMetadata();
457          w.close();
458        }
459      }
460
461      @Override
462      public void close(TaskAttemptContext c) throws IOException, InterruptedException {
463        for (WriterLength wl : this.writers.values()) {
464          close(wl.writer);
465        }
466      }
467    };
468  }
469
470  /**
471   * Configure block storage policy for CF after the directory is created.
472   */
473  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
474    byte[] tableAndFamily, Path cfPath) {
475    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
476      return;
477    }
478
479    String policy = conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
480      conf.get(STORAGE_POLICY_PROPERTY));
481    CommonFSUtils.setStoragePolicy(fs, cfPath, policy);
482  }
483
484  /*
485   * Data structure to hold a Writer and amount of data written on it.
486   */
487  static class WriterLength {
488    long written = 0;
489    StoreFileWriter writer = null;
490  }
491
492  /**
493   * Return the start keys of all of the regions in this table, as a list of ImmutableBytesWritable.
494   */
495  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
496    boolean writeMultipleTables) throws IOException {
497
498    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
499    for (RegionLocator regionLocator : regionLocators) {
500      TableName tableName = regionLocator.getName();
501      LOG.info("Looking up current regions for table " + tableName);
502      byte[][] byteKeys = regionLocator.getStartKeys();
503      for (byte[] byteKey : byteKeys) {
504        byte[] fullKey = byteKey; // HFileOutputFormat2 use case
505        if (writeMultipleTables) {
506          // MultiTableHFileOutputFormat use case
507          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
508        }
509        if (LOG.isDebugEnabled()) {
510          LOG.debug("SplitPoint startkey for " + tableName + ": " + Bytes.toStringBinary(fullKey));
511        }
512        ret.add(new ImmutableBytesWritable(fullKey));
513      }
514    }
515    return ret;
516  }
517
518  /**
519   * Write out a {@link SequenceFile} that can be read by {@link TotalOrderPartitioner} that
520   * contains the split points in startKeys.
521   */
522  @SuppressWarnings("deprecation")
523  private static void writePartitions(Configuration conf, Path partitionsPath,
524    List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
525    LOG.info("Writing partition information to " + partitionsPath);
526    if (startKeys.isEmpty()) {
527      throw new IllegalArgumentException("No regions passed");
528    }
529
530    // We're generating a list of split points, and we don't ever
531    // have keys < the first region (which has an empty start key)
532    // so we need to remove it. Otherwise we would end up with an
533    // empty reducer with index 0
534    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
535    ImmutableBytesWritable first = sorted.first();
536    if (writeMultipleTables) {
537      first =
538        new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first().get()));
539    }
540    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
541      throw new IllegalArgumentException(
542        "First region of table should have empty start key. Instead has: "
543          + Bytes.toStringBinary(first.get()));
544    }
545    sorted.remove(sorted.first());
546
547    // Write the actual file
548    FileSystem fs = partitionsPath.getFileSystem(conf);
549    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath,
550      ImmutableBytesWritable.class, NullWritable.class);
551
552    try {
553      for (ImmutableBytesWritable startKey : sorted) {
554        writer.append(startKey, NullWritable.get());
555      }
556    } finally {
557      writer.close();
558    }
559  }
560
561  /**
562   * Configure a MapReduce Job to perform an incremental load into the given table. This
563   * <ul>
564   * <li>Inspects the table to configure a total order partitioner</li>
565   * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
566   * <li>Sets the number of reduce tasks to match the current number of regions</li>
567   * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
568   * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
569   * PutSortReducer)</li>
570   * <li>Sets the HBase cluster key to load region locations for locality-sensitive</li>
571   * </ul>
572   * The user should be sure to set the map output value class to either KeyValue or Put before
573   * running this function.
574   */
575  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
576    throws IOException {
577    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
578    configureRemoteCluster(job, table.getConfiguration());
579  }
580
581  /**
582   * Configure a MapReduce Job to perform an incremental load into the given table. This
583   * <ul>
584   * <li>Inspects the table to configure a total order partitioner</li>
585   * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
586   * <li>Sets the number of reduce tasks to match the current number of regions</li>
587   * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
588   * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
589   * PutSortReducer)</li>
590   * </ul>
591   * The user should be sure to set the map output value class to either KeyValue or Put before
592   * running this function.
593   */
594  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
595    RegionLocator regionLocator) throws IOException {
596    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
597    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
598    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
599  }
600
601  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
602    Class<? extends OutputFormat<?, ?>> cls) throws IOException {
603    Configuration conf = job.getConfiguration();
604    job.setOutputKeyClass(ImmutableBytesWritable.class);
605    job.setOutputValueClass(MapReduceExtendedCell.class);
606    job.setOutputFormatClass(cls);
607
608    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
609      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
610    }
611    boolean writeMultipleTables = false;
612    if (MultiTableHFileOutputFormat.class.equals(cls)) {
613      writeMultipleTables = true;
614      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
615    }
616    // Based on the configured map output class, set the correct reducer to properly
617    // sort the incoming values.
618    // TODO it would be nice to pick one or the other of these formats.
619    if (
620      KeyValue.class.equals(job.getMapOutputValueClass())
621        || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())
622    ) {
623      job.setReducerClass(CellSortReducer.class);
624    } else if (Put.class.equals(job.getMapOutputValueClass())) {
625      job.setReducerClass(PutSortReducer.class);
626    } else if (Text.class.equals(job.getMapOutputValueClass())) {
627      job.setReducerClass(TextSortReducer.class);
628    } else {
629      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
630    }
631
632    mergeSerializations(conf);
633
634    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
635      LOG.info("bulkload locality sensitive enabled");
636    }
637
638    /* Now get the region start keys for every table required */
639    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
640    List<RegionLocator> regionLocators = new ArrayList<>(multiTableInfo.size());
641    List<TableDescriptor> tableDescriptors = new ArrayList<>(multiTableInfo.size());
642
643    for (TableInfo tableInfo : multiTableInfo) {
644      regionLocators.add(tableInfo.getRegionLocator());
645      String tn = writeMultipleTables
646        ? tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString()
647        : tableInfo.getRegionLocator().getName().getNameAsString();
648      allTableNames.add(tn);
649      tableDescriptors.add(tableInfo.getTableDescriptor());
650    }
651    // Record tablenames for creating writer by favored nodes, and decoding compression,
652    // block size and other attributes of columnfamily per table
653    conf.set(OUTPUT_TABLE_NAME_CONF_KEY,
654      StringUtils.join(allTableNames, Bytes.toString(tableSeparator)));
655    List<ImmutableBytesWritable> startKeys =
656      getRegionStartKeys(regionLocators, writeMultipleTables);
657    // Use table's region boundaries for TOP split points.
658    LOG.info("Configuring " + startKeys.size() + " reduce partitions "
659      + "to match current region count for all tables");
660    job.setNumReduceTasks(startKeys.size());
661
662    configurePartitioner(job, startKeys, writeMultipleTables);
663    // Set compression algorithms based on column families
664
665    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
666      serializeColumnFamilyAttribute(compressionDetails, tableDescriptors));
667    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
668      serializeColumnFamilyAttribute(blockSizeDetails, tableDescriptors));
669    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
670      serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors));
671    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
672      serializeColumnFamilyAttribute(bloomParamDetails, tableDescriptors));
673    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
674      serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
675
676    TableMapReduceUtil.addDependencyJars(job);
677    TableMapReduceUtil.initCredentials(job);
678    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
679  }
680
681  private static void mergeSerializations(Configuration conf) {
682    List<String> serializations = new ArrayList<>();
683
684    // add any existing values that have been set
685    String[] existing = conf.getStrings("io.serializations");
686    if (existing != null) {
687      Collections.addAll(serializations, existing);
688    }
689
690    serializations.add(MutationSerialization.class.getName());
691    serializations.add(ResultSerialization.class.getName());
692
693    // Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's
694    // SerializationFactory runs through serializations in the order they are registered.
695    // We want to register ExtendedCellSerialization before CellSerialization because both
696    // work for ExtendedCells but only ExtendedCellSerialization handles them properly.
697    if (
698      conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
699        EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT)
700    ) {
701      serializations.add(ExtendedCellSerialization.class.getName());
702    }
703    serializations.add(CellSerialization.class.getName());
704
705    conf.setStrings("io.serializations", serializations.toArray(new String[0]));
706  }
707
708  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor)
709    throws IOException {
710    Configuration conf = job.getConfiguration();
711
712    job.setOutputKeyClass(ImmutableBytesWritable.class);
713    job.setOutputValueClass(MapReduceExtendedCell.class);
714    job.setOutputFormatClass(HFileOutputFormat2.class);
715
716    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
717    singleTableDescriptor.add(tableDescriptor);
718
719    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
720    // Set compression algorithms based on column families
721    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
722      serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
723    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
724      serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
725    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
726      serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
727    conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
728      serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
729    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
730      serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
731
732    TableMapReduceUtil.addDependencyJars(job);
733    TableMapReduceUtil.initCredentials(job);
734    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
735  }
736
737  /**
738   * Configure HBase cluster key for remote cluster to load region location for locality-sensitive
739   * if it's enabled. It's not necessary to call this method explicitly when the cluster key for
740   * HBase cluster to be used to load region location is configured in the job configuration. Call
741   * this method when another HBase cluster key is configured in the job configuration. For example,
742   * you should call when you load data from HBase cluster A using {@link TableInputFormat} and
743   * generate hfiles for HBase cluster B. Otherwise, HFileOutputFormat2 fetch location from cluster
744   * A and locality-sensitive won't working correctly.
745   * {@link #configureIncrementalLoad(Job, Table, RegionLocator)} calls this method using
746   * {@link Table#getConfiguration} as clusterConf. See HBASE-25608.
747   * @param job         which has configuration to be updated
748   * @param clusterConf which contains cluster key of the HBase cluster to be locality-sensitive
749   * @see #configureIncrementalLoad(Job, Table, RegionLocator)
750   * @see #LOCALITY_SENSITIVE_CONF_KEY
751   * @see #REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY
752   * @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY
753   * @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY
754   */
755  public static void configureRemoteCluster(Job job, Configuration clusterConf) {
756    Configuration conf = job.getConfiguration();
757
758    if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
759      return;
760    }
761
762    final String quorum = clusterConf.get(HConstants.ZOOKEEPER_QUORUM);
763    final int clientPort = clusterConf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
764      HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
765    final String parent =
766      clusterConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
767
768    conf.set(REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY, quorum);
769    conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort);
770    conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent);
771
772    LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort
773      + "/" + parent);
774  }
775
776  /**
777   * Runs inside the task to deserialize column family to compression algorithm map from the
778   * configuration.
779   * @param conf to read the serialized values from
780   * @return a map from column family to the configured compression algorithm
781   */
782  @InterfaceAudience.Private
783  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) {
784    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY);
785    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
786    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
787      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
788      compressionMap.put(e.getKey(), algorithm);
789    }
790    return compressionMap;
791  }
792
793  /**
794   * Runs inside the task to deserialize column family to bloom filter type map from the
795   * configuration.
796   * @param conf to read the serialized values from
797   * @return a map from column family to the the configured bloom filter type
798   */
799  @InterfaceAudience.Private
800  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
801    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY);
802    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
803    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
804      BloomType bloomType = BloomType.valueOf(e.getValue());
805      bloomTypeMap.put(e.getKey(), bloomType);
806    }
807    return bloomTypeMap;
808  }
809
810  /**
811   * Runs inside the task to deserialize column family to bloom filter param map from the
812   * configuration.
813   * @param conf to read the serialized values from
814   * @return a map from column family to the the configured bloom filter param
815   */
816  @InterfaceAudience.Private
817  static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
818    return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
819  }
820
821  /**
822   * Runs inside the task to deserialize column family to block size map from the configuration.
823   * @param conf to read the serialized values from
824   * @return a map from column family to the configured block size
825   */
826  @InterfaceAudience.Private
827  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
828    Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY);
829    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
830    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
831      Integer blockSize = Integer.parseInt(e.getValue());
832      blockSizeMap.put(e.getKey(), blockSize);
833    }
834    return blockSizeMap;
835  }
836
837  /**
838   * Runs inside the task to deserialize column family to data block encoding type map from the
839   * configuration.
840   * @param conf to read the serialized values from
841   * @return a map from column family to HFileDataBlockEncoder for the configured data block type
842   *         for the family
843   */
844  @InterfaceAudience.Private
845  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) {
846    Map<byte[], String> stringMap =
847      createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
848    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
849    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
850      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
851    }
852    return encoderMap;
853  }
854
855  /**
856   * Run inside the task to deserialize column family to given conf value map.
857   * @param conf     to read the serialized values from
858   * @param confName conf key to read from the configuration
859   * @return a map of column family to the given configuration value
860   */
861  private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
862    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
863    String confVal = conf.get(confName, "");
864    for (String familyConf : confVal.split("&")) {
865      String[] familySplit = familyConf.split("=");
866      if (familySplit.length != 2) {
867        continue;
868      }
869      try {
870        confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0], "UTF-8")),
871          URLDecoder.decode(familySplit[1], "UTF-8"));
872      } catch (UnsupportedEncodingException e) {
873        // will not happen with UTF-8 encoding
874        throw new AssertionError(e);
875      }
876    }
877    return confValMap;
878  }
879
880  /**
881   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
882   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
883   */
884  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints,
885    boolean writeMultipleTables) throws IOException {
886    Configuration conf = job.getConfiguration();
887    // create the partitions file
888    FileSystem fs = FileSystem.get(conf);
889    String hbaseTmpFsDir =
890      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging");
891    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
892    fs.makeQualified(partitionsPath);
893    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
894    fs.deleteOnExit(partitionsPath);
895
896    // configure job to use it
897    job.setPartitionerClass(TotalOrderPartitioner.class);
898    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
899  }
900
901  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
902      value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
903  @InterfaceAudience.Private
904  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn,
905    List<TableDescriptor> allTables) throws UnsupportedEncodingException {
906    StringBuilder attributeValue = new StringBuilder();
907    int i = 0;
908    for (TableDescriptor tableDescriptor : allTables) {
909      if (tableDescriptor == null) {
910        // could happen with mock table instance
911        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
912        return "";
913      }
914      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
915        if (i++ > 0) {
916          attributeValue.append('&');
917        }
918        attributeValue.append(URLEncoder
919          .encode(Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(),
920            familyDescriptor.getName())), "UTF-8"));
921        attributeValue.append('=');
922        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
923      }
924    }
925    // Get rid of the last ampersand
926    return attributeValue.toString();
927  }
928
929  /**
930   * Serialize column family to compression algorithm map to configuration. Invoked while
931   * configuring the MR job for incremental load.
932   */
933  @InterfaceAudience.Private
934  static Function<ColumnFamilyDescriptor, String> compressionDetails =
935    familyDescriptor -> familyDescriptor.getCompressionType().getName();
936
937  /**
938   * Serialize column family to block size map to configuration. Invoked while configuring the MR
939   * job for incremental load.
940   */
941  @InterfaceAudience.Private
942  static Function<ColumnFamilyDescriptor, String> blockSizeDetails =
943    familyDescriptor -> String.valueOf(familyDescriptor.getBlocksize());
944
945  /**
946   * Serialize column family to bloom type map to configuration. Invoked while configuring the MR
947   * job for incremental load.
948   */
949  @InterfaceAudience.Private
950  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
951    String bloomType = familyDescriptor.getBloomFilterType().toString();
952    if (bloomType == null) {
953      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
954    }
955    return bloomType;
956  };
957
958  /**
959   * Serialize column family to bloom param map to configuration. Invoked while configuring the MR
960   * job for incremental load.
961   */
962  @InterfaceAudience.Private
963  static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
964    BloomType bloomType = familyDescriptor.getBloomFilterType();
965    String bloomParam = "";
966    if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
967      bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
968    }
969    return bloomParam;
970  };
971
972  /**
973   * Serialize column family to data block encoding map to configuration. Invoked while configuring
974   * the MR job for incremental load.
975   */
976  @InterfaceAudience.Private
977  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
978    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
979    if (encoding == null) {
980      encoding = DataBlockEncoding.NONE;
981    }
982    return encoding.toString();
983  };
984
985}