001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.zstd; 019 020import com.github.luben.zstd.Zstd; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.nio.ByteBuffer; 025import java.nio.ByteOrder; 026import org.apache.hadoop.conf.Configurable; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.CommonConfigurationKeys; 029import org.apache.hadoop.hbase.io.compress.DictionaryCache; 030import org.apache.hadoop.io.compress.BlockCompressorStream; 031import org.apache.hadoop.io.compress.BlockDecompressorStream; 032import org.apache.hadoop.io.compress.CompressionCodec; 033import org.apache.hadoop.io.compress.CompressionInputStream; 034import org.apache.hadoop.io.compress.CompressionOutputStream; 035import org.apache.hadoop.io.compress.Compressor; 036import org.apache.hadoop.io.compress.Decompressor; 037import org.apache.yetus.audience.InterfaceAudience; 038 039/** 040 * Hadoop ZStandard codec implemented with zstd-jni. 041 * <p> 042 * This is data format compatible with Hadoop's native ZStandard codec. 043 */ 044@InterfaceAudience.Private 045public class ZstdCodec implements Configurable, CompressionCodec { 046 047 public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level"; 048 public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize"; 049 public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024; 050 public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary"; 051 052 private Configuration conf; 053 private int bufferSize; 054 private int level; 055 private byte[] dictionary; 056 057 public ZstdCodec() { 058 conf = new Configuration(); 059 init(); 060 } 061 062 @Override 063 public Configuration getConf() { 064 return conf; 065 } 066 067 @Override 068 public void setConf(Configuration conf) { 069 this.conf = conf; 070 init(); 071 } 072 073 @Override 074 public Compressor createCompressor() { 075 return new ZstdCompressor(level, bufferSize, dictionary); 076 } 077 078 @Override 079 public Decompressor createDecompressor() { 080 return new ZstdDecompressor(bufferSize, dictionary); 081 } 082 083 @Override 084 public CompressionInputStream createInputStream(InputStream in) throws IOException { 085 return createInputStream(in, createDecompressor()); 086 } 087 088 @Override 089 public CompressionInputStream createInputStream(InputStream in, Decompressor d) 090 throws IOException { 091 return new BlockDecompressorStream(in, d, bufferSize); 092 } 093 094 @Override 095 public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { 096 return createOutputStream(out, createCompressor()); 097 } 098 099 @Override 100 public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) 101 throws IOException { 102 return new BlockCompressorStream(out, c, bufferSize, 103 (int) Zstd.compressBound(bufferSize) - bufferSize); // overhead only 104 } 105 106 @Override 107 public Class<? extends Compressor> getCompressorType() { 108 return ZstdCompressor.class; 109 } 110 111 @Override 112 public Class<? extends Decompressor> getDecompressorType() { 113 return ZstdDecompressor.class; 114 } 115 116 @Override 117 public String getDefaultExtension() { 118 return ".zst"; 119 } 120 121 // Package private 122 123 static int getLevel(Configuration conf) { 124 return conf.getInt(ZSTD_LEVEL_KEY, 125 conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 126 CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT)); 127 } 128 129 static int getBufferSize(Configuration conf) { 130 int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY, 131 conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY, 132 CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT)); 133 return size > 0 ? size : ZSTD_BUFFER_SIZE_DEFAULT; 134 } 135 136 static byte[] getDictionary(final Configuration conf) { 137 String path = conf.get(ZSTD_DICTIONARY_KEY); 138 try { 139 return DictionaryCache.getDictionary(conf, path); 140 } catch (IOException e) { 141 throw new RuntimeException("Unable to load dictionary at " + path, e); 142 } 143 } 144 145 // Zstandard dictionaries begin with a 32-bit magic number, 0xEC30A437 in little-endian 146 // format, followed by a 32-bit identifier also in little-endian format. 147 // Reference: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md 148 149 static boolean isDictionary(byte[] dictionary) { 150 return (dictionary[0] == (byte) 0x37 && dictionary[1] == (byte) 0xA4 151 && dictionary[2] == (byte) 0x30 && dictionary[3] == (byte) 0xEC); 152 } 153 154 static int getDictionaryId(byte[] dictionary) { 155 if (!isDictionary(dictionary)) { 156 throw new IllegalArgumentException("Not a ZStandard dictionary"); 157 } 158 return ByteBuffer.wrap(dictionary, 4, 4).order(ByteOrder.LITTLE_ENDIAN).getInt(); 159 } 160 161 private void init() { 162 this.bufferSize = getBufferSize(conf); 163 this.level = getLevel(conf); 164 this.dictionary = getDictionary(conf); 165 } 166}