View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HTableDescriptor;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.coprocessor.Batch;
33  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
34  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
35  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.PoolMap;
38  import org.apache.hadoop.hbase.util.PoolMap.PoolType;
39  
40  import com.google.protobuf.Descriptors;
41  import com.google.protobuf.Message;
42  import com.google.protobuf.Service;
43  import com.google.protobuf.ServiceException;
44  
45  /**
46   * A simple pool of HTable instances.
47   *
48   * Each HTablePool acts as a pool for all tables. To use, instantiate an
49   * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
50   *
51     * This method is not needed anymore, clients should call
52     * HTableInterface.close() rather than returning the tables to the pool
53     *
54   * Once you are done with it, close your instance of {@link HTableInterface}
55   * by calling {@link HTableInterface#close()} rather than returning the tables
56   * to the pool with (deprecated) {@link #putTable(HTableInterface)}.
57   *
58   * <p>
59   * A pool can be created with a <i>maxSize</i> which defines the most HTable
60   * references that will ever be retained for each table. Otherwise the default
61   * is {@link Integer#MAX_VALUE}.
62   *
63   * <p>
64   * Pool will manage its own connections to the cluster. See
65   * {@link HConnectionManager}.
66   * @deprecated as of 0.98.1. See {@link HConnection#getTable(String)}.
67   */
68  @InterfaceAudience.Private
69  @Deprecated
70  public class HTablePool implements Closeable {
71    private final PoolMap<String, HTableInterface> tables;
72    private final int maxSize;
73    private final PoolType poolType;
74    private final Configuration config;
75    private final HTableInterfaceFactory tableFactory;
76  
77    /**
78     * Default Constructor. Default HBaseConfiguration and no limit on pool size.
79     */
80    public HTablePool() {
81      this(HBaseConfiguration.create(), Integer.MAX_VALUE);
82    }
83  
84    /**
85     * Constructor to set maximum versions and use the specified configuration.
86     *
87     * @param config
88     *          configuration
89     * @param maxSize
90     *          maximum number of references to keep for each table
91     */
92    public HTablePool(final Configuration config, final int maxSize) {
93      this(config, maxSize, null, null);
94    }
95  
96    /**
97     * Constructor to set maximum versions and use the specified configuration and
98     * table factory.
99     *
100    * @param config
101    *          configuration
102    * @param maxSize
103    *          maximum number of references to keep for each table
104    * @param tableFactory
105    *          table factory
106    */
107   public HTablePool(final Configuration config, final int maxSize,
108       final HTableInterfaceFactory tableFactory) {
109     this(config, maxSize, tableFactory, PoolType.Reusable);
110   }
111 
112   /**
113    * Constructor to set maximum versions and use the specified configuration and
114    * pool type.
115    *
116    * @param config
117    *          configuration
118    * @param maxSize
119    *          maximum number of references to keep for each table
120    * @param poolType
121    *          pool type which is one of {@link PoolType#Reusable} or
122    *          {@link PoolType#ThreadLocal}
123    */
124   public HTablePool(final Configuration config, final int maxSize,
125       final PoolType poolType) {
126     this(config, maxSize, null, poolType);
127   }
128 
129   /**
130    * Constructor to set maximum versions and use the specified configuration,
131    * table factory and pool type. The HTablePool supports the
132    * {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool
133    * type is null or not one of those two values, then it will default to
134    * {@link PoolType#Reusable}.
135    *
136    * @param config
137    *          configuration
138    * @param maxSize
139    *          maximum number of references to keep for each table
140    * @param tableFactory
141    *          table factory
142    * @param poolType
143    *          pool type which is one of {@link PoolType#Reusable} or
144    *          {@link PoolType#ThreadLocal}
145    */
146   public HTablePool(final Configuration config, final int maxSize,
147       final HTableInterfaceFactory tableFactory, PoolType poolType) {
148     // Make a new configuration instance so I can safely cleanup when
149     // done with the pool.
150     this.config = config == null ? HBaseConfiguration.create() : config;
151     this.maxSize = maxSize;
152     this.tableFactory = tableFactory == null ? new HTableFactory()
153         : tableFactory;
154     if (poolType == null) {
155       this.poolType = PoolType.Reusable;
156     } else {
157       switch (poolType) {
158       case Reusable:
159       case ThreadLocal:
160         this.poolType = poolType;
161         break;
162       default:
163         this.poolType = PoolType.Reusable;
164         break;
165       }
166     }
167     this.tables = new PoolMap<String, HTableInterface>(this.poolType,
168         this.maxSize);
169   }
170 
171   /**
172    * Get a reference to the specified table from the pool.
173    * <p>
174    * <p/>
175    *
176    * @param tableName
177    *          table name
178    * @return a reference to the specified table
179    * @throws RuntimeException
180    *           if there is a problem instantiating the HTable
181    */
182   public HTableInterface getTable(String tableName) {
183     // call the old getTable implementation renamed to findOrCreateTable
184     HTableInterface table = findOrCreateTable(tableName);
185     // return a proxy table so when user closes the proxy, the actual table
186     // will be returned to the pool
187     return new PooledHTable(table);
188   }
189 
190   /**
191    * Get a reference to the specified table from the pool.
192    * <p>
193    *
194    * Create a new one if one is not available.
195    *
196    * @param tableName
197    *          table name
198    * @return a reference to the specified table
199    * @throws RuntimeException
200    *           if there is a problem instantiating the HTable
201    */
202   private HTableInterface findOrCreateTable(String tableName) {
203     HTableInterface table = tables.get(tableName);
204     if (table == null) {
205       table = createHTable(tableName);
206     }
207     return table;
208   }
209 
210   /**
211    * Get a reference to the specified table from the pool.
212    * <p>
213    *
214    * Create a new one if one is not available.
215    *
216    * @param tableName
217    *          table name
218    * @return a reference to the specified table
219    * @throws RuntimeException
220    *           if there is a problem instantiating the HTable
221    */
222   public HTableInterface getTable(byte[] tableName) {
223     return getTable(Bytes.toString(tableName));
224   }
225 
226   /**
227    * This method is not needed anymore, clients should call
228    * HTableInterface.close() rather than returning the tables to the pool
229    *
230    * @param table
231    *          the proxy table user got from pool
232    * @deprecated
233    */
234   public void putTable(HTableInterface table) throws IOException {
235     // we need to be sure nobody puts a proxy implementation in the pool
236     // but if the client code is not updated
237     // and it will continue to call putTable() instead of calling close()
238     // then we need to return the wrapped table to the pool instead of the
239     // proxy
240     // table
241     if (table instanceof PooledHTable) {
242       returnTable(((PooledHTable) table).getWrappedTable());
243     } else {
244       // normally this should not happen if clients pass back the same
245       // table
246       // object they got from the pool
247       // but if it happens then it's better to reject it
248       throw new IllegalArgumentException("not a pooled table: " + table);
249     }
250   }
251 
252   /**
253    * Puts the specified HTable back into the pool.
254    * <p>
255    *
256    * If the pool already contains <i>maxSize</i> references to the table, then
257    * the table instance gets closed after flushing buffered edits.
258    *
259    * @param table
260    *          table
261    */
262   private void returnTable(HTableInterface table) throws IOException {
263     // this is the old putTable method renamed and made private
264     String tableName = Bytes.toString(table.getTableName());
265     if (tables.size(tableName) >= maxSize) {
266       // release table instance since we're not reusing it
267       this.tables.removeValue(tableName, table);
268       this.tableFactory.releaseHTableInterface(table);
269       return;
270     }
271     tables.put(tableName, table);
272   }
273 
274   protected HTableInterface createHTable(String tableName) {
275     return this.tableFactory.createHTableInterface(config,
276         Bytes.toBytes(tableName));
277   }
278 
279   /**
280    * Closes all the HTable instances , belonging to the given table, in the
281    * table pool.
282    * <p>
283    * Note: this is a 'shutdown' of the given table pool and different from
284    * {@link #putTable(HTableInterface)}, that is used to return the table
285    * instance to the pool for future re-use.
286    *
287    * @param tableName
288    */
289   public void closeTablePool(final String tableName) throws IOException {
290     Collection<HTableInterface> tables = this.tables.values(tableName);
291     if (tables != null) {
292       for (HTableInterface table : tables) {
293         this.tableFactory.releaseHTableInterface(table);
294       }
295     }
296     this.tables.remove(tableName);
297   }
298 
299   /**
300    * See {@link #closeTablePool(String)}.
301    *
302    * @param tableName
303    */
304   public void closeTablePool(final byte[] tableName) throws IOException {
305     closeTablePool(Bytes.toString(tableName));
306   }
307 
308   /**
309    * Closes all the HTable instances , belonging to all tables in the table
310    * pool.
311    * <p>
312    * Note: this is a 'shutdown' of all the table pools.
313    */
314   public void close() throws IOException {
315     for (String tableName : tables.keySet()) {
316       closeTablePool(tableName);
317     }
318     this.tables.clear();
319   }
320 
321   public int getCurrentPoolSize(String tableName) {
322     return tables.size(tableName);
323   }
324 
325   /**
326    * A proxy class that implements HTableInterface.close method to return the
327    * wrapped table back to the table pool
328    *
329    */
330   class PooledHTable implements HTableInterface {
331 
332     private boolean open = false;
333 
334     private HTableInterface table; // actual table implementation
335 
336     public PooledHTable(HTableInterface table) {
337       this.table = table;
338       this.open = true;
339     }
340 
341     @Override
342     public byte[] getTableName() {
343       checkState();
344       return table.getTableName();
345     }
346 
347     @Override
348     public TableName getName() {
349       return table.getName();
350     }
351 
352     @Override
353     public Configuration getConfiguration() {
354       checkState();
355       return table.getConfiguration();
356     }
357 
358     @Override
359     public HTableDescriptor getTableDescriptor() throws IOException {
360       checkState();
361       return table.getTableDescriptor();
362     }
363 
364     @Override
365     public boolean exists(Get get) throws IOException {
366       checkState();
367       return table.exists(get);
368     }
369 
370     @Override
371     public boolean[] existsAll(List<Get> gets) throws IOException {
372       checkState();
373       return table.existsAll(gets);
374     }
375 
376     @Override
377     public Boolean[] exists(List<Get> gets) throws IOException {
378       checkState();
379       return table.exists(gets);
380     }
381 
382     @Override
383     public void batch(List<? extends Row> actions, Object[] results) throws IOException,
384         InterruptedException {
385       checkState();
386       table.batch(actions, results);
387     }
388 
389     /**
390      * {@inheritDoc}
391      * @deprecated If any exception is thrown by one of the actions, there is no way to
392      * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
393      */
394     @Override
395     public Object[] batch(List<? extends Row> actions) throws IOException,
396         InterruptedException {
397       checkState();
398       return table.batch(actions);
399     }
400 
401     @Override
402     public Result get(Get get) throws IOException {
403       checkState();
404       return table.get(get);
405     }
406 
407     @Override
408     public Result[] get(List<Get> gets) throws IOException {
409       checkState();
410       return table.get(gets);
411     }
412 
413     @Override
414     @SuppressWarnings("deprecation")
415     @Deprecated
416     public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
417       checkState();
418       return table.getRowOrBefore(row, family);
419     }
420 
421     @Override
422     public ResultScanner getScanner(Scan scan) throws IOException {
423       checkState();
424       return table.getScanner(scan);
425     }
426 
427     @Override
428     public ResultScanner getScanner(byte[] family) throws IOException {
429       checkState();
430       return table.getScanner(family);
431     }
432 
433     @Override
434     public ResultScanner getScanner(byte[] family, byte[] qualifier)
435         throws IOException {
436       checkState();
437       return table.getScanner(family, qualifier);
438     }
439 
440     @Override
441     public void put(Put put) throws IOException {
442       checkState();
443       table.put(put);
444     }
445 
446     @Override
447     public void put(List<Put> puts) throws IOException {
448       checkState();
449       table.put(puts);
450     }
451 
452     @Override
453     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
454         byte[] value, Put put) throws IOException {
455       checkState();
456       return table.checkAndPut(row, family, qualifier, value, put);
457     }
458 
459     @Override
460     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
461         CompareOp compareOp, byte[] value, Put put) throws IOException {
462       checkState();
463       return table.checkAndPut(row, family, qualifier, compareOp, value, put);
464     }
465 
466     @Override
467     public void delete(Delete delete) throws IOException {
468       checkState();
469       table.delete(delete);
470     }
471 
472     @Override
473     public void delete(List<Delete> deletes) throws IOException {
474       checkState();
475       table.delete(deletes);
476     }
477 
478     @Override
479     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
480         byte[] value, Delete delete) throws IOException {
481       checkState();
482       return table.checkAndDelete(row, family, qualifier, value, delete);
483     }
484 
485     @Override
486     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
487         CompareOp compareOp, byte[] value, Delete delete) throws IOException {
488       checkState();
489       return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
490     }
491 
492     @Override
493     public Result increment(Increment increment) throws IOException {
494       checkState();
495       return table.increment(increment);
496     }
497 
498     @Override
499     public long incrementColumnValue(byte[] row, byte[] family,
500         byte[] qualifier, long amount) throws IOException {
501       checkState();
502       return table.incrementColumnValue(row, family, qualifier, amount);
503     }
504 
505     @Override
506     public long incrementColumnValue(byte[] row, byte[] family,
507         byte[] qualifier, long amount, Durability durability) throws IOException {
508       checkState();
509       return table.incrementColumnValue(row, family, qualifier, amount,
510           durability);
511     }
512 
513     @Override
514     public boolean isAutoFlush() {
515       checkState();
516       return table.isAutoFlush();
517     }
518 
519     @Override
520     public void flushCommits() throws IOException {
521       checkState();
522       table.flushCommits();
523     }
524 
525     /**
526      * Returns the actual table back to the pool
527      *
528      * @throws IOException
529      */
530     public void close() throws IOException {
531       checkState();
532       open = false;
533       returnTable(table);
534     }
535 
536     @Override
537     public CoprocessorRpcChannel coprocessorService(byte[] row) {
538       checkState();
539       return table.coprocessorService(row);
540     }
541 
542     @Override
543     public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
544         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
545         throws ServiceException, Throwable {
546       checkState();
547       return table.coprocessorService(service, startKey, endKey, callable);
548     }
549 
550     @Override
551     public <T extends Service, R> void coprocessorService(Class<T> service,
552         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
553         throws ServiceException, Throwable {
554       checkState();
555       table.coprocessorService(service, startKey, endKey, callable, callback);
556     }
557 
558     @Override
559     public String toString() {
560       return "PooledHTable{" + ", table=" + table + '}';
561     }
562 
563     /**
564      * Expose the wrapped HTable to tests in the same package
565      *
566      * @return wrapped htable
567      */
568     HTableInterface getWrappedTable() {
569       return table;
570     }
571 
572     @Override
573     public <R> void batchCallback(List<? extends Row> actions,
574         Object[] results, Callback<R> callback) throws IOException,
575         InterruptedException {
576       checkState();
577       table.batchCallback(actions, results, callback);
578     }
579 
580     /**
581      * {@inheritDoc}
582      * @deprecated If any exception is thrown by one of the actions, there is no way to
583      * retrieve the partially executed results. Use
584      * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
585      * instead.
586      */
587     @Override
588     public <R> Object[] batchCallback(List<? extends Row> actions,
589         Callback<R> callback) throws IOException, InterruptedException {
590       checkState();
591       return table.batchCallback(actions,  callback);
592     }
593 
594     @Override
595     public void mutateRow(RowMutations rm) throws IOException {
596       checkState();
597       table.mutateRow(rm);
598     }
599 
600     @Override
601     public Result append(Append append) throws IOException {
602       checkState();
603       return table.append(append);
604     }
605 
606     @Override
607     public void setAutoFlush(boolean autoFlush) {
608       checkState();
609       table.setAutoFlush(autoFlush, autoFlush);
610     }
611 
612     @Override
613     public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
614       checkState();
615       table.setAutoFlush(autoFlush, clearBufferOnFail);
616     }
617 
618     @Override
619     public void setAutoFlushTo(boolean autoFlush) {
620       table.setAutoFlushTo(autoFlush);
621     }
622 
623     @Override
624     public long getWriteBufferSize() {
625       checkState();
626       return table.getWriteBufferSize();
627     }
628 
629     @Override
630     public void setWriteBufferSize(long writeBufferSize) throws IOException {
631       checkState();
632       table.setWriteBufferSize(writeBufferSize);
633     }
634 
635     boolean isOpen() {
636       return open;
637     }
638 
639     private void checkState() {
640       if (!isOpen()) {
641         throw new IllegalStateException("Table=" + new String(table.getTableName()) + " already closed");
642       }
643     }
644 
645     @Override
646     public long incrementColumnValue(byte[] row, byte[] family,
647         byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
648       return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
649     }
650 
651     @Override
652     public <R extends Message> Map<byte[], R> batchCoprocessorService(
653         Descriptors.MethodDescriptor method, Message request,
654         byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
655       checkState();
656       return table.batchCoprocessorService(method, request, startKey, endKey,
657           responsePrototype);
658     }
659 
660     @Override
661     public <R extends Message> void batchCoprocessorService(
662         Descriptors.MethodDescriptor method, Message request,
663         byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
664         throws ServiceException, Throwable {
665       checkState();
666       table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
667     }
668 
669     @Override
670     public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
671         byte[] value, RowMutations mutation) throws IOException {
672       checkState();
673       return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
674     }
675 
676   }
677 }