001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.xerial;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import org.apache.hadoop.hbase.io.compress.CompressionUtil;
023import org.apache.hadoop.io.compress.Decompressor;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.xerial.snappy.Snappy;
026
027/**
028 * Hadoop decompressor glue for Xerial Snappy.
029 */
030@InterfaceAudience.Private
031public class SnappyDecompressor implements Decompressor {
032
033  protected ByteBuffer inBuf, outBuf;
034  protected int inLen;
035  protected boolean finished;
036
037  SnappyDecompressor(int bufferSize) {
038    this.inBuf = ByteBuffer.allocateDirect(bufferSize);
039    this.outBuf = ByteBuffer.allocateDirect(bufferSize);
040    this.outBuf.position(bufferSize);
041  }
042
043  @Override
044  public int decompress(byte[] b, int off, int len) throws IOException {
045    if (outBuf.hasRemaining()) {
046      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
047      outBuf.get(b, off, n);
048      return n;
049    }
050    if (inBuf.position() > 0) {
051      inBuf.flip();
052      int remaining = inBuf.remaining();
053      inLen -= remaining;
054      outBuf.clear();
055      int written = Snappy.uncompress(inBuf, outBuf);
056      inBuf.clear();
057      int n = Math.min(written, len);
058      outBuf.get(b, off, n);
059      return n;
060    }
061    finished = true;
062    return 0;
063  }
064
065  @Override
066  public void end() {
067  }
068
069  @Override
070  public boolean finished() {
071    return finished;
072  }
073
074  @Override
075  public int getRemaining() {
076    return inLen;
077  }
078
079  @Override
080  public boolean needsDictionary() {
081    return false;
082  }
083
084  @Override
085  public void reset() {
086    inBuf.clear();
087    inLen = 0;
088    outBuf.clear();
089    outBuf.position(outBuf.capacity());
090    finished = false;
091  }
092
093  @Override
094  public boolean needsInput() {
095    return inBuf.position() == 0;
096  }
097
098  @Override
099  public void setDictionary(byte[] b, int off, int len) {
100    throw new UnsupportedOperationException("setDictionary is not supported");
101  }
102
103  @Override
104  public void setInput(byte[] b, int off, int len) {
105    if (inBuf.remaining() < len) {
106      // Get a new buffer that can accomodate the accumulated input plus the additional
107      // input that would cause a buffer overflow without reallocation.
108      // This condition should be fortunately rare, because it is expensive.
109      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
110      ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
111      inBuf.flip();
112      newBuf.put(inBuf);
113      inBuf = newBuf;
114    }
115    inBuf.put(b, off, len);
116    inLen += len;
117    finished = false;
118  }
119
120}