001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.zstd;
019
020import com.github.luben.zstd.Zstd;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import java.nio.ByteOrder;
026import org.apache.hadoop.conf.Configurable;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.CommonConfigurationKeys;
029import org.apache.hadoop.hbase.io.compress.DictionaryCache;
030import org.apache.hadoop.io.compress.BlockCompressorStream;
031import org.apache.hadoop.io.compress.BlockDecompressorStream;
032import org.apache.hadoop.io.compress.CompressionCodec;
033import org.apache.hadoop.io.compress.CompressionInputStream;
034import org.apache.hadoop.io.compress.CompressionOutputStream;
035import org.apache.hadoop.io.compress.Compressor;
036import org.apache.hadoop.io.compress.Decompressor;
037import org.apache.yetus.audience.InterfaceAudience;
038
039/**
040 * Hadoop ZStandard codec implemented with zstd-jni.
041 * <p>
042 * This is data format compatible with Hadoop's native ZStandard codec.
043 */
044@InterfaceAudience.Private
045public class ZstdCodec implements Configurable, CompressionCodec {
046
047  public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
048  public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
049  public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
050  public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
051
052  private Configuration conf;
053  private int bufferSize;
054  private int level;
055  private byte[] dictionary;
056
057  public ZstdCodec() {
058    conf = new Configuration();
059    init();
060  }
061
062  @Override
063  public Configuration getConf() {
064    return conf;
065  }
066
067  @Override
068  public void setConf(Configuration conf) {
069    this.conf = conf;
070    init();
071  }
072
073  @Override
074  public Compressor createCompressor() {
075    return new ZstdCompressor(level, bufferSize, dictionary);
076  }
077
078  @Override
079  public Decompressor createDecompressor() {
080    return new ZstdDecompressor(bufferSize, dictionary);
081  }
082
083  @Override
084  public CompressionInputStream createInputStream(InputStream in) throws IOException {
085    return createInputStream(in, createDecompressor());
086  }
087
088  @Override
089  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
090    throws IOException {
091    return new BlockDecompressorStream(in, d, bufferSize);
092  }
093
094  @Override
095  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
096    return createOutputStream(out, createCompressor());
097  }
098
099  @Override
100  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
101    throws IOException {
102    return new BlockCompressorStream(out, c, bufferSize,
103      (int) Zstd.compressBound(bufferSize) - bufferSize); // overhead only
104  }
105
106  @Override
107  public Class<? extends Compressor> getCompressorType() {
108    return ZstdCompressor.class;
109  }
110
111  @Override
112  public Class<? extends Decompressor> getDecompressorType() {
113    return ZstdDecompressor.class;
114  }
115
116  @Override
117  public String getDefaultExtension() {
118    return ".zst";
119  }
120
121  // Package private
122
123  static int getLevel(Configuration conf) {
124    return conf.getInt(ZSTD_LEVEL_KEY,
125      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY,
126        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT));
127  }
128
129  static int getBufferSize(Configuration conf) {
130    return conf.getInt(ZSTD_BUFFER_SIZE_KEY,
131      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
132        // IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that.
133        ZSTD_BUFFER_SIZE_DEFAULT));
134  }
135
136  static byte[] getDictionary(final Configuration conf) {
137    String path = conf.get(ZSTD_DICTIONARY_KEY);
138    try {
139      return DictionaryCache.getDictionary(conf, path);
140    } catch (IOException e) {
141      throw new RuntimeException("Unable to load dictionary at " + path, e);
142    }
143  }
144
145  // Zstandard dictionaries begin with a 32-bit magic number, 0xEC30A437 in little-endian
146  // format, followed by a 32-bit identifier also in little-endian format.
147  // Reference: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md
148
149  static boolean isDictionary(byte[] dictionary) {
150    return (dictionary[0] == (byte) 0x37 && dictionary[1] == (byte) 0xA4
151      && dictionary[2] == (byte) 0x30 && dictionary[3] == (byte) 0xEC);
152  }
153
154  static int getDictionaryId(byte[] dictionary) {
155    if (!isDictionary(dictionary)) {
156      throw new IllegalArgumentException("Not a ZStandard dictionary");
157    }
158    return ByteBuffer.wrap(dictionary, 4, 4).order(ByteOrder.LITTLE_ENDIAN).getInt();
159  }
160
161  private void init() {
162    this.bufferSize = getBufferSize(conf);
163    this.level = getLevel(conf);
164    this.dictionary = getDictionary(conf);
165  }
166}