001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.xerial;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.io.compress.CanReinit;
024import org.apache.hadoop.hbase.io.compress.CompressionUtil;
025import org.apache.hadoop.io.compress.Compressor;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.xerial.snappy.Snappy;
028
029/**
030 * Hadoop compressor glue for Xerial Snappy.
031 */
032@InterfaceAudience.Private
033public class SnappyCompressor implements CanReinit, Compressor {
034
035  protected ByteBuffer inBuf, outBuf;
036  protected int bufferSize;
037  protected boolean finish, finished;
038  protected long bytesRead, bytesWritten;
039
040  SnappyCompressor(int bufferSize) {
041    this.bufferSize = bufferSize;
042    this.inBuf = ByteBuffer.allocateDirect(bufferSize);
043    this.outBuf = ByteBuffer.allocateDirect(bufferSize);
044    this.outBuf.position(bufferSize);
045  }
046
047  @Override
048  public int compress(byte[] b, int off, int len) throws IOException {
049    // If we have previously compressed our input and still have some buffered bytes
050    // remaining, provide them to the caller.
051    if (outBuf.hasRemaining()) {
052      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
053      outBuf.get(b, off, n);
054      return n;
055    }
056    // We don't actually begin compression until our caller calls finish().
057    if (finish) {
058      if (inBuf.position() > 0) {
059        inBuf.flip();
060        int uncompressed = inBuf.remaining();
061        // If we don't have enough capacity in our currently allocated output buffer,
062        // allocate a new one which does.
063        int needed = maxCompressedLength(uncompressed);
064        if (outBuf.capacity() < needed) {
065          needed = CompressionUtil.roundInt2(needed);
066          outBuf = ByteBuffer.allocateDirect(needed);
067        } else {
068          outBuf.clear();
069        }
070        int written = Snappy.compress(inBuf, outBuf);
071        bytesWritten += written;
072        inBuf.clear();
073        finished = true;
074        int n = Math.min(written, len);
075        outBuf.get(b, off, n);
076        return n;
077      } else {
078        finished = true;
079      }
080    }
081    return 0;
082  }
083
084  @Override
085  public void end() {
086  }
087
088  @Override
089  public void finish() {
090    finish = true;
091  }
092
093  @Override
094  public boolean finished() {
095    return finished && !outBuf.hasRemaining();
096  }
097
098  @Override
099  public long getBytesRead() {
100    return bytesRead;
101  }
102
103  @Override
104  public long getBytesWritten() {
105    return bytesWritten;
106  }
107
108  @Override
109  public boolean needsInput() {
110    return !finished();
111  }
112
113  @Override
114  public void reinit(Configuration conf) {
115    if (conf != null) {
116      // Buffer size might have changed
117      int newBufferSize = SnappyCodec.getBufferSize(conf);
118      if (bufferSize != newBufferSize) {
119        bufferSize = newBufferSize;
120        this.inBuf = ByteBuffer.allocateDirect(bufferSize);
121        this.outBuf = ByteBuffer.allocateDirect(bufferSize);
122      }
123    }
124    reset();
125  }
126
127  @Override
128  public void reset() {
129    inBuf.clear();
130    outBuf.clear();
131    outBuf.position(outBuf.capacity());
132    bytesRead = 0;
133    bytesWritten = 0;
134    finish = false;
135    finished = false;
136  }
137
138  @Override
139  public void setDictionary(byte[] b, int off, int len) {
140    throw new UnsupportedOperationException("setDictionary is not supported");
141  }
142
143  @Override
144  public void setInput(byte[] b, int off, int len) {
145    if (inBuf.remaining() < len) {
146      // Get a new buffer that can accomodate the accumulated input plus the additional
147      // input that would cause a buffer overflow without reallocation.
148      // This condition should be fortunately rare, because it is expensive.
149      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
150      ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
151      inBuf.flip();
152      newBuf.put(inBuf);
153      inBuf = newBuf;
154    }
155    inBuf.put(b, off, len);
156    bytesRead += len;
157    finished = false;
158  }
159
160  // Package private
161
162  int maxCompressedLength(int len) {
163    return Snappy.maxCompressedLength(len);
164  }
165
166}