001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.xz;
019
020import java.io.IOException;
021import java.nio.BufferOverflowException;
022import java.nio.ByteBuffer;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
025import org.apache.hadoop.hbase.io.compress.CompressionUtil;
026import org.apache.hadoop.io.compress.Compressor;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.tukaani.xz.ArrayCache;
029import org.tukaani.xz.BasicArrayCache;
030import org.tukaani.xz.LZMA2Options;
031import org.tukaani.xz.LZMAOutputStream;
032import org.tukaani.xz.UnsupportedOptionsException;
033
034/**
035 * Hadoop compressor glue for XZ for Java.
036 */
037@InterfaceAudience.Private
038public class LzmaCompressor implements Compressor {
039
040  protected static final ArrayCache ARRAY_CACHE = new BasicArrayCache();
041  protected ByteBuffer inBuf;
042  protected ByteBuffer outBuf;
043  protected int bufferSize;
044  protected boolean finish, finished;
045  protected long bytesRead, bytesWritten;
046  protected LZMA2Options lzOptions;
047
048  LzmaCompressor(int level, int bufferSize) {
049    this.bufferSize = bufferSize;
050    this.inBuf = ByteBuffer.allocate(bufferSize);
051    this.outBuf = ByteBuffer.allocate(bufferSize);
052    this.outBuf.position(bufferSize);
053    this.lzOptions = new LZMA2Options();
054    try {
055      this.lzOptions.setPreset(level);
056    } catch (UnsupportedOptionsException e) {
057      throw new RuntimeException(e);
058    }
059  }
060
061  @Override
062  public int compress(byte[] b, int off, int len) throws IOException {
063    // If we have previously compressed our input and still have some buffered bytes
064    // remaining, provide them to the caller.
065    if (outBuf.hasRemaining()) {
066      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
067      outBuf.get(b, off, n);
068      return n;
069    }
070    // We don't actually begin compression until our caller calls finish().
071    if (finish) {
072      if (inBuf.position() > 0) {
073        inBuf.flip();
074        int uncompressed = inBuf.remaining();
075        // If we don't have enough capacity in our currently allocated output buffer,
076        // allocate a new one which does.
077        int needed = maxCompressedLength(uncompressed);
078        // Can we decompress directly into the provided array?
079        ByteBuffer writeBuffer;
080        boolean direct = false;
081        if (len <= needed) {
082          writeBuffer = ByteBuffer.wrap(b, off, len);
083          direct = true;
084        } else {
085          if (outBuf.capacity() < needed) {
086            needed = CompressionUtil.roundInt2(needed);
087            outBuf = ByteBuffer.allocate(needed);
088          } else {
089            outBuf.clear();
090          }
091          writeBuffer = outBuf;
092        }
093        int oldPos = writeBuffer.position();
094        // This is pretty ugly. I don't see how to do it better. Stream to byte buffers back to
095        // stream back to byte buffers... if only XZ for Java had a public block compression
096        // API. It does not. Fortunately the algorithm is so slow, especially at higher levels,
097        // that inefficiencies here may not matter.
098        try (ByteBufferOutputStream lowerOut = new ByteBufferOutputStream(writeBuffer) {
099          @Override
100          // ByteBufferOutputStream will reallocate the output buffer if it is too small. We
101          // do not want that behavior here.
102          protected void checkSizeAndGrow(int extra) {
103            long capacityNeeded = curBuf.position() + (long) extra;
104            if (capacityNeeded > curBuf.limit()) {
105              throw new BufferOverflowException();
106            }
107          }
108        }) {
109          try (LZMAOutputStream out =
110            new LZMAOutputStream(lowerOut, lzOptions, uncompressed, ARRAY_CACHE)) {
111            out.write(inBuf.array(), inBuf.arrayOffset(), uncompressed);
112          }
113        }
114        int written = writeBuffer.position() - oldPos;
115        bytesWritten += written;
116        inBuf.clear();
117        finished = true;
118        outBuf.flip();
119        if (!direct) {
120          int n = Math.min(written, len);
121          outBuf.get(b, off, n);
122          return n;
123        } else {
124          return written;
125        }
126      } else {
127        finished = true;
128      }
129    }
130    return 0;
131  }
132
133  @Override
134  public void end() {
135  }
136
137  @Override
138  public void finish() {
139    finish = true;
140  }
141
142  @Override
143  public boolean finished() {
144    return finished && !outBuf.hasRemaining();
145  }
146
147  @Override
148  public long getBytesRead() {
149    return bytesRead;
150  }
151
152  @Override
153  public long getBytesWritten() {
154    return bytesWritten;
155  }
156
157  @Override
158  public boolean needsInput() {
159    return !finished();
160  }
161
162  @Override
163  public void reinit(Configuration conf) {
164    if (conf != null) {
165      // Level might have changed
166      try {
167        int level = LzmaCodec.getLevel(conf);
168        this.lzOptions = new LZMA2Options();
169        this.lzOptions.setPreset(level);
170      } catch (UnsupportedOptionsException e) {
171        throw new RuntimeException(e);
172      }
173      // Buffer size might have changed
174      int newBufferSize = LzmaCodec.getBufferSize(conf);
175      if (bufferSize != newBufferSize) {
176        bufferSize = newBufferSize;
177        this.inBuf = ByteBuffer.allocate(bufferSize);
178        this.outBuf = ByteBuffer.allocate(bufferSize);
179      }
180    }
181    reset();
182  }
183
184  @Override
185  public void reset() {
186    inBuf.clear();
187    outBuf.clear();
188    outBuf.position(outBuf.capacity());
189    bytesRead = 0;
190    bytesWritten = 0;
191    finish = false;
192    finished = false;
193  }
194
195  @Override
196  public void setDictionary(byte[] b, int off, int len) {
197    throw new UnsupportedOperationException("setDictionary is not supported");
198  }
199
200  @Override
201  public void setInput(byte[] b, int off, int len) {
202    if (inBuf.remaining() < len) {
203      // Get a new buffer that can accomodate the accumulated input plus the additional
204      // input that would cause a buffer overflow without reallocation.
205      // This condition should be fortunately rare, because it is expensive.
206      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
207      ByteBuffer newBuf = ByteBuffer.allocate(needed);
208      inBuf.flip();
209      newBuf.put(inBuf);
210      inBuf = newBuf;
211    }
212    inBuf.put(b, off, len);
213    bytesRead += len;
214    finished = false;
215  }
216
217  // Package private
218
219  int maxCompressedLength(int len) {
220    return len + CompressionUtil.compressionOverhead(len);
221  }
222
223}