001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.ByteArrayInputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.nio.BufferOverflowException;
025import java.nio.ByteBuffer;
026import org.apache.commons.io.IOUtils;
027import org.apache.hadoop.conf.Configurable;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.CellScanner;
030import org.apache.hadoop.hbase.DoNotRetryIOException;
031import org.apache.hadoop.hbase.codec.Codec;
032import org.apache.hadoop.hbase.io.ByteBuffAllocator;
033import org.apache.hadoop.hbase.io.ByteBuffInputStream;
034import org.apache.hadoop.hbase.io.ByteBufferInputStream;
035import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
036import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
037import org.apache.hadoop.hbase.nio.ByteBuff;
038import org.apache.hadoop.hbase.nio.SingleByteBuff;
039import org.apache.hadoop.hbase.util.ClassSize;
040import org.apache.hadoop.io.compress.CodecPool;
041import org.apache.hadoop.io.compress.CompressionCodec;
042import org.apache.hadoop.io.compress.CompressionInputStream;
043import org.apache.hadoop.io.compress.Compressor;
044import org.apache.hadoop.io.compress.Decompressor;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
050import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
051import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
052
053/**
054 * Helper class for building cell block.
055 */
056@InterfaceAudience.Private
057class CellBlockBuilder {
058
059  // LOG is being used in TestCellBlockBuilder
060  static final Logger LOG = LoggerFactory.getLogger(CellBlockBuilder.class);
061
062  private final Configuration conf;
063
064  /**
065   * How much we think the decompressor will expand the original compressed content.
066   */
067  private final int cellBlockDecompressionMultiplier;
068
069  private final int cellBlockBuildingInitialBufferSize;
070
071  public CellBlockBuilder(Configuration conf) {
072    this.conf = conf;
073    this.cellBlockDecompressionMultiplier =
074      conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
075
076    // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in
077    // #buildCellBlock.
078    this.cellBlockBuildingInitialBufferSize =
079      ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
080  }
081
082  private interface OutputStreamSupplier {
083
084    OutputStream get(int expectedSize);
085
086    int size();
087  }
088
089  private static final class ByteBufferOutputStreamSupplier implements OutputStreamSupplier {
090
091    private ByteBufferOutputStream baos;
092
093    @Override
094    public OutputStream get(int expectedSize) {
095      baos = new ByteBufferOutputStream(expectedSize);
096      return baos;
097    }
098
099    @Override
100    public int size() {
101      return baos.size();
102    }
103  }
104
105  /**
106   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
107   * <code>compressor</code>. nnn * @return Null or byte buffer filled with a cellblock filled with
108   * passed-in Cells encoded using passed in <code>codec</code> and/or <code>compressor</code>; the
109   * returned buffer has been flipped and is ready for reading. Use limit to find total size. n
110   */
111  public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
112    final CellScanner cellScanner) throws IOException {
113    ByteBufferOutputStreamSupplier supplier = new ByteBufferOutputStreamSupplier();
114    if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
115      ByteBuffer bb = supplier.baos.getByteBuffer();
116      // If no cells, don't mess around. Just return null (could be a bunch of existence checking
117      // gets or something -- stuff that does not return a cell).
118      return bb.hasRemaining() ? bb : null;
119    } else {
120      return null;
121    }
122  }
123
124  private static final class ByteBufOutputStreamSupplier implements OutputStreamSupplier {
125
126    private final ByteBufAllocator alloc;
127
128    private ByteBuf buf;
129
130    public ByteBufOutputStreamSupplier(ByteBufAllocator alloc) {
131      this.alloc = alloc;
132    }
133
134    @Override
135    public OutputStream get(int expectedSize) {
136      buf = alloc.buffer(expectedSize);
137      return new ByteBufOutputStream(buf);
138    }
139
140    @Override
141    public int size() {
142      return buf.writerIndex();
143    }
144  }
145
146  public ByteBuf buildCellBlock(Codec codec, CompressionCodec compressor, CellScanner cellScanner,
147    ByteBufAllocator alloc) throws IOException {
148    ByteBufOutputStreamSupplier supplier = new ByteBufOutputStreamSupplier(alloc);
149    if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
150      return supplier.buf;
151    } else {
152      return null;
153    }
154  }
155
156  private boolean buildCellBlock(final Codec codec, final CompressionCodec compressor,
157    final CellScanner cellScanner, OutputStreamSupplier supplier) throws IOException {
158    if (cellScanner == null) {
159      return false;
160    }
161    if (codec == null) {
162      throw new CellScannerButNoCodecException();
163    }
164    int bufferSize = cellBlockBuildingInitialBufferSize;
165    encodeCellsTo(supplier.get(bufferSize), cellScanner, codec, compressor);
166    if (LOG.isTraceEnabled() && bufferSize < supplier.size()) {
167      LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + supplier.size()
168        + "; up hbase.ipc.cellblock.building.initial.buffersize?");
169    }
170    return true;
171  }
172
173  private void encodeCellsTo(OutputStream os, CellScanner cellScanner, Codec codec,
174    CompressionCodec compressor) throws IOException {
175    Compressor poolCompressor = null;
176    try {
177      if (compressor != null) {
178        if (compressor instanceof Configurable) {
179          ((Configurable) compressor).setConf(this.conf);
180        }
181        poolCompressor = CodecPool.getCompressor(compressor);
182        os = compressor.createOutputStream(os, poolCompressor);
183      }
184      Codec.Encoder encoder = codec.getEncoder(os);
185      while (cellScanner.advance()) {
186        encoder.write(cellScanner.current());
187      }
188      encoder.flush();
189    } catch (BufferOverflowException | IndexOutOfBoundsException e) {
190      throw new DoNotRetryIOException(e);
191    } finally {
192      os.close();
193      if (poolCompressor != null) {
194        CodecPool.returnCompressor(poolCompressor);
195      }
196    }
197  }
198
199  /**
200   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
201   * <code>compressor</code>.
202   * @param codec       to use for encoding
203   * @param compressor  to use for encoding
204   * @param cellScanner to encode
205   * @param allocator   to allocate the {@link ByteBuff}.
206   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
207   *         passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
208   *         been flipped and is ready for reading. Use limit to find total size. If
209   *         <code>pool</code> was not null, then this returned ByteBuffer came from there and
210   *         should be returned to the pool when done.
211   * @throws IOException if encoding the cells fail
212   */
213  public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
214    CellScanner cellScanner, ByteBuffAllocator allocator) throws IOException {
215    if (cellScanner == null) {
216      return null;
217    }
218    if (codec == null) {
219      throw new CellScannerButNoCodecException();
220    }
221    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(allocator);
222    encodeCellsTo(bbos, cellScanner, codec, compressor);
223    if (bbos.size() == 0) {
224      bbos.releaseResources();
225      return null;
226    }
227    return bbos;
228  }
229
230  /**
231   * Create a cell scanner.
232   * @param codec     to use for cellblock
233   * @param cellBlock to encode
234   * @return CellScanner to work against the content of <code>cellBlock</code>
235   * @throws IOException if encoding fails
236   */
237  public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
238    final byte[] cellBlock) throws IOException {
239    // Use this method from Client side to create the CellScanner
240    if (compressor != null) {
241      ByteBuffer cellBlockBuf = decompress(compressor, cellBlock);
242      return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
243    }
244    // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
245    // make Cells directly over the passed BB. This method is called at client side and we don't
246    // want the Cells to share the same byte[] where the RPC response is being read. Caching of any
247    // of the Cells at user's app level will make it not possible to GC the response byte[]
248    return codec.getDecoder(new ByteArrayInputStream(cellBlock));
249  }
250
251  /**
252   * Create a cell scanner using an existing bytebuff.
253   * @param codec     to use for cellblock
254   * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
255   *                  position()'ed at the start of the cell block and limit()'ed at the end.
256   * @return CellScanner to work against the content of <code>cellBlock</code>. All cells created
257   *         out of the CellScanner will share the same ByteBuffer being passed.
258   * @throws IOException if cell encoding fails
259   */
260  public CellScanner createCellScannerReusingBuffers(final Codec codec,
261    final CompressionCodec compressor, ByteBuff cellBlock) throws IOException {
262    // Use this method from HRS to create the CellScanner
263    // If compressed, decompress it first before passing it on else we will leak compression
264    // resources if the stream is not closed properly after we let it out.
265    if (compressor != null) {
266      cellBlock = decompress(compressor, cellBlock);
267    }
268    return codec.getDecoder(cellBlock);
269  }
270
271  private ByteBuffer decompress(CompressionCodec compressor, byte[] compressedCellBlock)
272    throws IOException {
273    ByteBuffer cellBlock = decompress(compressor, new ByteArrayInputStream(compressedCellBlock),
274      compressedCellBlock.length * this.cellBlockDecompressionMultiplier);
275    return cellBlock;
276  }
277
278  private ByteBuff decompress(CompressionCodec compressor, ByteBuff compressedCellBlock)
279    throws IOException {
280    ByteBuffer cellBlock = decompress(compressor, new ByteBuffInputStream(compressedCellBlock),
281      compressedCellBlock.remaining() * this.cellBlockDecompressionMultiplier);
282    return new SingleByteBuff(cellBlock);
283  }
284
285  private ByteBuffer decompress(CompressionCodec compressor, InputStream cellBlockStream,
286    int osInitialSize) throws IOException {
287    // GZIPCodec fails w/ NPE if no configuration.
288    if (compressor instanceof Configurable) {
289      ((Configurable) compressor).setConf(this.conf);
290    }
291    Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
292    CompressionInputStream cis = compressor.createInputStream(cellBlockStream, poolDecompressor);
293    ByteBufferOutputStream bbos;
294    try {
295      // TODO: This is ugly. The buffer will be resized on us if we guess wrong.
296      // TODO: Reuse buffers.
297      bbos = new ByteBufferOutputStream(osInitialSize);
298      IOUtils.copy(cis, bbos);
299      bbos.close();
300      return bbos.getByteBuffer();
301    } finally {
302      CodecPool.returnDecompressor(poolDecompressor);
303    }
304  }
305}