001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.aircompressor; 019 020import io.airlift.compress.Decompressor; 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import org.apache.hadoop.hbase.io.compress.CompressionUtil; 024import org.apache.yetus.audience.InterfaceAudience; 025 026/** 027 * Hadoop decompressor glue for aircompressor decompressors. 028 */ 029@InterfaceAudience.Private 030public class HadoopDecompressor<T extends Decompressor> 031 implements org.apache.hadoop.io.compress.Decompressor { 032 033 protected T decompressor; 034 protected ByteBuffer inBuf, outBuf; 035 protected int inLen; 036 protected boolean finished; 037 038 HadoopDecompressor(T decompressor, int bufferSize) { 039 this.decompressor = decompressor; 040 this.inBuf = ByteBuffer.allocate(bufferSize); 041 this.outBuf = ByteBuffer.allocate(bufferSize); 042 this.outBuf.position(bufferSize); 043 } 044 045 @Override 046 public int decompress(byte[] b, int off, int len) throws IOException { 047 if (outBuf.hasRemaining()) { 048 int remaining = outBuf.remaining(), n = Math.min(remaining, len); 049 outBuf.get(b, off, n); 050 return n; 051 } 052 if (inBuf.position() > 0) { 053 inBuf.flip(); 054 int remaining = inBuf.remaining(); 055 inLen -= remaining; 056 outBuf.rewind(); 057 outBuf.limit(outBuf.capacity()); 058 decompressor.decompress(inBuf, outBuf); 059 inBuf.rewind(); 060 inBuf.limit(inBuf.capacity()); 061 final int written = outBuf.position(); 062 outBuf.flip(); 063 int n = Math.min(written, len); 064 outBuf.get(b, off, n); 065 return n; 066 } 067 finished = true; 068 return 0; 069 } 070 071 @Override 072 public void end() { 073 } 074 075 @Override 076 public boolean finished() { 077 return finished; 078 } 079 080 @Override 081 public int getRemaining() { 082 return inLen; 083 } 084 085 @Override 086 public boolean needsDictionary() { 087 return false; 088 } 089 090 @Override 091 public void reset() { 092 inBuf.rewind(); 093 inBuf.limit(inBuf.capacity()); 094 inLen = 0; 095 outBuf.rewind(); 096 outBuf.limit(0); 097 finished = false; 098 } 099 100 @Override 101 public boolean needsInput() { 102 return inBuf.position() == 0; 103 } 104 105 @Override 106 public void setDictionary(byte[] b, int off, int len) { 107 throw new UnsupportedOperationException("setDictionary is not supported"); 108 } 109 110 @Override 111 public void setInput(byte[] b, int off, int len) { 112 if (inBuf.remaining() < len) { 113 // Get a new buffer that can accomodate the accumulated input plus the additional 114 // input that would cause a buffer overflow without reallocation. 115 // This condition should be fortunately rare, because it is expensive. 116 int needed = CompressionUtil.roundInt2(inBuf.capacity() + len); 117 ByteBuffer newBuf = ByteBuffer.allocate(needed); 118 inBuf.flip(); 119 newBuf.put(inBuf); 120 inBuf = newBuf; 121 } 122 inBuf.put(b, off, len); 123 inLen += len; 124 finished = false; 125 } 126 127}