001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.xerial; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import org.apache.hadoop.hbase.io.compress.CompressionUtil; 023import org.apache.hadoop.io.compress.Decompressor; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.xerial.snappy.Snappy; 026 027/** 028 * Hadoop decompressor glue for Xerial Snappy. 029 */ 030@InterfaceAudience.Private 031public class SnappyDecompressor implements Decompressor { 032 033 protected ByteBuffer inBuf, outBuf; 034 protected int inLen; 035 protected boolean finished; 036 037 SnappyDecompressor(int bufferSize) { 038 this.inBuf = ByteBuffer.allocateDirect(bufferSize); 039 this.outBuf = ByteBuffer.allocateDirect(bufferSize); 040 this.outBuf.position(bufferSize); 041 } 042 043 @Override 044 public int decompress(byte[] b, int off, int len) throws IOException { 045 if (outBuf.hasRemaining()) { 046 int remaining = outBuf.remaining(), n = Math.min(remaining, len); 047 outBuf.get(b, off, n); 048 return n; 049 } 050 if (inBuf.position() > 0) { 051 inBuf.flip(); 052 int remaining = inBuf.remaining(); 053 inLen -= remaining; 054 outBuf.clear(); 055 int written = Snappy.uncompress(inBuf, outBuf); 056 inBuf.clear(); 057 int n = Math.min(written, len); 058 outBuf.get(b, off, n); 059 return n; 060 } 061 finished = true; 062 return 0; 063 } 064 065 @Override 066 public void end() { 067 } 068 069 @Override 070 public boolean finished() { 071 return finished; 072 } 073 074 @Override 075 public int getRemaining() { 076 return inLen; 077 } 078 079 @Override 080 public boolean needsDictionary() { 081 return false; 082 } 083 084 @Override 085 public void reset() { 086 inBuf.clear(); 087 inLen = 0; 088 outBuf.clear(); 089 outBuf.position(outBuf.capacity()); 090 finished = false; 091 } 092 093 @Override 094 public boolean needsInput() { 095 return inBuf.position() == 0; 096 } 097 098 @Override 099 public void setDictionary(byte[] b, int off, int len) { 100 throw new UnsupportedOperationException("setDictionary is not supported"); 101 } 102 103 @Override 104 public void setInput(byte[] b, int off, int len) { 105 if (inBuf.remaining() < len) { 106 // Get a new buffer that can accomodate the accumulated input plus the additional 107 // input that would cause a buffer overflow without reallocation. 108 // This condition should be fortunately rare, because it is expensive. 109 int needed = CompressionUtil.roundInt2(inBuf.capacity() + len); 110 ByteBuffer newBuf = ByteBuffer.allocateDirect(needed); 111 inBuf.flip(); 112 newBuf.put(inBuf); 113 inBuf = newBuf; 114 } 115 inBuf.put(b, off, len); 116 inLen += len; 117 finished = false; 118 } 119 120}