1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.apache.hadoop.hbase.client;
17
18 import org.apache.commons.logging.Log;
19 import org.apache.commons.logging.LogFactory;
20 import org.apache.hadoop.conf.Configuration;
21 import org.apache.hadoop.hbase.TableName;
22 import org.apache.hadoop.hbase.classification.InterfaceAudience;
23 import org.apache.hadoop.hbase.classification.InterfaceStability;
24 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
25
26 import java.io.IOException;
27 import java.io.InterruptedIOException;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.TimeUnit;
32
33
34
35
36
37
38
39
40
41
42
43
44 @InterfaceAudience.Private
45 @InterfaceStability.Evolving
46 public class BufferedMutatorImpl implements BufferedMutator {
47
48 private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
49
50 private final ExceptionListener listener;
51
52 protected ClusterConnection connection;
53 private final TableName tableName;
54 private volatile Configuration conf;
55 private List<Row> writeAsyncBuffer = new LinkedList<>();
56 private long writeBufferSize;
57 private final int maxKeyValueSize;
58 protected long currentWriteBufferSize = 0;
59 private boolean closed = false;
60 private final ExecutorService pool;
61 protected AsyncProcess ap;
62
63 BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
64 RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
65 if (conn == null || conn.isClosed()) {
66 throw new IllegalArgumentException("Connection is null or closed.");
67 }
68
69 this.tableName = params.getTableName();
70 this.connection = conn;
71 this.conf = connection.getConfiguration();
72 this.pool = params.getPool();
73 this.listener = params.getListener();
74
75 ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
76 this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
77 params.getWriteBufferSize() : tableConf.getWriteBufferSize();
78 this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
79 params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
80
81
82 ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory);
83 }
84
85 @Override
86 public TableName getName() {
87 return tableName;
88 }
89
90 @Override
91 public Configuration getConfiguration() {
92 return conf;
93 }
94
95 @Override
96 public synchronized void mutate(Mutation m) throws InterruptedIOException,
97 RetriesExhaustedWithDetailsException {
98 doMutate(m);
99 }
100
101 @Override
102 public synchronized void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
103 RetriesExhaustedWithDetailsException {
104 for (Mutation m : ms) {
105 doMutate(m);
106 }
107 }
108
109
110
111
112
113
114
115
116 private void doMutate(Mutation m) throws InterruptedIOException,
117 RetriesExhaustedWithDetailsException {
118 if (closed) {
119 throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
120 }
121 if (!(m instanceof Put) && !(m instanceof Delete)) {
122 throw new IllegalArgumentException("Pass a Delete or a Put");
123 }
124
125
126
127
128 if (ap.hasError()) {
129 writeAsyncBuffer.add(m);
130 backgroundFlushCommits(true);
131 }
132
133 if (m instanceof Put) {
134 validatePut((Put) m);
135 }
136
137 currentWriteBufferSize += m.heapSize();
138 writeAsyncBuffer.add(m);
139
140 while (currentWriteBufferSize > writeBufferSize) {
141 backgroundFlushCommits(false);
142 }
143 }
144
145
146 public void validatePut(final Put put) throws IllegalArgumentException {
147 HTable.validatePut(put, maxKeyValueSize);
148 }
149
150 @Override
151 public synchronized void close() throws IOException {
152 if (this.closed) {
153 return;
154 }
155 try {
156
157
158 backgroundFlushCommits(true);
159 this.pool.shutdown();
160 boolean terminated = false;
161 int loopCnt = 0;
162 do {
163
164 terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
165 loopCnt += 1;
166 if (loopCnt >= 10) {
167 LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
168 break;
169 }
170 } while (!terminated);
171 } catch (InterruptedException e) {
172 LOG.warn("waitForTermination interrupted");
173 } finally {
174 this.closed = true;
175 }
176 }
177
178 @Override
179 public synchronized void flush() throws InterruptedIOException,
180 RetriesExhaustedWithDetailsException {
181
182
183 backgroundFlushCommits(true);
184 }
185
186
187
188
189
190
191
192
193
194 private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException,
195 RetriesExhaustedWithDetailsException {
196 try {
197 if (!synchronous) {
198 ap.submit(tableName, writeAsyncBuffer, true, null, false);
199 if (ap.hasError()) {
200 LOG.debug(tableName + ": One or more of the operations have failed -"
201 + " waiting for all operation in progress to finish (successfully or not)");
202 }
203 }
204 if (synchronous || ap.hasError()) {
205 while (!writeAsyncBuffer.isEmpty()) {
206 ap.submit(tableName, writeAsyncBuffer, true, null, false);
207 }
208 RetriesExhaustedWithDetailsException error =
209 ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
210 if (error != null) {
211 if (listener == null) {
212 throw error;
213 } else {
214 this.listener.onException(error, this);
215 }
216 }
217 }
218 } finally {
219 currentWriteBufferSize = 0;
220 for (Row mut : writeAsyncBuffer) {
221 if (mut instanceof Mutation) {
222 currentWriteBufferSize += ((Mutation) mut).heapSize();
223 }
224 }
225 }
226 }
227
228
229
230
231
232
233 @Deprecated
234 public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException,
235 InterruptedIOException {
236 this.writeBufferSize = writeBufferSize;
237 if (currentWriteBufferSize > writeBufferSize) {
238 flush();
239 }
240 }
241
242
243
244
245 @Override
246 public long getWriteBufferSize() {
247 return this.writeBufferSize;
248 }
249
250
251
252
253
254
255 @Deprecated
256 public List<Row> getWriteBuffer() {
257 return this.writeAsyncBuffer;
258 }
259 }