001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.lz4;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import net.jpountz.lz4.LZ4Compressor;
023import net.jpountz.lz4.LZ4Factory;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.io.compress.CanReinit;
026import org.apache.hadoop.hbase.io.compress.CompressionUtil;
027import org.apache.hadoop.io.compress.Compressor;
028import org.apache.yetus.audience.InterfaceAudience;
029
030/**
031 * Hadoop compressor glue for lz4-java.
032 */
033@InterfaceAudience.Private
034public class Lz4Compressor implements CanReinit, Compressor {
035
036  protected LZ4Compressor compressor;
037  protected ByteBuffer inBuf, outBuf;
038  protected int bufferSize;
039  protected boolean finish, finished;
040  protected long bytesRead, bytesWritten;
041
042  Lz4Compressor(int bufferSize) {
043    compressor = LZ4Factory.fastestInstance().fastCompressor();
044    this.bufferSize = bufferSize;
045    this.inBuf = ByteBuffer.allocate(bufferSize);
046    this.outBuf = ByteBuffer.allocate(bufferSize);
047    this.outBuf.position(bufferSize);
048  }
049
050  @Override
051  public int compress(byte[] b, int off, int len) throws IOException {
052    // If we have previously compressed our input and still have some buffered bytes
053    // remaining, provide them to the caller.
054    if (outBuf.hasRemaining()) {
055      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
056      outBuf.get(b, off, n);
057      return n;
058    }
059    // We don't actually begin compression until our caller calls finish().
060    if (finish) {
061      if (inBuf.position() > 0) {
062        inBuf.flip();
063        int uncompressed = inBuf.remaining();
064        int needed = maxCompressedLength(uncompressed);
065        // Can we decompress directly into the provided array?
066        ByteBuffer writeBuffer;
067        boolean direct = false;
068        if (len <= needed) {
069          writeBuffer = ByteBuffer.wrap(b, off, len);
070          direct = true;
071        } else {
072          // If we don't have enough capacity in our currently allocated output buffer,
073          // allocate a new one which does.
074          if (outBuf.capacity() < needed) {
075            needed = CompressionUtil.roundInt2(needed);
076            outBuf = ByteBuffer.allocate(needed);
077          } else {
078            outBuf.clear();
079          }
080          writeBuffer = outBuf;
081        }
082        final int oldPos = writeBuffer.position();
083        compressor.compress(inBuf, writeBuffer);
084        final int written = writeBuffer.position() - oldPos;
085        bytesWritten += written;
086        inBuf.clear();
087        finished = true;
088        if (!direct) {
089          outBuf.flip();
090          int n = Math.min(written, len);
091          outBuf.get(b, off, n);
092          return n;
093        } else {
094          return written;
095        }
096      } else {
097        finished = true;
098      }
099    }
100    return 0;
101  }
102
103  @Override
104  public void end() {
105  }
106
107  @Override
108  public void finish() {
109    finish = true;
110  }
111
112  @Override
113  public boolean finished() {
114    return finished && !outBuf.hasRemaining();
115  }
116
117  @Override
118  public long getBytesRead() {
119    return bytesRead;
120  }
121
122  @Override
123  public long getBytesWritten() {
124    return bytesWritten;
125  }
126
127  @Override
128  public boolean needsInput() {
129    return !finished();
130  }
131
132  @Override
133  public void reinit(Configuration conf) {
134    if (conf != null) {
135      // Buffer size might have changed
136      int newBufferSize = Lz4Codec.getBufferSize(conf);
137      if (bufferSize != newBufferSize) {
138        bufferSize = newBufferSize;
139        this.inBuf = ByteBuffer.allocate(bufferSize);
140        this.outBuf = ByteBuffer.allocate(bufferSize);
141      }
142    }
143    reset();
144  }
145
146  @Override
147  public void reset() {
148    inBuf.clear();
149    outBuf.clear();
150    outBuf.position(outBuf.capacity());
151    bytesRead = 0;
152    bytesWritten = 0;
153    finish = false;
154    finished = false;
155  }
156
157  @Override
158  public void setDictionary(byte[] b, int off, int len) {
159    throw new UnsupportedOperationException("setDictionary is not supported");
160  }
161
162  @Override
163  public void setInput(byte[] b, int off, int len) {
164    if (inBuf.remaining() < len) {
165      // Get a new buffer that can accomodate the accumulated input plus the additional
166      // input that would cause a buffer overflow without reallocation.
167      // This condition should be fortunately rare, because it is expensive.
168      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
169      ByteBuffer newBuf = ByteBuffer.allocate(needed);
170      inBuf.flip();
171      newBuf.put(inBuf);
172      inBuf = newBuf;
173    }
174    inBuf.put(b, off, len);
175    bytesRead += len;
176    finished = false;
177  }
178
179  // Package private
180
181  int maxCompressedLength(int len) {
182    return compressor.maxCompressedLength(len);
183  }
184
185}