001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.brotli; 019 020import com.aayushatharva.brotli4j.Brotli4jLoader; 021import com.aayushatharva.brotli4j.encoder.Encoder; 022import com.aayushatharva.brotli4j.encoder.Encoders; 023import java.io.IOException; 024import java.nio.ByteBuffer; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.io.compress.CanReinit; 027import org.apache.hadoop.hbase.io.compress.CompressionUtil; 028import org.apache.hadoop.io.compress.Compressor; 029import org.apache.yetus.audience.InterfaceAudience; 030 031/** 032 * Hadoop compressor glue for Brotli4j 033 */ 034@InterfaceAudience.Private 035public class BrotliCompressor implements CanReinit, Compressor { 036 037 protected ByteBuffer inBuf, outBuf; 038 protected int bufferSize; 039 protected boolean finish, finished; 040 protected long bytesRead, bytesWritten; 041 protected Encoder.Parameters params; 042 043 static { 044 Brotli4jLoader.ensureAvailability(); 045 } 046 047 BrotliCompressor(int level, int window, int bufferSize) { 048 this.bufferSize = bufferSize; 049 this.inBuf = ByteBuffer.allocate(bufferSize); 050 this.outBuf = ByteBuffer.allocate(bufferSize); 051 this.outBuf.position(bufferSize); 052 params = new Encoder.Parameters(); 053 params.setQuality(level); 054 params.setWindow(window); 055 } 056 057 @Override 058 public int compress(byte[] b, int off, int len) throws IOException { 059 // If we have previously compressed our input and still have some buffered bytes 060 // remaining, provide them to the caller. 061 if (outBuf.hasRemaining()) { 062 int remaining = outBuf.remaining(), n = Math.min(remaining, len); 063 outBuf.get(b, off, n); 064 return n; 065 } 066 // We don't actually begin compression until our caller calls finish(). 067 if (finish) { 068 if (inBuf.position() > 0) { 069 inBuf.flip(); 070 int uncompressed = inBuf.remaining(); 071 // If we don't have enough capacity in our currently allocated output buffer, 072 // allocate a new one which does. 073 int needed = maxCompressedLength(uncompressed); 074 // Can we compress directly into the provided array? 075 boolean direct = false; 076 ByteBuffer writeBuf; 077 if (len <= needed) { 078 direct = true; 079 writeBuf = ByteBuffer.wrap(b, off, len); 080 } else { 081 if (outBuf.capacity() < needed) { 082 needed = CompressionUtil.roundInt2(needed); 083 outBuf = ByteBuffer.allocate(needed); 084 } else { 085 outBuf.clear(); 086 } 087 writeBuf = outBuf; 088 } 089 final int oldPos = writeBuf.position(); 090 Encoders.compress(inBuf, writeBuf, params); 091 final int written = writeBuf.position() - oldPos; 092 bytesWritten += written; 093 inBuf.clear(); 094 finished = true; 095 if (!direct) { 096 outBuf.flip(); 097 int n = Math.min(written, len); 098 outBuf.get(b, off, n); 099 return n; 100 } else { 101 return written; 102 } 103 } else { 104 finished = true; 105 } 106 } 107 return 0; 108 } 109 110 @Override 111 public void end() { 112 } 113 114 @Override 115 public void finish() { 116 finish = true; 117 } 118 119 @Override 120 public boolean finished() { 121 return finished && !outBuf.hasRemaining(); 122 } 123 124 @Override 125 public long getBytesRead() { 126 return bytesRead; 127 } 128 129 @Override 130 public long getBytesWritten() { 131 return bytesWritten; 132 } 133 134 @Override 135 public boolean needsInput() { 136 return !finished(); 137 } 138 139 @Override 140 public void reinit(Configuration conf) { 141 if (conf != null) { 142 // Quality or window settings might have changed 143 params.setQuality(BrotliCodec.getLevel(conf)); 144 params.setWindow(BrotliCodec.getWindow(conf)); 145 // Buffer size might have changed 146 int newBufferSize = BrotliCodec.getBufferSize(conf); 147 if (bufferSize != newBufferSize) { 148 bufferSize = newBufferSize; 149 this.inBuf = ByteBuffer.allocate(bufferSize); 150 this.outBuf = ByteBuffer.allocate(bufferSize); 151 } 152 } 153 reset(); 154 } 155 156 @Override 157 public void reset() { 158 inBuf.clear(); 159 outBuf.clear(); 160 outBuf.position(outBuf.capacity()); 161 bytesRead = 0; 162 bytesWritten = 0; 163 finish = false; 164 finished = false; 165 } 166 167 @Override 168 public void setDictionary(byte[] b, int off, int len) { 169 throw new UnsupportedOperationException("setDictionary is not supported"); 170 } 171 172 @Override 173 public void setInput(byte[] b, int off, int len) { 174 if (inBuf.remaining() < len) { 175 // Get a new buffer that can accomodate the accumulated input plus the additional 176 // input that would cause a buffer overflow without reallocation. 177 // This condition should be fortunately rare, because it is expensive. 178 int needed = CompressionUtil.roundInt2(inBuf.capacity() + len); 179 ByteBuffer newBuf = ByteBuffer.allocate(needed); 180 inBuf.flip(); 181 newBuf.put(inBuf); 182 inBuf = newBuf; 183 } 184 inBuf.put(b, off, len); 185 bytesRead += len; 186 finished = false; 187 } 188 189 // Package private 190 191 int maxCompressedLength(int len) { 192 return len + CompressionUtil.compressionOverhead(len); 193 } 194 195}