001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.zstd; 019 020import com.github.luben.zstd.Zstd; 021import com.github.luben.zstd.ZstdDictDecompress; 022import edu.umd.cs.findbugs.annotations.Nullable; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.OutputStream; 026import java.nio.ByteBuffer; 027import java.nio.ByteOrder; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.conf.Configurable; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.CommonConfigurationKeys; 033import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressionCodec; 034import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressor; 035import org.apache.hadoop.hbase.io.compress.Compression; 036import org.apache.hadoop.hbase.io.compress.DictionaryCache; 037import org.apache.hadoop.hbase.util.Pair; 038import org.apache.hadoop.io.compress.BlockCompressorStream; 039import org.apache.hadoop.io.compress.BlockDecompressorStream; 040import org.apache.hadoop.io.compress.CompressionCodec; 041import org.apache.hadoop.io.compress.CompressionInputStream; 042import org.apache.hadoop.io.compress.CompressionOutputStream; 043import org.apache.hadoop.io.compress.Compressor; 044import org.apache.hadoop.io.compress.Decompressor; 045import org.apache.yetus.audience.InterfaceAudience; 046 047import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 048import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 049 050/** 051 * Hadoop ZStandard codec implemented with zstd-jni. 052 * <p> 053 * This is data format compatible with Hadoop's native ZStandard codec. 054 */ 055@InterfaceAudience.Private 056public class ZstdCodec implements Configurable, CompressionCodec, ByteBuffDecompressionCodec { 057 058 public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level"; 059 public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize"; 060 public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024; 061 public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary"; 062 063 private static final Cache<String, Pair<ZstdDictDecompress, Integer>> DECOMPRESS_DICT_CACHE = 064 CacheBuilder.newBuilder().maximumSize(100).expireAfterAccess(10, TimeUnit.MINUTES).build(); 065 066 private Configuration conf; 067 private int bufferSize; 068 private int level; 069 private byte[] dictionary; 070 071 public ZstdCodec() { 072 conf = new Configuration(); 073 init(); 074 } 075 076 @Override 077 public Configuration getConf() { 078 return conf; 079 } 080 081 @Override 082 public void setConf(Configuration conf) { 083 this.conf = conf; 084 init(); 085 } 086 087 @Override 088 public Compressor createCompressor() { 089 return new ZstdCompressor(level, bufferSize, dictionary); 090 } 091 092 @Override 093 public Decompressor createDecompressor() { 094 return new ZstdDecompressor(bufferSize, dictionary); 095 } 096 097 @Override 098 public ByteBuffDecompressor createByteBuffDecompressor() { 099 return new ZstdByteBuffDecompressor(dictionary); 100 } 101 102 @Override 103 public CompressionInputStream createInputStream(InputStream in) throws IOException { 104 return createInputStream(in, createDecompressor()); 105 } 106 107 @Override 108 public CompressionInputStream createInputStream(InputStream in, Decompressor d) 109 throws IOException { 110 return new BlockDecompressorStream(in, d, bufferSize); 111 } 112 113 @Override 114 public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { 115 return createOutputStream(out, createCompressor()); 116 } 117 118 @Override 119 public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) 120 throws IOException { 121 return new BlockCompressorStream(out, c, bufferSize, 122 (int) Zstd.compressBound(bufferSize) - bufferSize); // overhead only 123 } 124 125 @Override 126 public Class<? extends Compressor> getCompressorType() { 127 return ZstdCompressor.class; 128 } 129 130 @Override 131 public Class<? extends Decompressor> getDecompressorType() { 132 return ZstdDecompressor.class; 133 } 134 135 @Override 136 public Class<? extends ByteBuffDecompressor> getByteBuffDecompressorType() { 137 return ZstdByteBuffDecompressor.class; 138 } 139 140 @Override 141 public Compression.HFileDecompressionContext 142 getDecompressionContextFromConfiguration(Configuration conf) { 143 return ZstdHFileDecompressionContext.fromConfiguration(conf); 144 } 145 146 @Override 147 public String getDefaultExtension() { 148 return ".zst"; 149 } 150 151 // Package private 152 153 static int getLevel(Configuration conf) { 154 return conf.getInt(ZSTD_LEVEL_KEY, 155 conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 156 CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT)); 157 } 158 159 static int getBufferSize(Configuration conf) { 160 int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY, 161 conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY, 162 CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT)); 163 return size > 0 ? size : ZSTD_BUFFER_SIZE_DEFAULT; 164 } 165 166 @Nullable 167 static byte[] getDictionary(final Configuration conf) { 168 String path = conf.get(ZSTD_DICTIONARY_KEY); 169 return DictionaryCache.getDictionary(conf, path); 170 } 171 172 /** 173 * Returns dictionary and its ID number, useful for comparing to other dictionaries for equality 174 */ 175 @Nullable 176 static Pair<ZstdDictDecompress, Integer> getDecompressDictionary(final Configuration conf) { 177 String path = conf.get(ZSTD_DICTIONARY_KEY); 178 if (path == null) { 179 return null; 180 } 181 182 try { 183 return DECOMPRESS_DICT_CACHE.get(path, () -> { 184 byte[] dictBytes = DictionaryCache.getDictionary(conf, path); 185 int dictId = getDictionaryId(dictBytes); 186 return new Pair<>(new ZstdDictDecompress(dictBytes), dictId); 187 }); 188 } catch (ExecutionException e) { 189 throw new RuntimeException("Unable to load ZSTD dictionary", e); 190 } 191 } 192 193 // Zstandard dictionaries begin with a 32-bit magic number, 0xEC30A437 in little-endian 194 // format, followed by a 32-bit identifier also in little-endian format. 195 // Reference: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md 196 197 static boolean isDictionary(byte[] dictionary) { 198 return (dictionary[0] == (byte) 0x37 && dictionary[1] == (byte) 0xA4 199 && dictionary[2] == (byte) 0x30 && dictionary[3] == (byte) 0xEC); 200 } 201 202 static int getDictionaryId(byte[] dictionary) { 203 if (!isDictionary(dictionary)) { 204 throw new IllegalArgumentException("Not a ZStandard dictionary"); 205 } 206 return ByteBuffer.wrap(dictionary, 4, 4).order(ByteOrder.LITTLE_ENDIAN).getInt(); 207 } 208 209 private void init() { 210 this.bufferSize = getBufferSize(conf); 211 this.level = getLevel(conf); 212 this.dictionary = getDictionary(conf); 213 } 214}