001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.aircompressor; 019 020import io.airlift.compress.zstd.ZstdCompressor; 021import io.airlift.compress.zstd.ZstdDecompressor; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import org.apache.hadoop.conf.Configurable; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.CommonConfigurationKeys; 028import org.apache.hadoop.hbase.io.compress.CompressionUtil; 029import org.apache.hadoop.io.compress.BlockCompressorStream; 030import org.apache.hadoop.io.compress.BlockDecompressorStream; 031import org.apache.hadoop.io.compress.CompressionCodec; 032import org.apache.hadoop.io.compress.CompressionInputStream; 033import org.apache.hadoop.io.compress.CompressionOutputStream; 034import org.apache.hadoop.io.compress.Compressor; 035import org.apache.hadoop.io.compress.Decompressor; 036import org.apache.yetus.audience.InterfaceAudience; 037 038/** 039 * Hadoop codec implementation for Zstandard, implemented with aircompressor. 040 * <p> 041 * Unlike the other codecs this one should be considered as under development and unstable (as in 042 * changing), reflecting the status of aircompressor's zstandard implementation. 043 * <p> 044 * NOTE: This codec is NOT data format compatible with the Hadoop native zstandard codec. There are 045 * issues with both framing and limitations of the aircompressor zstandard compressor. This codec 046 * can be used as an alternative to the native codec, if the native codec cannot be made available 047 * and/or an eventual migration will never be necessary (i.e. this codec's performance meets 048 * anticipated requirements). Once you begin using this alternative you will be locked into it. 049 */ 050@InterfaceAudience.Private 051public class ZstdCodec implements Configurable, CompressionCodec { 052 053 public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize"; 054 public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024; 055 056 private Configuration conf; 057 058 public ZstdCodec() { 059 conf = new Configuration(); 060 } 061 062 @Override 063 public void setConf(Configuration conf) { 064 this.conf = conf; 065 } 066 067 @Override 068 public Configuration getConf() { 069 return conf; 070 } 071 072 @Override 073 public Compressor createCompressor() { 074 return new HadoopZstdCompressor(); 075 } 076 077 @Override 078 public Decompressor createDecompressor() { 079 return new HadoopZstdDecompressor(); 080 } 081 082 @Override 083 public CompressionInputStream createInputStream(InputStream in) throws IOException { 084 return createInputStream(in, createDecompressor()); 085 } 086 087 @Override 088 public CompressionInputStream createInputStream(InputStream in, Decompressor d) 089 throws IOException { 090 return new BlockDecompressorStream(in, d, getBufferSize(conf)); 091 } 092 093 @Override 094 public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { 095 return createOutputStream(out, createCompressor()); 096 } 097 098 @Override 099 public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) 100 throws IOException { 101 int bufferSize = getBufferSize(conf); 102 return new BlockCompressorStream(out, c, bufferSize, 103 CompressionUtil.compressionOverhead(bufferSize)); 104 } 105 106 @Override 107 public Class<? extends Compressor> getCompressorType() { 108 return HadoopZstdCompressor.class; 109 } 110 111 @Override 112 public Class<? extends Decompressor> getDecompressorType() { 113 return HadoopZstdDecompressor.class; 114 } 115 116 @Override 117 public String getDefaultExtension() { 118 return ".zst"; 119 } 120 121 @InterfaceAudience.Private 122 public class HadoopZstdCompressor extends HadoopCompressor<ZstdCompressor> { 123 124 HadoopZstdCompressor(ZstdCompressor compressor) { 125 super(compressor, ZstdCodec.getBufferSize(conf)); 126 } 127 128 HadoopZstdCompressor() { 129 this(new ZstdCompressor()); 130 } 131 132 @Override 133 int getLevel(Configuration conf) { 134 return 0; 135 } 136 137 @Override 138 int getBufferSize(Configuration conf) { 139 return ZstdCodec.getBufferSize(conf); 140 } 141 142 } 143 144 @InterfaceAudience.Private 145 public class HadoopZstdDecompressor extends HadoopDecompressor<ZstdDecompressor> { 146 147 HadoopZstdDecompressor(ZstdDecompressor decompressor) { 148 super(decompressor, getBufferSize(conf)); 149 } 150 151 HadoopZstdDecompressor() { 152 this(new ZstdDecompressor()); 153 } 154 155 } 156 157 // Package private 158 159 static int getBufferSize(Configuration conf) { 160 return conf.getInt(ZSTD_BUFFER_SIZE_KEY, 161 conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY, 162 // IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that. 163 ZSTD_BUFFER_SIZE_DEFAULT)); 164 } 165 166}