001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.zstd;
019
020import com.github.luben.zstd.Zstd;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import java.nio.ByteOrder;
026import org.apache.hadoop.conf.Configurable;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.CommonConfigurationKeys;
029import org.apache.hadoop.hbase.io.compress.DictionaryCache;
030import org.apache.hadoop.io.compress.BlockCompressorStream;
031import org.apache.hadoop.io.compress.BlockDecompressorStream;
032import org.apache.hadoop.io.compress.CompressionCodec;
033import org.apache.hadoop.io.compress.CompressionInputStream;
034import org.apache.hadoop.io.compress.CompressionOutputStream;
035import org.apache.hadoop.io.compress.Compressor;
036import org.apache.hadoop.io.compress.Decompressor;
037import org.apache.yetus.audience.InterfaceAudience;
038
039/**
040 * Hadoop ZStandard codec implemented with zstd-jni.
041 * <p>
042 * This is data format compatible with Hadoop's native ZStandard codec.
043 */
044@InterfaceAudience.Private
045public class ZstdCodec implements Configurable, CompressionCodec {
046
047  public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
048  public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
049  public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
050  public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
051
052  private Configuration conf;
053
054  public ZstdCodec() {
055    conf = new Configuration();
056  }
057
058  @Override
059  public Configuration getConf() {
060    return conf;
061  }
062
063  @Override
064  public void setConf(Configuration conf) {
065    this.conf = conf;
066  }
067
068  @Override
069  public Compressor createCompressor() {
070    return new ZstdCompressor(getLevel(conf), getBufferSize(conf), getDictionary(conf));
071  }
072
073  @Override
074  public Decompressor createDecompressor() {
075    return new ZstdDecompressor(getBufferSize(conf), getDictionary(conf));
076  }
077
078  @Override
079  public CompressionInputStream createInputStream(InputStream in) throws IOException {
080    return createInputStream(in, createDecompressor());
081  }
082
083  @Override
084  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
085    throws IOException {
086    return new BlockDecompressorStream(in, d, getBufferSize(conf));
087  }
088
089  @Override
090  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
091    return createOutputStream(out, createCompressor());
092  }
093
094  @Override
095  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
096    throws IOException {
097    int bufferSize = getBufferSize(conf);
098    return new BlockCompressorStream(out, c, bufferSize,
099      (int) Zstd.compressBound(bufferSize) - bufferSize); // overhead only
100  }
101
102  @Override
103  public Class<? extends Compressor> getCompressorType() {
104    return ZstdCompressor.class;
105  }
106
107  @Override
108  public Class<? extends Decompressor> getDecompressorType() {
109    return ZstdDecompressor.class;
110  }
111
112  @Override
113  public String getDefaultExtension() {
114    return ".zst";
115  }
116
117  // Package private
118
119  static int getLevel(Configuration conf) {
120    return conf.getInt(ZSTD_LEVEL_KEY,
121      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY,
122        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT));
123  }
124
125  static int getBufferSize(Configuration conf) {
126    return conf.getInt(ZSTD_BUFFER_SIZE_KEY,
127      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
128        // IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that.
129        ZSTD_BUFFER_SIZE_DEFAULT));
130  }
131
132  static byte[] getDictionary(final Configuration conf) {
133    String path = conf.get(ZSTD_DICTIONARY_KEY);
134    try {
135      return DictionaryCache.getDictionary(conf, path);
136    } catch (IOException e) {
137      throw new RuntimeException("Unable to load dictionary at " + path, e);
138    }
139  }
140
141  // Zstandard dictionaries begin with a 32-bit magic number, 0xEC30A437 in little-endian
142  // format, followed by a 32-bit identifier also in little-endian format.
143  // Reference: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md
144
145  static boolean isDictionary(byte[] dictionary) {
146    return (dictionary[0] == (byte) 0x37 && dictionary[1] == (byte) 0xA4
147      && dictionary[2] == (byte) 0x30 && dictionary[3] == (byte) 0xEC);
148  }
149
150  static int getDictionaryId(byte[] dictionary) {
151    if (!isDictionary(dictionary)) {
152      throw new IllegalArgumentException("Not a ZStandard dictionary");
153    }
154    return ByteBuffer.wrap(dictionary, 4, 4).order(ByteOrder.LITTLE_ENDIAN).getInt();
155  }
156
157}