001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.zstd;
019
020import com.github.luben.zstd.Zstd;
021import com.github.luben.zstd.ZstdDictDecompress;
022import edu.umd.cs.findbugs.annotations.Nullable;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStream;
026import java.nio.ByteBuffer;
027import java.nio.ByteOrder;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.conf.Configurable;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.CommonConfigurationKeys;
033import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressionCodec;
034import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressor;
035import org.apache.hadoop.hbase.io.compress.Compression;
036import org.apache.hadoop.hbase.io.compress.DictionaryCache;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.hadoop.io.compress.BlockCompressorStream;
039import org.apache.hadoop.io.compress.BlockDecompressorStream;
040import org.apache.hadoop.io.compress.CompressionCodec;
041import org.apache.hadoop.io.compress.CompressionInputStream;
042import org.apache.hadoop.io.compress.CompressionOutputStream;
043import org.apache.hadoop.io.compress.Compressor;
044import org.apache.hadoop.io.compress.Decompressor;
045import org.apache.yetus.audience.InterfaceAudience;
046
047import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
048import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
049
050/**
051 * Hadoop ZStandard codec implemented with zstd-jni.
052 * <p>
053 * This is data format compatible with Hadoop's native ZStandard codec.
054 */
055@InterfaceAudience.Private
056public class ZstdCodec implements Configurable, CompressionCodec, ByteBuffDecompressionCodec {
057
058  public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
059  public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
060  public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
061  public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
062
063  private static final Cache<String, Pair<ZstdDictDecompress, Integer>> DECOMPRESS_DICT_CACHE =
064    CacheBuilder.newBuilder().maximumSize(100).expireAfterAccess(10, TimeUnit.MINUTES).build();
065
066  private Configuration conf;
067  private int bufferSize;
068  private int level;
069  private byte[] dictionary;
070
071  public ZstdCodec() {
072    conf = new Configuration();
073    init();
074  }
075
076  @Override
077  public Configuration getConf() {
078    return conf;
079  }
080
081  @Override
082  public void setConf(Configuration conf) {
083    this.conf = conf;
084    init();
085  }
086
087  @Override
088  public Compressor createCompressor() {
089    return new ZstdCompressor(level, bufferSize, dictionary);
090  }
091
092  @Override
093  public Decompressor createDecompressor() {
094    return new ZstdDecompressor(bufferSize, dictionary);
095  }
096
097  @Override
098  public ByteBuffDecompressor createByteBuffDecompressor() {
099    return new ZstdByteBuffDecompressor(dictionary);
100  }
101
102  @Override
103  public CompressionInputStream createInputStream(InputStream in) throws IOException {
104    return createInputStream(in, createDecompressor());
105  }
106
107  @Override
108  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
109    throws IOException {
110    return new BlockDecompressorStream(in, d, bufferSize);
111  }
112
113  @Override
114  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
115    return createOutputStream(out, createCompressor());
116  }
117
118  @Override
119  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
120    throws IOException {
121    return new BlockCompressorStream(out, c, bufferSize,
122      (int) Zstd.compressBound(bufferSize) - bufferSize); // overhead only
123  }
124
125  @Override
126  public Class<? extends Compressor> getCompressorType() {
127    return ZstdCompressor.class;
128  }
129
130  @Override
131  public Class<? extends Decompressor> getDecompressorType() {
132    return ZstdDecompressor.class;
133  }
134
135  @Override
136  public Class<? extends ByteBuffDecompressor> getByteBuffDecompressorType() {
137    return ZstdByteBuffDecompressor.class;
138  }
139
140  @Override
141  public Compression.HFileDecompressionContext
142    getDecompressionContextFromConfiguration(Configuration conf) {
143    return ZstdHFileDecompressionContext.fromConfiguration(conf);
144  }
145
146  @Override
147  public String getDefaultExtension() {
148    return ".zst";
149  }
150
151  // Package private
152
153  static int getLevel(Configuration conf) {
154    return conf.getInt(ZSTD_LEVEL_KEY,
155      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY,
156        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT));
157  }
158
159  static int getBufferSize(Configuration conf) {
160    int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY,
161      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
162        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT));
163    return size > 0 ? size : ZSTD_BUFFER_SIZE_DEFAULT;
164  }
165
166  @Nullable
167  static byte[] getDictionary(final Configuration conf) {
168    String path = conf.get(ZSTD_DICTIONARY_KEY);
169    return DictionaryCache.getDictionary(conf, path);
170  }
171
172  /**
173   * Returns dictionary and its ID number, useful for comparing to other dictionaries for equality
174   */
175  @Nullable
176  static Pair<ZstdDictDecompress, Integer> getDecompressDictionary(final Configuration conf) {
177    String path = conf.get(ZSTD_DICTIONARY_KEY);
178    if (path == null) {
179      return null;
180    }
181
182    try {
183      return DECOMPRESS_DICT_CACHE.get(path, () -> {
184        byte[] dictBytes = DictionaryCache.getDictionary(conf, path);
185        int dictId = getDictionaryId(dictBytes);
186        return new Pair<>(new ZstdDictDecompress(dictBytes), dictId);
187      });
188    } catch (ExecutionException e) {
189      throw new RuntimeException("Unable to load ZSTD dictionary", e);
190    }
191  }
192
193  // Zstandard dictionaries begin with a 32-bit magic number, 0xEC30A437 in little-endian
194  // format, followed by a 32-bit identifier also in little-endian format.
195  // Reference: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md
196
197  static boolean isDictionary(byte[] dictionary) {
198    return (dictionary[0] == (byte) 0x37 && dictionary[1] == (byte) 0xA4
199      && dictionary[2] == (byte) 0x30 && dictionary[3] == (byte) 0xEC);
200  }
201
202  static int getDictionaryId(byte[] dictionary) {
203    if (!isDictionary(dictionary)) {
204      throw new IllegalArgumentException("Not a ZStandard dictionary");
205    }
206    return ByteBuffer.wrap(dictionary, 4, 4).order(ByteOrder.LITTLE_ENDIAN).getInt();
207  }
208
209  private void init() {
210    this.bufferSize = getBufferSize(conf);
211    this.level = getLevel(conf);
212    this.dictionary = getDictionary(conf);
213  }
214}