1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HTableDescriptor;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.coprocessor.Batch;
33 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
34 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
35 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.PoolMap;
38 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
39
40 import com.google.protobuf.Descriptors;
41 import com.google.protobuf.Message;
42 import com.google.protobuf.Service;
43 import com.google.protobuf.ServiceException;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 @InterfaceAudience.Private
69 @Deprecated
70 public class HTablePool implements Closeable {
71 private final PoolMap<String, HTableInterface> tables;
72 private final int maxSize;
73 private final PoolType poolType;
74 private final Configuration config;
75 private final HTableInterfaceFactory tableFactory;
76
77
78
79
80 public HTablePool() {
81 this(HBaseConfiguration.create(), Integer.MAX_VALUE);
82 }
83
84
85
86
87
88
89
90
91
92 public HTablePool(final Configuration config, final int maxSize) {
93 this(config, maxSize, null, null);
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107 public HTablePool(final Configuration config, final int maxSize,
108 final HTableInterfaceFactory tableFactory) {
109 this(config, maxSize, tableFactory, PoolType.Reusable);
110 }
111
112
113
114
115
116
117
118
119
120
121
122
123
124 public HTablePool(final Configuration config, final int maxSize,
125 final PoolType poolType) {
126 this(config, maxSize, null, poolType);
127 }
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 public HTablePool(final Configuration config, final int maxSize,
147 final HTableInterfaceFactory tableFactory, PoolType poolType) {
148
149
150 this.config = config == null ? HBaseConfiguration.create() : config;
151 this.maxSize = maxSize;
152 this.tableFactory = tableFactory == null ? new HTableFactory()
153 : tableFactory;
154 if (poolType == null) {
155 this.poolType = PoolType.Reusable;
156 } else {
157 switch (poolType) {
158 case Reusable:
159 case ThreadLocal:
160 this.poolType = poolType;
161 break;
162 default:
163 this.poolType = PoolType.Reusable;
164 break;
165 }
166 }
167 this.tables = new PoolMap<String, HTableInterface>(this.poolType,
168 this.maxSize);
169 }
170
171
172
173
174
175
176
177
178
179
180 public HTableInterface getTable(String tableName) {
181
182 HTableInterface table = findOrCreateTable(tableName);
183
184
185 return new PooledHTable(table);
186 }
187
188
189
190
191
192
193
194
195
196
197
198
199 private HTableInterface findOrCreateTable(String tableName) {
200 HTableInterface table = tables.get(tableName);
201 if (table == null) {
202 table = createHTable(tableName);
203 }
204 return table;
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218
219 public HTableInterface getTable(byte[] tableName) {
220 return getTable(Bytes.toString(tableName));
221 }
222
223
224
225
226
227
228
229
230
231 public void putTable(HTableInterface table) throws IOException {
232
233
234
235
236
237
238 if (table instanceof PooledHTable) {
239 returnTable(((PooledHTable) table).getWrappedTable());
240 } else {
241
242
243
244
245 throw new IllegalArgumentException("not a pooled table: " + table);
246 }
247 }
248
249
250
251
252
253
254
255
256
257
258
259 private void returnTable(HTableInterface table) throws IOException {
260
261 String tableName = Bytes.toString(table.getTableName());
262 if (tables.size(tableName) >= maxSize) {
263
264 this.tables.removeValue(tableName, table);
265 this.tableFactory.releaseHTableInterface(table);
266 return;
267 }
268 tables.put(tableName, table);
269 }
270
271 protected HTableInterface createHTable(String tableName) {
272 return this.tableFactory.createHTableInterface(config,
273 Bytes.toBytes(tableName));
274 }
275
276
277
278
279
280
281
282
283
284
285
286 public void closeTablePool(final String tableName) throws IOException {
287 Collection<HTableInterface> tables = this.tables.values(tableName);
288 if (tables != null) {
289 for (HTableInterface table : tables) {
290 this.tableFactory.releaseHTableInterface(table);
291 }
292 }
293 this.tables.remove(tableName);
294 }
295
296
297
298
299
300
301 public void closeTablePool(final byte[] tableName) throws IOException {
302 closeTablePool(Bytes.toString(tableName));
303 }
304
305
306
307
308
309
310
311 public void close() throws IOException {
312 for (String tableName : tables.keySet()) {
313 closeTablePool(tableName);
314 }
315 this.tables.clear();
316 }
317
318 public int getCurrentPoolSize(String tableName) {
319 return tables.size(tableName);
320 }
321
322
323
324
325
326
327 class PooledHTable implements HTableInterface {
328
329 private boolean open = false;
330
331 private HTableInterface table;
332
333 public PooledHTable(HTableInterface table) {
334 this.table = table;
335 this.open = true;
336 }
337
338 @Override
339 public byte[] getTableName() {
340 checkState();
341 return table.getTableName();
342 }
343
344 @Override
345 public TableName getName() {
346 return table.getName();
347 }
348
349 @Override
350 public Configuration getConfiguration() {
351 checkState();
352 return table.getConfiguration();
353 }
354
355 @Override
356 public HTableDescriptor getTableDescriptor() throws IOException {
357 checkState();
358 return table.getTableDescriptor();
359 }
360
361 @Override
362 public boolean exists(Get get) throws IOException {
363 checkState();
364 return table.exists(get);
365 }
366
367 @Override
368 public boolean[] existsAll(List<Get> gets) throws IOException {
369 checkState();
370 return table.existsAll(gets);
371 }
372
373 @Override
374 public Boolean[] exists(List<Get> gets) throws IOException {
375 checkState();
376 return table.exists(gets);
377 }
378
379 @Override
380 public void batch(List<? extends Row> actions, Object[] results) throws IOException,
381 InterruptedException {
382 checkState();
383 table.batch(actions, results);
384 }
385
386
387
388
389
390
391 @Override
392 public Object[] batch(List<? extends Row> actions) throws IOException,
393 InterruptedException {
394 checkState();
395 return table.batch(actions);
396 }
397
398 @Override
399 public Result get(Get get) throws IOException {
400 checkState();
401 return table.get(get);
402 }
403
404 @Override
405 public Result[] get(List<Get> gets) throws IOException {
406 checkState();
407 return table.get(gets);
408 }
409
410 @Override
411 @SuppressWarnings("deprecation")
412 @Deprecated
413 public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
414 checkState();
415 return table.getRowOrBefore(row, family);
416 }
417
418 @Override
419 public ResultScanner getScanner(Scan scan) throws IOException {
420 checkState();
421 return table.getScanner(scan);
422 }
423
424 @Override
425 public ResultScanner getScanner(byte[] family) throws IOException {
426 checkState();
427 return table.getScanner(family);
428 }
429
430 @Override
431 public ResultScanner getScanner(byte[] family, byte[] qualifier)
432 throws IOException {
433 checkState();
434 return table.getScanner(family, qualifier);
435 }
436
437 @Override
438 public void put(Put put) throws IOException {
439 checkState();
440 table.put(put);
441 }
442
443 @Override
444 public void put(List<Put> puts) throws IOException {
445 checkState();
446 table.put(puts);
447 }
448
449 @Override
450 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
451 byte[] value, Put put) throws IOException {
452 checkState();
453 return table.checkAndPut(row, family, qualifier, value, put);
454 }
455
456 @Override
457 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
458 CompareOp compareOp, byte[] value, Put put) throws IOException {
459 checkState();
460 return table.checkAndPut(row, family, qualifier, compareOp, value, put);
461 }
462
463 @Override
464 public void delete(Delete delete) throws IOException {
465 checkState();
466 table.delete(delete);
467 }
468
469 @Override
470 public void delete(List<Delete> deletes) throws IOException {
471 checkState();
472 table.delete(deletes);
473 }
474
475 @Override
476 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
477 byte[] value, Delete delete) throws IOException {
478 checkState();
479 return table.checkAndDelete(row, family, qualifier, value, delete);
480 }
481
482 @Override
483 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
484 CompareOp compareOp, byte[] value, Delete delete) throws IOException {
485 checkState();
486 return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
487 }
488
489 @Override
490 public Result increment(Increment increment) throws IOException {
491 checkState();
492 return table.increment(increment);
493 }
494
495 @Override
496 public long incrementColumnValue(byte[] row, byte[] family,
497 byte[] qualifier, long amount) throws IOException {
498 checkState();
499 return table.incrementColumnValue(row, family, qualifier, amount);
500 }
501
502 @Override
503 public long incrementColumnValue(byte[] row, byte[] family,
504 byte[] qualifier, long amount, Durability durability) throws IOException {
505 checkState();
506 return table.incrementColumnValue(row, family, qualifier, amount,
507 durability);
508 }
509
510 @Override
511 public boolean isAutoFlush() {
512 checkState();
513 return table.isAutoFlush();
514 }
515
516 @Override
517 public void flushCommits() throws IOException {
518 checkState();
519 table.flushCommits();
520 }
521
522
523
524
525
526
527 public void close() throws IOException {
528 checkState();
529 open = false;
530 returnTable(table);
531 }
532
533 @Override
534 public CoprocessorRpcChannel coprocessorService(byte[] row) {
535 checkState();
536 return table.coprocessorService(row);
537 }
538
539 @Override
540 public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
541 byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
542 throws ServiceException, Throwable {
543 checkState();
544 return table.coprocessorService(service, startKey, endKey, callable);
545 }
546
547 @Override
548 public <T extends Service, R> void coprocessorService(Class<T> service,
549 byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
550 throws ServiceException, Throwable {
551 checkState();
552 table.coprocessorService(service, startKey, endKey, callable, callback);
553 }
554
555 @Override
556 public String toString() {
557 return "PooledHTable{" + ", table=" + table + '}';
558 }
559
560
561
562
563
564
565 HTableInterface getWrappedTable() {
566 return table;
567 }
568
569 @Override
570 public <R> void batchCallback(List<? extends Row> actions,
571 Object[] results, Callback<R> callback) throws IOException,
572 InterruptedException {
573 checkState();
574 table.batchCallback(actions, results, callback);
575 }
576
577
578
579
580
581
582
583
584 @Override
585 public <R> Object[] batchCallback(List<? extends Row> actions,
586 Callback<R> callback) throws IOException, InterruptedException {
587 checkState();
588 return table.batchCallback(actions, callback);
589 }
590
591 @Override
592 public void mutateRow(RowMutations rm) throws IOException {
593 checkState();
594 table.mutateRow(rm);
595 }
596
597 @Override
598 public Result append(Append append) throws IOException {
599 checkState();
600 return table.append(append);
601 }
602
603 @Override
604 public void setAutoFlush(boolean autoFlush) {
605 checkState();
606 table.setAutoFlush(autoFlush, autoFlush);
607 }
608
609 @Override
610 public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
611 checkState();
612 table.setAutoFlush(autoFlush, clearBufferOnFail);
613 }
614
615 @Override
616 public void setAutoFlushTo(boolean autoFlush) {
617 table.setAutoFlushTo(autoFlush);
618 }
619
620 @Override
621 public long getWriteBufferSize() {
622 checkState();
623 return table.getWriteBufferSize();
624 }
625
626 @Override
627 public void setWriteBufferSize(long writeBufferSize) throws IOException {
628 checkState();
629 table.setWriteBufferSize(writeBufferSize);
630 }
631
632 boolean isOpen() {
633 return open;
634 }
635
636 private void checkState() {
637 if (!isOpen()) {
638 throw new IllegalStateException("Table=" + new String(table.getTableName()) + " already closed");
639 }
640 }
641
642 @Override
643 public long incrementColumnValue(byte[] row, byte[] family,
644 byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
645 return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
646 }
647
648 @Override
649 public <R extends Message> Map<byte[], R> batchCoprocessorService(
650 Descriptors.MethodDescriptor method, Message request,
651 byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
652 checkState();
653 return table.batchCoprocessorService(method, request, startKey, endKey,
654 responsePrototype);
655 }
656
657 @Override
658 public <R extends Message> void batchCoprocessorService(
659 Descriptors.MethodDescriptor method, Message request,
660 byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
661 throws ServiceException, Throwable {
662 checkState();
663 table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
664 }
665
666 @Override
667 public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
668 byte[] value, RowMutations mutation) throws IOException {
669 checkState();
670 return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
671 }
672
673 }
674 }