View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.DataInput;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.nio.ByteBuffer;
26  
27  import org.apache.commons.io.IOUtils;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configurable;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.CellScanner;
34  import org.apache.hadoop.hbase.HBaseIOException;
35  import org.apache.hadoop.hbase.codec.Codec;
36  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
37  import org.apache.hadoop.hbase.io.HeapSize;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.hbase.util.ClassSize;
40  import org.apache.hadoop.io.compress.CodecPool;
41  import org.apache.hadoop.io.compress.CompressionCodec;
42  import org.apache.hadoop.io.compress.CompressionInputStream;
43  import org.apache.hadoop.io.compress.Compressor;
44  import org.apache.hadoop.io.compress.Decompressor;
45  
46  import com.google.common.base.Preconditions;
47  import com.google.protobuf.CodedOutputStream;
48  import com.google.protobuf.Message;
49  
50  /**
51   * Utility to help ipc'ing.
52   */
53  @InterfaceAudience.Private
54  public class IPCUtil {
55    public static final Log LOG = LogFactory.getLog(IPCUtil.class);
56    /**
57     * How much we think the decompressor will expand the original compressed content.
58     */
59    private final int cellBlockDecompressionMultiplier;
60    private final int cellBlockBuildingInitialBufferSize;
61    private final Configuration conf;
62  
63    public IPCUtil(final Configuration conf) {
64      super();
65      this.conf = conf;
66      this.cellBlockDecompressionMultiplier =
67          conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
68      // Guess that 16k is a good size for rpc buffer.  Could go bigger.  See the TODO below in
69      // #buildCellBlock.
70      this.cellBlockBuildingInitialBufferSize =
71        ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
72    }
73  
74    /**
75     * Thrown if a cellscanner but no codec to encode it with.
76     */
77    public static class CellScannerButNoCodecException extends HBaseIOException {};
78  
79    /**
80     * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
81     * <code>compressor</code>.
82     * @param codec
83     * @param compressor
84     * @param cellScanner
85     * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
86     * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
87     * flipped and is ready for reading.  Use limit to find total size.
88     * @throws IOException
89     */
90    @SuppressWarnings("resource")
91    public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
92      final CellScanner cellScanner)
93    throws IOException {
94      if (cellScanner == null) return null;
95      if (codec == null) throw new CellScannerButNoCodecException();
96      int bufferSize = this.cellBlockBuildingInitialBufferSize;
97      if (cellScanner instanceof HeapSize) {
98        long longSize = ((HeapSize)cellScanner).heapSize();
99        // Just make sure we don't have a size bigger than an int.
100       if (longSize > Integer.MAX_VALUE) {
101         throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
102       }
103       bufferSize = ClassSize.align((int)longSize);
104     } // TODO: Else, get estimate on size of buffer rather than have the buffer resize.
105     // See TestIPCUtil main for experiment where we spin through the Cells getting estimate of
106     // total size before creating the buffer.  It costs somw small percentage.  If we are usually
107     // within the estimated buffer size, then the cost is not worth it.  If we are often well
108     // outside the guesstimated buffer size, the processing can be done in half the time if we
109     // go w/ the estimated size rather than let the buffer resize.
110     ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
111     OutputStream os = baos;
112     Compressor poolCompressor = null;
113     try {
114       if (compressor != null) {
115         if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
116         poolCompressor = CodecPool.getCompressor(compressor);
117         os = compressor.createOutputStream(os, poolCompressor);
118       }
119       Codec.Encoder encoder = codec.getEncoder(os);
120       int count = 0;
121       while (cellScanner.advance()) {
122         encoder.write(cellScanner.current());
123         count++;
124       }
125       encoder.flush();
126       // If no cells, don't mess around.  Just return null (could be a bunch of existence checking
127       // gets or something -- stuff that does not return a cell).
128       if (count == 0) return null;
129     } finally {
130       os.close();
131       if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
132     }
133     if (LOG.isTraceEnabled()) {
134       if (bufferSize < baos.size()) {
135         LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
136           "; up hbase.ipc.cellblock.building.initial.buffersize?");
137       }
138     }
139     return baos.getByteBuffer();
140   }
141 
142   /**
143    * @param codec
144    * @param cellBlock
145    * @return CellScanner to work against the content of <code>cellBlock</code>
146    * @throws IOException
147    */
148   public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
149       final byte [] cellBlock)
150   throws IOException {
151     return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
152   }
153 
154   /**
155    * @param codec
156    * @param cellBlock
157    * @param offset
158    * @param length
159    * @return CellScanner to work against the content of <code>cellBlock</code>
160    * @throws IOException
161    */
162   public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
163       final byte [] cellBlock, final int offset, final int length)
164   throws IOException {
165     // If compressed, decompress it first before passing it on else we will leak compression
166     // resources if the stream is not closed properly after we let it out.
167     InputStream is = null;
168     if (compressor != null) {
169       // GZIPCodec fails w/ NPE if no configuration.
170       if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
171       Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
172       CompressionInputStream cis =
173         compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length),
174         poolDecompressor);
175       ByteBufferOutputStream bbos = null;
176       try {
177         // TODO: This is ugly.  The buffer will be resized on us if we guess wrong.
178         // TODO: Reuse buffers.
179         bbos = new ByteBufferOutputStream((length - offset) *
180           this.cellBlockDecompressionMultiplier);
181         IOUtils.copy(cis, bbos);
182         bbos.close();
183         ByteBuffer bb = bbos.getByteBuffer();
184         is = new ByteArrayInputStream(bb.array(), 0, bb.limit());
185       } finally {
186         if (is != null) is.close();
187         if (bbos != null) bbos.close();
188 
189         CodecPool.returnDecompressor(poolDecompressor);
190       }
191     } else {
192       is = new ByteArrayInputStream(cellBlock, offset, length);
193     }
194     return codec.getDecoder(is);
195   }
196 
197   /**
198    * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its
199    * serialization.
200    * @return The passed in Message serialized with delimiter.  Return null if <code>m</code> is null
201    * @throws IOException
202    */
203   public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
204     if (m == null) return null;
205     int serializedSize = m.getSerializedSize();
206     int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
207     byte [] buffer = new byte[serializedSize + vintSize];
208     // Passing in a byte array saves COS creating a buffer which it does when using streams.
209     CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
210     // This will write out the vint preamble and the message serialized.
211     cos.writeMessageNoTag(m);
212     cos.flush();
213     cos.checkNoSpaceLeft();
214     return ByteBuffer.wrap(buffer);
215   }
216 
217   /**
218    * Write out header, param, and cell block if there is one.
219    * @param dos
220    * @param header
221    * @param param
222    * @param cellBlock
223    * @return Total number of bytes written.
224    * @throws IOException
225    */
226   public static int write(final OutputStream dos, final Message header, final Message param,
227       final ByteBuffer cellBlock)
228   throws IOException {
229     // Must calculate total size and write that first so other side can read it all in in one
230     // swoop.  This is dictated by how the server is currently written.  Server needs to change
231     // if we are to be able to write without the length prefixing.
232     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
233     if (cellBlock != null) totalSize += cellBlock.remaining();
234     return write(dos, header, param, cellBlock, totalSize);
235   }
236 
237   private static int write(final OutputStream dos, final Message header, final Message param,
238     final ByteBuffer cellBlock, final int totalSize)
239   throws IOException {
240     // I confirmed toBytes does same as DataOutputStream#writeInt.
241     dos.write(Bytes.toBytes(totalSize));
242     // This allocates a buffer that is the size of the message internally.
243     header.writeDelimitedTo(dos);
244     if (param != null) param.writeDelimitedTo(dos);
245     if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
246     dos.flush();
247     return totalSize;
248   }
249 
250   /**
251    * Read in chunks of 8K (HBASE-7239)
252    * @param in
253    * @param dest
254    * @param offset
255    * @param len
256    * @throws IOException
257    */
258   public static void readChunked(final DataInput in, byte[] dest, int offset, int len)
259       throws IOException {
260     int maxRead = 8192;
261 
262     for (; offset < len; offset += maxRead) {
263       in.readFully(dest, offset, Math.min(len - offset, maxRead));
264     }
265   }
266 
267   /**
268    * @return Size on the wire when the two messages are written with writeDelimitedTo
269    */
270   public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
271     int totalSize = 0;
272     for (Message m: messages) {
273       if (m == null) continue;
274       totalSize += m.getSerializedSize();
275       totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
276     }
277     Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
278     return totalSize;
279   }
280 }