001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 021import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; 022import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 023 024import java.io.ByteArrayInputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.nio.BufferOverflowException; 029import java.nio.ByteBuffer; 030 031import org.apache.commons.io.IOUtils; 032import org.apache.hadoop.conf.Configurable; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.CellScanner; 035import org.apache.hadoop.hbase.DoNotRetryIOException; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.apache.hadoop.hbase.codec.Codec; 040import org.apache.hadoop.hbase.io.ByteBuffInputStream; 041import org.apache.hadoop.hbase.io.ByteBufferInputStream; 042import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; 043import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 044import org.apache.hadoop.hbase.io.ByteBufferPool; 045import org.apache.hadoop.hbase.nio.ByteBuff; 046import org.apache.hadoop.hbase.nio.SingleByteBuff; 047import org.apache.hadoop.hbase.util.ClassSize; 048import org.apache.hadoop.io.compress.CodecPool; 049import org.apache.hadoop.io.compress.CompressionCodec; 050import org.apache.hadoop.io.compress.CompressionInputStream; 051import org.apache.hadoop.io.compress.Compressor; 052import org.apache.hadoop.io.compress.Decompressor; 053 054/** 055 * Helper class for building cell block. 056 */ 057@InterfaceAudience.Private 058class CellBlockBuilder { 059 060 // LOG is being used in TestCellBlockBuilder 061 static final Logger LOG = LoggerFactory.getLogger(CellBlockBuilder.class); 062 063 private final Configuration conf; 064 065 /** 066 * How much we think the decompressor will expand the original compressed content. 067 */ 068 private final int cellBlockDecompressionMultiplier; 069 070 private final int cellBlockBuildingInitialBufferSize; 071 072 public CellBlockBuilder(Configuration conf) { 073 this.conf = conf; 074 this.cellBlockDecompressionMultiplier = conf 075 .getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); 076 077 // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in 078 // #buildCellBlock. 079 this.cellBlockBuildingInitialBufferSize = ClassSize 080 .align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024)); 081 } 082 083 private interface OutputStreamSupplier { 084 085 OutputStream get(int expectedSize); 086 087 int size(); 088 } 089 090 private static final class ByteBufferOutputStreamSupplier implements OutputStreamSupplier { 091 092 private ByteBufferOutputStream baos; 093 094 @Override 095 public OutputStream get(int expectedSize) { 096 baos = new ByteBufferOutputStream(expectedSize); 097 return baos; 098 } 099 100 @Override 101 public int size() { 102 return baos.size(); 103 } 104 } 105 106 /** 107 * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or 108 * <code>compressor</code>. 109 * @param codec 110 * @param compressor 111 * @param cellScanner 112 * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using 113 * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has 114 * been flipped and is ready for reading. Use limit to find total size. 115 * @throws IOException 116 */ 117 public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, 118 final CellScanner cellScanner) throws IOException { 119 ByteBufferOutputStreamSupplier supplier = new ByteBufferOutputStreamSupplier(); 120 if (buildCellBlock(codec, compressor, cellScanner, supplier)) { 121 ByteBuffer bb = supplier.baos.getByteBuffer(); 122 // If no cells, don't mess around. Just return null (could be a bunch of existence checking 123 // gets or something -- stuff that does not return a cell). 124 return bb.hasRemaining() ? bb : null; 125 } else { 126 return null; 127 } 128 } 129 130 private static final class ByteBufOutputStreamSupplier implements OutputStreamSupplier { 131 132 private final ByteBufAllocator alloc; 133 134 private ByteBuf buf; 135 136 public ByteBufOutputStreamSupplier(ByteBufAllocator alloc) { 137 this.alloc = alloc; 138 } 139 140 @Override 141 public OutputStream get(int expectedSize) { 142 buf = alloc.buffer(expectedSize); 143 return new ByteBufOutputStream(buf); 144 } 145 146 @Override 147 public int size() { 148 return buf.writerIndex(); 149 } 150 } 151 152 public ByteBuf buildCellBlock(Codec codec, CompressionCodec compressor, CellScanner cellScanner, 153 ByteBufAllocator alloc) throws IOException { 154 ByteBufOutputStreamSupplier supplier = new ByteBufOutputStreamSupplier(alloc); 155 if (buildCellBlock(codec, compressor, cellScanner, supplier)) { 156 return supplier.buf; 157 } else { 158 return null; 159 } 160 } 161 162 private boolean buildCellBlock(final Codec codec, final CompressionCodec compressor, 163 final CellScanner cellScanner, OutputStreamSupplier supplier) throws IOException { 164 if (cellScanner == null) { 165 return false; 166 } 167 if (codec == null) { 168 throw new CellScannerButNoCodecException(); 169 } 170 int bufferSize = cellBlockBuildingInitialBufferSize; 171 encodeCellsTo(supplier.get(bufferSize), cellScanner, codec, compressor); 172 if (LOG.isTraceEnabled() && bufferSize < supplier.size()) { 173 LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + supplier.size() 174 + "; up hbase.ipc.cellblock.building.initial.buffersize?"); 175 } 176 return true; 177 } 178 179 private void encodeCellsTo(OutputStream os, CellScanner cellScanner, Codec codec, 180 CompressionCodec compressor) throws IOException { 181 Compressor poolCompressor = null; 182 try { 183 if (compressor != null) { 184 if (compressor instanceof Configurable) { 185 ((Configurable) compressor).setConf(this.conf); 186 } 187 poolCompressor = CodecPool.getCompressor(compressor); 188 os = compressor.createOutputStream(os, poolCompressor); 189 } 190 Codec.Encoder encoder = codec.getEncoder(os); 191 while (cellScanner.advance()) { 192 encoder.write(cellScanner.current()); 193 } 194 encoder.flush(); 195 } catch (BufferOverflowException | IndexOutOfBoundsException e) { 196 throw new DoNotRetryIOException(e); 197 } finally { 198 os.close(); 199 if (poolCompressor != null) { 200 CodecPool.returnCompressor(poolCompressor); 201 } 202 } 203 } 204 205 /** 206 * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or 207 * <code>compressor</code>. 208 * @param codec to use for encoding 209 * @param compressor to use for encoding 210 * @param cellScanner to encode 211 * @param pool Pool of ByteBuffers to make use of. 212 * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using 213 * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has 214 * been flipped and is ready for reading. Use limit to find total size. If 215 * <code>pool</code> was not null, then this returned ByteBuffer came from there and 216 * should be returned to the pool when done. 217 * @throws IOException if encoding the cells fail 218 */ 219 public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor, 220 CellScanner cellScanner, ByteBufferPool pool) throws IOException { 221 if (cellScanner == null) { 222 return null; 223 } 224 if (codec == null) { 225 throw new CellScannerButNoCodecException(); 226 } 227 assert pool != null; 228 ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool); 229 encodeCellsTo(bbos, cellScanner, codec, compressor); 230 if (bbos.size() == 0) { 231 bbos.releaseResources(); 232 return null; 233 } 234 return bbos; 235 } 236 237 /** 238 * @param codec to use for cellblock 239 * @param cellBlock to encode 240 * @return CellScanner to work against the content of <code>cellBlock</code> 241 * @throws IOException if encoding fails 242 */ 243 public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, 244 final byte[] cellBlock) throws IOException { 245 // Use this method from Client side to create the CellScanner 246 if (compressor != null) { 247 ByteBuffer cellBlockBuf = decompress(compressor, cellBlock); 248 return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf)); 249 } 250 // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will 251 // make Cells directly over the passed BB. This method is called at client side and we don't 252 // want the Cells to share the same byte[] where the RPC response is being read. Caching of any 253 // of the Cells at user's app level will make it not possible to GC the response byte[] 254 return codec.getDecoder(new ByteArrayInputStream(cellBlock)); 255 } 256 257 /** 258 * @param codec to use for cellblock 259 * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be 260 * position()'ed at the start of the cell block and limit()'ed at the end. 261 * @return CellScanner to work against the content of <code>cellBlock</code>. All cells created 262 * out of the CellScanner will share the same ByteBuffer being passed. 263 * @throws IOException if cell encoding fails 264 */ 265 public CellScanner createCellScannerReusingBuffers(final Codec codec, 266 final CompressionCodec compressor, ByteBuff cellBlock) throws IOException { 267 // Use this method from HRS to create the CellScanner 268 // If compressed, decompress it first before passing it on else we will leak compression 269 // resources if the stream is not closed properly after we let it out. 270 if (compressor != null) { 271 cellBlock = decompress(compressor, cellBlock); 272 } 273 return codec.getDecoder(cellBlock); 274 } 275 276 private ByteBuffer decompress(CompressionCodec compressor, byte[] compressedCellBlock) 277 throws IOException { 278 ByteBuffer cellBlock = decompress(compressor, new ByteArrayInputStream(compressedCellBlock), 279 compressedCellBlock.length * this.cellBlockDecompressionMultiplier); 280 return cellBlock; 281 } 282 283 private ByteBuff decompress(CompressionCodec compressor, ByteBuff compressedCellBlock) 284 throws IOException { 285 ByteBuffer cellBlock = decompress(compressor, new ByteBuffInputStream(compressedCellBlock), 286 compressedCellBlock.remaining() * this.cellBlockDecompressionMultiplier); 287 return new SingleByteBuff(cellBlock); 288 } 289 290 private ByteBuffer decompress(CompressionCodec compressor, InputStream cellBlockStream, 291 int osInitialSize) throws IOException { 292 // GZIPCodec fails w/ NPE if no configuration. 293 if (compressor instanceof Configurable) { 294 ((Configurable) compressor).setConf(this.conf); 295 } 296 Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); 297 CompressionInputStream cis = compressor.createInputStream(cellBlockStream, poolDecompressor); 298 ByteBufferOutputStream bbos; 299 try { 300 // TODO: This is ugly. The buffer will be resized on us if we guess wrong. 301 // TODO: Reuse buffers. 302 bbos = new ByteBufferOutputStream(osInitialSize); 303 IOUtils.copy(cis, bbos); 304 bbos.close(); 305 return bbos.getByteBuffer(); 306 } finally { 307 CodecPool.returnDecompressor(poolDecompressor); 308 } 309 } 310}