View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.DataInput;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.nio.ByteBuffer;
26  
27  import org.apache.commons.io.IOUtils;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configurable;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.CellScanner;
34  import org.apache.hadoop.hbase.HBaseIOException;
35  import org.apache.hadoop.hbase.codec.Codec;
36  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
37  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
38  import org.apache.hadoop.hbase.io.HeapSize;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.ClassSize;
41  import org.apache.hadoop.io.compress.CodecPool;
42  import org.apache.hadoop.io.compress.CompressionCodec;
43  import org.apache.hadoop.io.compress.CompressionInputStream;
44  import org.apache.hadoop.io.compress.Compressor;
45  import org.apache.hadoop.io.compress.Decompressor;
46  
47  import com.google.common.base.Preconditions;
48  import com.google.protobuf.CodedOutputStream;
49  import com.google.protobuf.Message;
50  
51  /**
52   * Utility to help ipc'ing.
53   */
54  @InterfaceAudience.Private
55  public class IPCUtil {
56    // LOG is being used in TestIPCUtil
57    public static final Log LOG = LogFactory.getLog(IPCUtil.class);
58    /**
59     * How much we think the decompressor will expand the original compressed content.
60     */
61    private final int cellBlockDecompressionMultiplier;
62    private final int cellBlockBuildingInitialBufferSize;
63    private final Configuration conf;
64  
65    public IPCUtil(final Configuration conf) {
66      super();
67      this.conf = conf;
68      this.cellBlockDecompressionMultiplier =
69          conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
70  
71      // Guess that 16k is a good size for rpc buffer.  Could go bigger.  See the TODO below in
72      // #buildCellBlock.
73      this.cellBlockBuildingInitialBufferSize =
74        ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
75    }
76  
77    /**
78     * Thrown if a cellscanner but no codec to encode it with.
79     */
80    public static class CellScannerButNoCodecException extends HBaseIOException {};
81  
82    /**
83     * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
84     * <code>compressor</code>.
85     * @param codec
86     * @param compressor
87     * @param cellScanner
88     * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
89     * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
90     * flipped and is ready for reading.  Use limit to find total size.
91     * @throws IOException
92     */
93    @SuppressWarnings("resource")
94    public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
95      final CellScanner cellScanner)
96    throws IOException {
97      return buildCellBlock(codec, compressor, cellScanner, null);
98    }
99  
100   /**
101    * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
102    * <code>compressor</code>.
103    * @param codec
104    * @param compressor
105    * @param cellScanner
106    * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate
107    * our own ByteBuffer.
108    * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
109    * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
110    * flipped and is ready for reading.  Use limit to find total size. If <code>pool</code> was not
111    * null, then this returned ByteBuffer came from there and should be returned to the pool when
112    * done.
113    * @throws IOException
114    */
115   @SuppressWarnings("resource")
116   public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
117     final CellScanner cellScanner, final BoundedByteBufferPool pool)
118   throws IOException {
119     if (cellScanner == null) return null;
120     if (codec == null) throw new CellScannerButNoCodecException();
121     int bufferSize = this.cellBlockBuildingInitialBufferSize;
122     ByteBufferOutputStream baos = null;
123     if (pool != null) {
124       ByteBuffer bb = pool.getBuffer();
125       bufferSize = bb.capacity();
126       baos = new ByteBufferOutputStream(bb);
127     } else {
128       // Then we need to make our own to return.
129       if (cellScanner instanceof HeapSize) {
130         long longSize = ((HeapSize)cellScanner).heapSize();
131         // Just make sure we don't have a size bigger than an int.
132         if (longSize > Integer.MAX_VALUE) {
133           throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
134         }
135         bufferSize = ClassSize.align((int)longSize);
136       }
137       baos = new ByteBufferOutputStream(bufferSize);
138     }
139     OutputStream os = baos;
140     Compressor poolCompressor = null;
141     try {
142       if (compressor != null) {
143         if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
144         poolCompressor = CodecPool.getCompressor(compressor);
145         os = compressor.createOutputStream(os, poolCompressor);
146       }
147       Codec.Encoder encoder = codec.getEncoder(os);
148       int count = 0;
149       while (cellScanner.advance()) {
150         encoder.write(cellScanner.current());
151         count++;
152       }
153       encoder.flush();
154       // If no cells, don't mess around.  Just return null (could be a bunch of existence checking
155       // gets or something -- stuff that does not return a cell).
156       if (count == 0) return null;
157     } finally {
158       os.close();
159       if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
160     }
161     if (LOG.isTraceEnabled()) {
162       if (bufferSize < baos.size()) {
163         LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
164           "; up hbase.ipc.cellblock.building.initial.buffersize?");
165       }
166     }
167     return baos.getByteBuffer();
168   }
169 
170   /**
171    * @param codec
172    * @param cellBlock
173    * @return CellScanner to work against the content of <code>cellBlock</code>
174    * @throws IOException
175    */
176   public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
177       final byte [] cellBlock)
178   throws IOException {
179     return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
180   }
181 
182   /**
183    * @param codec
184    * @param cellBlock
185    * @param offset
186    * @param length
187    * @return CellScanner to work against the content of <code>cellBlock</code>
188    * @throws IOException
189    */
190   public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
191       final byte [] cellBlock, final int offset, final int length)
192   throws IOException {
193     // If compressed, decompress it first before passing it on else we will leak compression
194     // resources if the stream is not closed properly after we let it out.
195     InputStream is = null;
196     if (compressor != null) {
197       // GZIPCodec fails w/ NPE if no configuration.
198       if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
199       Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
200       CompressionInputStream cis =
201         compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length),
202         poolDecompressor);
203       ByteBufferOutputStream bbos = null;
204       try {
205         // TODO: This is ugly.  The buffer will be resized on us if we guess wrong.
206         // TODO: Reuse buffers.
207         bbos = new ByteBufferOutputStream((length - offset) *
208           this.cellBlockDecompressionMultiplier);
209         IOUtils.copy(cis, bbos);
210         bbos.close();
211         ByteBuffer bb = bbos.getByteBuffer();
212         is = new ByteArrayInputStream(bb.array(), 0, bb.limit());
213       } finally {
214         if (is != null) is.close();
215         if (bbos != null) bbos.close();
216 
217         CodecPool.returnDecompressor(poolDecompressor);
218       }
219     } else {
220       is = new ByteArrayInputStream(cellBlock, offset, length);
221     }
222     return codec.getDecoder(is);
223   }
224 
225   /**
226    * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its
227    * serialization.
228    * @return The passed in Message serialized with delimiter.  Return null if <code>m</code> is null
229    * @throws IOException
230    */
231   public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
232     if (m == null) return null;
233     int serializedSize = m.getSerializedSize();
234     int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
235     byte [] buffer = new byte[serializedSize + vintSize];
236     // Passing in a byte array saves COS creating a buffer which it does when using streams.
237     CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
238     // This will write out the vint preamble and the message serialized.
239     cos.writeMessageNoTag(m);
240     cos.flush();
241     cos.checkNoSpaceLeft();
242     return ByteBuffer.wrap(buffer);
243   }
244 
245   /**
246    * Write out header, param, and cell block if there is one.
247    * @param dos
248    * @param header
249    * @param param
250    * @param cellBlock
251    * @return Total number of bytes written.
252    * @throws IOException
253    */
254   public static int write(final OutputStream dos, final Message header, final Message param,
255       final ByteBuffer cellBlock)
256   throws IOException {
257     // Must calculate total size and write that first so other side can read it all in in one
258     // swoop.  This is dictated by how the server is currently written.  Server needs to change
259     // if we are to be able to write without the length prefixing.
260     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
261     if (cellBlock != null) totalSize += cellBlock.remaining();
262     return write(dos, header, param, cellBlock, totalSize);
263   }
264 
265   private static int write(final OutputStream dos, final Message header, final Message param,
266     final ByteBuffer cellBlock, final int totalSize)
267   throws IOException {
268     // I confirmed toBytes does same as DataOutputStream#writeInt.
269     dos.write(Bytes.toBytes(totalSize));
270     // This allocates a buffer that is the size of the message internally.
271     header.writeDelimitedTo(dos);
272     if (param != null) param.writeDelimitedTo(dos);
273     if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
274     dos.flush();
275     return totalSize;
276   }
277 
278   /**
279    * Read in chunks of 8K (HBASE-7239)
280    * @param in
281    * @param dest
282    * @param offset
283    * @param len
284    * @throws IOException
285    */
286   public static void readChunked(final DataInput in, byte[] dest, int offset, int len)
287       throws IOException {
288     int maxRead = 8192;
289 
290     for (; offset < len; offset += maxRead) {
291       in.readFully(dest, offset, Math.min(len - offset, maxRead));
292     }
293   }
294 
295   /**
296    * @return Size on the wire when the two messages are written with writeDelimitedTo
297    */
298   public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
299     int totalSize = 0;
300     for (Message m: messages) {
301       if (m == null) continue;
302       totalSize += m.getSerializedSize();
303       totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
304     }
305     Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
306     return totalSize;
307   }
308 }