001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.ByteArrayInputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.nio.BufferOverflowException;
025import java.nio.ByteBuffer;
026import org.apache.commons.io.IOUtils;
027import org.apache.hadoop.conf.Configurable;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.DoNotRetryIOException;
030import org.apache.hadoop.hbase.ExtendedCellScanner;
031import org.apache.hadoop.hbase.codec.Codec;
032import org.apache.hadoop.hbase.io.ByteBuffAllocator;
033import org.apache.hadoop.hbase.io.ByteBuffInputStream;
034import org.apache.hadoop.hbase.io.ByteBufferInputStream;
035import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
036import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
037import org.apache.hadoop.hbase.nio.ByteBuff;
038import org.apache.hadoop.hbase.nio.SingleByteBuff;
039import org.apache.hadoop.hbase.util.ClassSize;
040import org.apache.hadoop.io.compress.CodecPool;
041import org.apache.hadoop.io.compress.CompressionCodec;
042import org.apache.hadoop.io.compress.CompressionInputStream;
043import org.apache.hadoop.io.compress.Compressor;
044import org.apache.hadoop.io.compress.Decompressor;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
050import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
051import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
052
053/**
054 * Helper class for building cell block.
055 */
056@InterfaceAudience.Private
057class CellBlockBuilder {
058
059  // LOG is being used in TestCellBlockBuilder
060  static final Logger LOG = LoggerFactory.getLogger(CellBlockBuilder.class);
061
062  private final Configuration conf;
063
064  /**
065   * How much we think the decompressor will expand the original compressed content.
066   */
067  private final int cellBlockDecompressionMultiplier;
068
069  private final int cellBlockBuildingInitialBufferSize;
070
071  public CellBlockBuilder(Configuration conf) {
072    this.conf = conf;
073    this.cellBlockDecompressionMultiplier =
074      conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
075
076    // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in
077    // #buildCellBlock.
078    this.cellBlockBuildingInitialBufferSize =
079      ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
080  }
081
082  private interface OutputStreamSupplier {
083
084    OutputStream get(int expectedSize);
085
086    int size();
087  }
088
089  private static final class ByteBufferOutputStreamSupplier implements OutputStreamSupplier {
090
091    private ByteBufferOutputStream baos;
092
093    @Override
094    public OutputStream get(int expectedSize) {
095      baos = new ByteBufferOutputStream(expectedSize);
096      return baos;
097    }
098
099    @Override
100    public int size() {
101      return baos.size();
102    }
103  }
104
105  /**
106   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
107   * <code>compressor</code>.
108   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
109   *         passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
110   *         been flipped and is ready for reading. Use limit to find total size.
111   */
112  public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
113    final ExtendedCellScanner cellScanner) throws IOException {
114    ByteBufferOutputStreamSupplier supplier = new ByteBufferOutputStreamSupplier();
115    if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
116      ByteBuffer bb = supplier.baos.getByteBuffer();
117      // If no cells, don't mess around. Just return null (could be a bunch of existence checking
118      // gets or something -- stuff that does not return a cell).
119      return bb.hasRemaining() ? bb : null;
120    } else {
121      return null;
122    }
123  }
124
125  private static final class ByteBufOutputStreamSupplier implements OutputStreamSupplier {
126
127    private final ByteBufAllocator alloc;
128
129    private ByteBuf buf;
130
131    public ByteBufOutputStreamSupplier(ByteBufAllocator alloc) {
132      this.alloc = alloc;
133    }
134
135    @Override
136    public OutputStream get(int expectedSize) {
137      buf = alloc.buffer(expectedSize);
138      return new ByteBufOutputStream(buf);
139    }
140
141    @Override
142    public int size() {
143      return buf.writerIndex();
144    }
145  }
146
147  public ByteBuf buildCellBlock(Codec codec, CompressionCodec compressor,
148    ExtendedCellScanner cellScanner, ByteBufAllocator alloc) throws IOException {
149    ByteBufOutputStreamSupplier supplier = new ByteBufOutputStreamSupplier(alloc);
150    if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
151      return supplier.buf;
152    } else {
153      return null;
154    }
155  }
156
157  private boolean buildCellBlock(final Codec codec, final CompressionCodec compressor,
158    final ExtendedCellScanner cellScanner, OutputStreamSupplier supplier) throws IOException {
159    if (cellScanner == null) {
160      return false;
161    }
162    if (codec == null) {
163      throw new CellScannerButNoCodecException();
164    }
165    int bufferSize = cellBlockBuildingInitialBufferSize;
166    encodeCellsTo(supplier.get(bufferSize), cellScanner, codec, compressor);
167    if (LOG.isTraceEnabled() && bufferSize < supplier.size()) {
168      LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + supplier.size()
169        + "; up hbase.ipc.cellblock.building.initial.buffersize?");
170    }
171    return true;
172  }
173
174  private void encodeCellsTo(OutputStream os, ExtendedCellScanner cellScanner, Codec codec,
175    CompressionCodec compressor) throws IOException {
176    Compressor poolCompressor = null;
177    try {
178      if (compressor != null) {
179        if (compressor instanceof Configurable) {
180          ((Configurable) compressor).setConf(this.conf);
181        }
182        poolCompressor = CodecPool.getCompressor(compressor);
183        os = compressor.createOutputStream(os, poolCompressor);
184      }
185      Codec.Encoder encoder = codec.getEncoder(os);
186      while (cellScanner.advance()) {
187        encoder.write(cellScanner.current());
188      }
189      encoder.flush();
190    } catch (BufferOverflowException | IndexOutOfBoundsException e) {
191      throw new DoNotRetryIOException(e);
192    } finally {
193      os.close();
194      if (poolCompressor != null) {
195        CodecPool.returnCompressor(poolCompressor);
196      }
197    }
198  }
199
200  /**
201   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
202   * <code>compressor</code>.
203   * @param codec       to use for encoding
204   * @param compressor  to use for encoding
205   * @param cellScanner to encode
206   * @param allocator   to allocate the {@link ByteBuff}.
207   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
208   *         passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
209   *         been flipped and is ready for reading. Use limit to find total size. If
210   *         <code>pool</code> was not null, then this returned ByteBuffer came from there and
211   *         should be returned to the pool when done.
212   * @throws IOException if encoding the cells fail
213   */
214  public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
215    ExtendedCellScanner cellScanner, ByteBuffAllocator allocator) throws IOException {
216    if (cellScanner == null) {
217      return null;
218    }
219    if (codec == null) {
220      throw new CellScannerButNoCodecException();
221    }
222    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(allocator);
223    encodeCellsTo(bbos, cellScanner, codec, compressor);
224    if (bbos.size() == 0) {
225      bbos.releaseResources();
226      return null;
227    }
228    return bbos;
229  }
230
231  /**
232   * Create a cell scanner.
233   * @param codec     to use for cellblock
234   * @param cellBlock to encode
235   * @return CellScanner to work against the content of <code>cellBlock</code>
236   * @throws IOException if encoding fails
237   */
238  public ExtendedCellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
239    final byte[] cellBlock) throws IOException {
240    // Use this method from Client side to create the CellScanner
241    if (compressor != null) {
242      ByteBuffer cellBlockBuf = decompress(compressor, cellBlock);
243      return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
244    }
245    // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
246    // make Cells directly over the passed BB. This method is called at client side and we don't
247    // want the Cells to share the same byte[] where the RPC response is being read. Caching of any
248    // of the Cells at user's app level will make it not possible to GC the response byte[]
249    return codec.getDecoder(new ByteArrayInputStream(cellBlock));
250  }
251
252  /**
253   * Create a cell scanner using an existing bytebuff.
254   * @param codec     to use for cellblock
255   * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
256   *                  position()'ed at the start of the cell block and limit()'ed at the end.
257   * @return CellScanner to work against the content of <code>cellBlock</code>. All cells created
258   *         out of the CellScanner will share the same ByteBuffer being passed.
259   * @throws IOException if cell encoding fails
260   */
261  public ExtendedCellScanner createCellScannerReusingBuffers(final Codec codec,
262    final CompressionCodec compressor, ByteBuff cellBlock) throws IOException {
263    // Use this method from HRS to create the CellScanner
264    // If compressed, decompress it first before passing it on else we will leak compression
265    // resources if the stream is not closed properly after we let it out.
266    if (compressor != null) {
267      cellBlock = decompress(compressor, cellBlock);
268    }
269    return codec.getDecoder(cellBlock);
270  }
271
272  private ByteBuffer decompress(CompressionCodec compressor, byte[] compressedCellBlock)
273    throws IOException {
274    ByteBuffer cellBlock = decompress(compressor, new ByteArrayInputStream(compressedCellBlock),
275      compressedCellBlock.length * this.cellBlockDecompressionMultiplier);
276    return cellBlock;
277  }
278
279  private ByteBuff decompress(CompressionCodec compressor, ByteBuff compressedCellBlock)
280    throws IOException {
281    ByteBuffer cellBlock = decompress(compressor, new ByteBuffInputStream(compressedCellBlock),
282      compressedCellBlock.remaining() * this.cellBlockDecompressionMultiplier);
283    return new SingleByteBuff(cellBlock);
284  }
285
286  private ByteBuffer decompress(CompressionCodec compressor, InputStream cellBlockStream,
287    int osInitialSize) throws IOException {
288    // GZIPCodec fails w/ NPE if no configuration.
289    if (compressor instanceof Configurable) {
290      ((Configurable) compressor).setConf(this.conf);
291    }
292    Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
293    CompressionInputStream cis = compressor.createInputStream(cellBlockStream, poolDecompressor);
294    ByteBufferOutputStream bbos;
295    try {
296      // TODO: This is ugly. The buffer will be resized on us if we guess wrong.
297      // TODO: Reuse buffers.
298      bbos = new ByteBufferOutputStream(osInitialSize);
299      IOUtils.copy(cis, bbos);
300      bbos.close();
301      return bbos.getByteBuffer();
302    } finally {
303      CodecPool.returnDecompressor(poolDecompressor);
304    }
305  }
306}