001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.zstd; 019 020import com.github.luben.zstd.Zstd; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.nio.ByteBuffer; 025import java.nio.ByteOrder; 026import org.apache.hadoop.conf.Configurable; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.CommonConfigurationKeys; 029import org.apache.hadoop.hbase.io.compress.DictionaryCache; 030import org.apache.hadoop.io.compress.BlockCompressorStream; 031import org.apache.hadoop.io.compress.BlockDecompressorStream; 032import org.apache.hadoop.io.compress.CompressionCodec; 033import org.apache.hadoop.io.compress.CompressionInputStream; 034import org.apache.hadoop.io.compress.CompressionOutputStream; 035import org.apache.hadoop.io.compress.Compressor; 036import org.apache.hadoop.io.compress.Decompressor; 037import org.apache.yetus.audience.InterfaceAudience; 038 039/** 040 * Hadoop ZStandard codec implemented with zstd-jni. 041 * <p> 042 * This is data format compatible with Hadoop's native ZStandard codec. 043 */ 044@InterfaceAudience.Private 045public class ZstdCodec implements Configurable, CompressionCodec { 046 047 public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level"; 048 public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize"; 049 public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024; 050 public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary"; 051 052 private Configuration conf; 053 054 public ZstdCodec() { 055 conf = new Configuration(); 056 } 057 058 @Override 059 public Configuration getConf() { 060 return conf; 061 } 062 063 @Override 064 public void setConf(Configuration conf) { 065 this.conf = conf; 066 } 067 068 @Override 069 public Compressor createCompressor() { 070 return new ZstdCompressor(getLevel(conf), getBufferSize(conf), getDictionary(conf)); 071 } 072 073 @Override 074 public Decompressor createDecompressor() { 075 return new ZstdDecompressor(getBufferSize(conf), getDictionary(conf)); 076 } 077 078 @Override 079 public CompressionInputStream createInputStream(InputStream in) throws IOException { 080 return createInputStream(in, createDecompressor()); 081 } 082 083 @Override 084 public CompressionInputStream createInputStream(InputStream in, Decompressor d) 085 throws IOException { 086 return new BlockDecompressorStream(in, d, getBufferSize(conf)); 087 } 088 089 @Override 090 public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { 091 return createOutputStream(out, createCompressor()); 092 } 093 094 @Override 095 public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) 096 throws IOException { 097 int bufferSize = getBufferSize(conf); 098 return new BlockCompressorStream(out, c, bufferSize, 099 (int) Zstd.compressBound(bufferSize) - bufferSize); // overhead only 100 } 101 102 @Override 103 public Class<? extends Compressor> getCompressorType() { 104 return ZstdCompressor.class; 105 } 106 107 @Override 108 public Class<? extends Decompressor> getDecompressorType() { 109 return ZstdDecompressor.class; 110 } 111 112 @Override 113 public String getDefaultExtension() { 114 return ".zst"; 115 } 116 117 // Package private 118 119 static int getLevel(Configuration conf) { 120 return conf.getInt(ZSTD_LEVEL_KEY, 121 conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 122 CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT)); 123 } 124 125 static int getBufferSize(Configuration conf) { 126 return conf.getInt(ZSTD_BUFFER_SIZE_KEY, 127 conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY, 128 // IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that. 129 ZSTD_BUFFER_SIZE_DEFAULT)); 130 } 131 132 static byte[] getDictionary(final Configuration conf) { 133 String path = conf.get(ZSTD_DICTIONARY_KEY); 134 try { 135 return DictionaryCache.getDictionary(conf, path); 136 } catch (IOException e) { 137 throw new RuntimeException("Unable to load dictionary at " + path, e); 138 } 139 } 140 141 // Zstandard dictionaries begin with a 32-bit magic number, 0xEC30A437 in little-endian 142 // format, followed by a 32-bit identifier also in little-endian format. 143 // Reference: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md 144 145 static boolean isDictionary(byte[] dictionary) { 146 return (dictionary[0] == (byte) 0x37 && dictionary[1] == (byte) 0xA4 147 && dictionary[2] == (byte) 0x30 && dictionary[3] == (byte) 0xEC); 148 } 149 150 static int getDictionaryId(byte[] dictionary) { 151 if (!isDictionary(dictionary)) { 152 throw new IllegalArgumentException("Not a ZStandard dictionary"); 153 } 154 return ByteBuffer.wrap(dictionary, 4, 4).order(ByteOrder.LITTLE_ENDIAN).getInt(); 155 } 156 157}