001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.aircompressor;
019
020import io.airlift.compress.zstd.ZstdCompressor;
021import io.airlift.compress.zstd.ZstdDecompressor;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import org.apache.hadoop.conf.Configurable;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.CommonConfigurationKeys;
028import org.apache.hadoop.hbase.io.compress.CompressionUtil;
029import org.apache.hadoop.io.compress.BlockCompressorStream;
030import org.apache.hadoop.io.compress.BlockDecompressorStream;
031import org.apache.hadoop.io.compress.CompressionCodec;
032import org.apache.hadoop.io.compress.CompressionInputStream;
033import org.apache.hadoop.io.compress.CompressionOutputStream;
034import org.apache.hadoop.io.compress.Compressor;
035import org.apache.hadoop.io.compress.Decompressor;
036import org.apache.yetus.audience.InterfaceAudience;
037
038/**
039 * Hadoop codec implementation for Zstandard, implemented with aircompressor.
040 * <p>
041 * Unlike the other codecs this one should be considered as under development and unstable (as in
042 * changing), reflecting the status of aircompressor's zstandard implementation.
043 * <p>
044 * NOTE: This codec is NOT data format compatible with the Hadoop native zstandard codec. There are
045 * issues with both framing and limitations of the aircompressor zstandard compressor. This codec
046 * can be used as an alternative to the native codec, if the native codec cannot be made available
047 * and/or an eventual migration will never be necessary (i.e. this codec's performance meets
048 * anticipated requirements). Once you begin using this alternative you will be locked into it.
049 */
050@InterfaceAudience.Private
051public class ZstdCodec implements Configurable, CompressionCodec {
052
053  public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
054  public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
055
056  private Configuration conf;
057  private int bufferSize;
058
059  public ZstdCodec() {
060    conf = new Configuration();
061    bufferSize = getBufferSize(conf);
062  }
063
064  @Override
065  public void setConf(Configuration conf) {
066    this.conf = conf;
067    this.bufferSize = getBufferSize(conf);
068  }
069
070  @Override
071  public Configuration getConf() {
072    return conf;
073  }
074
075  @Override
076  public Compressor createCompressor() {
077    return new HadoopZstdCompressor();
078  }
079
080  @Override
081  public Decompressor createDecompressor() {
082    return new HadoopZstdDecompressor();
083  }
084
085  @Override
086  public CompressionInputStream createInputStream(InputStream in) throws IOException {
087    return createInputStream(in, createDecompressor());
088  }
089
090  @Override
091  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
092    throws IOException {
093    return new BlockDecompressorStream(in, d, bufferSize);
094  }
095
096  @Override
097  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
098    return createOutputStream(out, createCompressor());
099  }
100
101  @Override
102  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
103    throws IOException {
104    return new BlockCompressorStream(out, c, bufferSize,
105      CompressionUtil.compressionOverhead(bufferSize));
106  }
107
108  @Override
109  public Class<? extends Compressor> getCompressorType() {
110    return HadoopZstdCompressor.class;
111  }
112
113  @Override
114  public Class<? extends Decompressor> getDecompressorType() {
115    return HadoopZstdDecompressor.class;
116  }
117
118  @Override
119  public String getDefaultExtension() {
120    return ".zst";
121  }
122
123  @InterfaceAudience.Private
124  public class HadoopZstdCompressor extends HadoopCompressor<ZstdCompressor> {
125
126    HadoopZstdCompressor(ZstdCompressor compressor) {
127      super(compressor, ZstdCodec.getBufferSize(conf));
128    }
129
130    HadoopZstdCompressor() {
131      this(new ZstdCompressor());
132    }
133
134    @Override
135    int getLevel(Configuration conf) {
136      return 0;
137    }
138
139    @Override
140    int getBufferSize(Configuration conf) {
141      return ZstdCodec.getBufferSize(conf);
142    }
143
144  }
145
146  @InterfaceAudience.Private
147  public class HadoopZstdDecompressor extends HadoopDecompressor<ZstdDecompressor> {
148
149    HadoopZstdDecompressor(ZstdDecompressor decompressor) {
150      super(decompressor, getBufferSize(conf));
151    }
152
153    HadoopZstdDecompressor() {
154      this(new ZstdDecompressor());
155    }
156
157  }
158
159  // Package private
160
161  static int getBufferSize(Configuration conf) {
162    int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY,
163      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
164        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT));
165    return size > 0 ? size : ZSTD_BUFFER_SIZE_DEFAULT;
166  }
167
168}