View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.DataInput;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.nio.ByteBuffer;
26  
27  import org.apache.commons.io.IOUtils;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configurable;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.CellScanner;
33  import org.apache.hadoop.hbase.HBaseIOException;
34  import org.apache.hadoop.hbase.codec.Codec;
35  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
36  import org.apache.hadoop.hbase.io.HeapSize;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.util.ClassSize;
39  import org.apache.hadoop.io.compress.CodecPool;
40  import org.apache.hadoop.io.compress.CompressionCodec;
41  import org.apache.hadoop.io.compress.CompressionInputStream;
42  import org.apache.hadoop.io.compress.Compressor;
43  import org.apache.hadoop.io.compress.Decompressor;
44  
45  import com.google.common.base.Preconditions;
46  import com.google.protobuf.CodedOutputStream;
47  import com.google.protobuf.Message;
48  
49  /**
50   * Utility to help ipc'ing.
51   */
52  class IPCUtil {
53    public static final Log LOG = LogFactory.getLog(IPCUtil.class);
54    /**
55     * How much we think the decompressor will expand the original compressed content.
56     */
57    private final int cellBlockDecompressionMultiplier;
58    private final int cellBlockBuildingInitialBufferSize;
59    private final Configuration conf;
60  
61    IPCUtil(final Configuration conf) {
62      super();
63      this.conf = conf;
64      this.cellBlockDecompressionMultiplier =
65          conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
66      // Guess that 16k is a good size for rpc buffer.  Could go bigger.  See the TODO below in
67      // #buildCellBlock.
68      this.cellBlockBuildingInitialBufferSize =
69        ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
70    }
71  
72    /**
73     * Thrown if a cellscanner but no codec to encode it with.
74     */
75    public static class CellScannerButNoCodecException extends HBaseIOException {};
76  
77    /**
78     * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
79     * <code>compressor</code>.
80     * @param codec
81     * @param compressor
82     * @Param cellScanner
83     * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
84     * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
85     * flipped and is ready for reading.  Use limit to find total size.
86     * @throws IOException
87     */
88    @SuppressWarnings("resource")
89    ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
90      final CellScanner cellScanner)
91    throws IOException {
92      if (cellScanner == null) return null;
93      if (codec == null) throw new CellScannerButNoCodecException();
94      int bufferSize = this.cellBlockBuildingInitialBufferSize;
95      if (cellScanner instanceof HeapSize) {
96        long longSize = ((HeapSize)cellScanner).heapSize();
97        // Just make sure we don't have a size bigger than an int.
98        if (longSize > Integer.MAX_VALUE) {
99          throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
100       }
101       bufferSize = ClassSize.align((int)longSize);
102     } // TODO: Else, get estimate on size of buffer rather than have the buffer resize.
103     // See TestIPCUtil main for experiment where we spin through the Cells getting estimate of
104     // total size before creating the buffer.  It costs somw small percentage.  If we are usually
105     // within the estimated buffer size, then the cost is not worth it.  If we are often well
106     // outside the guesstimated buffer size, the processing can be done in half the time if we
107     // go w/ the estimated size rather than let the buffer resize.
108     ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
109     OutputStream os = baos;
110     Compressor poolCompressor = null;
111     try {
112       if (compressor != null) {
113         if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
114         poolCompressor = CodecPool.getCompressor(compressor);
115         os = compressor.createOutputStream(os, poolCompressor);
116       }
117       Codec.Encoder encoder = codec.getEncoder(os);
118       int count = 0;
119       while (cellScanner.advance()) {
120         encoder.write(cellScanner.current());
121         count++;
122       }
123       encoder.flush();
124       // If no cells, don't mess around.  Just return null (could be a bunch of existence checking
125       // gets or something -- stuff that does not return a cell).
126       if (count == 0) return null;
127     } finally {
128       os.close();
129       if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
130     }
131     if (LOG.isTraceEnabled()) {
132       if (bufferSize < baos.size()) {
133         LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
134           "; up hbase.ipc.cellblock.building.initial.buffersize?");
135       }
136     }
137     return baos.getByteBuffer();
138   }
139 
140   /**
141    * @param codec
142    * @param cellBlock
143    * @return CellScanner to work against the content of <code>cellBlock</code>
144    * @throws IOException
145    */
146   CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
147       final byte [] cellBlock)
148   throws IOException {
149     return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
150   }
151 
152   /**
153    * @param codec
154    * @param cellBlock
155    * @param offset
156    * @param length
157    * @return CellScanner to work against the content of <code>cellBlock</code>
158    * @throws IOException
159    */
160   CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
161       final byte [] cellBlock, final int offset, final int length)
162   throws IOException {
163     // If compressed, decompress it first before passing it on else we will leak compression
164     // resources if the stream is not closed properly after we let it out.
165     InputStream is = null;
166     if (compressor != null) {
167       // GZIPCodec fails w/ NPE if no configuration.
168       if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
169       Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
170       CompressionInputStream cis =
171         compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length),
172         poolDecompressor);
173       ByteBufferOutputStream bbos = null;
174       try {
175         // TODO: This is ugly.  The buffer will be resized on us if we guess wrong.
176         // TODO: Reuse buffers.
177         bbos = new ByteBufferOutputStream((length - offset) *
178           this.cellBlockDecompressionMultiplier);
179         IOUtils.copy(cis, bbos);
180         bbos.close();
181         ByteBuffer bb = bbos.getByteBuffer();
182         is = new ByteArrayInputStream(bb.array(), 0, bb.limit());
183       } finally {
184         if (is != null) is.close();
185         if (bbos != null) bbos.close();
186 
187         CodecPool.returnDecompressor(poolDecompressor);
188       }
189     } else {
190       is = new ByteArrayInputStream(cellBlock, offset, length);
191     }
192     return codec.getDecoder(is);
193   }
194 
195   /**
196    * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its
197    * serialization.
198    * @return The passed in Message serialized with delimiter.  Return null if <code>m</code> is null
199    * @throws IOException
200    */
201   static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
202     if (m == null) return null;
203     int serializedSize = m.getSerializedSize();
204     int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
205     byte [] buffer = new byte[serializedSize + vintSize];
206     // Passing in a byte array saves COS creating a buffer which it does when using streams.
207     CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
208     // This will write out the vint preamble and the message serialized.
209     cos.writeMessageNoTag(m);
210     cos.flush();
211     cos.checkNoSpaceLeft();
212     return ByteBuffer.wrap(buffer);
213   }
214 
215   /**
216    * Write out header, param, and cell block if there is one.
217    * @param dos
218    * @param header
219    * @param param
220    * @param cellBlock
221    * @return Total number of bytes written.
222    * @throws IOException
223    */
224   static int write(final OutputStream dos, final Message header, final Message param,
225       final ByteBuffer cellBlock)
226   throws IOException {
227     // Must calculate total size and write that first so other side can read it all in in one
228     // swoop.  This is dictated by how the server is currently written.  Server needs to change
229     // if we are to be able to write without the length prefixing.
230     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
231     if (cellBlock != null) totalSize += cellBlock.remaining();
232     return write(dos, header, param, cellBlock, totalSize);
233   }
234 
235   private static int write(final OutputStream dos, final Message header, final Message param,
236     final ByteBuffer cellBlock, final int totalSize)
237   throws IOException {
238     // I confirmed toBytes does same as DataOutputStream#writeInt.
239     dos.write(Bytes.toBytes(totalSize));
240     // This allocates a buffer that is the size of the message internally.
241     header.writeDelimitedTo(dos);
242     if (param != null) param.writeDelimitedTo(dos);
243     if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
244     dos.flush();
245     return totalSize;
246   }
247 
248   /**
249    * Read in chunks of 8K (HBASE-7239)
250    * @param in
251    * @param dest
252    * @param offset
253    * @param len
254    * @throws IOException
255    */
256   static void readChunked(final DataInput in, byte[] dest, int offset, int len)
257       throws IOException {
258     int maxRead = 8192;
259 
260     for (; offset < len; offset += maxRead) {
261       in.readFully(dest, offset, Math.min(len - offset, maxRead));
262     }
263   }
264 
265   /**
266    * @param header
267    * @param body
268    * @return Size on the wire when the two messages are written with writeDelimitedTo
269    */
270   static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
271     int totalSize = 0;
272     for (Message m: messages) {
273       if (m == null) continue;
274       totalSize += m.getSerializedSize();
275       totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
276     }
277     Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
278     return totalSize;
279   }
280 }