001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.zstd; 019 020import com.github.luben.zstd.ZstdDecompressCtx; 021import com.github.luben.zstd.ZstdDictDecompress; 022import java.io.IOException; 023import java.nio.ByteBuffer; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.io.compress.CanReinit; 026import org.apache.hadoop.hbase.io.compress.CompressionUtil; 027import org.apache.hadoop.io.compress.Decompressor; 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * Hadoop decompressor glue for zstd-java. 032 */ 033@InterfaceAudience.Private 034public class ZstdDecompressor implements CanReinit, Decompressor { 035 036 protected ByteBuffer inBuf, outBuf; 037 protected int bufferSize; 038 protected int inLen; 039 protected boolean finished; 040 protected int dictId; 041 protected ZstdDictDecompress dict; 042 protected ZstdDecompressCtx ctx; 043 044 ZstdDecompressor(final int bufferSize, final byte[] dictionary) { 045 this.bufferSize = bufferSize; 046 this.inBuf = ByteBuffer.allocateDirect(bufferSize); 047 this.outBuf = ByteBuffer.allocateDirect(bufferSize); 048 this.outBuf.position(bufferSize); 049 this.ctx = new ZstdDecompressCtx(); 050 if (dictionary != null) { 051 this.dictId = ZstdCodec.getDictionaryId(dictionary); 052 this.dict = new ZstdDictDecompress(dictionary); 053 this.ctx.loadDict(this.dict); 054 } 055 } 056 057 ZstdDecompressor(final int bufferSize) { 058 this(bufferSize, null); 059 } 060 061 @Override 062 public int decompress(final byte[] b, final int off, final int len) throws IOException { 063 if (outBuf.hasRemaining()) { 064 int remaining = outBuf.remaining(), n = Math.min(remaining, len); 065 outBuf.get(b, off, n); 066 return n; 067 } 068 if (inBuf.position() > 0) { 069 inBuf.flip(); 070 int remaining = inBuf.remaining(); 071 inLen -= remaining; 072 outBuf.clear(); 073 int written = ctx.decompress(outBuf, inBuf); 074 inBuf.clear(); 075 outBuf.flip(); 076 int n = Math.min(written, len); 077 outBuf.get(b, off, n); 078 return n; 079 } 080 finished = true; 081 return 0; 082 } 083 084 @Override 085 public void end() { 086 } 087 088 @Override 089 public boolean finished() { 090 return finished; 091 } 092 093 @Override 094 public int getRemaining() { 095 return inLen; 096 } 097 098 @Override 099 public boolean needsDictionary() { 100 return false; 101 } 102 103 @Override 104 public void reset() { 105 inBuf.clear(); 106 inLen = 0; 107 outBuf.clear(); 108 outBuf.position(outBuf.capacity()); 109 finished = false; 110 ctx.reset(); 111 if (dict != null) { 112 ctx.loadDict(dict); 113 } else { 114 // loadDict((byte[]) accepts null to clear the dictionary 115 ctx.loadDict((byte[]) null); 116 } 117 } 118 119 @Override 120 public boolean needsInput() { 121 return (inBuf.position() == 0); 122 } 123 124 @Override 125 public void setDictionary(final byte[] b, final int off, final int len) { 126 throw new UnsupportedOperationException("setDictionary is not supported"); 127 } 128 129 @Override 130 public void setInput(final byte[] b, final int off, final int len) { 131 if (inBuf.remaining() < len) { 132 // Get a new buffer that can accomodate the accumulated input plus the additional 133 // input that would cause a buffer overflow without reallocation. 134 // This condition should be fortunately rare, because it is expensive. 135 final int needed = CompressionUtil.roundInt2(inBuf.capacity() + len); 136 ByteBuffer newBuf = ByteBuffer.allocateDirect(needed); 137 inBuf.flip(); 138 newBuf.put(inBuf); 139 inBuf = newBuf; 140 } 141 inBuf.put(b, off, len); 142 inLen += len; 143 finished = false; 144 } 145 146 @Override 147 public void reinit(final Configuration conf) { 148 if (conf != null) { 149 // Dictionary may have changed 150 byte[] b = ZstdCodec.getDictionary(conf); 151 if (b != null) { 152 // Don't casually create dictionary objects; they consume native memory 153 int thisDictId = ZstdCodec.getDictionaryId(b); 154 if (dict == null || dictId != thisDictId) { 155 dictId = thisDictId; 156 dict = new ZstdDictDecompress(b); 157 } 158 } else { 159 dict = null; 160 } 161 // Buffer size might have changed 162 int newBufferSize = ZstdCodec.getBufferSize(conf); 163 if (bufferSize != newBufferSize) { 164 bufferSize = newBufferSize; 165 this.inBuf = ByteBuffer.allocateDirect(bufferSize); 166 this.outBuf = ByteBuffer.allocateDirect(bufferSize); 167 } 168 } 169 reset(); 170 } 171 172}