1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HTableDescriptor;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.coprocessor.Batch;
33 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
34 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
35 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.PoolMap;
38 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
39
40 import com.google.protobuf.Descriptors;
41 import com.google.protobuf.Message;
42 import com.google.protobuf.Service;
43 import com.google.protobuf.ServiceException;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 @InterfaceAudience.Private
69 @Deprecated
70 public class HTablePool implements Closeable {
71 private final PoolMap<String, HTableInterface> tables;
72 private final int maxSize;
73 private final PoolType poolType;
74 private final Configuration config;
75 private final HTableInterfaceFactory tableFactory;
76
77
78
79
80 public HTablePool() {
81 this(HBaseConfiguration.create(), Integer.MAX_VALUE);
82 }
83
84
85
86
87
88
89
90
91
92 public HTablePool(final Configuration config, final int maxSize) {
93 this(config, maxSize, null, null);
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107 public HTablePool(final Configuration config, final int maxSize,
108 final HTableInterfaceFactory tableFactory) {
109 this(config, maxSize, tableFactory, PoolType.Reusable);
110 }
111
112
113
114
115
116
117
118
119
120
121
122
123
124 public HTablePool(final Configuration config, final int maxSize,
125 final PoolType poolType) {
126 this(config, maxSize, null, poolType);
127 }
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 public HTablePool(final Configuration config, final int maxSize,
147 final HTableInterfaceFactory tableFactory, PoolType poolType) {
148
149
150 this.config = config == null ? HBaseConfiguration.create() : config;
151 this.maxSize = maxSize;
152 this.tableFactory = tableFactory == null ? new HTableFactory()
153 : tableFactory;
154 if (poolType == null) {
155 this.poolType = PoolType.Reusable;
156 } else {
157 switch (poolType) {
158 case Reusable:
159 case ThreadLocal:
160 this.poolType = poolType;
161 break;
162 default:
163 this.poolType = PoolType.Reusable;
164 break;
165 }
166 }
167 this.tables = new PoolMap<String, HTableInterface>(this.poolType,
168 this.maxSize);
169 }
170
171
172
173
174
175
176
177
178
179
180
181
182 public HTableInterface getTable(String tableName) {
183
184 HTableInterface table = findOrCreateTable(tableName);
185
186
187 return new PooledHTable(table);
188 }
189
190
191
192
193
194
195
196
197
198
199
200
201
202 private HTableInterface findOrCreateTable(String tableName) {
203 HTableInterface table = tables.get(tableName);
204 if (table == null) {
205 table = createHTable(tableName);
206 }
207 return table;
208 }
209
210
211
212
213
214
215
216
217
218
219
220
221
222 public HTableInterface getTable(byte[] tableName) {
223 return getTable(Bytes.toString(tableName));
224 }
225
226
227
228
229
230
231
232
233
234 public void putTable(HTableInterface table) throws IOException {
235
236
237
238
239
240
241 if (table instanceof PooledHTable) {
242 returnTable(((PooledHTable) table).getWrappedTable());
243 } else {
244
245
246
247
248 throw new IllegalArgumentException("not a pooled table: " + table);
249 }
250 }
251
252
253
254
255
256
257
258
259
260
261
262 private void returnTable(HTableInterface table) throws IOException {
263
264 String tableName = Bytes.toString(table.getTableName());
265 if (tables.size(tableName) >= maxSize) {
266
267 this.tables.removeValue(tableName, table);
268 this.tableFactory.releaseHTableInterface(table);
269 return;
270 }
271 tables.put(tableName, table);
272 }
273
274 protected HTableInterface createHTable(String tableName) {
275 return this.tableFactory.createHTableInterface(config,
276 Bytes.toBytes(tableName));
277 }
278
279
280
281
282
283
284
285
286
287
288
289 public void closeTablePool(final String tableName) throws IOException {
290 Collection<HTableInterface> tables = this.tables.values(tableName);
291 if (tables != null) {
292 for (HTableInterface table : tables) {
293 this.tableFactory.releaseHTableInterface(table);
294 }
295 }
296 this.tables.remove(tableName);
297 }
298
299
300
301
302
303
304 public void closeTablePool(final byte[] tableName) throws IOException {
305 closeTablePool(Bytes.toString(tableName));
306 }
307
308
309
310
311
312
313
314 public void close() throws IOException {
315 for (String tableName : tables.keySet()) {
316 closeTablePool(tableName);
317 }
318 this.tables.clear();
319 }
320
321 public int getCurrentPoolSize(String tableName) {
322 return tables.size(tableName);
323 }
324
325
326
327
328
329
330 class PooledHTable implements HTableInterface {
331
332 private boolean open = false;
333
334 private HTableInterface table;
335
336 public PooledHTable(HTableInterface table) {
337 this.table = table;
338 this.open = true;
339 }
340
341 @Override
342 public byte[] getTableName() {
343 checkState();
344 return table.getTableName();
345 }
346
347 @Override
348 public TableName getName() {
349 return table.getName();
350 }
351
352 @Override
353 public Configuration getConfiguration() {
354 checkState();
355 return table.getConfiguration();
356 }
357
358 @Override
359 public HTableDescriptor getTableDescriptor() throws IOException {
360 checkState();
361 return table.getTableDescriptor();
362 }
363
364 @Override
365 public boolean exists(Get get) throws IOException {
366 checkState();
367 return table.exists(get);
368 }
369
370 @Override
371 public boolean[] existsAll(List<Get> gets) throws IOException {
372 checkState();
373 return table.existsAll(gets);
374 }
375
376 @Override
377 public Boolean[] exists(List<Get> gets) throws IOException {
378 checkState();
379 return table.exists(gets);
380 }
381
382 @Override
383 public void batch(List<? extends Row> actions, Object[] results) throws IOException,
384 InterruptedException {
385 checkState();
386 table.batch(actions, results);
387 }
388
389
390
391
392
393
394 @Override
395 public Object[] batch(List<? extends Row> actions) throws IOException,
396 InterruptedException {
397 checkState();
398 return table.batch(actions);
399 }
400
401 @Override
402 public Result get(Get get) throws IOException {
403 checkState();
404 return table.get(get);
405 }
406
407 @Override
408 public Result[] get(List<Get> gets) throws IOException {
409 checkState();
410 return table.get(gets);
411 }
412
413 @Override
414 @SuppressWarnings("deprecation")
415 @Deprecated
416 public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
417 checkState();
418 return table.getRowOrBefore(row, family);
419 }
420
421 @Override
422 public ResultScanner getScanner(Scan scan) throws IOException {
423 checkState();
424 return table.getScanner(scan);
425 }
426
427 @Override
428 public ResultScanner getScanner(byte[] family) throws IOException {
429 checkState();
430 return table.getScanner(family);
431 }
432
433 @Override
434 public ResultScanner getScanner(byte[] family, byte[] qualifier)
435 throws IOException {
436 checkState();
437 return table.getScanner(family, qualifier);
438 }
439
440 @Override
441 public void put(Put put) throws IOException {
442 checkState();
443 table.put(put);
444 }
445
446 @Override
447 public void put(List<Put> puts) throws IOException {
448 checkState();
449 table.put(puts);
450 }
451
452 @Override
453 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
454 byte[] value, Put put) throws IOException {
455 checkState();
456 return table.checkAndPut(row, family, qualifier, value, put);
457 }
458
459 @Override
460 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
461 CompareOp compareOp, byte[] value, Put put) throws IOException {
462 checkState();
463 return table.checkAndPut(row, family, qualifier, compareOp, value, put);
464 }
465
466 @Override
467 public void delete(Delete delete) throws IOException {
468 checkState();
469 table.delete(delete);
470 }
471
472 @Override
473 public void delete(List<Delete> deletes) throws IOException {
474 checkState();
475 table.delete(deletes);
476 }
477
478 @Override
479 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
480 byte[] value, Delete delete) throws IOException {
481 checkState();
482 return table.checkAndDelete(row, family, qualifier, value, delete);
483 }
484
485 @Override
486 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
487 CompareOp compareOp, byte[] value, Delete delete) throws IOException {
488 checkState();
489 return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
490 }
491
492 @Override
493 public Result increment(Increment increment) throws IOException {
494 checkState();
495 return table.increment(increment);
496 }
497
498 @Override
499 public long incrementColumnValue(byte[] row, byte[] family,
500 byte[] qualifier, long amount) throws IOException {
501 checkState();
502 return table.incrementColumnValue(row, family, qualifier, amount);
503 }
504
505 @Override
506 public long incrementColumnValue(byte[] row, byte[] family,
507 byte[] qualifier, long amount, Durability durability) throws IOException {
508 checkState();
509 return table.incrementColumnValue(row, family, qualifier, amount,
510 durability);
511 }
512
513 @Override
514 public boolean isAutoFlush() {
515 checkState();
516 return table.isAutoFlush();
517 }
518
519 @Override
520 public void flushCommits() throws IOException {
521 checkState();
522 table.flushCommits();
523 }
524
525
526
527
528
529
530 public void close() throws IOException {
531 checkState();
532 open = false;
533 returnTable(table);
534 }
535
536 @Override
537 public CoprocessorRpcChannel coprocessorService(byte[] row) {
538 checkState();
539 return table.coprocessorService(row);
540 }
541
542 @Override
543 public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
544 byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
545 throws ServiceException, Throwable {
546 checkState();
547 return table.coprocessorService(service, startKey, endKey, callable);
548 }
549
550 @Override
551 public <T extends Service, R> void coprocessorService(Class<T> service,
552 byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
553 throws ServiceException, Throwable {
554 checkState();
555 table.coprocessorService(service, startKey, endKey, callable, callback);
556 }
557
558 @Override
559 public String toString() {
560 return "PooledHTable{" + ", table=" + table + '}';
561 }
562
563
564
565
566
567
568 HTableInterface getWrappedTable() {
569 return table;
570 }
571
572 @Override
573 public <R> void batchCallback(List<? extends Row> actions,
574 Object[] results, Callback<R> callback) throws IOException,
575 InterruptedException {
576 checkState();
577 table.batchCallback(actions, results, callback);
578 }
579
580
581
582
583
584
585
586
587 @Override
588 public <R> Object[] batchCallback(List<? extends Row> actions,
589 Callback<R> callback) throws IOException, InterruptedException {
590 checkState();
591 return table.batchCallback(actions, callback);
592 }
593
594 @Override
595 public void mutateRow(RowMutations rm) throws IOException {
596 checkState();
597 table.mutateRow(rm);
598 }
599
600 @Override
601 public Result append(Append append) throws IOException {
602 checkState();
603 return table.append(append);
604 }
605
606 @Override
607 public void setAutoFlush(boolean autoFlush) {
608 checkState();
609 table.setAutoFlush(autoFlush, autoFlush);
610 }
611
612 @Override
613 public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
614 checkState();
615 table.setAutoFlush(autoFlush, clearBufferOnFail);
616 }
617
618 @Override
619 public void setAutoFlushTo(boolean autoFlush) {
620 table.setAutoFlushTo(autoFlush);
621 }
622
623 @Override
624 public long getWriteBufferSize() {
625 checkState();
626 return table.getWriteBufferSize();
627 }
628
629 @Override
630 public void setWriteBufferSize(long writeBufferSize) throws IOException {
631 checkState();
632 table.setWriteBufferSize(writeBufferSize);
633 }
634
635 boolean isOpen() {
636 return open;
637 }
638
639 private void checkState() {
640 if (!isOpen()) {
641 throw new IllegalStateException("Table=" + new String(table.getTableName()) + " already closed");
642 }
643 }
644
645 @Override
646 public long incrementColumnValue(byte[] row, byte[] family,
647 byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
648 return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
649 }
650
651 @Override
652 public <R extends Message> Map<byte[], R> batchCoprocessorService(
653 Descriptors.MethodDescriptor method, Message request,
654 byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
655 checkState();
656 return table.batchCoprocessorService(method, request, startKey, endKey,
657 responsePrototype);
658 }
659
660 @Override
661 public <R extends Message> void batchCoprocessorService(
662 Descriptors.MethodDescriptor method, Message request,
663 byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
664 throws ServiceException, Throwable {
665 checkState();
666 table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
667 }
668
669 @Override
670 public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
671 byte[] value, RowMutations mutation) throws IOException {
672 checkState();
673 return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
674 }
675
676 }
677 }