001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.brotli;
019
020import com.aayushatharva.brotli4j.Brotli4jLoader;
021import com.aayushatharva.brotli4j.encoder.Encoder;
022import com.aayushatharva.brotli4j.encoder.Encoders;
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.io.compress.CanReinit;
027import org.apache.hadoop.hbase.io.compress.CompressionUtil;
028import org.apache.hadoop.io.compress.Compressor;
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * Hadoop compressor glue for Brotli4j
033 */
034@InterfaceAudience.Private
035public class BrotliCompressor implements CanReinit, Compressor {
036
037  protected ByteBuffer inBuf, outBuf;
038  protected int bufferSize;
039  protected boolean finish, finished;
040  protected long bytesRead, bytesWritten;
041  protected Encoder.Parameters params;
042
043  static {
044    Brotli4jLoader.ensureAvailability();
045  }
046
047  BrotliCompressor(int level, int window, int bufferSize) {
048    this.bufferSize = bufferSize;
049    this.inBuf = ByteBuffer.allocate(bufferSize);
050    this.outBuf = ByteBuffer.allocate(bufferSize);
051    this.outBuf.position(bufferSize);
052    params = new Encoder.Parameters();
053    params.setQuality(level);
054    params.setWindow(window);
055  }
056
057  @Override
058  public int compress(byte[] b, int off, int len) throws IOException {
059    // If we have previously compressed our input and still have some buffered bytes
060    // remaining, provide them to the caller.
061    if (outBuf.hasRemaining()) {
062      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
063      outBuf.get(b, off, n);
064      return n;
065    }
066    // We don't actually begin compression until our caller calls finish().
067    if (finish) {
068      if (inBuf.position() > 0) {
069        inBuf.flip();
070        int uncompressed = inBuf.remaining();
071        // If we don't have enough capacity in our currently allocated output buffer,
072        // allocate a new one which does.
073        int needed = maxCompressedLength(uncompressed);
074        // Can we compress directly into the provided array?
075        boolean direct = false;
076        ByteBuffer writeBuf;
077        if (len <= needed) {
078          direct = true;
079          writeBuf = ByteBuffer.wrap(b, off, len);
080        } else {
081          if (outBuf.capacity() < needed) {
082            needed = CompressionUtil.roundInt2(needed);
083            outBuf = ByteBuffer.allocate(needed);
084          } else {
085            outBuf.clear();
086          }
087          writeBuf = outBuf;
088        }
089        final int oldPos = writeBuf.position();
090        Encoders.compress(inBuf, writeBuf, params);
091        final int written = writeBuf.position() - oldPos;
092        bytesWritten += written;
093        inBuf.clear();
094        finished = true;
095        if (!direct) {
096          outBuf.flip();
097          int n = Math.min(written, len);
098          outBuf.get(b, off, n);
099          return n;
100        } else {
101          return written;
102        }
103      } else {
104        finished = true;
105      }
106    }
107    return 0;
108  }
109
110  @Override
111  public void end() {
112  }
113
114  @Override
115  public void finish() {
116    finish = true;
117  }
118
119  @Override
120  public boolean finished() {
121    return finished && !outBuf.hasRemaining();
122  }
123
124  @Override
125  public long getBytesRead() {
126    return bytesRead;
127  }
128
129  @Override
130  public long getBytesWritten() {
131    return bytesWritten;
132  }
133
134  @Override
135  public boolean needsInput() {
136    return !finished();
137  }
138
139  @Override
140  public void reinit(Configuration conf) {
141    if (conf != null) {
142      // Quality or window settings might have changed
143      params.setQuality(BrotliCodec.getLevel(conf));
144      params.setWindow(BrotliCodec.getWindow(conf));
145      // Buffer size might have changed
146      int newBufferSize = BrotliCodec.getBufferSize(conf);
147      if (bufferSize != newBufferSize) {
148        bufferSize = newBufferSize;
149        this.inBuf = ByteBuffer.allocate(bufferSize);
150        this.outBuf = ByteBuffer.allocate(bufferSize);
151      }
152    }
153    reset();
154  }
155
156  @Override
157  public void reset() {
158    inBuf.clear();
159    outBuf.clear();
160    outBuf.position(outBuf.capacity());
161    bytesRead = 0;
162    bytesWritten = 0;
163    finish = false;
164    finished = false;
165  }
166
167  @Override
168  public void setDictionary(byte[] b, int off, int len) {
169    throw new UnsupportedOperationException("setDictionary is not supported");
170  }
171
172  @Override
173  public void setInput(byte[] b, int off, int len) {
174    if (inBuf.remaining() < len) {
175      // Get a new buffer that can accomodate the accumulated input plus the additional
176      // input that would cause a buffer overflow without reallocation.
177      // This condition should be fortunately rare, because it is expensive.
178      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
179      ByteBuffer newBuf = ByteBuffer.allocate(needed);
180      inBuf.flip();
181      newBuf.put(inBuf);
182      inBuf = newBuf;
183    }
184    inBuf.put(b, off, len);
185    bytesRead += len;
186    finished = false;
187  }
188
189  // Package private
190
191  int maxCompressedLength(int len) {
192    return len + CompressionUtil.compressionOverhead(len);
193  }
194
195}