View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HTableDescriptor;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.coprocessor.Batch;
33  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
34  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
35  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.PoolMap;
38  import org.apache.hadoop.hbase.util.PoolMap.PoolType;
39  
40  import com.google.protobuf.Descriptors;
41  import com.google.protobuf.Message;
42  import com.google.protobuf.Service;
43  import com.google.protobuf.ServiceException;
44  
45  /**
46   * A simple pool of HTable instances.
47   *
48   * Each HTablePool acts as a pool for all tables. To use, instantiate an
49   * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
50   *
51     * This method is not needed anymore, clients should call
52     * HTableInterface.close() rather than returning the tables to the pool
53     *
54   * Once you are done with it, close your instance of {@link HTableInterface}
55   * by calling {@link HTableInterface#close()} rather than returning the tables
56   * to the pool with (deprecated) {@link #putTable(HTableInterface)}.
57   *
58   * <p>
59   * A pool can be created with a <i>maxSize</i> which defines the most HTable
60   * references that will ever be retained for each table. Otherwise the default
61   * is {@link Integer#MAX_VALUE}.
62   *
63   * <p>
64   * Pool will manage its own connections to the cluster. See
65   * {@link HConnectionManager}.
66   * @deprecated as of 0.98.1. See {@link HConnection#getTable(String)}.
67   */
68  @InterfaceAudience.Private
69  @Deprecated
70  public class HTablePool implements Closeable {
71    private final PoolMap<String, HTableInterface> tables;
72    private final int maxSize;
73    private final PoolType poolType;
74    private final Configuration config;
75    private final HTableInterfaceFactory tableFactory;
76  
77    /**
78     * Default Constructor. Default HBaseConfiguration and no limit on pool size.
79     */
80    public HTablePool() {
81      this(HBaseConfiguration.create(), Integer.MAX_VALUE);
82    }
83  
84    /**
85     * Constructor to set maximum versions and use the specified configuration.
86     *
87     * @param config
88     *          configuration
89     * @param maxSize
90     *          maximum number of references to keep for each table
91     */
92    public HTablePool(final Configuration config, final int maxSize) {
93      this(config, maxSize, null, null);
94    }
95  
96    /**
97     * Constructor to set maximum versions and use the specified configuration and
98     * table factory.
99     *
100    * @param config
101    *          configuration
102    * @param maxSize
103    *          maximum number of references to keep for each table
104    * @param tableFactory
105    *          table factory
106    */
107   public HTablePool(final Configuration config, final int maxSize,
108       final HTableInterfaceFactory tableFactory) {
109     this(config, maxSize, tableFactory, PoolType.Reusable);
110   }
111 
112   /**
113    * Constructor to set maximum versions and use the specified configuration and
114    * pool type.
115    *
116    * @param config
117    *          configuration
118    * @param maxSize
119    *          maximum number of references to keep for each table
120    * @param poolType
121    *          pool type which is one of {@link PoolType#Reusable} or
122    *          {@link PoolType#ThreadLocal}
123    */
124   public HTablePool(final Configuration config, final int maxSize,
125       final PoolType poolType) {
126     this(config, maxSize, null, poolType);
127   }
128 
129   /**
130    * Constructor to set maximum versions and use the specified configuration,
131    * table factory and pool type. The HTablePool supports the
132    * {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool
133    * type is null or not one of those two values, then it will default to
134    * {@link PoolType#Reusable}.
135    *
136    * @param config
137    *          configuration
138    * @param maxSize
139    *          maximum number of references to keep for each table
140    * @param tableFactory
141    *          table factory
142    * @param poolType
143    *          pool type which is one of {@link PoolType#Reusable} or
144    *          {@link PoolType#ThreadLocal}
145    */
146   public HTablePool(final Configuration config, final int maxSize,
147       final HTableInterfaceFactory tableFactory, PoolType poolType) {
148     // Make a new configuration instance so I can safely cleanup when
149     // done with the pool.
150     this.config = config == null ? HBaseConfiguration.create() : config;
151     this.maxSize = maxSize;
152     this.tableFactory = tableFactory == null ? new HTableFactory()
153         : tableFactory;
154     if (poolType == null) {
155       this.poolType = PoolType.Reusable;
156     } else {
157       switch (poolType) {
158       case Reusable:
159       case ThreadLocal:
160         this.poolType = poolType;
161         break;
162       default:
163         this.poolType = PoolType.Reusable;
164         break;
165       }
166     }
167     this.tables = new PoolMap<String, HTableInterface>(this.poolType,
168         this.maxSize);
169   }
170 
171   /**
172    * Get a reference to the specified table from the pool.
173    *
174    * @param tableName
175    *          table name
176    * @return a reference to the specified table
177    * @throws RuntimeException
178    *           if there is a problem instantiating the HTable
179    */
180   public HTableInterface getTable(String tableName) {
181     // call the old getTable implementation renamed to findOrCreateTable
182     HTableInterface table = findOrCreateTable(tableName);
183     // return a proxy table so when user closes the proxy, the actual table
184     // will be returned to the pool
185     return new PooledHTable(table);
186   }
187 
188   /**
189    * Get a reference to the specified table from the pool.
190    *
191    * Create a new one if one is not available.
192    *
193    * @param tableName
194    *          table name
195    * @return a reference to the specified table
196    * @throws RuntimeException
197    *           if there is a problem instantiating the HTable
198    */
199   private HTableInterface findOrCreateTable(String tableName) {
200     HTableInterface table = tables.get(tableName);
201     if (table == null) {
202       table = createHTable(tableName);
203     }
204     return table;
205   }
206 
207   /**
208    * Get a reference to the specified table from the pool.
209    * <p>
210    *
211    * Create a new one if one is not available.
212    *
213    * @param tableName
214    *          table name
215    * @return a reference to the specified table
216    * @throws RuntimeException
217    *           if there is a problem instantiating the HTable
218    */
219   public HTableInterface getTable(byte[] tableName) {
220     return getTable(Bytes.toString(tableName));
221   }
222 
223   /**
224    * This method is not needed anymore, clients should call
225    * HTableInterface.close() rather than returning the tables to the pool
226    *
227    * @param table
228    *          the proxy table user got from pool
229    * @deprecated
230    */
231   public void putTable(HTableInterface table) throws IOException {
232     // we need to be sure nobody puts a proxy implementation in the pool
233     // but if the client code is not updated
234     // and it will continue to call putTable() instead of calling close()
235     // then we need to return the wrapped table to the pool instead of the
236     // proxy
237     // table
238     if (table instanceof PooledHTable) {
239       returnTable(((PooledHTable) table).getWrappedTable());
240     } else {
241       // normally this should not happen if clients pass back the same
242       // table
243       // object they got from the pool
244       // but if it happens then it's better to reject it
245       throw new IllegalArgumentException("not a pooled table: " + table);
246     }
247   }
248 
249   /**
250    * Puts the specified HTable back into the pool.
251    * <p>
252    *
253    * If the pool already contains <i>maxSize</i> references to the table, then
254    * the table instance gets closed after flushing buffered edits.
255    *
256    * @param table
257    *          table
258    */
259   private void returnTable(HTableInterface table) throws IOException {
260     // this is the old putTable method renamed and made private
261     String tableName = Bytes.toString(table.getTableName());
262     if (tables.size(tableName) >= maxSize) {
263       // release table instance since we're not reusing it
264       this.tables.removeValue(tableName, table);
265       this.tableFactory.releaseHTableInterface(table);
266       return;
267     }
268     tables.put(tableName, table);
269   }
270 
271   protected HTableInterface createHTable(String tableName) {
272     return this.tableFactory.createHTableInterface(config,
273         Bytes.toBytes(tableName));
274   }
275 
276   /**
277    * Closes all the HTable instances , belonging to the given table, in the
278    * table pool.
279    * <p>
280    * Note: this is a 'shutdown' of the given table pool and different from
281    * {@link #putTable(HTableInterface)}, that is used to return the table
282    * instance to the pool for future re-use.
283    *
284    * @param tableName
285    */
286   public void closeTablePool(final String tableName) throws IOException {
287     Collection<HTableInterface> tables = this.tables.values(tableName);
288     if (tables != null) {
289       for (HTableInterface table : tables) {
290         this.tableFactory.releaseHTableInterface(table);
291       }
292     }
293     this.tables.remove(tableName);
294   }
295 
296   /**
297    * See {@link #closeTablePool(String)}.
298    *
299    * @param tableName
300    */
301   public void closeTablePool(final byte[] tableName) throws IOException {
302     closeTablePool(Bytes.toString(tableName));
303   }
304 
305   /**
306    * Closes all the HTable instances , belonging to all tables in the table
307    * pool.
308    * <p>
309    * Note: this is a 'shutdown' of all the table pools.
310    */
311   public void close() throws IOException {
312     for (String tableName : tables.keySet()) {
313       closeTablePool(tableName);
314     }
315     this.tables.clear();
316   }
317 
318   public int getCurrentPoolSize(String tableName) {
319     return tables.size(tableName);
320   }
321 
322   /**
323    * A proxy class that implements HTableInterface.close method to return the
324    * wrapped table back to the table pool
325    *
326    */
327   class PooledHTable implements HTableInterface {
328 
329     private boolean open = false;
330 
331     private HTableInterface table; // actual table implementation
332 
333     public PooledHTable(HTableInterface table) {
334       this.table = table;
335       this.open = true;
336     }
337 
338     @Override
339     public byte[] getTableName() {
340       checkState();
341       return table.getTableName();
342     }
343 
344     @Override
345     public TableName getName() {
346       return table.getName();
347     }
348 
349     @Override
350     public Configuration getConfiguration() {
351       checkState();
352       return table.getConfiguration();
353     }
354 
355     @Override
356     public HTableDescriptor getTableDescriptor() throws IOException {
357       checkState();
358       return table.getTableDescriptor();
359     }
360 
361     @Override
362     public boolean exists(Get get) throws IOException {
363       checkState();
364       return table.exists(get);
365     }
366 
367     @Override
368     public boolean[] existsAll(List<Get> gets) throws IOException {
369       checkState();
370       return table.existsAll(gets);
371     }
372 
373     @Override
374     public Boolean[] exists(List<Get> gets) throws IOException {
375       checkState();
376       return table.exists(gets);
377     }
378 
379     @Override
380     public void batch(List<? extends Row> actions, Object[] results) throws IOException,
381         InterruptedException {
382       checkState();
383       table.batch(actions, results);
384     }
385 
386     /**
387      * {@inheritDoc}
388      * @deprecated If any exception is thrown by one of the actions, there is no way to
389      * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
390      */
391     @Override
392     public Object[] batch(List<? extends Row> actions) throws IOException,
393         InterruptedException {
394       checkState();
395       return table.batch(actions);
396     }
397 
398     @Override
399     public Result get(Get get) throws IOException {
400       checkState();
401       return table.get(get);
402     }
403 
404     @Override
405     public Result[] get(List<Get> gets) throws IOException {
406       checkState();
407       return table.get(gets);
408     }
409 
410     @Override
411     @SuppressWarnings("deprecation")
412     @Deprecated
413     public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
414       checkState();
415       return table.getRowOrBefore(row, family);
416     }
417 
418     @Override
419     public ResultScanner getScanner(Scan scan) throws IOException {
420       checkState();
421       return table.getScanner(scan);
422     }
423 
424     @Override
425     public ResultScanner getScanner(byte[] family) throws IOException {
426       checkState();
427       return table.getScanner(family);
428     }
429 
430     @Override
431     public ResultScanner getScanner(byte[] family, byte[] qualifier)
432         throws IOException {
433       checkState();
434       return table.getScanner(family, qualifier);
435     }
436 
437     @Override
438     public void put(Put put) throws IOException {
439       checkState();
440       table.put(put);
441     }
442 
443     @Override
444     public void put(List<Put> puts) throws IOException {
445       checkState();
446       table.put(puts);
447     }
448 
449     @Override
450     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
451         byte[] value, Put put) throws IOException {
452       checkState();
453       return table.checkAndPut(row, family, qualifier, value, put);
454     }
455 
456     @Override
457     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
458         CompareOp compareOp, byte[] value, Put put) throws IOException {
459       checkState();
460       return table.checkAndPut(row, family, qualifier, compareOp, value, put);
461     }
462 
463     @Override
464     public void delete(Delete delete) throws IOException {
465       checkState();
466       table.delete(delete);
467     }
468 
469     @Override
470     public void delete(List<Delete> deletes) throws IOException {
471       checkState();
472       table.delete(deletes);
473     }
474 
475     @Override
476     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
477         byte[] value, Delete delete) throws IOException {
478       checkState();
479       return table.checkAndDelete(row, family, qualifier, value, delete);
480     }
481 
482     @Override
483     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
484         CompareOp compareOp, byte[] value, Delete delete) throws IOException {
485       checkState();
486       return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
487     }
488 
489     @Override
490     public Result increment(Increment increment) throws IOException {
491       checkState();
492       return table.increment(increment);
493     }
494 
495     @Override
496     public long incrementColumnValue(byte[] row, byte[] family,
497         byte[] qualifier, long amount) throws IOException {
498       checkState();
499       return table.incrementColumnValue(row, family, qualifier, amount);
500     }
501 
502     @Override
503     public long incrementColumnValue(byte[] row, byte[] family,
504         byte[] qualifier, long amount, Durability durability) throws IOException {
505       checkState();
506       return table.incrementColumnValue(row, family, qualifier, amount,
507           durability);
508     }
509 
510     @Override
511     public boolean isAutoFlush() {
512       checkState();
513       return table.isAutoFlush();
514     }
515 
516     @Override
517     public void flushCommits() throws IOException {
518       checkState();
519       table.flushCommits();
520     }
521 
522     /**
523      * Returns the actual table back to the pool
524      *
525      * @throws IOException
526      */
527     public void close() throws IOException {
528       checkState();
529       open = false;
530       returnTable(table);
531     }
532 
533     @Override
534     public CoprocessorRpcChannel coprocessorService(byte[] row) {
535       checkState();
536       return table.coprocessorService(row);
537     }
538 
539     @Override
540     public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
541         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
542         throws ServiceException, Throwable {
543       checkState();
544       return table.coprocessorService(service, startKey, endKey, callable);
545     }
546 
547     @Override
548     public <T extends Service, R> void coprocessorService(Class<T> service,
549         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
550         throws ServiceException, Throwable {
551       checkState();
552       table.coprocessorService(service, startKey, endKey, callable, callback);
553     }
554 
555     @Override
556     public String toString() {
557       return "PooledHTable{" + ", table=" + table + '}';
558     }
559 
560     /**
561      * Expose the wrapped HTable to tests in the same package
562      *
563      * @return wrapped htable
564      */
565     HTableInterface getWrappedTable() {
566       return table;
567     }
568 
569     @Override
570     public <R> void batchCallback(List<? extends Row> actions,
571         Object[] results, Callback<R> callback) throws IOException,
572         InterruptedException {
573       checkState();
574       table.batchCallback(actions, results, callback);
575     }
576 
577     /**
578      * {@inheritDoc}
579      * @deprecated If any exception is thrown by one of the actions, there is no way to
580      * retrieve the partially executed results. Use
581      * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
582      * instead.
583      */
584     @Override
585     public <R> Object[] batchCallback(List<? extends Row> actions,
586         Callback<R> callback) throws IOException, InterruptedException {
587       checkState();
588       return table.batchCallback(actions,  callback);
589     }
590 
591     @Override
592     public void mutateRow(RowMutations rm) throws IOException {
593       checkState();
594       table.mutateRow(rm);
595     }
596 
597     @Override
598     public Result append(Append append) throws IOException {
599       checkState();
600       return table.append(append);
601     }
602 
603     @Override
604     public void setAutoFlush(boolean autoFlush) {
605       checkState();
606       table.setAutoFlush(autoFlush, autoFlush);
607     }
608 
609     @Override
610     public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
611       checkState();
612       table.setAutoFlush(autoFlush, clearBufferOnFail);
613     }
614 
615     @Override
616     public void setAutoFlushTo(boolean autoFlush) {
617       table.setAutoFlushTo(autoFlush);
618     }
619 
620     @Override
621     public long getWriteBufferSize() {
622       checkState();
623       return table.getWriteBufferSize();
624     }
625 
626     @Override
627     public void setWriteBufferSize(long writeBufferSize) throws IOException {
628       checkState();
629       table.setWriteBufferSize(writeBufferSize);
630     }
631 
632     boolean isOpen() {
633       return open;
634     }
635 
636     private void checkState() {
637       if (!isOpen()) {
638         throw new IllegalStateException("Table=" + new String(table.getTableName()) + " already closed");
639       }
640     }
641 
642     @Override
643     public long incrementColumnValue(byte[] row, byte[] family,
644         byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
645       return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
646     }
647 
648     @Override
649     public <R extends Message> Map<byte[], R> batchCoprocessorService(
650         Descriptors.MethodDescriptor method, Message request,
651         byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
652       checkState();
653       return table.batchCoprocessorService(method, request, startKey, endKey,
654           responsePrototype);
655     }
656 
657     @Override
658     public <R extends Message> void batchCoprocessorService(
659         Descriptors.MethodDescriptor method, Message request,
660         byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
661         throws ServiceException, Throwable {
662       checkState();
663       table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
664     }
665 
666     @Override
667     public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
668         byte[] value, RowMutations mutation) throws IOException {
669       checkState();
670       return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
671     }
672 
673   }
674 }