1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import com.google.protobuf.Service;
22 import com.google.protobuf.ServiceException;
23 import org.apache.hadoop.classification.InterfaceAudience;
24 import org.apache.hadoop.classification.InterfaceStability;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.HBaseConfiguration;
27 import org.apache.hadoop.hbase.HTableDescriptor;
28 import org.apache.hadoop.hbase.client.coprocessor.Batch;
29 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
30 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
31 import org.apache.hadoop.hbase.util.Bytes;
32 import org.apache.hadoop.hbase.util.PoolMap;
33 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
34
35 import java.io.Closeable;
36 import java.io.IOException;
37 import java.util.Collection;
38 import java.util.List;
39 import java.util.Map;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 @InterfaceAudience.Public
64 @InterfaceStability.Stable
65 public class HTablePool implements Closeable {
66 private final PoolMap<String, HTableInterface> tables;
67 private final int maxSize;
68 private final PoolType poolType;
69 private final Configuration config;
70 private final HTableInterfaceFactory tableFactory;
71
72
73
74
75 public HTablePool() {
76 this(HBaseConfiguration.create(), Integer.MAX_VALUE);
77 }
78
79
80
81
82
83
84
85
86
87 public HTablePool(final Configuration config, final int maxSize) {
88 this(config, maxSize, null, null);
89 }
90
91
92
93
94
95
96
97
98
99
100
101
102 public HTablePool(final Configuration config, final int maxSize,
103 final HTableInterfaceFactory tableFactory) {
104 this(config, maxSize, tableFactory, PoolType.Reusable);
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119 public HTablePool(final Configuration config, final int maxSize,
120 final PoolType poolType) {
121 this(config, maxSize, null, poolType);
122 }
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141 public HTablePool(final Configuration config, final int maxSize,
142 final HTableInterfaceFactory tableFactory, PoolType poolType) {
143
144
145 this.config = config == null ? HBaseConfiguration.create() : config;
146 this.maxSize = maxSize;
147 this.tableFactory = tableFactory == null ? new HTableFactory()
148 : tableFactory;
149 if (poolType == null) {
150 this.poolType = PoolType.Reusable;
151 } else {
152 switch (poolType) {
153 case Reusable:
154 case ThreadLocal:
155 this.poolType = poolType;
156 break;
157 default:
158 this.poolType = PoolType.Reusable;
159 break;
160 }
161 }
162 this.tables = new PoolMap<String, HTableInterface>(this.poolType,
163 this.maxSize);
164 }
165
166
167
168
169
170
171
172
173
174
175
176
177 public HTableInterface getTable(String tableName) {
178
179 HTableInterface table = findOrCreateTable(tableName);
180
181
182 return new PooledHTable(table);
183 }
184
185
186
187
188
189
190
191
192
193
194
195
196
197 private HTableInterface findOrCreateTable(String tableName) {
198 HTableInterface table = tables.get(tableName);
199 if (table == null) {
200 table = createHTable(tableName);
201 }
202 return table;
203 }
204
205
206
207
208
209
210
211
212
213
214
215
216
217 public HTableInterface getTable(byte[] tableName) {
218 return getTable(Bytes.toString(tableName));
219 }
220
221
222
223
224
225
226
227
228
229 public void putTable(HTableInterface table) throws IOException {
230
231
232
233
234
235
236 if (table instanceof PooledHTable) {
237 returnTable(((PooledHTable) table).getWrappedTable());
238 } else {
239
240
241
242
243 throw new IllegalArgumentException("not a pooled table: " + table);
244 }
245 }
246
247
248
249
250
251
252
253
254
255
256
257 private void returnTable(HTableInterface table) throws IOException {
258
259 String tableName = Bytes.toString(table.getTableName());
260 if (tables.size(tableName) >= maxSize) {
261
262 this.tables.remove(tableName, table);
263 this.tableFactory.releaseHTableInterface(table);
264 return;
265 }
266 tables.put(tableName, table);
267 }
268
269 protected HTableInterface createHTable(String tableName) {
270 return this.tableFactory.createHTableInterface(config,
271 Bytes.toBytes(tableName));
272 }
273
274
275
276
277
278
279
280
281
282
283
284 public void closeTablePool(final String tableName) throws IOException {
285 Collection<HTableInterface> tables = this.tables.values(tableName);
286 if (tables != null) {
287 for (HTableInterface table : tables) {
288 this.tableFactory.releaseHTableInterface(table);
289 }
290 }
291 this.tables.remove(tableName);
292 }
293
294
295
296
297
298
299 public void closeTablePool(final byte[] tableName) throws IOException {
300 closeTablePool(Bytes.toString(tableName));
301 }
302
303
304
305
306
307
308
309 public void close() throws IOException {
310 for (String tableName : tables.keySet()) {
311 closeTablePool(tableName);
312 }
313 this.tables.clear();
314 }
315
316 public int getCurrentPoolSize(String tableName) {
317 return tables.size(tableName);
318 }
319
320
321
322
323
324
325 class PooledHTable implements HTableInterface {
326
327 private boolean open = false;
328
329 private HTableInterface table;
330
331 public PooledHTable(HTableInterface table) {
332 this.table = table;
333 this.open = true;
334 }
335
336 @Override
337 public byte[] getTableName() {
338 checkState();
339 return table.getTableName();
340 }
341
342 @Override
343 public Configuration getConfiguration() {
344 checkState();
345 return table.getConfiguration();
346 }
347
348 @Override
349 public HTableDescriptor getTableDescriptor() throws IOException {
350 checkState();
351 return table.getTableDescriptor();
352 }
353
354 @Override
355 public boolean exists(Get get) throws IOException {
356 checkState();
357 return table.exists(get);
358 }
359
360 @Override
361 public Boolean[] exists(List<Get> gets) throws IOException {
362 checkState();
363 return table.exists(gets);
364 }
365
366 @Override
367 public void batch(List<? extends Row> actions, Object[] results) throws IOException,
368 InterruptedException {
369 checkState();
370 table.batch(actions, results);
371 }
372
373 @Override
374 public Object[] batch(List<? extends Row> actions) throws IOException,
375 InterruptedException {
376 checkState();
377 return table.batch(actions);
378 }
379
380 @Override
381 public Result get(Get get) throws IOException {
382 checkState();
383 return table.get(get);
384 }
385
386 @Override
387 public Result[] get(List<Get> gets) throws IOException {
388 checkState();
389 return table.get(gets);
390 }
391
392 @Override
393 @SuppressWarnings("deprecation")
394 public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
395 checkState();
396 return table.getRowOrBefore(row, family);
397 }
398
399 @Override
400 public ResultScanner getScanner(Scan scan) throws IOException {
401 checkState();
402 return table.getScanner(scan);
403 }
404
405 @Override
406 public ResultScanner getScanner(byte[] family) throws IOException {
407 checkState();
408 return table.getScanner(family);
409 }
410
411 @Override
412 public ResultScanner getScanner(byte[] family, byte[] qualifier)
413 throws IOException {
414 checkState();
415 return table.getScanner(family, qualifier);
416 }
417
418 @Override
419 public void put(Put put) throws IOException {
420 checkState();
421 table.put(put);
422 }
423
424 @Override
425 public void put(List<Put> puts) throws IOException {
426 checkState();
427 table.put(puts);
428 }
429
430 @Override
431 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
432 byte[] value, Put put) throws IOException {
433 checkState();
434 return table.checkAndPut(row, family, qualifier, value, put);
435 }
436
437 @Override
438 public void delete(Delete delete) throws IOException {
439 checkState();
440 table.delete(delete);
441 }
442
443 @Override
444 public void delete(List<Delete> deletes) throws IOException {
445 checkState();
446 table.delete(deletes);
447 }
448
449 @Override
450 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
451 byte[] value, Delete delete) throws IOException {
452 checkState();
453 return table.checkAndDelete(row, family, qualifier, value, delete);
454 }
455
456 @Override
457 public Result increment(Increment increment) throws IOException {
458 checkState();
459 return table.increment(increment);
460 }
461
462 @Override
463 public long incrementColumnValue(byte[] row, byte[] family,
464 byte[] qualifier, long amount) throws IOException {
465 checkState();
466 return table.incrementColumnValue(row, family, qualifier, amount);
467 }
468
469 @Override
470 public long incrementColumnValue(byte[] row, byte[] family,
471 byte[] qualifier, long amount, Durability durability) throws IOException {
472 checkState();
473 return table.incrementColumnValue(row, family, qualifier, amount,
474 durability);
475 }
476
477 @Override
478 public boolean isAutoFlush() {
479 checkState();
480 return table.isAutoFlush();
481 }
482
483 @Override
484 public void flushCommits() throws IOException {
485 checkState();
486 table.flushCommits();
487 }
488
489
490
491
492
493
494 public void close() throws IOException {
495 checkState();
496 open = false;
497 returnTable(table);
498 }
499
500 @Override
501 public CoprocessorRpcChannel coprocessorService(byte[] row) {
502 checkState();
503 return table.coprocessorService(row);
504 }
505
506 @Override
507 public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
508 byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
509 throws ServiceException, Throwable {
510 checkState();
511 return table.coprocessorService(service, startKey, endKey, callable);
512 }
513
514 @Override
515 public <T extends Service, R> void coprocessorService(Class<T> service,
516 byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
517 throws ServiceException, Throwable {
518 checkState();
519 table.coprocessorService(service, startKey, endKey, callable, callback);
520 }
521
522 @Override
523 public String toString() {
524 return "PooledHTable{" + ", table=" + table + '}';
525 }
526
527
528
529
530
531
532 HTableInterface getWrappedTable() {
533 return table;
534 }
535
536 @Override
537 public <R> void batchCallback(List<? extends Row> actions,
538 Object[] results, Callback<R> callback) throws IOException,
539 InterruptedException {
540 checkState();
541 table.batchCallback(actions, results, callback);
542 }
543
544 @Override
545 public <R> Object[] batchCallback(List<? extends Row> actions,
546 Callback<R> callback) throws IOException, InterruptedException {
547 checkState();
548 return table.batchCallback(actions, callback);
549 }
550
551 @Override
552 public void mutateRow(RowMutations rm) throws IOException {
553 checkState();
554 table.mutateRow(rm);
555 }
556
557 @Override
558 public Result append(Append append) throws IOException {
559 checkState();
560 return table.append(append);
561 }
562
563 @Override
564 public void setAutoFlush(boolean autoFlush) {
565 checkState();
566 table.setAutoFlush(autoFlush);
567 }
568
569 @Override
570 public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
571 checkState();
572 table.setAutoFlush(autoFlush, clearBufferOnFail);
573 }
574
575 @Override
576 public long getWriteBufferSize() {
577 checkState();
578 return table.getWriteBufferSize();
579 }
580
581 @Override
582 public void setWriteBufferSize(long writeBufferSize) throws IOException {
583 checkState();
584 table.setWriteBufferSize(writeBufferSize);
585 }
586
587 boolean isOpen() {
588 return open;
589 }
590
591 private void checkState() {
592 if (!isOpen()) {
593 throw new IllegalStateException("Table=" + new String(table.getTableName()) + " already closed");
594 }
595 }
596 }
597 }