001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.lz4; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import net.jpountz.lz4.LZ4Factory; 023import net.jpountz.lz4.LZ4SafeDecompressor; 024import org.apache.hadoop.hbase.io.compress.CompressionUtil; 025import org.apache.hadoop.io.compress.Decompressor; 026import org.apache.yetus.audience.InterfaceAudience; 027 028/** 029 * Hadoop decompressor glue for lz4-java. 030 */ 031@InterfaceAudience.Private 032public class Lz4Decompressor implements Decompressor { 033 034 protected LZ4SafeDecompressor decompressor; 035 protected ByteBuffer inBuf, outBuf; 036 protected int bufferSize, inLen; 037 protected boolean finished; 038 039 Lz4Decompressor(int bufferSize) { 040 this.decompressor = LZ4Factory.fastestInstance().safeDecompressor(); 041 this.bufferSize = bufferSize; 042 this.inBuf = ByteBuffer.allocate(bufferSize); 043 this.outBuf = ByteBuffer.allocate(bufferSize); 044 this.outBuf.position(bufferSize); 045 } 046 047 @Override 048 public int decompress(byte[] b, int off, int len) throws IOException { 049 if (outBuf.hasRemaining()) { 050 int remaining = outBuf.remaining(), n = Math.min(remaining, len); 051 outBuf.get(b, off, n); 052 return n; 053 } 054 if (inBuf.position() > 0) { 055 inBuf.flip(); 056 int remaining = inBuf.remaining(); 057 inLen -= remaining; 058 outBuf.clear(); 059 decompressor.decompress(inBuf, outBuf); 060 inBuf.clear(); 061 final int written = outBuf.position(); 062 outBuf.flip(); 063 int n = Math.min(written, len); 064 outBuf.get(b, off, n); 065 return n; 066 } 067 finished = true; 068 return 0; 069 } 070 071 @Override 072 public void end() { 073 } 074 075 @Override 076 public boolean finished() { 077 return finished; 078 } 079 080 @Override 081 public int getRemaining() { 082 return inLen; 083 } 084 085 @Override 086 public boolean needsDictionary() { 087 return false; 088 } 089 090 @Override 091 public void reset() { 092 inBuf.clear(); 093 inLen = 0; 094 outBuf.clear(); 095 outBuf.position(outBuf.capacity()); 096 finished = false; 097 } 098 099 @Override 100 public boolean needsInput() { 101 return inBuf.position() == 0; 102 } 103 104 @Override 105 public void setDictionary(byte[] b, int off, int len) { 106 throw new UnsupportedOperationException("setDictionary is not supported"); 107 } 108 109 @Override 110 public void setInput(byte[] b, int off, int len) { 111 if (inBuf.remaining() < len) { 112 // Get a new buffer that can accomodate the accumulated input plus the additional 113 // input that would cause a buffer overflow without reallocation. 114 // This condition should be fortunately rare, because it is expensive. 115 int needed = CompressionUtil.roundInt2(inBuf.capacity() + len); 116 ByteBuffer newBuf = ByteBuffer.allocate(needed); 117 inBuf.flip(); 118 newBuf.put(inBuf); 119 inBuf = newBuf; 120 } 121 inBuf.put(b, off, len); 122 inLen += len; 123 finished = false; 124 } 125 126}