001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.zstd; 019 020import com.github.luben.zstd.Zstd; 021import com.github.luben.zstd.ZstdDictCompress; 022import java.io.IOException; 023import java.nio.ByteBuffer; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.io.compress.CanReinit; 026import org.apache.hadoop.hbase.io.compress.CompressionUtil; 027import org.apache.hadoop.io.compress.Compressor; 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * Hadoop compressor glue for zstd-jni. 032 */ 033@InterfaceAudience.Private 034public class ZstdCompressor implements CanReinit, Compressor { 035 036 protected int level, bufferSize; 037 protected ByteBuffer inBuf, outBuf; 038 protected boolean finish, finished; 039 protected long bytesRead, bytesWritten; 040 protected int dictId; 041 protected ZstdDictCompress dict; 042 043 ZstdCompressor(final int level, final int bufferSize, final byte[] dictionary) { 044 this.level = level; 045 this.bufferSize = bufferSize; 046 this.inBuf = ByteBuffer.allocateDirect(bufferSize); 047 this.outBuf = ByteBuffer.allocateDirect(bufferSize); 048 this.outBuf.position(bufferSize); 049 if (dictionary != null) { 050 this.dictId = ZstdCodec.getDictionaryId(dictionary); 051 this.dict = new ZstdDictCompress(dictionary, level); 052 } 053 } 054 055 ZstdCompressor(final int level, final int bufferSize) { 056 this(level, bufferSize, null); 057 } 058 059 @Override 060 public int compress(final byte[] b, final int off, final int len) throws IOException { 061 // If we have previously compressed our input and still have some buffered bytes 062 // remaining, provide them to the caller. 063 if (outBuf.hasRemaining()) { 064 int remaining = outBuf.remaining(), n = Math.min(remaining, len); 065 outBuf.get(b, off, n); 066 return n; 067 } 068 // We don't actually begin compression until our caller calls finish(). 069 if (finish) { 070 if (inBuf.position() > 0) { 071 inBuf.flip(); 072 int uncompressed = inBuf.remaining(); 073 // If we don't have enough capacity in our currently allocated output buffer, 074 // allocate a new one which does. 075 int needed = maxCompressedLength(uncompressed); 076 if (outBuf.capacity() < needed) { 077 needed = CompressionUtil.roundInt2(needed); 078 outBuf = ByteBuffer.allocateDirect(needed); 079 } else { 080 outBuf.clear(); 081 } 082 int written; 083 if (dict != null) { 084 written = Zstd.compress(outBuf, inBuf, dict); 085 } else { 086 written = Zstd.compress(outBuf, inBuf, level); 087 } 088 bytesWritten += written; 089 inBuf.clear(); 090 finished = true; 091 outBuf.flip(); 092 int n = Math.min(written, len); 093 outBuf.get(b, off, n); 094 return n; 095 } else { 096 finished = true; 097 } 098 } 099 return 0; 100 } 101 102 @Override 103 public void end() { 104 } 105 106 @Override 107 public void finish() { 108 finish = true; 109 } 110 111 @Override 112 public boolean finished() { 113 return finished && !outBuf.hasRemaining(); 114 } 115 116 @Override 117 public long getBytesRead() { 118 return bytesRead; 119 } 120 121 @Override 122 public long getBytesWritten() { 123 return bytesWritten; 124 } 125 126 @Override 127 public boolean needsInput() { 128 return !finished(); 129 } 130 131 @Override 132 public void reinit(final Configuration conf) { 133 if (conf != null) { 134 // Level might have changed 135 boolean levelChanged = false; 136 int newLevel = ZstdCodec.getLevel(conf); 137 if (level != newLevel) { 138 level = newLevel; 139 levelChanged = true; 140 } 141 // Dictionary may have changed 142 byte[] b = ZstdCodec.getDictionary(conf); 143 if (b != null) { 144 // Don't casually create dictionary objects; they consume native memory 145 int thisDictId = ZstdCodec.getDictionaryId(b); 146 if (dict == null || dictId != thisDictId || levelChanged) { 147 dictId = thisDictId; 148 dict = new ZstdDictCompress(b, level); 149 } 150 } else { 151 dict = null; 152 } 153 // Buffer size might have changed 154 int newBufferSize = ZstdCodec.getBufferSize(conf); 155 if (bufferSize != newBufferSize) { 156 bufferSize = newBufferSize; 157 this.inBuf = ByteBuffer.allocateDirect(bufferSize); 158 this.outBuf = ByteBuffer.allocateDirect(bufferSize); 159 } 160 } 161 reset(); 162 } 163 164 @Override 165 public void reset() { 166 inBuf.clear(); 167 outBuf.clear(); 168 outBuf.position(outBuf.capacity()); 169 bytesRead = 0; 170 bytesWritten = 0; 171 finish = false; 172 finished = false; 173 } 174 175 @Override 176 public void setDictionary(final byte[] b, final int off, final int len) { 177 throw new UnsupportedOperationException("setDictionary is not supported"); 178 } 179 180 @Override 181 public void setInput(final byte[] b, final int off, final int len) { 182 if (inBuf.remaining() < len) { 183 // Get a new buffer that can accomodate the accumulated input plus the additional 184 // input that would cause a buffer overflow without reallocation. 185 // This condition should be fortunately rare, because it is expensive. 186 int needed = CompressionUtil.roundInt2(inBuf.capacity() + len); 187 ByteBuffer newBuf = ByteBuffer.allocateDirect(needed); 188 inBuf.flip(); 189 newBuf.put(inBuf); 190 inBuf = newBuf; 191 } 192 inBuf.put(b, off, len); 193 bytesRead += len; 194 finished = false; 195 } 196 197 // Package private 198 199 static int maxCompressedLength(final int len) { 200 return (int) Zstd.compressBound(len); 201 } 202 203}