001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.zstd;
019
020import com.github.luben.zstd.Zstd;
021import com.github.luben.zstd.ZstdDictCompress;
022import java.io.IOException;
023import java.nio.ByteBuffer;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.io.compress.CanReinit;
026import org.apache.hadoop.hbase.io.compress.CompressionUtil;
027import org.apache.hadoop.io.compress.Compressor;
028import org.apache.yetus.audience.InterfaceAudience;
029
030/**
031 * Hadoop compressor glue for zstd-jni.
032 */
033@InterfaceAudience.Private
034public class ZstdCompressor implements CanReinit, Compressor {
035
036  protected int level, bufferSize;
037  protected ByteBuffer inBuf, outBuf;
038  protected boolean finish, finished;
039  protected long bytesRead, bytesWritten;
040  protected int dictId;
041  protected ZstdDictCompress dict;
042
043  ZstdCompressor(final int level, final int bufferSize, final byte[] dictionary) {
044    this.level = level;
045    this.bufferSize = bufferSize;
046    this.inBuf = ByteBuffer.allocateDirect(bufferSize);
047    this.outBuf = ByteBuffer.allocateDirect(bufferSize);
048    this.outBuf.position(bufferSize);
049    if (dictionary != null) {
050      this.dictId = ZstdCodec.getDictionaryId(dictionary);
051      this.dict = new ZstdDictCompress(dictionary, level);
052    }
053  }
054
055  ZstdCompressor(final int level, final int bufferSize) {
056    this(level, bufferSize, null);
057  }
058
059  @Override
060  public int compress(final byte[] b, final int off, final int len) throws IOException {
061    // If we have previously compressed our input and still have some buffered bytes
062    // remaining, provide them to the caller.
063    if (outBuf.hasRemaining()) {
064      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
065      outBuf.get(b, off, n);
066      return n;
067    }
068    // We don't actually begin compression until our caller calls finish().
069    if (finish) {
070      if (inBuf.position() > 0) {
071        inBuf.flip();
072        int uncompressed = inBuf.remaining();
073        // If we don't have enough capacity in our currently allocated output buffer,
074        // allocate a new one which does.
075        int needed = maxCompressedLength(uncompressed);
076        if (outBuf.capacity() < needed) {
077          needed = CompressionUtil.roundInt2(needed);
078          outBuf = ByteBuffer.allocateDirect(needed);
079        } else {
080          outBuf.clear();
081        }
082        int written;
083        if (dict != null) {
084          written = Zstd.compress(outBuf, inBuf, dict);
085        } else {
086          written = Zstd.compress(outBuf, inBuf, level);
087        }
088        bytesWritten += written;
089        inBuf.clear();
090        finished = true;
091        outBuf.flip();
092        int n = Math.min(written, len);
093        outBuf.get(b, off, n);
094        return n;
095      } else {
096        finished = true;
097      }
098    }
099    return 0;
100  }
101
102  @Override
103  public void end() {
104  }
105
106  @Override
107  public void finish() {
108    finish = true;
109  }
110
111  @Override
112  public boolean finished() {
113    return finished && !outBuf.hasRemaining();
114  }
115
116  @Override
117  public long getBytesRead() {
118    return bytesRead;
119  }
120
121  @Override
122  public long getBytesWritten() {
123    return bytesWritten;
124  }
125
126  @Override
127  public boolean needsInput() {
128    return !finished();
129  }
130
131  @Override
132  public void reinit(final Configuration conf) {
133    if (conf != null) {
134      // Level might have changed
135      boolean levelChanged = false;
136      int newLevel = ZstdCodec.getLevel(conf);
137      if (level != newLevel) {
138        level = newLevel;
139        levelChanged = true;
140      }
141      // Dictionary may have changed
142      byte[] b = ZstdCodec.getDictionary(conf);
143      if (b != null) {
144        // Don't casually create dictionary objects; they consume native memory
145        int thisDictId = ZstdCodec.getDictionaryId(b);
146        if (dict == null || dictId != thisDictId || levelChanged) {
147          dictId = thisDictId;
148          dict = new ZstdDictCompress(b, level);
149        }
150      } else {
151        dict = null;
152      }
153      // Buffer size might have changed
154      int newBufferSize = ZstdCodec.getBufferSize(conf);
155      if (bufferSize != newBufferSize) {
156        bufferSize = newBufferSize;
157        this.inBuf = ByteBuffer.allocateDirect(bufferSize);
158        this.outBuf = ByteBuffer.allocateDirect(bufferSize);
159      }
160    }
161    reset();
162  }
163
164  @Override
165  public void reset() {
166    inBuf.clear();
167    outBuf.clear();
168    outBuf.position(outBuf.capacity());
169    bytesRead = 0;
170    bytesWritten = 0;
171    finish = false;
172    finished = false;
173  }
174
175  @Override
176  public void setDictionary(final byte[] b, final int off, final int len) {
177    throw new UnsupportedOperationException("setDictionary is not supported");
178  }
179
180  @Override
181  public void setInput(final byte[] b, final int off, final int len) {
182    if (inBuf.remaining() < len) {
183      // Get a new buffer that can accomodate the accumulated input plus the additional
184      // input that would cause a buffer overflow without reallocation.
185      // This condition should be fortunately rare, because it is expensive.
186      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
187      ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
188      inBuf.flip();
189      newBuf.put(inBuf);
190      inBuf = newBuf;
191    }
192    inBuf.put(b, off, len);
193    bytesRead += len;
194    finished = false;
195  }
196
197  // Package private
198
199  static int maxCompressedLength(final int len) {
200    return (int) Zstd.compressBound(len);
201  }
202
203}