1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import java.io.ByteArrayInputStream;
21 import java.io.DataInput;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.nio.BufferOverflowException;
26 import java.nio.ByteBuffer;
27
28 import org.apache.commons.io.IOUtils;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configurable;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.CellScanner;
35 import org.apache.hadoop.hbase.DoNotRetryIOException;
36 import org.apache.hadoop.hbase.HBaseIOException;
37 import org.apache.hadoop.hbase.codec.Codec;
38 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
39 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
40 import org.apache.hadoop.hbase.io.HeapSize;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.util.ClassSize;
43 import org.apache.hadoop.io.compress.CodecPool;
44 import org.apache.hadoop.io.compress.CompressionCodec;
45 import org.apache.hadoop.io.compress.CompressionInputStream;
46 import org.apache.hadoop.io.compress.Compressor;
47 import org.apache.hadoop.io.compress.Decompressor;
48
49 import com.google.common.base.Preconditions;
50 import com.google.protobuf.CodedOutputStream;
51 import com.google.protobuf.Message;
52
53
54
55
56 @InterfaceAudience.Private
57 public class IPCUtil {
58 public static final Log LOG = LogFactory.getLog(IPCUtil.class);
59
60
61
62 private final int cellBlockDecompressionMultiplier;
63 private final int cellBlockBuildingInitialBufferSize;
64 private final Configuration conf;
65
66 public IPCUtil(final Configuration conf) {
67 super();
68 this.conf = conf;
69 this.cellBlockDecompressionMultiplier =
70 conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
71
72
73
74 this.cellBlockBuildingInitialBufferSize =
75 ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
76 }
77
78
79
80
81 public static class CellScannerButNoCodecException extends HBaseIOException {};
82
83
84
85
86
87
88
89
90
91
92
93
94 @SuppressWarnings("resource")
95 public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
96 final CellScanner cellScanner)
97 throws IOException {
98 return buildCellBlock(codec, compressor, cellScanner, null);
99 }
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 @SuppressWarnings("resource")
117 public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
118 final CellScanner cellScanner, final BoundedByteBufferPool pool)
119 throws IOException {
120 if (cellScanner == null) return null;
121 if (codec == null) throw new CellScannerButNoCodecException();
122 int bufferSize = this.cellBlockBuildingInitialBufferSize;
123 ByteBufferOutputStream baos = null;
124 if (pool != null) {
125 ByteBuffer bb = pool.getBuffer();
126 bufferSize = bb.capacity();
127 baos = new ByteBufferOutputStream(bb);
128 } else {
129
130 if (cellScanner instanceof HeapSize) {
131 long longSize = ((HeapSize)cellScanner).heapSize();
132
133 if (longSize > Integer.MAX_VALUE) {
134 throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
135 }
136 bufferSize = ClassSize.align((int)longSize);
137 }
138 baos = new ByteBufferOutputStream(bufferSize);
139 }
140 OutputStream os = baos;
141 Compressor poolCompressor = null;
142 try {
143 if (compressor != null) {
144 if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
145 poolCompressor = CodecPool.getCompressor(compressor);
146 os = compressor.createOutputStream(os, poolCompressor);
147 }
148 Codec.Encoder encoder = codec.getEncoder(os);
149 int count = 0;
150 while (cellScanner.advance()) {
151 encoder.write(cellScanner.current());
152 count++;
153 }
154 encoder.flush();
155
156
157 if (count == 0) return null;
158 } catch (BufferOverflowException e) {
159 throw new DoNotRetryIOException(e);
160 } finally {
161 os.close();
162 if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
163 }
164 if (LOG.isTraceEnabled()) {
165 if (bufferSize < baos.size()) {
166 LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
167 "; up hbase.ipc.cellblock.building.initial.buffersize?");
168 }
169 }
170 return baos.getByteBuffer();
171 }
172
173
174
175
176
177
178
179 public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
180 final byte [] cellBlock)
181 throws IOException {
182 return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
183 }
184
185
186
187
188
189
190
191
192
193 public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
194 final byte [] cellBlock, final int offset, final int length)
195 throws IOException {
196
197
198 InputStream is = null;
199 if (compressor != null) {
200
201 if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
202 Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
203 CompressionInputStream cis =
204 compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length),
205 poolDecompressor);
206 ByteBufferOutputStream bbos = null;
207 try {
208
209
210 bbos = new ByteBufferOutputStream((length - offset) *
211 this.cellBlockDecompressionMultiplier);
212 IOUtils.copy(cis, bbos);
213 bbos.close();
214 ByteBuffer bb = bbos.getByteBuffer();
215 is = new ByteArrayInputStream(bb.array(), 0, bb.limit());
216 } finally {
217 if (is != null) is.close();
218 if (bbos != null) bbos.close();
219
220 CodecPool.returnDecompressor(poolDecompressor);
221 }
222 } else {
223 is = new ByteArrayInputStream(cellBlock, offset, length);
224 }
225 return codec.getDecoder(is);
226 }
227
228
229
230
231
232
233
234 public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
235 if (m == null) return null;
236 int serializedSize = m.getSerializedSize();
237 int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
238 byte [] buffer = new byte[serializedSize + vintSize];
239
240 CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
241
242 cos.writeMessageNoTag(m);
243 cos.flush();
244 cos.checkNoSpaceLeft();
245 return ByteBuffer.wrap(buffer);
246 }
247
248
249
250
251
252
253
254
255
256
257 public static int write(final OutputStream dos, final Message header, final Message param,
258 final ByteBuffer cellBlock)
259 throws IOException {
260
261
262
263 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
264 if (cellBlock != null) totalSize += cellBlock.remaining();
265 return write(dos, header, param, cellBlock, totalSize);
266 }
267
268 private static int write(final OutputStream dos, final Message header, final Message param,
269 final ByteBuffer cellBlock, final int totalSize)
270 throws IOException {
271
272 dos.write(Bytes.toBytes(totalSize));
273
274 header.writeDelimitedTo(dos);
275 if (param != null) param.writeDelimitedTo(dos);
276 if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
277 dos.flush();
278 return totalSize;
279 }
280
281
282
283
284
285
286
287
288
289 public static void readChunked(final DataInput in, byte[] dest, int offset, int len)
290 throws IOException {
291 int maxRead = 8192;
292
293 for (; offset < len; offset += maxRead) {
294 in.readFully(dest, offset, Math.min(len - offset, maxRead));
295 }
296 }
297
298
299
300
301 public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
302 int totalSize = 0;
303 for (Message m: messages) {
304 if (m == null) continue;
305 totalSize += m.getSerializedSize();
306 totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
307 }
308 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
309 return totalSize;
310 }
311 }