001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.xz;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import org.apache.hadoop.hbase.io.ByteBufferInputStream;
023import org.apache.hadoop.hbase.io.compress.CompressionUtil;
024import org.apache.hadoop.io.compress.Decompressor;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.tukaani.xz.ArrayCache;
027import org.tukaani.xz.BasicArrayCache;
028import org.tukaani.xz.LZMAInputStream;
029
030/**
031 * Hadoop decompressor glue for XZ for Java.
032 */
033@InterfaceAudience.Private
034public class LzmaDecompressor implements Decompressor {
035
036  protected static final ArrayCache ARRAY_CACHE = new BasicArrayCache() {
037    @Override
038    public byte[] getByteArray(int size, boolean fillWithZeros) {
039      // Work around a bug in XZ decompression if cached byte arrays are not cleared by
040      // always clearing them.
041      return super.getByteArray(size, true);
042    }
043  };
044  protected ByteBuffer inBuf, outBuf;
045  protected int inLen;
046  protected boolean finished;
047
048  LzmaDecompressor(int bufferSize) {
049    this.inBuf = ByteBuffer.allocate(bufferSize);
050    this.outBuf = ByteBuffer.allocate(bufferSize);
051    this.outBuf.position(bufferSize);
052  }
053
054  @Override
055  public int decompress(byte[] b, int off, int len) throws IOException {
056    if (outBuf.hasRemaining()) {
057      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
058      outBuf.get(b, off, n);
059      return n;
060    }
061    if (inBuf.position() > 0) {
062      inBuf.flip();
063      int remaining = inBuf.remaining();
064      inLen -= remaining;
065      // This is pretty ugly. I don't see how to do it better. Stream to byte buffers back to
066      // stream back to byte buffers... if only XZ for Java had a public block compression API.
067      // It does not. LZMA decompression speed is reasonably good, so inefficiency here is a
068      // shame.
069      // Perhaps we could look at using reflection to make package protected classes for block
070      // compression in XZ for Java accessible here, that library can be expected to rarely
071      // change, if at all.
072      outBuf.clear();
073      try (ByteBufferInputStream lowerIn = new ByteBufferInputStream(inBuf)) {
074        final byte[] buf = new byte[8192];
075        try (LZMAInputStream in = new LZMAInputStream(lowerIn, ARRAY_CACHE)) {
076          int read;
077          do {
078            read = in.read(buf);
079            if (read > 0) {
080              outBuf.put(buf, 0, read);
081            }
082          } while (read > 0);
083        }
084      }
085      int written = outBuf.position();
086      outBuf.flip();
087      inBuf.clear();
088      int n = Math.min(written, len);
089      outBuf.get(b, off, n);
090      return n;
091    }
092    finished = true;
093    return 0;
094  }
095
096  @Override
097  public void end() {
098  }
099
100  @Override
101  public boolean finished() {
102    return finished;
103  }
104
105  @Override
106  public int getRemaining() {
107    return inLen;
108  }
109
110  @Override
111  public boolean needsDictionary() {
112    return false;
113  }
114
115  @Override
116  public void reset() {
117    inBuf.clear();
118    inLen = 0;
119    outBuf.clear();
120    outBuf.position(outBuf.capacity());
121    finished = false;
122  }
123
124  @Override
125  public boolean needsInput() {
126    return inBuf.position() == 0;
127  }
128
129  @Override
130  public void setDictionary(byte[] b, int off, int len) {
131    throw new UnsupportedOperationException("setDictionary is not supported");
132  }
133
134  @Override
135  public void setInput(byte[] b, int off, int len) {
136    if (inBuf.remaining() < len) {
137      // Get a new buffer that can accomodate the accumulated input plus the additional
138      // input that would cause a buffer overflow without reallocation.
139      // This condition should be fortunately rare, because it is expensive.
140      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
141      ByteBuffer newBuf = ByteBuffer.allocate(needed);
142      inBuf.flip();
143      newBuf.put(inBuf);
144      inBuf = newBuf;
145    }
146    inBuf.put(b, off, len);
147    inLen += len;
148    finished = false;
149  }
150
151}