001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
021import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
022import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
023
024import java.io.ByteArrayInputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.nio.BufferOverflowException;
029import java.nio.ByteBuffer;
030
031import org.apache.commons.io.IOUtils;
032import org.apache.hadoop.conf.Configurable;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.CellScanner;
035import org.apache.hadoop.hbase.DoNotRetryIOException;
036import org.apache.hadoop.hbase.io.ByteBuffAllocator;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.apache.hadoop.hbase.codec.Codec;
041import org.apache.hadoop.hbase.io.ByteBuffInputStream;
042import org.apache.hadoop.hbase.io.ByteBufferInputStream;
043import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
044import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
045import org.apache.hadoop.hbase.nio.ByteBuff;
046import org.apache.hadoop.hbase.nio.SingleByteBuff;
047import org.apache.hadoop.hbase.util.ClassSize;
048import org.apache.hadoop.io.compress.CodecPool;
049import org.apache.hadoop.io.compress.CompressionCodec;
050import org.apache.hadoop.io.compress.CompressionInputStream;
051import org.apache.hadoop.io.compress.Compressor;
052import org.apache.hadoop.io.compress.Decompressor;
053
054/**
055 * Helper class for building cell block.
056 */
057@InterfaceAudience.Private
058class CellBlockBuilder {
059
060  // LOG is being used in TestCellBlockBuilder
061  static final Logger LOG = LoggerFactory.getLogger(CellBlockBuilder.class);
062
063  private final Configuration conf;
064
065  /**
066   * How much we think the decompressor will expand the original compressed content.
067   */
068  private final int cellBlockDecompressionMultiplier;
069
070  private final int cellBlockBuildingInitialBufferSize;
071
072  public CellBlockBuilder(Configuration conf) {
073    this.conf = conf;
074    this.cellBlockDecompressionMultiplier = conf
075        .getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
076
077    // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in
078    // #buildCellBlock.
079    this.cellBlockBuildingInitialBufferSize = ClassSize
080        .align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
081  }
082
083  private interface OutputStreamSupplier {
084
085    OutputStream get(int expectedSize);
086
087    int size();
088  }
089
090  private static final class ByteBufferOutputStreamSupplier implements OutputStreamSupplier {
091
092    private ByteBufferOutputStream baos;
093
094    @Override
095    public OutputStream get(int expectedSize) {
096      baos = new ByteBufferOutputStream(expectedSize);
097      return baos;
098    }
099
100    @Override
101    public int size() {
102      return baos.size();
103    }
104  }
105
106  /**
107   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
108   * <code>compressor</code>.
109   * @param codec
110   * @param compressor
111   * @param cellScanner
112   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
113   *         passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
114   *         been flipped and is ready for reading. Use limit to find total size.
115   * @throws IOException
116   */
117  public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
118      final CellScanner cellScanner) throws IOException {
119    ByteBufferOutputStreamSupplier supplier = new ByteBufferOutputStreamSupplier();
120    if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
121      ByteBuffer bb = supplier.baos.getByteBuffer();
122      // If no cells, don't mess around. Just return null (could be a bunch of existence checking
123      // gets or something -- stuff that does not return a cell).
124      return bb.hasRemaining() ? bb : null;
125    } else {
126      return null;
127    }
128  }
129
130  private static final class ByteBufOutputStreamSupplier implements OutputStreamSupplier {
131
132    private final ByteBufAllocator alloc;
133
134    private ByteBuf buf;
135
136    public ByteBufOutputStreamSupplier(ByteBufAllocator alloc) {
137      this.alloc = alloc;
138    }
139
140    @Override
141    public OutputStream get(int expectedSize) {
142      buf = alloc.buffer(expectedSize);
143      return new ByteBufOutputStream(buf);
144    }
145
146    @Override
147    public int size() {
148      return buf.writerIndex();
149    }
150  }
151
152  public ByteBuf buildCellBlock(Codec codec, CompressionCodec compressor, CellScanner cellScanner,
153      ByteBufAllocator alloc) throws IOException {
154    ByteBufOutputStreamSupplier supplier = new ByteBufOutputStreamSupplier(alloc);
155    if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
156      return supplier.buf;
157    } else {
158      return null;
159    }
160  }
161
162  private boolean buildCellBlock(final Codec codec, final CompressionCodec compressor,
163      final CellScanner cellScanner, OutputStreamSupplier supplier) throws IOException {
164    if (cellScanner == null) {
165      return false;
166    }
167    if (codec == null) {
168      throw new CellScannerButNoCodecException();
169    }
170    int bufferSize = cellBlockBuildingInitialBufferSize;
171    encodeCellsTo(supplier.get(bufferSize), cellScanner, codec, compressor);
172    if (LOG.isTraceEnabled() && bufferSize < supplier.size()) {
173      LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + supplier.size()
174          + "; up hbase.ipc.cellblock.building.initial.buffersize?");
175    }
176    return true;
177  }
178
179  private void encodeCellsTo(OutputStream os, CellScanner cellScanner, Codec codec,
180      CompressionCodec compressor) throws IOException {
181    Compressor poolCompressor = null;
182    try {
183      if (compressor != null) {
184        if (compressor instanceof Configurable) {
185          ((Configurable) compressor).setConf(this.conf);
186        }
187        poolCompressor = CodecPool.getCompressor(compressor);
188        os = compressor.createOutputStream(os, poolCompressor);
189      }
190      Codec.Encoder encoder = codec.getEncoder(os);
191      while (cellScanner.advance()) {
192        encoder.write(cellScanner.current());
193      }
194      encoder.flush();
195    } catch (BufferOverflowException | IndexOutOfBoundsException e) {
196      throw new DoNotRetryIOException(e);
197    } finally {
198      os.close();
199      if (poolCompressor != null) {
200        CodecPool.returnCompressor(poolCompressor);
201      }
202    }
203  }
204
205  /**
206   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
207   * <code>compressor</code>.
208   * @param codec to use for encoding
209   * @param compressor to use for encoding
210   * @param cellScanner to encode
211   * @param allocator to allocate the {@link ByteBuff}.
212   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
213   *         passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
214   *         been flipped and is ready for reading. Use limit to find total size. If
215   *         <code>pool</code> was not null, then this returned ByteBuffer came from there and
216   *         should be returned to the pool when done.
217   * @throws IOException if encoding the cells fail
218   */
219  public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
220      CellScanner cellScanner, ByteBuffAllocator allocator) throws IOException {
221    if (cellScanner == null) {
222      return null;
223    }
224    if (codec == null) {
225      throw new CellScannerButNoCodecException();
226    }
227    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(allocator);
228    encodeCellsTo(bbos, cellScanner, codec, compressor);
229    if (bbos.size() == 0) {
230      bbos.releaseResources();
231      return null;
232    }
233    return bbos;
234  }
235
236  /**
237   * @param codec to use for cellblock
238   * @param cellBlock to encode
239   * @return CellScanner to work against the content of <code>cellBlock</code>
240   * @throws IOException if encoding fails
241   */
242  public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
243      final byte[] cellBlock) throws IOException {
244    // Use this method from Client side to create the CellScanner
245    if (compressor != null) {
246      ByteBuffer cellBlockBuf = decompress(compressor, cellBlock);
247      return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
248    }
249    // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
250    // make Cells directly over the passed BB. This method is called at client side and we don't
251    // want the Cells to share the same byte[] where the RPC response is being read. Caching of any
252    // of the Cells at user's app level will make it not possible to GC the response byte[]
253    return codec.getDecoder(new ByteArrayInputStream(cellBlock));
254  }
255
256  /**
257   * @param codec to use for cellblock
258   * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
259   *          position()'ed at the start of the cell block and limit()'ed at the end.
260   * @return CellScanner to work against the content of <code>cellBlock</code>. All cells created
261   *         out of the CellScanner will share the same ByteBuffer being passed.
262   * @throws IOException if cell encoding fails
263   */
264  public CellScanner createCellScannerReusingBuffers(final Codec codec,
265      final CompressionCodec compressor, ByteBuff cellBlock) throws IOException {
266    // Use this method from HRS to create the CellScanner
267    // If compressed, decompress it first before passing it on else we will leak compression
268    // resources if the stream is not closed properly after we let it out.
269    if (compressor != null) {
270      cellBlock = decompress(compressor, cellBlock);
271    }
272    return codec.getDecoder(cellBlock);
273  }
274
275  private ByteBuffer decompress(CompressionCodec compressor, byte[] compressedCellBlock)
276      throws IOException {
277    ByteBuffer cellBlock = decompress(compressor, new ByteArrayInputStream(compressedCellBlock),
278        compressedCellBlock.length * this.cellBlockDecompressionMultiplier);
279    return cellBlock;
280  }
281
282  private ByteBuff decompress(CompressionCodec compressor, ByteBuff compressedCellBlock)
283      throws IOException {
284    ByteBuffer cellBlock = decompress(compressor, new ByteBuffInputStream(compressedCellBlock),
285        compressedCellBlock.remaining() * this.cellBlockDecompressionMultiplier);
286    return new SingleByteBuff(cellBlock);
287  }
288
289  private ByteBuffer decompress(CompressionCodec compressor, InputStream cellBlockStream,
290      int osInitialSize) throws IOException {
291    // GZIPCodec fails w/ NPE if no configuration.
292    if (compressor instanceof Configurable) {
293      ((Configurable) compressor).setConf(this.conf);
294    }
295    Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
296    CompressionInputStream cis = compressor.createInputStream(cellBlockStream, poolDecompressor);
297    ByteBufferOutputStream bbos;
298    try {
299      // TODO: This is ugly. The buffer will be resized on us if we guess wrong.
300      // TODO: Reuse buffers.
301      bbos = new ByteBufferOutputStream(osInitialSize);
302      IOUtils.copy(cis, bbos);
303      bbos.close();
304      return bbos.getByteBuffer();
305    } finally {
306      CodecPool.returnDecompressor(poolDecompressor);
307    }
308  }
309}