001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.zstd; 019 020import com.github.luben.zstd.Zstd; 021import com.github.luben.zstd.ZstdCompressCtx; 022import com.github.luben.zstd.ZstdDictCompress; 023import java.io.IOException; 024import java.nio.ByteBuffer; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.io.compress.CanReinit; 027import org.apache.hadoop.hbase.io.compress.CompressionUtil; 028import org.apache.hadoop.io.compress.Compressor; 029import org.apache.yetus.audience.InterfaceAudience; 030 031/** 032 * Hadoop compressor glue for zstd-jni. 033 */ 034@InterfaceAudience.Private 035public class ZstdCompressor implements CanReinit, Compressor { 036 037 protected int level, bufferSize; 038 protected ByteBuffer inBuf, outBuf; 039 protected boolean finish, finished; 040 protected long bytesRead, bytesWritten; 041 protected int dictId; 042 protected ZstdDictCompress dict; 043 protected ZstdCompressCtx ctx; 044 045 ZstdCompressor(final int level, final int bufferSize, final byte[] dictionary) { 046 this.level = level; 047 this.bufferSize = bufferSize; 048 this.inBuf = ByteBuffer.allocateDirect(bufferSize); 049 this.outBuf = ByteBuffer.allocateDirect(bufferSize); 050 this.outBuf.position(bufferSize); 051 this.ctx = new ZstdCompressCtx(); 052 this.ctx.setLevel(level); 053 if (dictionary != null) { 054 this.dictId = ZstdCodec.getDictionaryId(dictionary); 055 this.dict = new ZstdDictCompress(dictionary, level); 056 this.ctx.loadDict(this.dict); 057 } 058 } 059 060 ZstdCompressor(final int level, final int bufferSize) { 061 this(level, bufferSize, null); 062 } 063 064 @Override 065 public int compress(final byte[] b, final int off, final int len) throws IOException { 066 // If we have previously compressed our input and still have some buffered bytes 067 // remaining, provide them to the caller. 068 if (outBuf.hasRemaining()) { 069 int remaining = outBuf.remaining(), n = Math.min(remaining, len); 070 outBuf.get(b, off, n); 071 return n; 072 } 073 // We don't actually begin compression until our caller calls finish(). 074 if (finish) { 075 if (inBuf.position() > 0) { 076 inBuf.flip(); 077 int uncompressed = inBuf.remaining(); 078 // If we don't have enough capacity in our currently allocated output buffer, 079 // allocate a new one which does. 080 int needed = maxCompressedLength(uncompressed); 081 if (outBuf.capacity() < needed) { 082 needed = CompressionUtil.roundInt2(needed); 083 outBuf = ByteBuffer.allocateDirect(needed); 084 } else { 085 outBuf.clear(); 086 } 087 int written = ctx.compress(outBuf, inBuf); 088 bytesWritten += written; 089 inBuf.clear(); 090 finished = true; 091 outBuf.flip(); 092 int n = Math.min(written, len); 093 outBuf.get(b, off, n); 094 return n; 095 } else { 096 finished = true; 097 } 098 } 099 return 0; 100 } 101 102 @Override 103 public void end() { 104 } 105 106 @Override 107 public void finish() { 108 finish = true; 109 } 110 111 @Override 112 public boolean finished() { 113 return finished && !outBuf.hasRemaining(); 114 } 115 116 @Override 117 public long getBytesRead() { 118 return bytesRead; 119 } 120 121 @Override 122 public long getBytesWritten() { 123 return bytesWritten; 124 } 125 126 @Override 127 public boolean needsInput() { 128 return !finished(); 129 } 130 131 @Override 132 public void reinit(final Configuration conf) { 133 if (conf != null) { 134 // Level might have changed 135 boolean levelChanged = false; 136 int newLevel = ZstdCodec.getLevel(conf); 137 if (level != newLevel) { 138 level = newLevel; 139 levelChanged = true; 140 } 141 // Dictionary may have changed 142 byte[] b = ZstdCodec.getDictionary(conf); 143 if (b != null) { 144 // Don't casually create dictionary objects; they consume native memory 145 int thisDictId = ZstdCodec.getDictionaryId(b); 146 if (dict == null || dictId != thisDictId || levelChanged) { 147 dictId = thisDictId; 148 dict = new ZstdDictCompress(b, level); 149 } 150 } else { 151 dict = null; 152 } 153 // Buffer size might have changed 154 int newBufferSize = ZstdCodec.getBufferSize(conf); 155 if (bufferSize != newBufferSize) { 156 bufferSize = newBufferSize; 157 this.inBuf = ByteBuffer.allocateDirect(bufferSize); 158 this.outBuf = ByteBuffer.allocateDirect(bufferSize); 159 } 160 } 161 reset(); 162 } 163 164 @Override 165 public void reset() { 166 inBuf.clear(); 167 outBuf.clear(); 168 outBuf.position(outBuf.capacity()); 169 bytesRead = 0; 170 bytesWritten = 0; 171 finish = false; 172 finished = false; 173 ctx.reset(); 174 ctx.setLevel(level); 175 if (dict != null) { 176 ctx.loadDict(dict); 177 } else { 178 // loadDict((byte[]) accepts null to clear the dictionary 179 ctx.loadDict((byte[]) null); 180 } 181 } 182 183 @Override 184 public void setDictionary(final byte[] b, final int off, final int len) { 185 throw new UnsupportedOperationException("setDictionary is not supported"); 186 } 187 188 @Override 189 public void setInput(final byte[] b, final int off, final int len) { 190 if (inBuf.remaining() < len) { 191 // Get a new buffer that can accomodate the accumulated input plus the additional 192 // input that would cause a buffer overflow without reallocation. 193 // This condition should be fortunately rare, because it is expensive. 194 int needed = CompressionUtil.roundInt2(inBuf.capacity() + len); 195 ByteBuffer newBuf = ByteBuffer.allocateDirect(needed); 196 inBuf.flip(); 197 newBuf.put(inBuf); 198 inBuf = newBuf; 199 } 200 inBuf.put(b, off, len); 201 bytesRead += len; 202 finished = false; 203 } 204 205 // Package private 206 207 static int maxCompressedLength(final int len) { 208 return (int) Zstd.compressBound(len); 209 } 210 211}