001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.aircompressor; 019 020import io.airlift.compress.zstd.ZstdCompressor; 021import io.airlift.compress.zstd.ZstdDecompressor; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import org.apache.hadoop.conf.Configurable; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.CommonConfigurationKeys; 028import org.apache.hadoop.hbase.io.compress.CompressionUtil; 029import org.apache.hadoop.io.compress.BlockCompressorStream; 030import org.apache.hadoop.io.compress.BlockDecompressorStream; 031import org.apache.hadoop.io.compress.CompressionCodec; 032import org.apache.hadoop.io.compress.CompressionInputStream; 033import org.apache.hadoop.io.compress.CompressionOutputStream; 034import org.apache.hadoop.io.compress.Compressor; 035import org.apache.hadoop.io.compress.Decompressor; 036import org.apache.yetus.audience.InterfaceAudience; 037 038/** 039 * Hadoop codec implementation for Zstandard, implemented with aircompressor. 040 * <p> 041 * Unlike the other codecs this one should be considered as under development and unstable (as in 042 * changing), reflecting the status of aircompressor's zstandard implementation. 043 * <p> 044 * NOTE: This codec is NOT data format compatible with the Hadoop native zstandard codec. There are 045 * issues with both framing and limitations of the aircompressor zstandard compressor. This codec 046 * can be used as an alternative to the native codec, if the native codec cannot be made available 047 * and/or an eventual migration will never be necessary (i.e. this codec's performance meets 048 * anticipated requirements). Once you begin using this alternative you will be locked into it. 049 */ 050@InterfaceAudience.Private 051public class ZstdCodec implements Configurable, CompressionCodec { 052 053 public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize"; 054 public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024; 055 056 private Configuration conf; 057 private int bufferSize; 058 059 public ZstdCodec() { 060 conf = new Configuration(); 061 bufferSize = getBufferSize(conf); 062 } 063 064 @Override 065 public void setConf(Configuration conf) { 066 this.conf = conf; 067 this.bufferSize = getBufferSize(conf); 068 } 069 070 @Override 071 public Configuration getConf() { 072 return conf; 073 } 074 075 @Override 076 public Compressor createCompressor() { 077 return new HadoopZstdCompressor(); 078 } 079 080 @Override 081 public Decompressor createDecompressor() { 082 return new HadoopZstdDecompressor(); 083 } 084 085 @Override 086 public CompressionInputStream createInputStream(InputStream in) throws IOException { 087 return createInputStream(in, createDecompressor()); 088 } 089 090 @Override 091 public CompressionInputStream createInputStream(InputStream in, Decompressor d) 092 throws IOException { 093 return new BlockDecompressorStream(in, d, bufferSize); 094 } 095 096 @Override 097 public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { 098 return createOutputStream(out, createCompressor()); 099 } 100 101 @Override 102 public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) 103 throws IOException { 104 return new BlockCompressorStream(out, c, bufferSize, 105 CompressionUtil.compressionOverhead(bufferSize)); 106 } 107 108 @Override 109 public Class<? extends Compressor> getCompressorType() { 110 return HadoopZstdCompressor.class; 111 } 112 113 @Override 114 public Class<? extends Decompressor> getDecompressorType() { 115 return HadoopZstdDecompressor.class; 116 } 117 118 @Override 119 public String getDefaultExtension() { 120 return ".zst"; 121 } 122 123 @InterfaceAudience.Private 124 public class HadoopZstdCompressor extends HadoopCompressor<ZstdCompressor> { 125 126 HadoopZstdCompressor(ZstdCompressor compressor) { 127 super(compressor, ZstdCodec.getBufferSize(conf)); 128 } 129 130 HadoopZstdCompressor() { 131 this(new ZstdCompressor()); 132 } 133 134 @Override 135 int getLevel(Configuration conf) { 136 return 0; 137 } 138 139 @Override 140 int getBufferSize(Configuration conf) { 141 return ZstdCodec.getBufferSize(conf); 142 } 143 144 } 145 146 @InterfaceAudience.Private 147 public class HadoopZstdDecompressor extends HadoopDecompressor<ZstdDecompressor> { 148 149 HadoopZstdDecompressor(ZstdDecompressor decompressor) { 150 super(decompressor, getBufferSize(conf)); 151 } 152 153 HadoopZstdDecompressor() { 154 this(new ZstdDecompressor()); 155 } 156 157 } 158 159 // Package private 160 161 static int getBufferSize(Configuration conf) { 162 int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY, 163 conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY, 164 CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT)); 165 return size > 0 ? size : ZSTD_BUFFER_SIZE_DEFAULT; 166 } 167 168}