001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.zstd;
019
020import com.github.luben.zstd.Zstd;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import java.nio.ByteOrder;
026import org.apache.hadoop.conf.Configurable;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.CommonConfigurationKeys;
029import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressionCodec;
030import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressor;
031import org.apache.hadoop.hbase.io.compress.DictionaryCache;
032import org.apache.hadoop.io.compress.BlockCompressorStream;
033import org.apache.hadoop.io.compress.BlockDecompressorStream;
034import org.apache.hadoop.io.compress.CompressionCodec;
035import org.apache.hadoop.io.compress.CompressionInputStream;
036import org.apache.hadoop.io.compress.CompressionOutputStream;
037import org.apache.hadoop.io.compress.Compressor;
038import org.apache.hadoop.io.compress.Decompressor;
039import org.apache.yetus.audience.InterfaceAudience;
040
041/**
042 * Hadoop ZStandard codec implemented with zstd-jni.
043 * <p>
044 * This is data format compatible with Hadoop's native ZStandard codec.
045 */
046@InterfaceAudience.Private
047public class ZstdCodec implements Configurable, CompressionCodec, ByteBuffDecompressionCodec {
048
049  public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
050  public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
051  public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
052  public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
053
054  private Configuration conf;
055  private int bufferSize;
056  private int level;
057  private byte[] dictionary;
058
059  public ZstdCodec() {
060    conf = new Configuration();
061    init();
062  }
063
064  @Override
065  public Configuration getConf() {
066    return conf;
067  }
068
069  @Override
070  public void setConf(Configuration conf) {
071    this.conf = conf;
072    init();
073  }
074
075  @Override
076  public Compressor createCompressor() {
077    return new ZstdCompressor(level, bufferSize, dictionary);
078  }
079
080  @Override
081  public Decompressor createDecompressor() {
082    return new ZstdDecompressor(bufferSize, dictionary);
083  }
084
085  @Override
086  public ByteBuffDecompressor createByteBuffDecompressor() {
087    return new ZstdByteBuffDecompressor(dictionary);
088  }
089
090  @Override
091  public CompressionInputStream createInputStream(InputStream in) throws IOException {
092    return createInputStream(in, createDecompressor());
093  }
094
095  @Override
096  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
097    throws IOException {
098    return new BlockDecompressorStream(in, d, bufferSize);
099  }
100
101  @Override
102  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
103    return createOutputStream(out, createCompressor());
104  }
105
106  @Override
107  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
108    throws IOException {
109    return new BlockCompressorStream(out, c, bufferSize,
110      (int) Zstd.compressBound(bufferSize) - bufferSize); // overhead only
111  }
112
113  @Override
114  public Class<? extends Compressor> getCompressorType() {
115    return ZstdCompressor.class;
116  }
117
118  @Override
119  public Class<? extends Decompressor> getDecompressorType() {
120    return ZstdDecompressor.class;
121  }
122
123  @Override
124  public Class<? extends ByteBuffDecompressor> getByteBuffDecompressorType() {
125    return ZstdByteBuffDecompressor.class;
126  }
127
128  @Override
129  public String getDefaultExtension() {
130    return ".zst";
131  }
132
133  // Package private
134
135  static int getLevel(Configuration conf) {
136    return conf.getInt(ZSTD_LEVEL_KEY,
137      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY,
138        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT));
139  }
140
141  static int getBufferSize(Configuration conf) {
142    int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY,
143      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
144        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT));
145    return size > 0 ? size : ZSTD_BUFFER_SIZE_DEFAULT;
146  }
147
148  static byte[] getDictionary(final Configuration conf) {
149    String path = conf.get(ZSTD_DICTIONARY_KEY);
150    try {
151      return DictionaryCache.getDictionary(conf, path);
152    } catch (IOException e) {
153      throw new RuntimeException("Unable to load dictionary at " + path, e);
154    }
155  }
156
157  // Zstandard dictionaries begin with a 32-bit magic number, 0xEC30A437 in little-endian
158  // format, followed by a 32-bit identifier also in little-endian format.
159  // Reference: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md
160
161  static boolean isDictionary(byte[] dictionary) {
162    return (dictionary[0] == (byte) 0x37 && dictionary[1] == (byte) 0xA4
163      && dictionary[2] == (byte) 0x30 && dictionary[3] == (byte) 0xEC);
164  }
165
166  static int getDictionaryId(byte[] dictionary) {
167    if (!isDictionary(dictionary)) {
168      throw new IllegalArgumentException("Not a ZStandard dictionary");
169    }
170    return ByteBuffer.wrap(dictionary, 4, 4).order(ByteOrder.LITTLE_ENDIAN).getInt();
171  }
172
173  private void init() {
174    this.bufferSize = getBufferSize(conf);
175    this.level = getLevel(conf);
176    this.dictionary = getDictionary(conf);
177  }
178}