001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.ByteArrayInputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.nio.BufferOverflowException; 025import java.nio.ByteBuffer; 026import org.apache.commons.io.IOUtils; 027import org.apache.hadoop.conf.Configurable; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.DoNotRetryIOException; 030import org.apache.hadoop.hbase.ExtendedCellScanner; 031import org.apache.hadoop.hbase.codec.Codec; 032import org.apache.hadoop.hbase.io.ByteBuffAllocator; 033import org.apache.hadoop.hbase.io.ByteBuffInputStream; 034import org.apache.hadoop.hbase.io.ByteBufferInputStream; 035import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; 036import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 037import org.apache.hadoop.hbase.nio.ByteBuff; 038import org.apache.hadoop.hbase.nio.SingleByteBuff; 039import org.apache.hadoop.hbase.util.ClassSize; 040import org.apache.hadoop.io.compress.CodecPool; 041import org.apache.hadoop.io.compress.CompressionCodec; 042import org.apache.hadoop.io.compress.CompressionInputStream; 043import org.apache.hadoop.io.compress.Compressor; 044import org.apache.hadoop.io.compress.Decompressor; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 050import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 051import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 052 053/** 054 * Helper class for building cell block. 055 */ 056@InterfaceAudience.Private 057class CellBlockBuilder { 058 059 // LOG is being used in TestCellBlockBuilder 060 static final Logger LOG = LoggerFactory.getLogger(CellBlockBuilder.class); 061 062 private final Configuration conf; 063 064 /** 065 * How much we think the decompressor will expand the original compressed content. 066 */ 067 private final int cellBlockDecompressionMultiplier; 068 069 private final int cellBlockBuildingInitialBufferSize; 070 071 public CellBlockBuilder(Configuration conf) { 072 this.conf = conf; 073 this.cellBlockDecompressionMultiplier = 074 conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); 075 076 // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in 077 // #buildCellBlock. 078 this.cellBlockBuildingInitialBufferSize = 079 ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024)); 080 } 081 082 private interface OutputStreamSupplier { 083 084 OutputStream get(int expectedSize); 085 086 int size(); 087 } 088 089 private static final class ByteBufferOutputStreamSupplier implements OutputStreamSupplier { 090 091 private ByteBufferOutputStream baos; 092 093 @Override 094 public OutputStream get(int expectedSize) { 095 baos = new ByteBufferOutputStream(expectedSize); 096 return baos; 097 } 098 099 @Override 100 public int size() { 101 return baos.size(); 102 } 103 } 104 105 /** 106 * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or 107 * <code>compressor</code>. 108 * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using 109 * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has 110 * been flipped and is ready for reading. Use limit to find total size. 111 */ 112 public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, 113 final ExtendedCellScanner cellScanner) throws IOException { 114 ByteBufferOutputStreamSupplier supplier = new ByteBufferOutputStreamSupplier(); 115 if (buildCellBlock(codec, compressor, cellScanner, supplier)) { 116 ByteBuffer bb = supplier.baos.getByteBuffer(); 117 // If no cells, don't mess around. Just return null (could be a bunch of existence checking 118 // gets or something -- stuff that does not return a cell). 119 return bb.hasRemaining() ? bb : null; 120 } else { 121 return null; 122 } 123 } 124 125 private static final class ByteBufOutputStreamSupplier implements OutputStreamSupplier { 126 127 private final ByteBufAllocator alloc; 128 129 private ByteBuf buf; 130 131 public ByteBufOutputStreamSupplier(ByteBufAllocator alloc) { 132 this.alloc = alloc; 133 } 134 135 @Override 136 public OutputStream get(int expectedSize) { 137 buf = alloc.buffer(expectedSize); 138 return new ByteBufOutputStream(buf); 139 } 140 141 @Override 142 public int size() { 143 return buf.writerIndex(); 144 } 145 } 146 147 public ByteBuf buildCellBlock(Codec codec, CompressionCodec compressor, 148 ExtendedCellScanner cellScanner, ByteBufAllocator alloc) throws IOException { 149 ByteBufOutputStreamSupplier supplier = new ByteBufOutputStreamSupplier(alloc); 150 if (buildCellBlock(codec, compressor, cellScanner, supplier)) { 151 return supplier.buf; 152 } else { 153 return null; 154 } 155 } 156 157 private boolean buildCellBlock(final Codec codec, final CompressionCodec compressor, 158 final ExtendedCellScanner cellScanner, OutputStreamSupplier supplier) throws IOException { 159 if (cellScanner == null) { 160 return false; 161 } 162 if (codec == null) { 163 throw new CellScannerButNoCodecException(); 164 } 165 int bufferSize = cellBlockBuildingInitialBufferSize; 166 encodeCellsTo(supplier.get(bufferSize), cellScanner, codec, compressor); 167 if (LOG.isTraceEnabled() && bufferSize < supplier.size()) { 168 LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + supplier.size() 169 + "; up hbase.ipc.cellblock.building.initial.buffersize?"); 170 } 171 return true; 172 } 173 174 private void encodeCellsTo(OutputStream os, ExtendedCellScanner cellScanner, Codec codec, 175 CompressionCodec compressor) throws IOException { 176 Compressor poolCompressor = null; 177 try { 178 if (compressor != null) { 179 if (compressor instanceof Configurable) { 180 ((Configurable) compressor).setConf(this.conf); 181 } 182 poolCompressor = CodecPool.getCompressor(compressor); 183 os = compressor.createOutputStream(os, poolCompressor); 184 } 185 Codec.Encoder encoder = codec.getEncoder(os); 186 while (cellScanner.advance()) { 187 encoder.write(cellScanner.current()); 188 } 189 encoder.flush(); 190 } catch (BufferOverflowException | IndexOutOfBoundsException e) { 191 throw new DoNotRetryIOException(e); 192 } finally { 193 os.close(); 194 if (poolCompressor != null) { 195 CodecPool.returnCompressor(poolCompressor); 196 } 197 } 198 } 199 200 /** 201 * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or 202 * <code>compressor</code>. 203 * @param codec to use for encoding 204 * @param compressor to use for encoding 205 * @param cellScanner to encode 206 * @param allocator to allocate the {@link ByteBuff}. 207 * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using 208 * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has 209 * been flipped and is ready for reading. Use limit to find total size. If 210 * <code>pool</code> was not null, then this returned ByteBuffer came from there and 211 * should be returned to the pool when done. 212 * @throws IOException if encoding the cells fail 213 */ 214 public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor, 215 ExtendedCellScanner cellScanner, ByteBuffAllocator allocator) throws IOException { 216 if (cellScanner == null) { 217 return null; 218 } 219 if (codec == null) { 220 throw new CellScannerButNoCodecException(); 221 } 222 ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(allocator); 223 encodeCellsTo(bbos, cellScanner, codec, compressor); 224 if (bbos.size() == 0) { 225 bbos.releaseResources(); 226 return null; 227 } 228 return bbos; 229 } 230 231 /** 232 * Create a cell scanner. 233 * @param codec to use for cellblock 234 * @param cellBlock to encode 235 * @return CellScanner to work against the content of <code>cellBlock</code> 236 * @throws IOException if encoding fails 237 */ 238 public ExtendedCellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, 239 final byte[] cellBlock) throws IOException { 240 // Use this method from Client side to create the CellScanner 241 if (compressor != null) { 242 ByteBuffer cellBlockBuf = decompress(compressor, cellBlock); 243 return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf)); 244 } 245 // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will 246 // make Cells directly over the passed BB. This method is called at client side and we don't 247 // want the Cells to share the same byte[] where the RPC response is being read. Caching of any 248 // of the Cells at user's app level will make it not possible to GC the response byte[] 249 return codec.getDecoder(new ByteArrayInputStream(cellBlock)); 250 } 251 252 /** 253 * Create a cell scanner using an existing bytebuff. 254 * @param codec to use for cellblock 255 * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be 256 * position()'ed at the start of the cell block and limit()'ed at the end. 257 * @return CellScanner to work against the content of <code>cellBlock</code>. All cells created 258 * out of the CellScanner will share the same ByteBuffer being passed. 259 * @throws IOException if cell encoding fails 260 */ 261 public ExtendedCellScanner createCellScannerReusingBuffers(final Codec codec, 262 final CompressionCodec compressor, ByteBuff cellBlock) throws IOException { 263 // Use this method from HRS to create the CellScanner 264 // If compressed, decompress it first before passing it on else we will leak compression 265 // resources if the stream is not closed properly after we let it out. 266 if (compressor != null) { 267 cellBlock = decompress(compressor, cellBlock); 268 } 269 return codec.getDecoder(cellBlock); 270 } 271 272 private ByteBuffer decompress(CompressionCodec compressor, byte[] compressedCellBlock) 273 throws IOException { 274 ByteBuffer cellBlock = decompress(compressor, new ByteArrayInputStream(compressedCellBlock), 275 compressedCellBlock.length * this.cellBlockDecompressionMultiplier); 276 return cellBlock; 277 } 278 279 private ByteBuff decompress(CompressionCodec compressor, ByteBuff compressedCellBlock) 280 throws IOException { 281 ByteBuffer cellBlock = decompress(compressor, new ByteBuffInputStream(compressedCellBlock), 282 compressedCellBlock.remaining() * this.cellBlockDecompressionMultiplier); 283 return new SingleByteBuff(cellBlock); 284 } 285 286 private ByteBuffer decompress(CompressionCodec compressor, InputStream cellBlockStream, 287 int osInitialSize) throws IOException { 288 // GZIPCodec fails w/ NPE if no configuration. 289 if (compressor instanceof Configurable) { 290 ((Configurable) compressor).setConf(this.conf); 291 } 292 Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); 293 CompressionInputStream cis = compressor.createInputStream(cellBlockStream, poolDecompressor); 294 ByteBufferOutputStream bbos; 295 try { 296 // TODO: This is ugly. The buffer will be resized on us if we guess wrong. 297 // TODO: Reuse buffers. 298 bbos = new ByteBufferOutputStream(osInitialSize); 299 IOUtils.copy(cis, bbos); 300 bbos.close(); 301 return bbos.getByteBuffer(); 302 } finally { 303 CodecPool.returnDecompressor(poolDecompressor); 304 } 305 } 306}