View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.DataInput;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.nio.BufferOverflowException;
26  import java.nio.ByteBuffer;
27  
28  import org.apache.commons.io.IOUtils;
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.conf.Configurable;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.CellScanner;
35  import org.apache.hadoop.hbase.DoNotRetryIOException;
36  import org.apache.hadoop.hbase.HBaseIOException;
37  import org.apache.hadoop.hbase.codec.Codec;
38  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
39  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
40  import org.apache.hadoop.hbase.io.HeapSize;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.ClassSize;
43  import org.apache.hadoop.io.compress.CodecPool;
44  import org.apache.hadoop.io.compress.CompressionCodec;
45  import org.apache.hadoop.io.compress.CompressionInputStream;
46  import org.apache.hadoop.io.compress.Compressor;
47  import org.apache.hadoop.io.compress.Decompressor;
48  
49  import com.google.common.base.Preconditions;
50  import com.google.protobuf.CodedOutputStream;
51  import com.google.protobuf.Message;
52  
53  /**
54   * Utility to help ipc'ing.
55   */
56  @InterfaceAudience.Private
57  public class IPCUtil {
58    // LOG is being used in TestIPCUtil
59    public static final Log LOG = LogFactory.getLog(IPCUtil.class);
60    /**
61     * How much we think the decompressor will expand the original compressed content.
62     */
63    private final int cellBlockDecompressionMultiplier;
64    private final int cellBlockBuildingInitialBufferSize;
65    private final Configuration conf;
66  
67    public IPCUtil(final Configuration conf) {
68      super();
69      this.conf = conf;
70      this.cellBlockDecompressionMultiplier =
71          conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
72  
73      // Guess that 16k is a good size for rpc buffer.  Could go bigger.  See the TODO below in
74      // #buildCellBlock.
75      this.cellBlockBuildingInitialBufferSize =
76        ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
77    }
78  
79    /**
80     * Thrown if a cellscanner but no codec to encode it with.
81     */
82    public static class CellScannerButNoCodecException extends HBaseIOException {};
83  
84    /**
85     * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
86     * <code>compressor</code>.
87     * @param codec
88     * @param compressor
89     * @param cellScanner
90     * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
91     * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
92     * flipped and is ready for reading.  Use limit to find total size.
93     * @throws IOException
94     */
95    @SuppressWarnings("resource")
96    public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
97      final CellScanner cellScanner)
98    throws IOException {
99      return buildCellBlock(codec, compressor, cellScanner, null);
100   }
101 
102   /**
103    * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
104    * <code>compressor</code>.
105    * @param codec
106    * @param compressor
107    * @param cellScanner
108    * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate
109    * our own ByteBuffer.
110    * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
111    * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
112    * flipped and is ready for reading.  Use limit to find total size. If <code>pool</code> was not
113    * null, then this returned ByteBuffer came from there and should be returned to the pool when
114    * done.
115    * @throws IOException
116    */
117   @SuppressWarnings("resource")
118   public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
119     final CellScanner cellScanner, final BoundedByteBufferPool pool)
120   throws IOException {
121     if (cellScanner == null) return null;
122     if (codec == null) throw new CellScannerButNoCodecException();
123     int bufferSize = this.cellBlockBuildingInitialBufferSize;
124     ByteBufferOutputStream baos = null;
125     ByteBuffer bb = null;
126     if (pool != null) {
127       bb = pool.getBuffer();
128       bufferSize = bb.capacity();
129       baos = new ByteBufferOutputStream(bb);
130     } else {
131       // Then we need to make our own to return.
132       if (cellScanner instanceof HeapSize) {
133         long longSize = ((HeapSize)cellScanner).heapSize();
134         // Just make sure we don't have a size bigger than an int.
135         if (longSize > Integer.MAX_VALUE) {
136           throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
137         }
138         bufferSize = ClassSize.align((int)longSize);
139       }
140       baos = new ByteBufferOutputStream(bufferSize);
141     }
142     OutputStream os = baos;
143     Compressor poolCompressor = null;
144     try {
145       if (compressor != null) {
146         if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
147         poolCompressor = CodecPool.getCompressor(compressor);
148         os = compressor.createOutputStream(os, poolCompressor);
149       }
150       Codec.Encoder encoder = codec.getEncoder(os);
151       int count = 0;
152       while (cellScanner.advance()) {
153         encoder.write(cellScanner.current());
154         count++;
155       }
156       encoder.flush();
157       // If no cells, don't mess around.  Just return null (could be a bunch of existence checking
158       // gets or something -- stuff that does not return a cell).
159       if (count == 0) {
160         if (pool != null && bb != null) {
161           pool.putBuffer(bb);
162         }
163         return null;
164       }
165     } catch (BufferOverflowException e) {
166       throw new DoNotRetryIOException(e);
167     } finally {
168       os.close();
169       if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
170     }
171     if (LOG.isTraceEnabled()) {
172       if (bufferSize < baos.size()) {
173         LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
174           "; up hbase.ipc.cellblock.building.initial.buffersize?");
175       }
176     }
177     return baos.getByteBuffer();
178   }
179 
180   /**
181    * @param codec
182    * @param cellBlock
183    * @return CellScanner to work against the content of <code>cellBlock</code>
184    * @throws IOException
185    */
186   public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
187       final byte [] cellBlock)
188   throws IOException {
189     return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
190   }
191 
192   /**
193    * @param codec
194    * @param cellBlock
195    * @param offset
196    * @param length
197    * @return CellScanner to work against the content of <code>cellBlock</code>
198    * @throws IOException
199    */
200   public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
201       final byte [] cellBlock, final int offset, final int length)
202   throws IOException {
203     // If compressed, decompress it first before passing it on else we will leak compression
204     // resources if the stream is not closed properly after we let it out.
205     InputStream is = null;
206     if (compressor != null) {
207       // GZIPCodec fails w/ NPE if no configuration.
208       if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
209       Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
210       CompressionInputStream cis =
211         compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length),
212         poolDecompressor);
213       ByteBufferOutputStream bbos = null;
214       try {
215         // TODO: This is ugly.  The buffer will be resized on us if we guess wrong.
216         // TODO: Reuse buffers.
217         bbos = new ByteBufferOutputStream((length - offset) *
218           this.cellBlockDecompressionMultiplier);
219         IOUtils.copy(cis, bbos);
220         bbos.close();
221         ByteBuffer bb = bbos.getByteBuffer();
222         is = new ByteArrayInputStream(bb.array(), 0, bb.limit());
223       } finally {
224         if (is != null) is.close();
225         if (bbos != null) bbos.close();
226 
227         CodecPool.returnDecompressor(poolDecompressor);
228       }
229     } else {
230       is = new ByteArrayInputStream(cellBlock, offset, length);
231     }
232     return codec.getDecoder(is);
233   }
234 
235   /**
236    * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its
237    * serialization.
238    * @return The passed in Message serialized with delimiter.  Return null if <code>m</code> is null
239    * @throws IOException
240    */
241   public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
242     if (m == null) return null;
243     int serializedSize = m.getSerializedSize();
244     int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
245     byte [] buffer = new byte[serializedSize + vintSize];
246     // Passing in a byte array saves COS creating a buffer which it does when using streams.
247     CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
248     // This will write out the vint preamble and the message serialized.
249     cos.writeMessageNoTag(m);
250     cos.flush();
251     cos.checkNoSpaceLeft();
252     return ByteBuffer.wrap(buffer);
253   }
254 
255   /**
256    * Write out header, param, and cell block if there is one.
257    * @param dos
258    * @param header
259    * @param param
260    * @param cellBlock
261    * @return Total number of bytes written.
262    * @throws IOException
263    */
264   public static int write(final OutputStream dos, final Message header, final Message param,
265       final ByteBuffer cellBlock)
266   throws IOException {
267     // Must calculate total size and write that first so other side can read it all in in one
268     // swoop.  This is dictated by how the server is currently written.  Server needs to change
269     // if we are to be able to write without the length prefixing.
270     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
271     if (cellBlock != null) totalSize += cellBlock.remaining();
272     return write(dos, header, param, cellBlock, totalSize);
273   }
274 
275   private static int write(final OutputStream dos, final Message header, final Message param,
276     final ByteBuffer cellBlock, final int totalSize)
277   throws IOException {
278     // I confirmed toBytes does same as DataOutputStream#writeInt.
279     dos.write(Bytes.toBytes(totalSize));
280     // This allocates a buffer that is the size of the message internally.
281     header.writeDelimitedTo(dos);
282     if (param != null) param.writeDelimitedTo(dos);
283     if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
284     dos.flush();
285     return totalSize;
286   }
287 
288   /**
289    * Read in chunks of 8K (HBASE-7239)
290    * @param in
291    * @param dest
292    * @param offset
293    * @param len
294    * @throws IOException
295    */
296   public static void readChunked(final DataInput in, byte[] dest, int offset, int len)
297       throws IOException {
298     int maxRead = 8192;
299 
300     for (; offset < len; offset += maxRead) {
301       in.readFully(dest, offset, Math.min(len - offset, maxRead));
302     }
303   }
304 
305   /**
306    * @return Size on the wire when the two messages are written with writeDelimitedTo
307    */
308   public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
309     int totalSize = 0;
310     for (Message m: messages) {
311       if (m == null) continue;
312       totalSize += m.getSerializedSize();
313       totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
314     }
315     Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
316     return totalSize;
317   }
318 
319   static IOException toIOE(Throwable t) {
320     if (t instanceof IOException) {
321       return (IOException) t;
322     } else {
323       return new IOException(t);
324     }
325   }
326 }