001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.ByteArrayInputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.nio.BufferOverflowException; 025import java.nio.ByteBuffer; 026import org.apache.commons.io.IOUtils; 027import org.apache.hadoop.conf.Configurable; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.CellScanner; 030import org.apache.hadoop.hbase.DoNotRetryIOException; 031import org.apache.hadoop.hbase.codec.Codec; 032import org.apache.hadoop.hbase.io.ByteBuffAllocator; 033import org.apache.hadoop.hbase.io.ByteBuffInputStream; 034import org.apache.hadoop.hbase.io.ByteBufferInputStream; 035import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; 036import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 037import org.apache.hadoop.hbase.nio.ByteBuff; 038import org.apache.hadoop.hbase.nio.SingleByteBuff; 039import org.apache.hadoop.hbase.util.ClassSize; 040import org.apache.hadoop.io.compress.CodecPool; 041import org.apache.hadoop.io.compress.CompressionCodec; 042import org.apache.hadoop.io.compress.CompressionInputStream; 043import org.apache.hadoop.io.compress.Compressor; 044import org.apache.hadoop.io.compress.Decompressor; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 050import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 051import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 052 053/** 054 * Helper class for building cell block. 055 */ 056@InterfaceAudience.Private 057class CellBlockBuilder { 058 059 // LOG is being used in TestCellBlockBuilder 060 static final Logger LOG = LoggerFactory.getLogger(CellBlockBuilder.class); 061 062 private final Configuration conf; 063 064 /** 065 * How much we think the decompressor will expand the original compressed content. 066 */ 067 private final int cellBlockDecompressionMultiplier; 068 069 private final int cellBlockBuildingInitialBufferSize; 070 071 public CellBlockBuilder(Configuration conf) { 072 this.conf = conf; 073 this.cellBlockDecompressionMultiplier = 074 conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); 075 076 // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in 077 // #buildCellBlock. 078 this.cellBlockBuildingInitialBufferSize = 079 ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024)); 080 } 081 082 private interface OutputStreamSupplier { 083 084 OutputStream get(int expectedSize); 085 086 int size(); 087 } 088 089 private static final class ByteBufferOutputStreamSupplier implements OutputStreamSupplier { 090 091 private ByteBufferOutputStream baos; 092 093 @Override 094 public OutputStream get(int expectedSize) { 095 baos = new ByteBufferOutputStream(expectedSize); 096 return baos; 097 } 098 099 @Override 100 public int size() { 101 return baos.size(); 102 } 103 } 104 105 /** 106 * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or 107 * <code>compressor</code>. nnn * @return Null or byte buffer filled with a cellblock filled with 108 * passed-in Cells encoded using passed in <code>codec</code> and/or <code>compressor</code>; the 109 * returned buffer has been flipped and is ready for reading. Use limit to find total size. n 110 */ 111 public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, 112 final CellScanner cellScanner) throws IOException { 113 ByteBufferOutputStreamSupplier supplier = new ByteBufferOutputStreamSupplier(); 114 if (buildCellBlock(codec, compressor, cellScanner, supplier)) { 115 ByteBuffer bb = supplier.baos.getByteBuffer(); 116 // If no cells, don't mess around. Just return null (could be a bunch of existence checking 117 // gets or something -- stuff that does not return a cell). 118 return bb.hasRemaining() ? bb : null; 119 } else { 120 return null; 121 } 122 } 123 124 private static final class ByteBufOutputStreamSupplier implements OutputStreamSupplier { 125 126 private final ByteBufAllocator alloc; 127 128 private ByteBuf buf; 129 130 public ByteBufOutputStreamSupplier(ByteBufAllocator alloc) { 131 this.alloc = alloc; 132 } 133 134 @Override 135 public OutputStream get(int expectedSize) { 136 buf = alloc.buffer(expectedSize); 137 return new ByteBufOutputStream(buf); 138 } 139 140 @Override 141 public int size() { 142 return buf.writerIndex(); 143 } 144 } 145 146 public ByteBuf buildCellBlock(Codec codec, CompressionCodec compressor, CellScanner cellScanner, 147 ByteBufAllocator alloc) throws IOException { 148 ByteBufOutputStreamSupplier supplier = new ByteBufOutputStreamSupplier(alloc); 149 if (buildCellBlock(codec, compressor, cellScanner, supplier)) { 150 return supplier.buf; 151 } else { 152 return null; 153 } 154 } 155 156 private boolean buildCellBlock(final Codec codec, final CompressionCodec compressor, 157 final CellScanner cellScanner, OutputStreamSupplier supplier) throws IOException { 158 if (cellScanner == null) { 159 return false; 160 } 161 if (codec == null) { 162 throw new CellScannerButNoCodecException(); 163 } 164 int bufferSize = cellBlockBuildingInitialBufferSize; 165 encodeCellsTo(supplier.get(bufferSize), cellScanner, codec, compressor); 166 if (LOG.isTraceEnabled() && bufferSize < supplier.size()) { 167 LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + supplier.size() 168 + "; up hbase.ipc.cellblock.building.initial.buffersize?"); 169 } 170 return true; 171 } 172 173 private void encodeCellsTo(OutputStream os, CellScanner cellScanner, Codec codec, 174 CompressionCodec compressor) throws IOException { 175 Compressor poolCompressor = null; 176 try { 177 if (compressor != null) { 178 if (compressor instanceof Configurable) { 179 ((Configurable) compressor).setConf(this.conf); 180 } 181 poolCompressor = CodecPool.getCompressor(compressor); 182 os = compressor.createOutputStream(os, poolCompressor); 183 } 184 Codec.Encoder encoder = codec.getEncoder(os); 185 while (cellScanner.advance()) { 186 encoder.write(cellScanner.current()); 187 } 188 encoder.flush(); 189 } catch (BufferOverflowException | IndexOutOfBoundsException e) { 190 throw new DoNotRetryIOException(e); 191 } finally { 192 os.close(); 193 if (poolCompressor != null) { 194 CodecPool.returnCompressor(poolCompressor); 195 } 196 } 197 } 198 199 /** 200 * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or 201 * <code>compressor</code>. 202 * @param codec to use for encoding 203 * @param compressor to use for encoding 204 * @param cellScanner to encode 205 * @param allocator to allocate the {@link ByteBuff}. 206 * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using 207 * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has 208 * been flipped and is ready for reading. Use limit to find total size. If 209 * <code>pool</code> was not null, then this returned ByteBuffer came from there and 210 * should be returned to the pool when done. 211 * @throws IOException if encoding the cells fail 212 */ 213 public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor, 214 CellScanner cellScanner, ByteBuffAllocator allocator) throws IOException { 215 if (cellScanner == null) { 216 return null; 217 } 218 if (codec == null) { 219 throw new CellScannerButNoCodecException(); 220 } 221 ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(allocator); 222 encodeCellsTo(bbos, cellScanner, codec, compressor); 223 if (bbos.size() == 0) { 224 bbos.releaseResources(); 225 return null; 226 } 227 return bbos; 228 } 229 230 /** 231 * Create a cell scanner. 232 * @param codec to use for cellblock 233 * @param cellBlock to encode 234 * @return CellScanner to work against the content of <code>cellBlock</code> 235 * @throws IOException if encoding fails 236 */ 237 public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, 238 final byte[] cellBlock) throws IOException { 239 // Use this method from Client side to create the CellScanner 240 if (compressor != null) { 241 ByteBuffer cellBlockBuf = decompress(compressor, cellBlock); 242 return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf)); 243 } 244 // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will 245 // make Cells directly over the passed BB. This method is called at client side and we don't 246 // want the Cells to share the same byte[] where the RPC response is being read. Caching of any 247 // of the Cells at user's app level will make it not possible to GC the response byte[] 248 return codec.getDecoder(new ByteArrayInputStream(cellBlock)); 249 } 250 251 /** 252 * Create a cell scanner using an existing bytebuff. 253 * @param codec to use for cellblock 254 * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be 255 * position()'ed at the start of the cell block and limit()'ed at the end. 256 * @return CellScanner to work against the content of <code>cellBlock</code>. All cells created 257 * out of the CellScanner will share the same ByteBuffer being passed. 258 * @throws IOException if cell encoding fails 259 */ 260 public CellScanner createCellScannerReusingBuffers(final Codec codec, 261 final CompressionCodec compressor, ByteBuff cellBlock) throws IOException { 262 // Use this method from HRS to create the CellScanner 263 // If compressed, decompress it first before passing it on else we will leak compression 264 // resources if the stream is not closed properly after we let it out. 265 if (compressor != null) { 266 cellBlock = decompress(compressor, cellBlock); 267 } 268 return codec.getDecoder(cellBlock); 269 } 270 271 private ByteBuffer decompress(CompressionCodec compressor, byte[] compressedCellBlock) 272 throws IOException { 273 ByteBuffer cellBlock = decompress(compressor, new ByteArrayInputStream(compressedCellBlock), 274 compressedCellBlock.length * this.cellBlockDecompressionMultiplier); 275 return cellBlock; 276 } 277 278 private ByteBuff decompress(CompressionCodec compressor, ByteBuff compressedCellBlock) 279 throws IOException { 280 ByteBuffer cellBlock = decompress(compressor, new ByteBuffInputStream(compressedCellBlock), 281 compressedCellBlock.remaining() * this.cellBlockDecompressionMultiplier); 282 return new SingleByteBuff(cellBlock); 283 } 284 285 private ByteBuffer decompress(CompressionCodec compressor, InputStream cellBlockStream, 286 int osInitialSize) throws IOException { 287 // GZIPCodec fails w/ NPE if no configuration. 288 if (compressor instanceof Configurable) { 289 ((Configurable) compressor).setConf(this.conf); 290 } 291 Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); 292 CompressionInputStream cis = compressor.createInputStream(cellBlockStream, poolDecompressor); 293 ByteBufferOutputStream bbos; 294 try { 295 // TODO: This is ugly. The buffer will be resized on us if we guess wrong. 296 // TODO: Reuse buffers. 297 bbos = new ByteBufferOutputStream(osInitialSize); 298 IOUtils.copy(cis, bbos); 299 bbos.close(); 300 return bbos.getByteBuffer(); 301 } finally { 302 CodecPool.returnDecompressor(poolDecompressor); 303 } 304 } 305}