001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.zstd; 019 020import com.github.luben.zstd.Zstd; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.nio.ByteBuffer; 025import java.nio.ByteOrder; 026import org.apache.hadoop.conf.Configurable; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.CommonConfigurationKeys; 029import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressionCodec; 030import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressor; 031import org.apache.hadoop.hbase.io.compress.DictionaryCache; 032import org.apache.hadoop.io.compress.BlockCompressorStream; 033import org.apache.hadoop.io.compress.BlockDecompressorStream; 034import org.apache.hadoop.io.compress.CompressionCodec; 035import org.apache.hadoop.io.compress.CompressionInputStream; 036import org.apache.hadoop.io.compress.CompressionOutputStream; 037import org.apache.hadoop.io.compress.Compressor; 038import org.apache.hadoop.io.compress.Decompressor; 039import org.apache.yetus.audience.InterfaceAudience; 040 041/** 042 * Hadoop ZStandard codec implemented with zstd-jni. 043 * <p> 044 * This is data format compatible with Hadoop's native ZStandard codec. 045 */ 046@InterfaceAudience.Private 047public class ZstdCodec implements Configurable, CompressionCodec, ByteBuffDecompressionCodec { 048 049 public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level"; 050 public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize"; 051 public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024; 052 public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary"; 053 054 private Configuration conf; 055 private int bufferSize; 056 private int level; 057 private byte[] dictionary; 058 059 public ZstdCodec() { 060 conf = new Configuration(); 061 init(); 062 } 063 064 @Override 065 public Configuration getConf() { 066 return conf; 067 } 068 069 @Override 070 public void setConf(Configuration conf) { 071 this.conf = conf; 072 init(); 073 } 074 075 @Override 076 public Compressor createCompressor() { 077 return new ZstdCompressor(level, bufferSize, dictionary); 078 } 079 080 @Override 081 public Decompressor createDecompressor() { 082 return new ZstdDecompressor(bufferSize, dictionary); 083 } 084 085 @Override 086 public ByteBuffDecompressor createByteBuffDecompressor() { 087 return new ZstdByteBuffDecompressor(dictionary); 088 } 089 090 @Override 091 public CompressionInputStream createInputStream(InputStream in) throws IOException { 092 return createInputStream(in, createDecompressor()); 093 } 094 095 @Override 096 public CompressionInputStream createInputStream(InputStream in, Decompressor d) 097 throws IOException { 098 return new BlockDecompressorStream(in, d, bufferSize); 099 } 100 101 @Override 102 public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { 103 return createOutputStream(out, createCompressor()); 104 } 105 106 @Override 107 public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) 108 throws IOException { 109 return new BlockCompressorStream(out, c, bufferSize, 110 (int) Zstd.compressBound(bufferSize) - bufferSize); // overhead only 111 } 112 113 @Override 114 public Class<? extends Compressor> getCompressorType() { 115 return ZstdCompressor.class; 116 } 117 118 @Override 119 public Class<? extends Decompressor> getDecompressorType() { 120 return ZstdDecompressor.class; 121 } 122 123 @Override 124 public Class<? extends ByteBuffDecompressor> getByteBuffDecompressorType() { 125 return ZstdByteBuffDecompressor.class; 126 } 127 128 @Override 129 public String getDefaultExtension() { 130 return ".zst"; 131 } 132 133 // Package private 134 135 static int getLevel(Configuration conf) { 136 return conf.getInt(ZSTD_LEVEL_KEY, 137 conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 138 CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT)); 139 } 140 141 static int getBufferSize(Configuration conf) { 142 int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY, 143 conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY, 144 CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT)); 145 return size > 0 ? size : ZSTD_BUFFER_SIZE_DEFAULT; 146 } 147 148 static byte[] getDictionary(final Configuration conf) { 149 String path = conf.get(ZSTD_DICTIONARY_KEY); 150 try { 151 return DictionaryCache.getDictionary(conf, path); 152 } catch (IOException e) { 153 throw new RuntimeException("Unable to load dictionary at " + path, e); 154 } 155 } 156 157 // Zstandard dictionaries begin with a 32-bit magic number, 0xEC30A437 in little-endian 158 // format, followed by a 32-bit identifier also in little-endian format. 159 // Reference: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md 160 161 static boolean isDictionary(byte[] dictionary) { 162 return (dictionary[0] == (byte) 0x37 && dictionary[1] == (byte) 0xA4 163 && dictionary[2] == (byte) 0x30 && dictionary[3] == (byte) 0xEC); 164 } 165 166 static int getDictionaryId(byte[] dictionary) { 167 if (!isDictionary(dictionary)) { 168 throw new IllegalArgumentException("Not a ZStandard dictionary"); 169 } 170 return ByteBuffer.wrap(dictionary, 4, 4).order(ByteOrder.LITTLE_ENDIAN).getInt(); 171 } 172 173 private void init() { 174 this.bufferSize = getBufferSize(conf); 175 this.level = getLevel(conf); 176 this.dictionary = getDictionary(conf); 177 } 178}