1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import java.io.ByteArrayInputStream;
21 import java.io.DataInput;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.nio.BufferOverflowException;
26 import java.nio.ByteBuffer;
27
28 import org.apache.commons.io.IOUtils;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configurable;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.CellScanner;
35 import org.apache.hadoop.hbase.DoNotRetryIOException;
36 import org.apache.hadoop.hbase.HBaseIOException;
37 import org.apache.hadoop.hbase.codec.Codec;
38 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
39 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
40 import org.apache.hadoop.hbase.io.HeapSize;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.util.ClassSize;
43 import org.apache.hadoop.io.compress.CodecPool;
44 import org.apache.hadoop.io.compress.CompressionCodec;
45 import org.apache.hadoop.io.compress.CompressionInputStream;
46 import org.apache.hadoop.io.compress.Compressor;
47 import org.apache.hadoop.io.compress.Decompressor;
48
49 import com.google.common.base.Preconditions;
50 import com.google.protobuf.CodedOutputStream;
51 import com.google.protobuf.Message;
52
53
54
55
56 @InterfaceAudience.Private
57 public class IPCUtil {
58
59 public static final Log LOG = LogFactory.getLog(IPCUtil.class);
60
61
62
63 private final int cellBlockDecompressionMultiplier;
64 private final int cellBlockBuildingInitialBufferSize;
65 private final Configuration conf;
66
67 public IPCUtil(final Configuration conf) {
68 super();
69 this.conf = conf;
70 this.cellBlockDecompressionMultiplier =
71 conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
72
73
74
75 this.cellBlockBuildingInitialBufferSize =
76 ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
77 }
78
79
80
81
82 public static class CellScannerButNoCodecException extends HBaseIOException {};
83
84
85
86
87
88
89
90
91
92
93
94
95 @SuppressWarnings("resource")
96 public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
97 final CellScanner cellScanner)
98 throws IOException {
99 return buildCellBlock(codec, compressor, cellScanner, null);
100 }
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117 @SuppressWarnings("resource")
118 public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
119 final CellScanner cellScanner, final BoundedByteBufferPool pool)
120 throws IOException {
121 if (cellScanner == null) return null;
122 if (codec == null) throw new CellScannerButNoCodecException();
123 int bufferSize = this.cellBlockBuildingInitialBufferSize;
124 ByteBufferOutputStream baos = null;
125 ByteBuffer bb = null;
126 if (pool != null) {
127 bb = pool.getBuffer();
128 bufferSize = bb.capacity();
129 baos = new ByteBufferOutputStream(bb);
130 } else {
131
132 if (cellScanner instanceof HeapSize) {
133 long longSize = ((HeapSize)cellScanner).heapSize();
134
135 if (longSize > Integer.MAX_VALUE) {
136 throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
137 }
138 bufferSize = ClassSize.align((int)longSize);
139 }
140 baos = new ByteBufferOutputStream(bufferSize);
141 }
142 OutputStream os = baos;
143 Compressor poolCompressor = null;
144 try {
145 if (compressor != null) {
146 if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
147 poolCompressor = CodecPool.getCompressor(compressor);
148 os = compressor.createOutputStream(os, poolCompressor);
149 }
150 Codec.Encoder encoder = codec.getEncoder(os);
151 int count = 0;
152 while (cellScanner.advance()) {
153 encoder.write(cellScanner.current());
154 count++;
155 }
156 encoder.flush();
157
158
159 if (count == 0) {
160 if (pool != null && bb != null) {
161 pool.putBuffer(bb);
162 }
163 return null;
164 }
165 } catch (BufferOverflowException e) {
166 throw new DoNotRetryIOException(e);
167 } finally {
168 os.close();
169 if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
170 }
171 if (LOG.isTraceEnabled()) {
172 if (bufferSize < baos.size()) {
173 LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
174 "; up hbase.ipc.cellblock.building.initial.buffersize?");
175 }
176 }
177 return baos.getByteBuffer();
178 }
179
180
181
182
183
184
185
186 public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
187 final byte [] cellBlock)
188 throws IOException {
189 return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
190 }
191
192
193
194
195
196
197
198
199
200 public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
201 final byte [] cellBlock, final int offset, final int length)
202 throws IOException {
203
204
205 InputStream is = null;
206 if (compressor != null) {
207
208 if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
209 Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
210 CompressionInputStream cis =
211 compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length),
212 poolDecompressor);
213 ByteBufferOutputStream bbos = null;
214 try {
215
216
217 bbos = new ByteBufferOutputStream((length - offset) *
218 this.cellBlockDecompressionMultiplier);
219 IOUtils.copy(cis, bbos);
220 bbos.close();
221 ByteBuffer bb = bbos.getByteBuffer();
222 is = new ByteArrayInputStream(bb.array(), 0, bb.limit());
223 } finally {
224 if (is != null) is.close();
225 if (bbos != null) bbos.close();
226
227 CodecPool.returnDecompressor(poolDecompressor);
228 }
229 } else {
230 is = new ByteArrayInputStream(cellBlock, offset, length);
231 }
232 return codec.getDecoder(is);
233 }
234
235
236
237
238
239
240
241 public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
242 if (m == null) return null;
243 int serializedSize = m.getSerializedSize();
244 int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
245 byte [] buffer = new byte[serializedSize + vintSize];
246
247 CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
248
249 cos.writeMessageNoTag(m);
250 cos.flush();
251 cos.checkNoSpaceLeft();
252 return ByteBuffer.wrap(buffer);
253 }
254
255
256
257
258
259
260
261
262
263
264 public static int write(final OutputStream dos, final Message header, final Message param,
265 final ByteBuffer cellBlock)
266 throws IOException {
267
268
269
270 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
271 if (cellBlock != null) totalSize += cellBlock.remaining();
272 return write(dos, header, param, cellBlock, totalSize);
273 }
274
275 private static int write(final OutputStream dos, final Message header, final Message param,
276 final ByteBuffer cellBlock, final int totalSize)
277 throws IOException {
278
279 dos.write(Bytes.toBytes(totalSize));
280
281 header.writeDelimitedTo(dos);
282 if (param != null) param.writeDelimitedTo(dos);
283 if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
284 dos.flush();
285 return totalSize;
286 }
287
288
289
290
291
292
293
294
295
296 public static void readChunked(final DataInput in, byte[] dest, int offset, int len)
297 throws IOException {
298 int maxRead = 8192;
299
300 for (; offset < len; offset += maxRead) {
301 in.readFully(dest, offset, Math.min(len - offset, maxRead));
302 }
303 }
304
305
306
307
308 public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
309 int totalSize = 0;
310 for (Message m: messages) {
311 if (m == null) continue;
312 totalSize += m.getSerializedSize();
313 totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
314 }
315 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
316 return totalSize;
317 }
318
319 static IOException toIOE(Throwable t) {
320 if (t instanceof IOException) {
321 return (IOException) t;
322 } else {
323 return new IOException(t);
324 }
325 }
326 }