001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.lz4;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import net.jpountz.lz4.LZ4Factory;
023import net.jpountz.lz4.LZ4SafeDecompressor;
024import org.apache.hadoop.hbase.io.compress.CompressionUtil;
025import org.apache.hadoop.io.compress.Decompressor;
026import org.apache.yetus.audience.InterfaceAudience;
027
028/**
029 * Hadoop decompressor glue for lz4-java.
030 */
031@InterfaceAudience.Private
032public class Lz4Decompressor implements Decompressor {
033
034  protected LZ4SafeDecompressor decompressor;
035  protected ByteBuffer inBuf, outBuf;
036  protected int bufferSize, inLen;
037  protected boolean finished;
038
039  Lz4Decompressor(int bufferSize) {
040    this.decompressor = LZ4Factory.fastestInstance().safeDecompressor();
041    this.bufferSize = bufferSize;
042    this.inBuf = ByteBuffer.allocate(bufferSize);
043    this.outBuf = ByteBuffer.allocate(bufferSize);
044    this.outBuf.position(bufferSize);
045  }
046
047  @Override
048  public int decompress(byte[] b, int off, int len) throws IOException {
049    if (outBuf.hasRemaining()) {
050      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
051      outBuf.get(b, off, n);
052      return n;
053    }
054    if (inBuf.position() > 0) {
055      inBuf.flip();
056      int remaining = inBuf.remaining();
057      inLen -= remaining;
058      outBuf.clear();
059      decompressor.decompress(inBuf, outBuf);
060      inBuf.clear();
061      final int written = outBuf.position();
062      outBuf.flip();
063      int n = Math.min(written, len);
064      outBuf.get(b, off, n);
065      return n;
066    }
067    finished = true;
068    return 0;
069  }
070
071  @Override
072  public void end() {
073  }
074
075  @Override
076  public boolean finished() {
077    return finished;
078  }
079
080  @Override
081  public int getRemaining() {
082    return inLen;
083  }
084
085  @Override
086  public boolean needsDictionary() {
087    return false;
088  }
089
090  @Override
091  public void reset() {
092    inBuf.clear();
093    inLen = 0;
094    outBuf.clear();
095    outBuf.position(outBuf.capacity());
096    finished = false;
097  }
098
099  @Override
100  public boolean needsInput() {
101    return inBuf.position() == 0;
102  }
103
104  @Override
105  public void setDictionary(byte[] b, int off, int len) {
106    throw new UnsupportedOperationException("setDictionary is not supported");
107  }
108
109  @Override
110  public void setInput(byte[] b, int off, int len) {
111    if (inBuf.remaining() < len) {
112      // Get a new buffer that can accomodate the accumulated input plus the additional
113      // input that would cause a buffer overflow without reallocation.
114      // This condition should be fortunately rare, because it is expensive.
115      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
116      ByteBuffer newBuf = ByteBuffer.allocate(needed);
117      inBuf.flip();
118      newBuf.put(inBuf);
119      inBuf = newBuf;
120    }
121    inBuf.put(b, off, len);
122    inLen += len;
123    finished = false;
124  }
125
126}