View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import com.google.common.base.Preconditions;
21  import com.google.protobuf.CodedOutputStream;
22  import com.google.protobuf.Message;
23  import java.io.IOException;
24  import java.io.OutputStream;
25  import java.nio.BufferOverflowException;
26  import java.nio.ByteBuffer;
27  import org.apache.commons.io.IOUtils;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configurable;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.CellScanner;
33  import org.apache.hadoop.hbase.DoNotRetryIOException;
34  import org.apache.hadoop.hbase.HBaseIOException;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.codec.Codec;
37  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
38  import org.apache.hadoop.hbase.io.ByteBufferInputStream;
39  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
40  import org.apache.hadoop.hbase.io.HeapSize;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.ClassSize;
43  import org.apache.hadoop.io.compress.CodecPool;
44  import org.apache.hadoop.io.compress.CompressionCodec;
45  import org.apache.hadoop.io.compress.CompressionInputStream;
46  import org.apache.hadoop.io.compress.Compressor;
47  import org.apache.hadoop.io.compress.Decompressor;
48
49  /**
50   * Utility to help ipc'ing.
51   */
52  @InterfaceAudience.Private
53  public class IPCUtil {
54    // LOG is being used in TestIPCUtil
55    public static final Log LOG = LogFactory.getLog(IPCUtil.class);
56    /**
57     * How much we think the decompressor will expand the original compressed content.
58     */
59    private final int cellBlockDecompressionMultiplier;
60    private final int cellBlockBuildingInitialBufferSize;
61    private final Configuration conf;
62
63    public IPCUtil(final Configuration conf) {
64      super();
65      this.conf = conf;
66      this.cellBlockDecompressionMultiplier =
67          conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
68
69      // Guess that 16k is a good size for rpc buffer.  Could go bigger.  See the TODO below in
70      // #buildCellBlock.
71      this.cellBlockBuildingInitialBufferSize =
72        ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
73    }
74
75    /**
76     * Thrown if a cellscanner but no codec to encode it with.
77     */
78    public static class CellScannerButNoCodecException extends HBaseIOException {};
79
80    /**
81     * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
82     * <code>compressor</code>.
83     * @param codec to use for encoding
84     * @param compressor to use for encoding
85     * @param cellScanner to encode
86     * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
87     *   passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
88     *   flipped and is ready for reading.  Use limit to find total size.
89     * @throws IOException if encoding the cells fail
90     */
91    @SuppressWarnings("resource")
92    public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
93      final CellScanner cellScanner)
94    throws IOException {
95      return buildCellBlock(codec, compressor, cellScanner, null);
96    }
97
98    /**
99     * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
100    * <code>compressor</code>.
101    * @param codec to use for encoding
102    * @param compressor to use for encoding
103    * @param cellScanner to encode
104    * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate
105    *   our own ByteBuffer.
106    * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
107    *   passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
108    *   flipped and is ready for reading.  Use limit to find total size. If <code>pool</code> was not
109    *   null, then this returned ByteBuffer came from there and should be returned to the pool when
110    *   done.
111    * @throws IOException if encoding the cells fail
112    */
113   @SuppressWarnings("resource")
114   public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
115     final CellScanner cellScanner, final BoundedByteBufferPool pool)
116   throws IOException {
117     if (cellScanner == null) {
118       return null;
119     }
120     if (codec == null) {
121       throw new CellScannerButNoCodecException();
122     }
123     int bufferSize = this.cellBlockBuildingInitialBufferSize;
124     ByteBufferOutputStream baos;
125     if (pool != null) {
126       ByteBuffer bb = pool.getBuffer();
127       bufferSize = bb.capacity();
128       baos = new ByteBufferOutputStream(bb);
129     } else {
130       // Then we need to make our own to return.
131       if (cellScanner instanceof HeapSize) {
132         long longSize = ((HeapSize)cellScanner).heapSize();
133         // Just make sure we don't have a size bigger than an int.
134         if (longSize > Integer.MAX_VALUE) {
135           throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
136         }
137         bufferSize = ClassSize.align((int)longSize);
138       }
139       baos = new ByteBufferOutputStream(bufferSize);
140     }
141     Compressor poolCompressor = null;
142     OutputStream os = baos;
143     try  {
144       if (compressor != null) {
145         if (compressor instanceof Configurable) {
146           ((Configurable) compressor).setConf(this.conf);
147         }
148         poolCompressor = CodecPool.getCompressor(compressor);
149         os = compressor.createOutputStream(os, poolCompressor);
150       }
151       Codec.Encoder encoder = codec.getEncoder(os);
152       int count = 0;
153       while (cellScanner.advance()) {
154         encoder.write(cellScanner.current());
155         count++;
156       }
157       encoder.flush();
158       // If no cells, don't mess around.  Just return null (could be a bunch of existence checking
159       // gets or something -- stuff that does not return a cell).
160       if (count == 0) {
161         return null;
162       }
163     } catch (BufferOverflowException e) {
164       throw new DoNotRetryIOException(e);
165     } finally {
166       os.close();
167
168       if (poolCompressor != null) {
169         CodecPool.returnCompressor(poolCompressor);
170       }
171     }
172     if (LOG.isTraceEnabled()) {
173       if (bufferSize < baos.size()) {
174         LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
175           "; up hbase.ipc.cellblock.building.initial.buffersize?");
176       }
177     }
178     return baos.getByteBuffer();
179   }
180
181   /**
182    * @param codec to use for cellblock
183    * @param cellBlock to encode
184    * @return CellScanner to work against the content of <code>cellBlock</code>
185    * @throws IOException if encoding fails
186    */
187   public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
188       final byte[] cellBlock) throws IOException {
189     // Use this method from Client side to create the CellScanner
190     ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock);
191     if (compressor != null) {
192       cellBlockBuf = decompress(compressor, cellBlockBuf);
193     }
194     // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
195     // make Cells directly over the passed BB. This method is called at client side and we don't
196     // want the Cells to share the same byte[] where the RPC response is being read. Caching of any
197     // of the Cells at user's app level will make it not possible to GC the response byte[]
198     return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
199   }
200
201   /**
202    * @param codec to use for cellblock
203    * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
204    *   position()'ed at the start of the cell block and limit()'ed at the end.
205    * @return CellScanner to work against the content of <code>cellBlock</code>.
206    *   All cells created out of the CellScanner will share the same ByteBuffer being passed.
207    * @throws IOException if cell encoding fails
208    */
209   public CellScanner createCellScannerReusingBuffers(final Codec codec,
210       final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException {
211     // Use this method from HRS to create the CellScanner
212     // If compressed, decompress it first before passing it on else we will leak compression
213     // resources if the stream is not closed properly after we let it out.
214     if (compressor != null) {
215       cellBlock = decompress(compressor, cellBlock);
216     }
217     return codec.getDecoder(cellBlock);
218   }
219
220   private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
221       throws IOException {
222     // GZIPCodec fails w/ NPE if no configuration.
223     if (compressor instanceof Configurable) {
224       ((Configurable) compressor).setConf(this.conf);
225     }
226     Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
227     CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock),
228         poolDecompressor);
229     ByteBufferOutputStream bbos;
230     try {
231       // TODO: This is ugly. The buffer will be resized on us if we guess wrong.
232       // TODO: Reuse buffers.
233       bbos = new ByteBufferOutputStream(
234           cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
235       IOUtils.copy(cis, bbos);
236       bbos.close();
237       cellBlock = bbos.getByteBuffer();
238     } finally {
239       CodecPool.returnDecompressor(poolDecompressor);
240     }
241     return cellBlock;
242   }
243
244   /**
245    * Write out header, param, and cell block if there is one.
246    * @param dos Stream to write into
247    * @param header to write
248    * @param param to write
249    * @param cellBlock to write
250    * @return Total number of bytes written.
251    * @throws IOException if write action fails
252    */
253   public static int write(final OutputStream dos, final Message header, final Message param,
254       final ByteBuffer cellBlock)
255   throws IOException {
256     // Must calculate total size and write that first so other side can read it all in in one
257     // swoop.  This is dictated by how the server is currently written.  Server needs to change
258     // if we are to be able to write without the length prefixing.
259     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
260     if (cellBlock != null) {
261       totalSize += cellBlock.remaining();
262     }
263     return write(dos, header, param, cellBlock, totalSize);
264   }
265
266   private static int write(final OutputStream dos, final Message header, final Message param,
267     final ByteBuffer cellBlock, final int totalSize)
268   throws IOException {
269     // I confirmed toBytes does same as DataOutputStream#writeInt.
270     dos.write(Bytes.toBytes(totalSize));
271     // This allocates a buffer that is the size of the message internally.
272     header.writeDelimitedTo(dos);
273     if (param != null) {
274       param.writeDelimitedTo(dos);
275     }
276     if (cellBlock != null) {
277       dos.write(cellBlock.array(), 0, cellBlock.remaining());
278     }
279     dos.flush();
280     return totalSize;
281   }
282
283   /**
284    * @return Size on the wire when the two messages are written with writeDelimitedTo
285    */
286   public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
287     int totalSize = 0;
288     for (Message m: messages) {
289       if (m == null) {
290         continue;
291       }
292       totalSize += m.getSerializedSize();
293       totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
294     }
295     Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
296     return totalSize;
297   }
298 }