1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.apache.hadoop.hbase.client;
17
18 import com.google.common.annotations.VisibleForTesting;
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.hbase.TableName;
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.hbase.classification.InterfaceStability;
25 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
26
27 import java.io.IOException;
28 import java.io.InterruptedIOException;
29 import java.util.Arrays;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicLong;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 @InterfaceAudience.Private
55 @InterfaceStability.Evolving
56 public class BufferedMutatorImpl implements BufferedMutator {
57
58 private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
59
60 private final ExceptionListener listener;
61
62 protected ClusterConnection connection;
63 private final TableName tableName;
64 private volatile Configuration conf;
65 @VisibleForTesting
66 final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<Mutation>();
67 @VisibleForTesting
68 AtomicLong currentWriteBufferSize = new AtomicLong(0);
69
70 private long writeBufferSize;
71 private final int maxKeyValueSize;
72 private boolean closed = false;
73 private final ExecutorService pool;
74
75 @VisibleForTesting
76 protected AsyncProcess ap;
77
78 BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
79 RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
80 if (conn == null || conn.isClosed()) {
81 throw new IllegalArgumentException("Connection is null or closed.");
82 }
83
84 this.tableName = params.getTableName();
85 this.connection = conn;
86 this.conf = connection.getConfiguration();
87 this.pool = params.getPool();
88 this.listener = params.getListener();
89
90 ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
91 this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
92 params.getWriteBufferSize() : tableConf.getWriteBufferSize();
93 this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
94 params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
95
96
97 ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory);
98 }
99
100 @Override
101 public TableName getName() {
102 return tableName;
103 }
104
105 @Override
106 public Configuration getConfiguration() {
107 return conf;
108 }
109
110 @Override
111 public void mutate(Mutation m) throws InterruptedIOException,
112 RetriesExhaustedWithDetailsException {
113 mutate(Arrays.asList(m));
114 }
115
116 @Override
117 public void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
118 RetriesExhaustedWithDetailsException {
119
120 if (closed) {
121 throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
122 }
123
124 long toAddSize = 0;
125 for (Mutation m : ms) {
126 if (m instanceof Put) {
127 validatePut((Put) m);
128 }
129 toAddSize += m.heapSize();
130 }
131
132
133
134
135 if (ap.hasError()) {
136 currentWriteBufferSize.addAndGet(toAddSize);
137 writeAsyncBuffer.addAll(ms);
138 backgroundFlushCommits(true);
139 } else {
140 currentWriteBufferSize.addAndGet(toAddSize);
141 writeAsyncBuffer.addAll(ms);
142 }
143
144
145 while (currentWriteBufferSize.get() > writeBufferSize) {
146 backgroundFlushCommits(false);
147 }
148 }
149
150
151 public void validatePut(final Put put) throws IllegalArgumentException {
152 HTable.validatePut(put, maxKeyValueSize);
153 }
154
155 @Override
156 public synchronized void close() throws IOException {
157 try {
158 if (this.closed) {
159 return;
160 }
161
162
163 backgroundFlushCommits(true);
164 this.pool.shutdown();
165 boolean terminated;
166 int loopCnt = 0;
167 do {
168
169 terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
170 loopCnt += 1;
171 if (loopCnt >= 10) {
172 LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
173 break;
174 }
175 } while (!terminated);
176
177 } catch (InterruptedException e) {
178 LOG.warn("waitForTermination interrupted");
179
180 } finally {
181 this.closed = true;
182 }
183 }
184
185 @Override
186 public synchronized void flush() throws InterruptedIOException,
187 RetriesExhaustedWithDetailsException {
188
189
190 backgroundFlushCommits(true);
191 }
192
193
194
195
196
197
198
199
200
201 private void backgroundFlushCommits(boolean synchronous) throws
202 InterruptedIOException,
203 RetriesExhaustedWithDetailsException {
204
205 LinkedList<Mutation> buffer = new LinkedList<>();
206
207 long dequeuedSize = 0;
208
209 try {
210
211 Mutation m;
212
213
214
215
216 while (
217 (writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2) || synchronous)
218 && (m = writeAsyncBuffer.poll()) != null) {
219 buffer.add(m);
220 long size = m.heapSize();
221 dequeuedSize += size;
222 currentWriteBufferSize.addAndGet(-size);
223 }
224
225 if (!synchronous && dequeuedSize == 0) {
226 return;
227 }
228
229 if (!synchronous) {
230 ap.submit(tableName, buffer, true, null, false);
231 if (ap.hasError()) {
232 LOG.debug(tableName + ": One or more of the operations have failed -"
233 + " waiting for all operation in progress to finish (successfully or not)");
234 }
235 }
236 if (synchronous || ap.hasError()) {
237 while (!buffer.isEmpty()) {
238 ap.submit(tableName, buffer, true, null, false);
239 }
240 RetriesExhaustedWithDetailsException error =
241 ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
242 if (error != null) {
243 if (listener == null) {
244 throw error;
245 } else {
246 this.listener.onException(error, this);
247 }
248 }
249 }
250 } finally {
251 for (Mutation mut : buffer) {
252 long size = mut.heapSize();
253 currentWriteBufferSize.addAndGet(size);
254 dequeuedSize -= size;
255 writeAsyncBuffer.add(mut);
256 }
257 }
258 }
259
260
261
262
263
264
265 @Deprecated
266 public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException,
267 InterruptedIOException {
268 this.writeBufferSize = writeBufferSize;
269 if (currentWriteBufferSize.get() > writeBufferSize) {
270 flush();
271 }
272 }
273
274
275
276
277 @Override
278 public long getWriteBufferSize() {
279 return this.writeBufferSize;
280 }
281
282
283
284
285
286
287 @Deprecated
288 public List<Row> getWriteBuffer() {
289 return Arrays.asList(writeAsyncBuffer.toArray(new Row[0]));
290 }
291 }