View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HTableDescriptor;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.*;
33  import org.apache.hadoop.hbase.client.coprocessor.Batch;
34  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
35  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
36  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.util.PoolMap;
39  import org.apache.hadoop.hbase.util.PoolMap.PoolType;
40  
41  import com.google.protobuf.Descriptors;
42  import com.google.protobuf.Message;
43  import com.google.protobuf.Service;
44  import com.google.protobuf.ServiceException;
45  
46  /**
47   * A simple pool of HTable instances.
48   *
49   * Each HTablePool acts as a pool for all tables. To use, instantiate an
50   * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
51   *
52   * This method is not needed anymore, clients should call
53   * HTableInterface.close() rather than returning the tables to the pool
54   *
55   * Once you are done with it, close your instance of
56   * {@link org.apache.hadoop.hbase.client.HTableInterface}
57   * by calling {@link org.apache.hadoop.hbase.client.HTableInterface#close()} rather than returning
58   * the tablesto the pool with (deprecated)
59   * {@link #putTable(org.apache.hadoop.hbase.client.HTableInterface)}.
60   *
61   * <p>
62   * A pool can be created with a <i>maxSize</i> which defines the most HTable
63   * references that will ever be retained for each table. Otherwise the default
64   * is {@link Integer#MAX_VALUE}.
65   *
66   * <p>
67   * Pool will manage its own connections to the cluster. See
68   * {@link org.apache.hadoop.hbase.client.HConnectionManager}.
69   * Was @deprecated made @InterfaceAudience.private as of 0.98.1.
70   * See {@link org.apache.hadoop.hbase.client.HConnection#getTable(String)},
71   * Moved to thrift2 module for 2.0
72   */
73  @InterfaceAudience.Private
74  public class HTablePool implements Closeable {
75    private final PoolMap<String, HTableInterface> tables;
76    private final int maxSize;
77    private final PoolType poolType;
78    private final Configuration config;
79    private final HTableInterfaceFactory tableFactory;
80  
81    /**
82     * Default Constructor. Default HBaseConfiguration and no limit on pool size.
83     */
84    public HTablePool() {
85      this(HBaseConfiguration.create(), Integer.MAX_VALUE);
86    }
87  
88    /**
89     * Constructor to set maximum versions and use the specified configuration.
90     *
91     * @param config
92     *          configuration
93     * @param maxSize
94     *          maximum number of references to keep for each table
95     */
96    public HTablePool(final Configuration config, final int maxSize) {
97      this(config, maxSize, null, null);
98    }
99  
100   /**
101    * Constructor to set maximum versions and use the specified configuration and
102    * table factory.
103    *
104    * @param config
105    *          configuration
106    * @param maxSize
107    *          maximum number of references to keep for each table
108    * @param tableFactory
109    *          table factory
110    */
111   public HTablePool(final Configuration config, final int maxSize,
112       final HTableInterfaceFactory tableFactory) {
113     this(config, maxSize, tableFactory, PoolType.Reusable);
114   }
115 
116   /**
117    * Constructor to set maximum versions and use the specified configuration and
118    * pool type.
119    *
120    * @param config
121    *          configuration
122    * @param maxSize
123    *          maximum number of references to keep for each table
124    * @param poolType
125    *          pool type which is one of {@link PoolType#Reusable} or
126    *          {@link PoolType#ThreadLocal}
127    */
128   public HTablePool(final Configuration config, final int maxSize,
129       final PoolType poolType) {
130     this(config, maxSize, null, poolType);
131   }
132 
133   /**
134    * Constructor to set maximum versions and use the specified configuration,
135    * table factory and pool type. The HTablePool supports the
136    * {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool
137    * type is null or not one of those two values, then it will default to
138    * {@link PoolType#Reusable}.
139    *
140    * @param config
141    *          configuration
142    * @param maxSize
143    *          maximum number of references to keep for each table
144    * @param tableFactory
145    *          table factory
146    * @param poolType
147    *          pool type which is one of {@link PoolType#Reusable} or
148    *          {@link PoolType#ThreadLocal}
149    */
150   public HTablePool(final Configuration config, final int maxSize,
151       final HTableInterfaceFactory tableFactory, PoolType poolType) {
152     // Make a new configuration instance so I can safely cleanup when
153     // done with the pool.
154     this.config = config == null ? HBaseConfiguration.create() : config;
155     this.maxSize = maxSize;
156     this.tableFactory = tableFactory == null ? new HTableFactory()
157         : tableFactory;
158     if (poolType == null) {
159       this.poolType = PoolType.Reusable;
160     } else {
161       switch (poolType) {
162       case Reusable:
163       case ThreadLocal:
164         this.poolType = poolType;
165         break;
166       default:
167         this.poolType = PoolType.Reusable;
168         break;
169       }
170     }
171     this.tables = new PoolMap<String, HTableInterface>(this.poolType,
172         this.maxSize);
173   }
174 
175   /**
176    * Get a reference to the specified table from the pool.
177    * <p>
178    * <p/>
179    *
180    * @param tableName
181    *          table name
182    * @return a reference to the specified table
183    * @throws RuntimeException
184    *           if there is a problem instantiating the HTable
185    */
186   public HTableInterface getTable(String tableName) {
187     // call the old getTable implementation renamed to findOrCreateTable
188     HTableInterface table = findOrCreateTable(tableName);
189     // return a proxy table so when user closes the proxy, the actual table
190     // will be returned to the pool
191     return new PooledHTable(table);
192   }
193 
194   /**
195    * Get a reference to the specified table from the pool.
196    * <p>
197    *
198    * Create a new one if one is not available.
199    *
200    * @param tableName
201    *          table name
202    * @return a reference to the specified table
203    * @throws RuntimeException
204    *           if there is a problem instantiating the HTable
205    */
206   private HTableInterface findOrCreateTable(String tableName) {
207     HTableInterface table = tables.get(tableName);
208     if (table == null) {
209       table = createHTable(tableName);
210     }
211     return table;
212   }
213 
214   /**
215    * Get a reference to the specified table from the pool.
216    * <p>
217    *
218    * Create a new one if one is not available.
219    *
220    * @param tableName
221    *          table name
222    * @return a reference to the specified table
223    * @throws RuntimeException
224    *           if there is a problem instantiating the HTable
225    */
226   public HTableInterface getTable(byte[] tableName) {
227     return getTable(Bytes.toString(tableName));
228   }
229 
230   /**
231    * This method is not needed anymore, clients should call
232    * HTableInterface.close() rather than returning the tables to the pool
233    *
234    * @param table
235    *          the proxy table user got from pool
236    * @deprecated
237    */
238   @Deprecated
239   public void putTable(HTableInterface table) throws IOException {
240     // we need to be sure nobody puts a proxy implementation in the pool
241     // but if the client code is not updated
242     // and it will continue to call putTable() instead of calling close()
243     // then we need to return the wrapped table to the pool instead of the
244     // proxy
245     // table
246     if (table instanceof PooledHTable) {
247       returnTable(((PooledHTable) table).getWrappedTable());
248     } else {
249       // normally this should not happen if clients pass back the same
250       // table
251       // object they got from the pool
252       // but if it happens then it's better to reject it
253       throw new IllegalArgumentException("not a pooled table: " + table);
254     }
255   }
256 
257   /**
258    * Puts the specified HTable back into the pool.
259    * <p>
260    *
261    * If the pool already contains <i>maxSize</i> references to the table, then
262    * the table instance gets closed after flushing buffered edits.
263    *
264    * @param table
265    *          table
266    */
267   private void returnTable(HTableInterface table) throws IOException {
268     // this is the old putTable method renamed and made private
269     String tableName = Bytes.toString(table.getTableName());
270     if (tables.size(tableName) >= maxSize) {
271       // release table instance since we're not reusing it
272       this.tables.removeValue(tableName, table);
273       this.tableFactory.releaseHTableInterface(table);
274       return;
275     }
276     tables.put(tableName, table);
277   }
278 
279   protected HTableInterface createHTable(String tableName) {
280     return this.tableFactory.createHTableInterface(config,
281         Bytes.toBytes(tableName));
282   }
283 
284   /**
285    * Closes all the HTable instances , belonging to the given table, in the
286    * table pool.
287    * <p>
288    * Note: this is a 'shutdown' of the given table pool and different from
289    * {@link #putTable(HTableInterface)}, that is used to return the table
290    * instance to the pool for future re-use.
291    *
292    * @param tableName
293    */
294   public void closeTablePool(final String tableName) throws IOException {
295     Collection<HTableInterface> tables = this.tables.values(tableName);
296     if (tables != null) {
297       for (HTableInterface table : tables) {
298         this.tableFactory.releaseHTableInterface(table);
299       }
300     }
301     this.tables.remove(tableName);
302   }
303 
304   /**
305    * See {@link #closeTablePool(String)}.
306    *
307    * @param tableName
308    */
309   public void closeTablePool(final byte[] tableName) throws IOException {
310     closeTablePool(Bytes.toString(tableName));
311   }
312 
313   /**
314    * Closes all the HTable instances , belonging to all tables in the table
315    * pool.
316    * <p>
317    * Note: this is a 'shutdown' of all the table pools.
318    */
319   public void close() throws IOException {
320     for (String tableName : tables.keySet()) {
321       closeTablePool(tableName);
322     }
323     this.tables.clear();
324   }
325 
326   public int getCurrentPoolSize(String tableName) {
327     return tables.size(tableName);
328   }
329 
330   /**
331    * A proxy class that implements HTableInterface.close method to return the
332    * wrapped table back to the table pool
333    *
334    */
335   class PooledHTable implements HTableInterface {
336 
337     private boolean open = false;
338 
339     private HTableInterface table; // actual table implementation
340 
341     public PooledHTable(HTableInterface table) {
342       this.table = table;
343       this.open = true;
344     }
345 
346     @Override
347     public byte[] getTableName() {
348       checkState();
349       return table.getTableName();
350     }
351 
352     @Override
353     public TableName getName() {
354       return table.getName();
355     }
356 
357     @Override
358     public Configuration getConfiguration() {
359       checkState();
360       return table.getConfiguration();
361     }
362 
363     @Override
364     public HTableDescriptor getTableDescriptor() throws IOException {
365       checkState();
366       return table.getTableDescriptor();
367     }
368 
369     @Override
370     public boolean exists(Get get) throws IOException {
371       checkState();
372       return table.exists(get);
373     }
374 
375     @Override
376     public boolean[] existsAll(List<Get> gets) throws IOException {
377       checkState();
378       return table.existsAll(gets);
379     }
380 
381     @Override
382     public Boolean[] exists(List<Get> gets) throws IOException {
383       checkState();
384       return table.exists(gets);
385     }
386 
387     @Override
388     public void batch(List<? extends Row> actions, Object[] results) throws IOException,
389         InterruptedException {
390       checkState();
391       table.batch(actions, results);
392     }
393 
394     /**
395      * {@inheritDoc}
396      * @deprecated If any exception is thrown by one of the actions, there is no way to
397      * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
398      */
399     @Deprecated
400     @Override
401     public Object[] batch(List<? extends Row> actions) throws IOException,
402         InterruptedException {
403       checkState();
404       return table.batch(actions);
405     }
406 
407     @Override
408     public Result get(Get get) throws IOException {
409       checkState();
410       return table.get(get);
411     }
412 
413     @Override
414     public Result[] get(List<Get> gets) throws IOException {
415       checkState();
416       return table.get(gets);
417     }
418 
419     @Override
420     @SuppressWarnings("deprecation")
421     @Deprecated
422     public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
423       checkState();
424       return table.getRowOrBefore(row, family);
425     }
426 
427     @Override
428     public ResultScanner getScanner(Scan scan) throws IOException {
429       checkState();
430       return table.getScanner(scan);
431     }
432 
433     @Override
434     public ResultScanner getScanner(byte[] family) throws IOException {
435       checkState();
436       return table.getScanner(family);
437     }
438 
439     @Override
440     public ResultScanner getScanner(byte[] family, byte[] qualifier)
441         throws IOException {
442       checkState();
443       return table.getScanner(family, qualifier);
444     }
445 
446     @Override
447     public void put(Put put) throws IOException {
448       checkState();
449       table.put(put);
450     }
451 
452     @Override
453     public void put(List<Put> puts) throws IOException {
454       checkState();
455       table.put(puts);
456     }
457 
458     @Override
459     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
460         byte[] value, Put put) throws IOException {
461       checkState();
462       return table.checkAndPut(row, family, qualifier, value, put);
463     }
464 
465     @Override
466     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
467         CompareOp compareOp, byte[] value, Put put) throws IOException {
468       checkState();
469       return table.checkAndPut(row, family, qualifier, compareOp, value, put);
470     }
471 
472     @Override
473     public void delete(Delete delete) throws IOException {
474       checkState();
475       table.delete(delete);
476     }
477 
478     @Override
479     public void delete(List<Delete> deletes) throws IOException {
480       checkState();
481       table.delete(deletes);
482     }
483 
484     @Override
485     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
486         byte[] value, Delete delete) throws IOException {
487       checkState();
488       return table.checkAndDelete(row, family, qualifier, value, delete);
489     }
490 
491     @Override
492     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
493         CompareOp compareOp, byte[] value, Delete delete) throws IOException {
494       checkState();
495       return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
496     }
497 
498     @Override
499     public Result increment(Increment increment) throws IOException {
500       checkState();
501       return table.increment(increment);
502     }
503 
504     @Override
505     public long incrementColumnValue(byte[] row, byte[] family,
506         byte[] qualifier, long amount) throws IOException {
507       checkState();
508       return table.incrementColumnValue(row, family, qualifier, amount);
509     }
510 
511     @Override
512     public long incrementColumnValue(byte[] row, byte[] family,
513         byte[] qualifier, long amount, Durability durability) throws IOException {
514       checkState();
515       return table.incrementColumnValue(row, family, qualifier, amount,
516           durability);
517     }
518 
519     @Override
520     public boolean isAutoFlush() {
521       checkState();
522       return table.isAutoFlush();
523     }
524 
525     @Override
526     public void flushCommits() throws IOException {
527       checkState();
528       table.flushCommits();
529     }
530 
531     /**
532      * Returns the actual table back to the pool
533      *
534      * @throws IOException
535      */
536     public void close() throws IOException {
537       checkState();
538       open = false;
539       returnTable(table);
540     }
541 
542     @Override
543     public CoprocessorRpcChannel coprocessorService(byte[] row) {
544       checkState();
545       return table.coprocessorService(row);
546     }
547 
548     @Override
549     public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
550         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
551         throws ServiceException, Throwable {
552       checkState();
553       return table.coprocessorService(service, startKey, endKey, callable);
554     }
555 
556     @Override
557     public <T extends Service, R> void coprocessorService(Class<T> service,
558         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
559         throws ServiceException, Throwable {
560       checkState();
561       table.coprocessorService(service, startKey, endKey, callable, callback);
562     }
563 
564     @Override
565     public String toString() {
566       return "PooledHTable{" + ", table=" + table + '}';
567     }
568 
569     /**
570      * Expose the wrapped HTable to tests in the same package
571      *
572      * @return wrapped htable
573      */
574     HTableInterface getWrappedTable() {
575       return table;
576     }
577 
578     @Override
579     public <R> void batchCallback(List<? extends Row> actions,
580         Object[] results, Callback<R> callback) throws IOException,
581         InterruptedException {
582       checkState();
583       table.batchCallback(actions, results, callback);
584     }
585 
586     /**
587      * {@inheritDoc}
588      * @deprecated If any exception is thrown by one of the actions, there is no way to
589      * retrieve the partially executed results. Use
590      * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
591      * instead.
592      */
593     @Deprecated
594     @Override
595     public <R> Object[] batchCallback(List<? extends Row> actions,
596         Callback<R> callback) throws IOException, InterruptedException {
597       checkState();
598       return table.batchCallback(actions,  callback);
599     }
600 
601     @Override
602     public void mutateRow(RowMutations rm) throws IOException {
603       checkState();
604       table.mutateRow(rm);
605     }
606 
607     @Override
608     public Result append(Append append) throws IOException {
609       checkState();
610       return table.append(append);
611     }
612 
613     @Override
614     public void setAutoFlush(boolean autoFlush) {
615       checkState();
616       table.setAutoFlush(autoFlush, autoFlush);
617     }
618 
619     @Override
620     public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
621       checkState();
622       table.setAutoFlush(autoFlush, clearBufferOnFail);
623     }
624 
625     @Override
626     public void setAutoFlushTo(boolean autoFlush) {
627       table.setAutoFlushTo(autoFlush);
628     }
629 
630     @Override
631     public long getWriteBufferSize() {
632       checkState();
633       return table.getWriteBufferSize();
634     }
635 
636     @Override
637     public void setWriteBufferSize(long writeBufferSize) throws IOException {
638       checkState();
639       table.setWriteBufferSize(writeBufferSize);
640     }
641 
642     boolean isOpen() {
643       return open;
644     }
645 
646     private void checkState() {
647       if (!isOpen()) {
648         throw new IllegalStateException("Table=" + new String(table.getTableName())
649                 + " already closed");
650       }
651     }
652 
653     @Override
654     public long incrementColumnValue(byte[] row, byte[] family,
655         byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
656       return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
657     }
658 
659     @Override
660     public <R extends Message> Map<byte[], R> batchCoprocessorService(
661         Descriptors.MethodDescriptor method, Message request,
662         byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
663       checkState();
664       return table.batchCoprocessorService(method, request, startKey, endKey,
665           responsePrototype);
666     }
667 
668     @Override
669     public <R extends Message> void batchCoprocessorService(
670         Descriptors.MethodDescriptor method, Message request,
671         byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
672         throws ServiceException, Throwable {
673       checkState();
674       table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
675     }
676 
677     @Override
678     public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
679         byte[] value, RowMutations mutation) throws IOException {
680       checkState();
681       return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
682     }
683   }
684 }