001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.aircompressor;
019
020import io.airlift.compress.Compressor;
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.io.compress.CanReinit;
025import org.apache.hadoop.hbase.io.compress.CompressionUtil;
026import org.apache.yetus.audience.InterfaceAudience;
027
028/**
029 * Hadoop compressor glue for aircompressor compressors.
030 */
031@InterfaceAudience.Private
032public abstract class HadoopCompressor<T extends Compressor>
033  implements CanReinit, org.apache.hadoop.io.compress.Compressor {
034
035  protected T compressor;
036  protected ByteBuffer inBuf, outBuf;
037  protected int bufferSize;
038  protected boolean finish, finished;
039  protected long bytesRead, bytesWritten;
040
041  HadoopCompressor(T compressor, int bufferSize) {
042    this.compressor = compressor;
043    this.bufferSize = bufferSize;
044    this.inBuf = ByteBuffer.allocate(bufferSize);
045    this.outBuf = ByteBuffer.allocate(bufferSize);
046    this.outBuf.position(bufferSize);
047  }
048
049  @Override
050  public int compress(byte[] b, int off, int len) throws IOException {
051    // If we have previously compressed our input and still have some buffered bytes
052    // remaining, provide them to the caller.
053    if (outBuf.hasRemaining()) {
054      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
055      outBuf.get(b, off, n);
056      return n;
057    }
058    // We don't actually begin compression until our caller calls finish().
059    // The aircompressor compressors operate over a range of input in one shot.
060    if (finish) {
061      if (inBuf.position() > 0) {
062        inBuf.flip();
063        int uncompressed = inBuf.remaining();
064        // If we don't have enough capacity in our currently allocated output buffer,
065        // allocate a new one which does.
066        int needed = maxCompressedLength(uncompressed);
067        // Can we decompress directly into the provided array?
068        ByteBuffer writeBuffer;
069        boolean direct = false;
070        if (len <= needed) {
071          writeBuffer = ByteBuffer.wrap(b, off, len);
072          direct = true;
073        } else {
074          if (outBuf.capacity() < needed) {
075            needed = CompressionUtil.roundInt2(needed);
076            outBuf = ByteBuffer.allocate(needed);
077          } else {
078            outBuf.clear();
079          }
080          writeBuffer = outBuf;
081        }
082        final int oldPos = writeBuffer.position();
083        compressor.compress(inBuf, writeBuffer);
084        final int written = writeBuffer.position() - oldPos;
085        bytesWritten += written;
086        inBuf.clear();
087        finished = true;
088        if (!direct) {
089          outBuf.flip();
090          int n = Math.min(written, len);
091          outBuf.get(b, off, n);
092          return n;
093        } else {
094          return written;
095        }
096      } else {
097        finished = true;
098      }
099    }
100    return 0;
101  }
102
103  @Override
104  public void end() {
105  }
106
107  @Override
108  public void finish() {
109    finish = true;
110  }
111
112  @Override
113  public boolean finished() {
114    return finished && !outBuf.hasRemaining();
115  }
116
117  @Override
118  public long getBytesRead() {
119    return bytesRead;
120  }
121
122  @Override
123  public long getBytesWritten() {
124    return bytesWritten;
125  }
126
127  @Override
128  public boolean needsInput() {
129    return !finished();
130  }
131
132  @Override
133  public void reinit(Configuration conf) {
134    if (conf != null) {
135      // Buffer size might have changed
136      int newBufferSize = getBufferSize(conf);
137      if (bufferSize != newBufferSize) {
138        bufferSize = newBufferSize;
139        this.inBuf = ByteBuffer.allocate(bufferSize);
140        this.outBuf = ByteBuffer.allocate(bufferSize);
141      }
142    }
143    reset();
144  }
145
146  @Override
147  public void reset() {
148    inBuf.clear();
149    outBuf.clear();
150    outBuf.position(outBuf.capacity());
151    bytesRead = 0;
152    bytesWritten = 0;
153    finish = false;
154    finished = false;
155  }
156
157  @Override
158  public void setDictionary(byte[] b, int off, int len) {
159    throw new UnsupportedOperationException("setDictionary is not supported");
160  }
161
162  @Override
163  public void setInput(byte[] b, int off, int len) {
164    if (inBuf.remaining() < len) {
165      // Get a new buffer that can accomodate the accumulated input plus the additional
166      // input that would cause a buffer overflow without reallocation.
167      // This condition should be fortunately rare, because it is expensive.
168      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
169      ByteBuffer newBuf = ByteBuffer.allocate(needed);
170      inBuf.flip();
171      newBuf.put(inBuf);
172      inBuf = newBuf;
173    }
174    inBuf.put(b, off, len);
175    bytesRead += len;
176    finished = false;
177  }
178
179  // Package private
180
181  int maxCompressedLength(int len) {
182    return compressor.maxCompressedLength(len);
183  }
184
185  abstract int getLevel(Configuration conf);
186
187  abstract int getBufferSize(Configuration conf);
188
189}