001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.xz; 019 020import java.io.IOException; 021import java.nio.BufferOverflowException; 022import java.nio.ByteBuffer; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 025import org.apache.hadoop.hbase.io.compress.CompressionUtil; 026import org.apache.hadoop.io.compress.Compressor; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.tukaani.xz.ArrayCache; 029import org.tukaani.xz.BasicArrayCache; 030import org.tukaani.xz.LZMA2Options; 031import org.tukaani.xz.LZMAOutputStream; 032import org.tukaani.xz.UnsupportedOptionsException; 033 034/** 035 * Hadoop compressor glue for XZ for Java. 036 */ 037@InterfaceAudience.Private 038public class LzmaCompressor implements Compressor { 039 040 protected static final ArrayCache ARRAY_CACHE = new BasicArrayCache(); 041 protected ByteBuffer inBuf; 042 protected ByteBuffer outBuf; 043 protected int bufferSize; 044 protected boolean finish, finished; 045 protected long bytesRead, bytesWritten; 046 protected LZMA2Options lzOptions; 047 048 LzmaCompressor(int level, int bufferSize) { 049 this.bufferSize = bufferSize; 050 this.inBuf = ByteBuffer.allocate(bufferSize); 051 this.outBuf = ByteBuffer.allocate(bufferSize); 052 this.outBuf.position(bufferSize); 053 this.lzOptions = new LZMA2Options(); 054 try { 055 this.lzOptions.setPreset(level); 056 } catch (UnsupportedOptionsException e) { 057 throw new RuntimeException(e); 058 } 059 } 060 061 @Override 062 public int compress(byte[] b, int off, int len) throws IOException { 063 // If we have previously compressed our input and still have some buffered bytes 064 // remaining, provide them to the caller. 065 if (outBuf.hasRemaining()) { 066 int remaining = outBuf.remaining(), n = Math.min(remaining, len); 067 outBuf.get(b, off, n); 068 return n; 069 } 070 // We don't actually begin compression until our caller calls finish(). 071 if (finish) { 072 if (inBuf.position() > 0) { 073 inBuf.flip(); 074 int uncompressed = inBuf.remaining(); 075 // If we don't have enough capacity in our currently allocated output buffer, 076 // allocate a new one which does. 077 int needed = maxCompressedLength(uncompressed); 078 // Can we decompress directly into the provided array? 079 ByteBuffer writeBuffer; 080 boolean direct = false; 081 if (len <= needed) { 082 writeBuffer = ByteBuffer.wrap(b, off, len); 083 direct = true; 084 } else { 085 if (outBuf.capacity() < needed) { 086 needed = CompressionUtil.roundInt2(needed); 087 outBuf = ByteBuffer.allocate(needed); 088 } else { 089 outBuf.clear(); 090 } 091 writeBuffer = outBuf; 092 } 093 int oldPos = writeBuffer.position(); 094 // This is pretty ugly. I don't see how to do it better. Stream to byte buffers back to 095 // stream back to byte buffers... if only XZ for Java had a public block compression 096 // API. It does not. Fortunately the algorithm is so slow, especially at higher levels, 097 // that inefficiencies here may not matter. 098 try (ByteBufferOutputStream lowerOut = new ByteBufferOutputStream(writeBuffer) { 099 @Override 100 // ByteBufferOutputStream will reallocate the output buffer if it is too small. We 101 // do not want that behavior here. 102 protected void checkSizeAndGrow(int extra) { 103 long capacityNeeded = curBuf.position() + (long) extra; 104 if (capacityNeeded > curBuf.limit()) { 105 throw new BufferOverflowException(); 106 } 107 } 108 }) { 109 try (LZMAOutputStream out = 110 new LZMAOutputStream(lowerOut, lzOptions, uncompressed, ARRAY_CACHE)) { 111 out.write(inBuf.array(), inBuf.arrayOffset(), uncompressed); 112 } 113 } 114 int written = writeBuffer.position() - oldPos; 115 bytesWritten += written; 116 inBuf.clear(); 117 finished = true; 118 outBuf.flip(); 119 if (!direct) { 120 int n = Math.min(written, len); 121 outBuf.get(b, off, n); 122 return n; 123 } else { 124 return written; 125 } 126 } else { 127 finished = true; 128 } 129 } 130 return 0; 131 } 132 133 @Override 134 public void end() { 135 } 136 137 @Override 138 public void finish() { 139 finish = true; 140 } 141 142 @Override 143 public boolean finished() { 144 return finished && !outBuf.hasRemaining(); 145 } 146 147 @Override 148 public long getBytesRead() { 149 return bytesRead; 150 } 151 152 @Override 153 public long getBytesWritten() { 154 return bytesWritten; 155 } 156 157 @Override 158 public boolean needsInput() { 159 return !finished(); 160 } 161 162 @Override 163 public void reinit(Configuration conf) { 164 if (conf != null) { 165 // Level might have changed 166 try { 167 int level = LzmaCodec.getLevel(conf); 168 this.lzOptions = new LZMA2Options(); 169 this.lzOptions.setPreset(level); 170 } catch (UnsupportedOptionsException e) { 171 throw new RuntimeException(e); 172 } 173 // Buffer size might have changed 174 int newBufferSize = LzmaCodec.getBufferSize(conf); 175 if (bufferSize != newBufferSize) { 176 bufferSize = newBufferSize; 177 this.inBuf = ByteBuffer.allocate(bufferSize); 178 this.outBuf = ByteBuffer.allocate(bufferSize); 179 } 180 } 181 reset(); 182 } 183 184 @Override 185 public void reset() { 186 inBuf.clear(); 187 outBuf.clear(); 188 outBuf.position(outBuf.capacity()); 189 bytesRead = 0; 190 bytesWritten = 0; 191 finish = false; 192 finished = false; 193 } 194 195 @Override 196 public void setDictionary(byte[] b, int off, int len) { 197 throw new UnsupportedOperationException("setDictionary is not supported"); 198 } 199 200 @Override 201 public void setInput(byte[] b, int off, int len) { 202 if (inBuf.remaining() < len) { 203 // Get a new buffer that can accomodate the accumulated input plus the additional 204 // input that would cause a buffer overflow without reallocation. 205 // This condition should be fortunately rare, because it is expensive. 206 int needed = CompressionUtil.roundInt2(inBuf.capacity() + len); 207 ByteBuffer newBuf = ByteBuffer.allocate(needed); 208 inBuf.flip(); 209 newBuf.put(inBuf); 210 inBuf = newBuf; 211 } 212 inBuf.put(b, off, len); 213 bytesRead += len; 214 finished = false; 215 } 216 217 // Package private 218 219 int maxCompressedLength(int len) { 220 return len + CompressionUtil.compressionOverhead(len); 221 } 222 223}