001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.zstd;
019
020import com.github.luben.zstd.Zstd;
021import com.github.luben.zstd.ZstdCompressCtx;
022import com.github.luben.zstd.ZstdDictCompress;
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.io.compress.CanReinit;
027import org.apache.hadoop.hbase.io.compress.CompressionUtil;
028import org.apache.hadoop.io.compress.Compressor;
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * Hadoop compressor glue for zstd-jni.
033 */
034@InterfaceAudience.Private
035public class ZstdCompressor implements CanReinit, Compressor {
036
037  protected int level, bufferSize;
038  protected ByteBuffer inBuf, outBuf;
039  protected boolean finish, finished;
040  protected long bytesRead, bytesWritten;
041  protected int dictId;
042  protected ZstdDictCompress dict;
043  protected ZstdCompressCtx ctx;
044
045  ZstdCompressor(final int level, final int bufferSize, final byte[] dictionary) {
046    this.level = level;
047    this.bufferSize = bufferSize;
048    this.inBuf = ByteBuffer.allocateDirect(bufferSize);
049    this.outBuf = ByteBuffer.allocateDirect(bufferSize);
050    this.outBuf.position(bufferSize);
051    this.ctx = new ZstdCompressCtx();
052    this.ctx.setLevel(level);
053    if (dictionary != null) {
054      this.dictId = ZstdCodec.getDictionaryId(dictionary);
055      this.dict = new ZstdDictCompress(dictionary, level);
056      this.ctx.loadDict(this.dict);
057    }
058  }
059
060  ZstdCompressor(final int level, final int bufferSize) {
061    this(level, bufferSize, null);
062  }
063
064  @Override
065  public int compress(final byte[] b, final int off, final int len) throws IOException {
066    // If we have previously compressed our input and still have some buffered bytes
067    // remaining, provide them to the caller.
068    if (outBuf.hasRemaining()) {
069      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
070      outBuf.get(b, off, n);
071      return n;
072    }
073    // We don't actually begin compression until our caller calls finish().
074    if (finish) {
075      if (inBuf.position() > 0) {
076        inBuf.flip();
077        int uncompressed = inBuf.remaining();
078        // If we don't have enough capacity in our currently allocated output buffer,
079        // allocate a new one which does.
080        int needed = maxCompressedLength(uncompressed);
081        if (outBuf.capacity() < needed) {
082          needed = CompressionUtil.roundInt2(needed);
083          outBuf = ByteBuffer.allocateDirect(needed);
084        } else {
085          outBuf.clear();
086        }
087        int written = ctx.compress(outBuf, inBuf);
088        bytesWritten += written;
089        inBuf.clear();
090        finished = true;
091        outBuf.flip();
092        int n = Math.min(written, len);
093        outBuf.get(b, off, n);
094        return n;
095      } else {
096        finished = true;
097      }
098    }
099    return 0;
100  }
101
102  @Override
103  public void end() {
104  }
105
106  @Override
107  public void finish() {
108    finish = true;
109  }
110
111  @Override
112  public boolean finished() {
113    return finished && !outBuf.hasRemaining();
114  }
115
116  @Override
117  public long getBytesRead() {
118    return bytesRead;
119  }
120
121  @Override
122  public long getBytesWritten() {
123    return bytesWritten;
124  }
125
126  @Override
127  public boolean needsInput() {
128    return !finished();
129  }
130
131  @Override
132  public void reinit(final Configuration conf) {
133    if (conf != null) {
134      // Level might have changed
135      boolean levelChanged = false;
136      int newLevel = ZstdCodec.getLevel(conf);
137      if (level != newLevel) {
138        level = newLevel;
139        levelChanged = true;
140      }
141      // Dictionary may have changed
142      byte[] b = ZstdCodec.getDictionary(conf);
143      if (b != null) {
144        // Don't casually create dictionary objects; they consume native memory
145        int thisDictId = ZstdCodec.getDictionaryId(b);
146        if (dict == null || dictId != thisDictId || levelChanged) {
147          dictId = thisDictId;
148          dict = new ZstdDictCompress(b, level);
149        }
150      } else {
151        dict = null;
152      }
153      // Buffer size might have changed
154      int newBufferSize = ZstdCodec.getBufferSize(conf);
155      if (bufferSize != newBufferSize) {
156        bufferSize = newBufferSize;
157        this.inBuf = ByteBuffer.allocateDirect(bufferSize);
158        this.outBuf = ByteBuffer.allocateDirect(bufferSize);
159      }
160    }
161    reset();
162  }
163
164  @Override
165  public void reset() {
166    inBuf.clear();
167    outBuf.clear();
168    outBuf.position(outBuf.capacity());
169    bytesRead = 0;
170    bytesWritten = 0;
171    finish = false;
172    finished = false;
173    ctx.reset();
174    ctx.setLevel(level);
175    if (dict != null) {
176      ctx.loadDict(dict);
177    } else {
178      // loadDict((byte[]) accepts null to clear the dictionary
179      ctx.loadDict((byte[]) null);
180    }
181  }
182
183  @Override
184  public void setDictionary(final byte[] b, final int off, final int len) {
185    throw new UnsupportedOperationException("setDictionary is not supported");
186  }
187
188  @Override
189  public void setInput(final byte[] b, final int off, final int len) {
190    if (inBuf.remaining() < len) {
191      // Get a new buffer that can accomodate the accumulated input plus the additional
192      // input that would cause a buffer overflow without reallocation.
193      // This condition should be fortunately rare, because it is expensive.
194      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
195      ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
196      inBuf.flip();
197      newBuf.put(inBuf);
198      inBuf = newBuf;
199    }
200    inBuf.put(b, off, len);
201    bytesRead += len;
202    finished = false;
203  }
204
205  // Package private
206
207  static int maxCompressedLength(final int len) {
208    return (int) Zstd.compressBound(len);
209  }
210
211}