001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.zstd;
019
020import com.github.luben.zstd.ZstdDecompressCtx;
021import com.github.luben.zstd.ZstdDictDecompress;
022import java.io.IOException;
023import java.nio.ByteBuffer;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.io.compress.CanReinit;
026import org.apache.hadoop.hbase.io.compress.CompressionUtil;
027import org.apache.hadoop.io.compress.Decompressor;
028import org.apache.yetus.audience.InterfaceAudience;
029
030/**
031 * Hadoop decompressor glue for zstd-java.
032 */
033@InterfaceAudience.Private
034public class ZstdDecompressor implements CanReinit, Decompressor {
035
036  protected ByteBuffer inBuf, outBuf;
037  protected int bufferSize;
038  protected int inLen;
039  protected boolean finished;
040  protected int dictId;
041  protected ZstdDictDecompress dict;
042  protected ZstdDecompressCtx ctx;
043
044  ZstdDecompressor(final int bufferSize, final byte[] dictionary) {
045    this.bufferSize = bufferSize;
046    this.inBuf = ByteBuffer.allocateDirect(bufferSize);
047    this.outBuf = ByteBuffer.allocateDirect(bufferSize);
048    this.outBuf.position(bufferSize);
049    this.ctx = new ZstdDecompressCtx();
050    if (dictionary != null) {
051      this.dictId = ZstdCodec.getDictionaryId(dictionary);
052      this.dict = new ZstdDictDecompress(dictionary);
053      this.ctx.loadDict(this.dict);
054    }
055  }
056
057  ZstdDecompressor(final int bufferSize) {
058    this(bufferSize, null);
059  }
060
061  @Override
062  public int decompress(final byte[] b, final int off, final int len) throws IOException {
063    if (outBuf.hasRemaining()) {
064      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
065      outBuf.get(b, off, n);
066      return n;
067    }
068    if (inBuf.position() > 0) {
069      inBuf.flip();
070      int remaining = inBuf.remaining();
071      inLen -= remaining;
072      outBuf.clear();
073      int written = ctx.decompress(outBuf, inBuf);
074      inBuf.clear();
075      outBuf.flip();
076      int n = Math.min(written, len);
077      outBuf.get(b, off, n);
078      return n;
079    }
080    finished = true;
081    return 0;
082  }
083
084  @Override
085  public void end() {
086  }
087
088  @Override
089  public boolean finished() {
090    return finished;
091  }
092
093  @Override
094  public int getRemaining() {
095    return inLen;
096  }
097
098  @Override
099  public boolean needsDictionary() {
100    return false;
101  }
102
103  @Override
104  public void reset() {
105    inBuf.clear();
106    inLen = 0;
107    outBuf.clear();
108    outBuf.position(outBuf.capacity());
109    finished = false;
110    ctx.reset();
111    if (dict != null) {
112      ctx.loadDict(dict);
113    } else {
114      // loadDict((byte[]) accepts null to clear the dictionary
115      ctx.loadDict((byte[]) null);
116    }
117  }
118
119  @Override
120  public boolean needsInput() {
121    return (inBuf.position() == 0);
122  }
123
124  @Override
125  public void setDictionary(final byte[] b, final int off, final int len) {
126    throw new UnsupportedOperationException("setDictionary is not supported");
127  }
128
129  @Override
130  public void setInput(final byte[] b, final int off, final int len) {
131    if (inBuf.remaining() < len) {
132      // Get a new buffer that can accomodate the accumulated input plus the additional
133      // input that would cause a buffer overflow without reallocation.
134      // This condition should be fortunately rare, because it is expensive.
135      final int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
136      ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
137      inBuf.flip();
138      newBuf.put(inBuf);
139      inBuf = newBuf;
140    }
141    inBuf.put(b, off, len);
142    inLen += len;
143    finished = false;
144  }
145
146  @Override
147  public void reinit(final Configuration conf) {
148    if (conf != null) {
149      // Dictionary may have changed
150      byte[] b = ZstdCodec.getDictionary(conf);
151      if (b != null) {
152        // Don't casually create dictionary objects; they consume native memory
153        int thisDictId = ZstdCodec.getDictionaryId(b);
154        if (dict == null || dictId != thisDictId) {
155          dictId = thisDictId;
156          dict = new ZstdDictDecompress(b);
157        }
158      } else {
159        dict = null;
160      }
161      // Buffer size might have changed
162      int newBufferSize = ZstdCodec.getBufferSize(conf);
163      if (bufferSize != newBufferSize) {
164        bufferSize = newBufferSize;
165        this.inBuf = ByteBuffer.allocateDirect(bufferSize);
166        this.outBuf = ByteBuffer.allocateDirect(bufferSize);
167      }
168    }
169    reset();
170  }
171
172}