1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io;
21
22 import java.io.IOException;
23 import java.io.OutputStream;
24 import java.nio.BufferOverflowException;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.Channels;
27 import java.nio.channels.WritableByteChannel;
28
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.classification.InterfaceStability;
31 import org.apache.hadoop.hbase.util.Bytes;
32
33
34
35
36 @InterfaceAudience.Public
37 @InterfaceStability.Evolving
38 public class ByteBufferOutputStream extends OutputStream {
39
40
41
42 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
43
44 protected ByteBuffer buf;
45
46 public ByteBufferOutputStream(int capacity) {
47 this(capacity, false);
48 }
49
50 public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
51 this(allocate(capacity, useDirectByteBuffer));
52 }
53
54
55
56
57
58
59
60
61
62
63
64 public ByteBufferOutputStream(final ByteBuffer bb) {
65 this.buf = bb;
66 this.buf.clear();
67 }
68
69 public int size() {
70 return buf.position();
71 }
72
73 private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
74 if (capacity > MAX_ARRAY_SIZE) {
75 throw new BufferOverflowException();
76 }
77 return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity);
78 }
79
80
81
82
83
84 public ByteBuffer getByteBuffer() {
85 buf.flip();
86 return buf;
87 }
88
89 private void checkSizeAndGrow(int extra) {
90 long capacityNeeded = buf.position() + (long) extra;
91 if (capacityNeeded > buf.limit()) {
92
93 if (capacityNeeded > MAX_ARRAY_SIZE) {
94 throw new BufferOverflowException();
95 }
96
97 long nextCapacity = Math.min(buf.capacity() * 2L, MAX_ARRAY_SIZE);
98
99 nextCapacity = Math.max(nextCapacity, capacityNeeded);
100 ByteBuffer newBuf = allocate((int) nextCapacity, buf.isDirect());
101 buf.flip();
102 newBuf.put(buf);
103 buf = newBuf;
104 }
105 }
106
107
108 @Override
109 public void write(int b) throws IOException {
110 checkSizeAndGrow(Bytes.SIZEOF_BYTE);
111
112 buf.put((byte)b);
113 }
114
115
116
117
118
119
120
121
122 public synchronized void writeTo(OutputStream out) throws IOException {
123 WritableByteChannel channel = Channels.newChannel(out);
124 ByteBuffer bb = buf.duplicate();
125 bb.flip();
126 channel.write(bb);
127 }
128
129 @Override
130 public void write(byte[] b) throws IOException {
131 checkSizeAndGrow(b.length);
132
133 buf.put(b);
134 }
135
136 @Override
137 public void write(byte[] b, int off, int len) throws IOException {
138 checkSizeAndGrow(len);
139
140 buf.put(b, off, len);
141 }
142
143 @Override
144 public void flush() throws IOException {
145
146 }
147
148 @Override
149 public void close() throws IOException {
150
151 }
152
153 public byte[] toByteArray(int offset, int length) {
154 ByteBuffer bb = buf.duplicate();
155 bb.flip();
156
157 byte[] chunk = new byte[length];
158
159 bb.position(offset);
160 bb.get(chunk, 0, length);
161 return chunk;
162 }
163 }