View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import java.io.DataInput;
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.nio.BufferOverflowException;
24  import java.nio.ByteBuffer;
25  
26  import org.apache.commons.io.IOUtils;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.conf.Configurable;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.CellScanner;
33  import org.apache.hadoop.hbase.DoNotRetryIOException;
34  import org.apache.hadoop.hbase.HBaseIOException;
35  import org.apache.hadoop.hbase.codec.Codec;
36  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
37  import org.apache.hadoop.hbase.io.ByteBufferInputStream;
38  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
39  import org.apache.hadoop.hbase.io.HeapSize;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.hbase.util.ClassSize;
42  import org.apache.hadoop.io.compress.CodecPool;
43  import org.apache.hadoop.io.compress.CompressionCodec;
44  import org.apache.hadoop.io.compress.CompressionInputStream;
45  import org.apache.hadoop.io.compress.Compressor;
46  import org.apache.hadoop.io.compress.Decompressor;
47  
48  import com.google.common.base.Preconditions;
49  import com.google.protobuf.CodedOutputStream;
50  import com.google.protobuf.Message;
51  
52  /**
53   * Utility to help ipc'ing.
54   */
55  @InterfaceAudience.Private
56  public class IPCUtil {
57    // LOG is being used in TestIPCUtil
58    public static final Log LOG = LogFactory.getLog(IPCUtil.class);
59    /**
60     * How much we think the decompressor will expand the original compressed content.
61     */
62    private final int cellBlockDecompressionMultiplier;
63    private final int cellBlockBuildingInitialBufferSize;
64    private final Configuration conf;
65  
66    public IPCUtil(final Configuration conf) {
67      super();
68      this.conf = conf;
69      this.cellBlockDecompressionMultiplier =
70          conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
71  
72      // Guess that 16k is a good size for rpc buffer.  Could go bigger.  See the TODO below in
73      // #buildCellBlock.
74      this.cellBlockBuildingInitialBufferSize =
75        ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
76    }
77  
78    /**
79     * Thrown if a cellscanner but no codec to encode it with.
80     */
81    public static class CellScannerButNoCodecException extends HBaseIOException {};
82  
83    /**
84     * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
85     * <code>compressor</code>.
86     * @param codec
87     * @param compressor
88     * @param cellScanner
89     * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
90     * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
91     * flipped and is ready for reading.  Use limit to find total size.
92     * @throws IOException
93     */
94    @SuppressWarnings("resource")
95    public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
96      final CellScanner cellScanner)
97    throws IOException {
98      return buildCellBlock(codec, compressor, cellScanner, null);
99    }
100 
101   /**
102    * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
103    * <code>compressor</code>.
104    * @param codec
105    * @param compressor
106    * @param cellScanner
107    * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate
108    * our own ByteBuffer.
109    * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
110    * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
111    * flipped and is ready for reading.  Use limit to find total size. If <code>pool</code> was not
112    * null, then this returned ByteBuffer came from there and should be returned to the pool when
113    * done.
114    * @throws IOException
115    */
116   @SuppressWarnings("resource")
117   public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
118     final CellScanner cellScanner, final BoundedByteBufferPool pool)
119   throws IOException {
120     if (cellScanner == null) return null;
121     if (codec == null) throw new CellScannerButNoCodecException();
122     int bufferSize = this.cellBlockBuildingInitialBufferSize;
123     ByteBufferOutputStream baos = null;
124     if (pool != null) {
125       ByteBuffer bb = pool.getBuffer();
126       bufferSize = bb.capacity();
127       baos = new ByteBufferOutputStream(bb);
128     } else {
129       // Then we need to make our own to return.
130       if (cellScanner instanceof HeapSize) {
131         long longSize = ((HeapSize)cellScanner).heapSize();
132         // Just make sure we don't have a size bigger than an int.
133         if (longSize > Integer.MAX_VALUE) {
134           throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
135         }
136         bufferSize = ClassSize.align((int)longSize);
137       }
138       baos = new ByteBufferOutputStream(bufferSize);
139     }
140     OutputStream os = baos;
141     Compressor poolCompressor = null;
142     try {
143       if (compressor != null) {
144         if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
145         poolCompressor = CodecPool.getCompressor(compressor);
146         os = compressor.createOutputStream(os, poolCompressor);
147       }
148       Codec.Encoder encoder = codec.getEncoder(os);
149       int count = 0;
150       while (cellScanner.advance()) {
151         encoder.write(cellScanner.current());
152         count++;
153       }
154       encoder.flush();
155       // If no cells, don't mess around.  Just return null (could be a bunch of existence checking
156       // gets or something -- stuff that does not return a cell).
157       if (count == 0) return null;
158     } catch (BufferOverflowException e) {
159       throw new DoNotRetryIOException(e);
160     } finally {
161       os.close();
162       if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
163     }
164     if (LOG.isTraceEnabled()) {
165       if (bufferSize < baos.size()) {
166         LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
167           "; up hbase.ipc.cellblock.building.initial.buffersize?");
168       }
169     }
170     return baos.getByteBuffer();
171   }
172 
173   /**
174    * @param codec
175    * @param cellBlock
176    * @return CellScanner to work against the content of <code>cellBlock</code>
177    * @throws IOException
178    */
179   public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
180       final byte[] cellBlock) throws IOException {
181     // Use this method from Client side to create the CellScanner
182     ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock);
183     if (compressor != null) {
184       cellBlockBuf = decompress(compressor, cellBlockBuf);
185     }
186     // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
187     // make Cells directly over the passed BB. This method is called at client side and we don't
188     // want the Cells to share the same byte[] where the RPC response is being read. Caching of any
189     // of the Cells at user's app level will make it not possible to GC the response byte[]
190     return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
191   }
192 
193   /**
194    * @param codec
195    * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
196    * position()'ed at the start of the cell block and limit()'ed at the end.
197    * @return CellScanner to work against the content of <code>cellBlock</code>.
198    * All cells created out of the CellScanner will share the same ByteBuffer being passed.
199    * @throws IOException
200    */
201   public CellScanner createCellScannerReusingBuffers(final Codec codec,
202       final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException {
203     // Use this method from HRS to create the CellScanner
204     // If compressed, decompress it first before passing it on else we will leak compression
205     // resources if the stream is not closed properly after we let it out.
206     if (compressor != null) {
207       cellBlock = decompress(compressor, cellBlock);
208     }
209     return codec.getDecoder(cellBlock);
210   }
211 
212   private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
213       throws IOException {
214     // GZIPCodec fails w/ NPE if no configuration.
215     if (compressor instanceof Configurable) ((Configurable) compressor).setConf(this.conf);
216     Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
217     CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock),
218         poolDecompressor);
219     ByteBufferOutputStream bbos = null;
220     try {
221       // TODO: This is ugly. The buffer will be resized on us if we guess wrong.
222       // TODO: Reuse buffers.
223       bbos = new ByteBufferOutputStream(
224           cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
225       IOUtils.copy(cis, bbos);
226       bbos.close();
227       cellBlock = bbos.getByteBuffer();
228     } finally {
229       CodecPool.returnDecompressor(poolDecompressor);
230     }
231     return cellBlock;
232   }
233 
234   /**
235    * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its
236    * serialization.
237    * @return The passed in Message serialized with delimiter.  Return null if <code>m</code> is null
238    * @throws IOException
239    */
240   public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
241     if (m == null) return null;
242     int serializedSize = m.getSerializedSize();
243     int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
244     byte [] buffer = new byte[serializedSize + vintSize];
245     // Passing in a byte array saves COS creating a buffer which it does when using streams.
246     CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
247     // This will write out the vint preamble and the message serialized.
248     cos.writeMessageNoTag(m);
249     cos.flush();
250     cos.checkNoSpaceLeft();
251     return ByteBuffer.wrap(buffer);
252   }
253 
254   /**
255    * Write out header, param, and cell block if there is one.
256    * @param dos
257    * @param header
258    * @param param
259    * @param cellBlock
260    * @return Total number of bytes written.
261    * @throws IOException
262    */
263   public static int write(final OutputStream dos, final Message header, final Message param,
264       final ByteBuffer cellBlock)
265   throws IOException {
266     // Must calculate total size and write that first so other side can read it all in in one
267     // swoop.  This is dictated by how the server is currently written.  Server needs to change
268     // if we are to be able to write without the length prefixing.
269     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
270     if (cellBlock != null) totalSize += cellBlock.remaining();
271     return write(dos, header, param, cellBlock, totalSize);
272   }
273 
274   private static int write(final OutputStream dos, final Message header, final Message param,
275     final ByteBuffer cellBlock, final int totalSize)
276   throws IOException {
277     // I confirmed toBytes does same as DataOutputStream#writeInt.
278     dos.write(Bytes.toBytes(totalSize));
279     // This allocates a buffer that is the size of the message internally.
280     header.writeDelimitedTo(dos);
281     if (param != null) param.writeDelimitedTo(dos);
282     if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
283     dos.flush();
284     return totalSize;
285   }
286 
287   /**
288    * Read in chunks of 8K (HBASE-7239)
289    * @param in
290    * @param dest
291    * @param offset
292    * @param len
293    * @throws IOException
294    */
295   public static void readChunked(final DataInput in, byte[] dest, int offset, int len)
296       throws IOException {
297     int maxRead = 8192;
298 
299     for (; offset < len; offset += maxRead) {
300       in.readFully(dest, offset, Math.min(len - offset, maxRead));
301     }
302   }
303 
304   /**
305    * @return Size on the wire when the two messages are written with writeDelimitedTo
306    */
307   public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
308     int totalSize = 0;
309     for (Message m: messages) {
310       if (m == null) continue;
311       totalSize += m.getSerializedSize();
312       totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
313     }
314     Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
315     return totalSize;
316   }
317 }