001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.zstd; 019 020import com.github.luben.zstd.Zstd; 021import com.github.luben.zstd.ZstdDictDecompress; 022import java.io.IOException; 023import java.nio.ByteBuffer; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.io.compress.CanReinit; 026import org.apache.hadoop.hbase.io.compress.CompressionUtil; 027import org.apache.hadoop.io.compress.Decompressor; 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * Hadoop decompressor glue for zstd-java. 032 */ 033@InterfaceAudience.Private 034public class ZstdDecompressor implements CanReinit, Decompressor { 035 036 protected ByteBuffer inBuf, outBuf; 037 protected int bufferSize; 038 protected int inLen; 039 protected boolean finished; 040 protected int dictId; 041 protected ZstdDictDecompress dict; 042 043 ZstdDecompressor(final int bufferSize, final byte[] dictionary) { 044 this.bufferSize = bufferSize; 045 this.inBuf = ByteBuffer.allocateDirect(bufferSize); 046 this.outBuf = ByteBuffer.allocateDirect(bufferSize); 047 this.outBuf.position(bufferSize); 048 if (dictionary != null) { 049 this.dictId = ZstdCodec.getDictionaryId(dictionary); 050 this.dict = new ZstdDictDecompress(dictionary); 051 } 052 } 053 054 ZstdDecompressor(final int bufferSize) { 055 this(bufferSize, null); 056 } 057 058 @Override 059 public int decompress(final byte[] b, final int off, final int len) throws IOException { 060 if (outBuf.hasRemaining()) { 061 int remaining = outBuf.remaining(), n = Math.min(remaining, len); 062 outBuf.get(b, off, n); 063 return n; 064 } 065 if (inBuf.position() > 0) { 066 inBuf.flip(); 067 int remaining = inBuf.remaining(); 068 inLen -= remaining; 069 outBuf.clear(); 070 int written; 071 if (dict != null) { 072 written = Zstd.decompress(outBuf, inBuf, dict); 073 } else { 074 written = Zstd.decompress(outBuf, inBuf); 075 } 076 inBuf.clear(); 077 outBuf.flip(); 078 int n = Math.min(written, len); 079 outBuf.get(b, off, n); 080 return n; 081 } 082 finished = true; 083 return 0; 084 } 085 086 @Override 087 public void end() { 088 } 089 090 @Override 091 public boolean finished() { 092 return finished; 093 } 094 095 @Override 096 public int getRemaining() { 097 return inLen; 098 } 099 100 @Override 101 public boolean needsDictionary() { 102 return false; 103 } 104 105 @Override 106 public void reset() { 107 inBuf.clear(); 108 inLen = 0; 109 outBuf.clear(); 110 outBuf.position(outBuf.capacity()); 111 finished = false; 112 } 113 114 @Override 115 public boolean needsInput() { 116 return (inBuf.position() == 0); 117 } 118 119 @Override 120 public void setDictionary(final byte[] b, final int off, final int len) { 121 throw new UnsupportedOperationException("setDictionary is not supported"); 122 } 123 124 @Override 125 public void setInput(final byte[] b, final int off, final int len) { 126 if (inBuf.remaining() < len) { 127 // Get a new buffer that can accomodate the accumulated input plus the additional 128 // input that would cause a buffer overflow without reallocation. 129 // This condition should be fortunately rare, because it is expensive. 130 final int needed = CompressionUtil.roundInt2(inBuf.capacity() + len); 131 ByteBuffer newBuf = ByteBuffer.allocateDirect(needed); 132 inBuf.flip(); 133 newBuf.put(inBuf); 134 inBuf = newBuf; 135 } 136 inBuf.put(b, off, len); 137 inLen += len; 138 finished = false; 139 } 140 141 @Override 142 public void reinit(final Configuration conf) { 143 if (conf != null) { 144 // Dictionary may have changed 145 byte[] b = ZstdCodec.getDictionary(conf); 146 if (b != null) { 147 // Don't casually create dictionary objects; they consume native memory 148 int thisDictId = ZstdCodec.getDictionaryId(b); 149 if (dict == null || dictId != thisDictId) { 150 dictId = thisDictId; 151 dict = new ZstdDictDecompress(b); 152 } 153 } else { 154 dict = null; 155 } 156 // Buffer size might have changed 157 int newBufferSize = ZstdCodec.getBufferSize(conf); 158 if (bufferSize != newBufferSize) { 159 bufferSize = newBufferSize; 160 this.inBuf = ByteBuffer.allocateDirect(bufferSize); 161 this.outBuf = ByteBuffer.allocateDirect(bufferSize); 162 } 163 } 164 reset(); 165 } 166 167}