View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HTableDescriptor;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.Append;
33  import org.apache.hadoop.hbase.client.Delete;
34  import org.apache.hadoop.hbase.client.Durability;
35  import org.apache.hadoop.hbase.client.Get;
36  import org.apache.hadoop.hbase.client.HTableFactory;
37  import org.apache.hadoop.hbase.client.HTableInterface;
38  import org.apache.hadoop.hbase.client.HTableInterfaceFactory;
39  import org.apache.hadoop.hbase.client.Increment;
40  import org.apache.hadoop.hbase.client.Put;
41  import org.apache.hadoop.hbase.client.Result;
42  import org.apache.hadoop.hbase.client.ResultScanner;
43  import org.apache.hadoop.hbase.client.Row;
44  import org.apache.hadoop.hbase.client.RowMutations;
45  import org.apache.hadoop.hbase.client.Scan;
46  import org.apache.hadoop.hbase.client.coprocessor.Batch;
47  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
48  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
49  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.PoolMap;
52  import org.apache.hadoop.hbase.util.PoolMap.PoolType;
53  
54  import com.google.protobuf.Descriptors;
55  import com.google.protobuf.Message;
56  import com.google.protobuf.Service;
57  import com.google.protobuf.ServiceException;
58  
59  /**
60   * A simple pool of HTable instances.
61   *
62   * Each HTablePool acts as a pool for all tables. To use, instantiate an
63   * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
64   *
65   * This method is not needed anymore, clients should call
66   * HTableInterface.close() rather than returning the tables to the pool
67   *
68   * Once you are done with it, close your instance of
69   * {@link org.apache.hadoop.hbase.client.HTableInterface}
70   * by calling {@link org.apache.hadoop.hbase.client.HTableInterface#close()} rather than returning
71   * the tablesto the pool with (deprecated)
72   * {@link #putTable(org.apache.hadoop.hbase.client.HTableInterface)}.
73   *
74   * <p>
75   * A pool can be created with a <i>maxSize</i> which defines the most HTable
76   * references that will ever be retained for each table. Otherwise the default
77   * is {@link Integer#MAX_VALUE}.
78   *
79   * <p>
80   * Pool will manage its own connections to the cluster. See
81   * {@link org.apache.hadoop.hbase.client.HConnectionManager}.
82   * Was @deprecated made @InterfaceAudience.private as of 0.98.1.
83   * See {@link org.apache.hadoop.hbase.client.HConnection#getTable(String)},
84   * Moved to thrift2 module for 2.0
85   */
86  @InterfaceAudience.Private
87  public class HTablePool implements Closeable {
88    private final PoolMap<String, HTableInterface> tables;
89    private final int maxSize;
90    private final PoolType poolType;
91    private final Configuration config;
92    private final HTableInterfaceFactory tableFactory;
93  
94    /**
95     * Default Constructor. Default HBaseConfiguration and no limit on pool size.
96     */
97    public HTablePool() {
98      this(HBaseConfiguration.create(), Integer.MAX_VALUE);
99    }
100 
101   /**
102    * Constructor to set maximum versions and use the specified configuration.
103    *
104    * @param config
105    *          configuration
106    * @param maxSize
107    *          maximum number of references to keep for each table
108    */
109   public HTablePool(final Configuration config, final int maxSize) {
110     this(config, maxSize, null, null);
111   }
112 
113   /**
114    * Constructor to set maximum versions and use the specified configuration and
115    * table factory.
116    *
117    * @param config
118    *          configuration
119    * @param maxSize
120    *          maximum number of references to keep for each table
121    * @param tableFactory
122    *          table factory
123    */
124   public HTablePool(final Configuration config, final int maxSize,
125       final HTableInterfaceFactory tableFactory) {
126     this(config, maxSize, tableFactory, PoolType.Reusable);
127   }
128 
129   /**
130    * Constructor to set maximum versions and use the specified configuration and
131    * pool type.
132    *
133    * @param config
134    *          configuration
135    * @param maxSize
136    *          maximum number of references to keep for each table
137    * @param poolType
138    *          pool type which is one of {@link PoolType#Reusable} or
139    *          {@link PoolType#ThreadLocal}
140    */
141   public HTablePool(final Configuration config, final int maxSize,
142       final PoolType poolType) {
143     this(config, maxSize, null, poolType);
144   }
145 
146   /**
147    * Constructor to set maximum versions and use the specified configuration,
148    * table factory and pool type. The HTablePool supports the
149    * {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool
150    * type is null or not one of those two values, then it will default to
151    * {@link PoolType#Reusable}.
152    *
153    * @param config
154    *          configuration
155    * @param maxSize
156    *          maximum number of references to keep for each table
157    * @param tableFactory
158    *          table factory
159    * @param poolType
160    *          pool type which is one of {@link PoolType#Reusable} or
161    *          {@link PoolType#ThreadLocal}
162    */
163   public HTablePool(final Configuration config, final int maxSize,
164       final HTableInterfaceFactory tableFactory, PoolType poolType) {
165     // Make a new configuration instance so I can safely cleanup when
166     // done with the pool.
167     this.config = config == null ? HBaseConfiguration.create() : config;
168     this.maxSize = maxSize;
169     this.tableFactory = tableFactory == null ? new HTableFactory()
170         : tableFactory;
171     if (poolType == null) {
172       this.poolType = PoolType.Reusable;
173     } else {
174       switch (poolType) {
175       case Reusable:
176       case ThreadLocal:
177         this.poolType = poolType;
178         break;
179       default:
180         this.poolType = PoolType.Reusable;
181         break;
182       }
183     }
184     this.tables = new PoolMap<String, HTableInterface>(this.poolType,
185         this.maxSize);
186   }
187 
188   /**
189    * Get a reference to the specified table from the pool.
190    * <p>
191    * <p/>
192    *
193    * @param tableName
194    *          table name
195    * @return a reference to the specified table
196    * @throws RuntimeException
197    *           if there is a problem instantiating the HTable
198    */
199   public HTableInterface getTable(String tableName) {
200     // call the old getTable implementation renamed to findOrCreateTable
201     HTableInterface table = findOrCreateTable(tableName);
202     // return a proxy table so when user closes the proxy, the actual table
203     // will be returned to the pool
204     return new PooledHTable(table);
205   }
206 
207   /**
208    * Get a reference to the specified table from the pool.
209    * <p>
210    *
211    * Create a new one if one is not available.
212    *
213    * @param tableName
214    *          table name
215    * @return a reference to the specified table
216    * @throws RuntimeException
217    *           if there is a problem instantiating the HTable
218    */
219   private HTableInterface findOrCreateTable(String tableName) {
220     HTableInterface table = tables.get(tableName);
221     if (table == null) {
222       table = createHTable(tableName);
223     }
224     return table;
225   }
226 
227   /**
228    * Get a reference to the specified table from the pool.
229    * <p>
230    *
231    * Create a new one if one is not available.
232    *
233    * @param tableName
234    *          table name
235    * @return a reference to the specified table
236    * @throws RuntimeException if there is a problem instantiating the HTable
237    */
238   public HTableInterface getTable(byte[] tableName) {
239     return getTable(Bytes.toString(tableName));
240   }
241 
242   /**
243    * This method is not needed anymore, clients should call
244    * HTableInterface.close() rather than returning the tables to the pool
245    *
246    * @param table
247    *          the proxy table user got from pool
248    * @deprecated
249    */
250   @Deprecated
251   public void putTable(HTableInterface table) throws IOException {
252     // we need to be sure nobody puts a proxy implementation in the pool
253     // but if the client code is not updated
254     // and it will continue to call putTable() instead of calling close()
255     // then we need to return the wrapped table to the pool instead of the
256     // proxy
257     // table
258     if (table instanceof PooledHTable) {
259       returnTable(((PooledHTable) table).getWrappedTable());
260     } else {
261       // normally this should not happen if clients pass back the same
262       // table
263       // object they got from the pool
264       // but if it happens then it's better to reject it
265       throw new IllegalArgumentException("not a pooled table: " + table);
266     }
267   }
268 
269   /**
270    * Puts the specified HTable back into the pool.
271    * <p>
272    *
273    * If the pool already contains <i>maxSize</i> references to the table, then
274    * the table instance gets closed after flushing buffered edits.
275    *
276    * @param table
277    *          table
278    */
279   private void returnTable(HTableInterface table) throws IOException {
280     // this is the old putTable method renamed and made private
281     String tableName = Bytes.toString(table.getTableName());
282     if (tables.size(tableName) >= maxSize) {
283       // release table instance since we're not reusing it
284       this.tables.removeValue(tableName, table);
285       this.tableFactory.releaseHTableInterface(table);
286       return;
287     }
288     tables.put(tableName, table);
289   }
290 
291   protected HTableInterface createHTable(String tableName) {
292     return this.tableFactory.createHTableInterface(config,
293         Bytes.toBytes(tableName));
294   }
295 
296   /**
297    * Closes all the HTable instances , belonging to the given table, in the
298    * table pool.
299    * <p>
300    * Note: this is a 'shutdown' of the given table pool and different from
301    * {@link #putTable(HTableInterface)}, that is used to return the table
302    * instance to the pool for future re-use.
303    *
304    * @param tableName
305    */
306   public void closeTablePool(final String tableName) throws IOException {
307     Collection<HTableInterface> tables = this.tables.values(tableName);
308     if (tables != null) {
309       for (HTableInterface table : tables) {
310         this.tableFactory.releaseHTableInterface(table);
311       }
312     }
313     this.tables.remove(tableName);
314   }
315 
316   /**
317    * See {@link #closeTablePool(String)}.
318    *
319    * @param tableName
320    */
321   public void closeTablePool(final byte[] tableName) throws IOException {
322     closeTablePool(Bytes.toString(tableName));
323   }
324 
325   /**
326    * Closes all the HTable instances , belonging to all tables in the table
327    * pool.
328    * <p>
329    * Note: this is a 'shutdown' of all the table pools.
330    */
331   public void close() throws IOException {
332     for (String tableName : tables.keySet()) {
333       closeTablePool(tableName);
334     }
335     this.tables.clear();
336   }
337 
338   public int getCurrentPoolSize(String tableName) {
339     return tables.size(tableName);
340   }
341 
342   /**
343    * A proxy class that implements HTableInterface.close method to return the
344    * wrapped table back to the table pool
345    *
346    */
347   class PooledHTable implements HTableInterface {
348 
349     private boolean open = false;
350 
351     private HTableInterface table; // actual table implementation
352 
353     public PooledHTable(HTableInterface table) {
354       this.table = table;
355       this.open = true;
356     }
357 
358     @Override
359     public byte[] getTableName() {
360       checkState();
361       return table.getTableName();
362     }
363 
364     @Override
365     public TableName getName() {
366       return table.getName();
367     }
368 
369     @Override
370     public Configuration getConfiguration() {
371       checkState();
372       return table.getConfiguration();
373     }
374 
375     @Override
376     public HTableDescriptor getTableDescriptor() throws IOException {
377       checkState();
378       return table.getTableDescriptor();
379     }
380 
381     @Override
382     public boolean exists(Get get) throws IOException {
383       checkState();
384       return table.exists(get);
385     }
386 
387     @Override
388     public boolean[] existsAll(List<Get> gets) throws IOException {
389       checkState();
390       return table.existsAll(gets);
391     }
392 
393     @Override
394     public Boolean[] exists(List<Get> gets) throws IOException {
395       checkState();
396       return table.exists(gets);
397     }
398 
399     @Override
400     public void batch(List<? extends Row> actions, Object[] results) throws IOException,
401         InterruptedException {
402       checkState();
403       table.batch(actions, results);
404     }
405 
406     /**
407      * {@inheritDoc}
408      * @deprecated If any exception is thrown by one of the actions, there is no way to
409      * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
410      */
411     @Deprecated
412     @Override
413     public Object[] batch(List<? extends Row> actions) throws IOException,
414         InterruptedException {
415       checkState();
416       return table.batch(actions);
417     }
418 
419     @Override
420     public Result get(Get get) throws IOException {
421       checkState();
422       return table.get(get);
423     }
424 
425     @Override
426     public Result[] get(List<Get> gets) throws IOException {
427       checkState();
428       return table.get(gets);
429     }
430 
431     @Override
432     @SuppressWarnings("deprecation")
433     @Deprecated
434     public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
435       checkState();
436       return table.getRowOrBefore(row, family);
437     }
438 
439     @Override
440     public ResultScanner getScanner(Scan scan) throws IOException {
441       checkState();
442       return table.getScanner(scan);
443     }
444 
445     @Override
446     public ResultScanner getScanner(byte[] family) throws IOException {
447       checkState();
448       return table.getScanner(family);
449     }
450 
451     @Override
452     public ResultScanner getScanner(byte[] family, byte[] qualifier)
453         throws IOException {
454       checkState();
455       return table.getScanner(family, qualifier);
456     }
457 
458     @Override
459     public void put(Put put) throws IOException {
460       checkState();
461       table.put(put);
462     }
463 
464     @Override
465     public void put(List<Put> puts) throws IOException {
466       checkState();
467       table.put(puts);
468     }
469 
470     @Override
471     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
472         byte[] value, Put put) throws IOException {
473       checkState();
474       return table.checkAndPut(row, family, qualifier, value, put);
475     }
476 
477     @Override
478     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
479         CompareOp compareOp, byte[] value, Put put) throws IOException {
480       checkState();
481       return table.checkAndPut(row, family, qualifier, compareOp, value, put);
482     }
483 
484     @Override
485     public void delete(Delete delete) throws IOException {
486       checkState();
487       table.delete(delete);
488     }
489 
490     @Override
491     public void delete(List<Delete> deletes) throws IOException {
492       checkState();
493       table.delete(deletes);
494     }
495 
496     @Override
497     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
498         byte[] value, Delete delete) throws IOException {
499       checkState();
500       return table.checkAndDelete(row, family, qualifier, value, delete);
501     }
502 
503     @Override
504     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
505         CompareOp compareOp, byte[] value, Delete delete) throws IOException {
506       checkState();
507       return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
508     }
509 
510     @Override
511     public Result increment(Increment increment) throws IOException {
512       checkState();
513       return table.increment(increment);
514     }
515 
516     @Override
517     public long incrementColumnValue(byte[] row, byte[] family,
518         byte[] qualifier, long amount) throws IOException {
519       checkState();
520       return table.incrementColumnValue(row, family, qualifier, amount);
521     }
522 
523     @Override
524     public long incrementColumnValue(byte[] row, byte[] family,
525         byte[] qualifier, long amount, Durability durability) throws IOException {
526       checkState();
527       return table.incrementColumnValue(row, family, qualifier, amount,
528           durability);
529     }
530 
531     @Override
532     public boolean isAutoFlush() {
533       checkState();
534       return table.isAutoFlush();
535     }
536 
537     @Override
538     public void flushCommits() throws IOException {
539       checkState();
540       table.flushCommits();
541     }
542 
543     /**
544      * Returns the actual table back to the pool
545      *
546      * @throws IOException
547      */
548     public void close() throws IOException {
549       checkState();
550       open = false;
551       returnTable(table);
552     }
553 
554     @Override
555     public CoprocessorRpcChannel coprocessorService(byte[] row) {
556       checkState();
557       return table.coprocessorService(row);
558     }
559 
560     @Override
561     public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
562         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
563         throws ServiceException, Throwable {
564       checkState();
565       return table.coprocessorService(service, startKey, endKey, callable);
566     }
567 
568     @Override
569     public <T extends Service, R> void coprocessorService(Class<T> service,
570         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
571         throws ServiceException, Throwable {
572       checkState();
573       table.coprocessorService(service, startKey, endKey, callable, callback);
574     }
575 
576     @Override
577     public String toString() {
578       return "PooledHTable{" + ", table=" + table + '}';
579     }
580 
581     /**
582      * Expose the wrapped HTable to tests in the same package
583      *
584      * @return wrapped htable
585      */
586     HTableInterface getWrappedTable() {
587       return table;
588     }
589 
590     @Override
591     public <R> void batchCallback(List<? extends Row> actions,
592         Object[] results, Callback<R> callback) throws IOException,
593         InterruptedException {
594       checkState();
595       table.batchCallback(actions, results, callback);
596     }
597 
598     /**
599      * {@inheritDoc}
600      * @deprecated If any exception is thrown by one of the actions, there is no way to
601      * retrieve the partially executed results. Use
602      * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
603      * instead.
604      */
605     @Deprecated
606     @Override
607     public <R> Object[] batchCallback(List<? extends Row> actions,
608         Callback<R> callback) throws IOException, InterruptedException {
609       checkState();
610       return table.batchCallback(actions,  callback);
611     }
612 
613     @Override
614     public void mutateRow(RowMutations rm) throws IOException {
615       checkState();
616       table.mutateRow(rm);
617     }
618 
619     @Override
620     public Result append(Append append) throws IOException {
621       checkState();
622       return table.append(append);
623     }
624 
625     @Override
626     public void setAutoFlush(boolean autoFlush) {
627       checkState();
628       table.setAutoFlush(autoFlush, autoFlush);
629     }
630 
631     @Override
632     public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
633       checkState();
634       table.setAutoFlush(autoFlush, clearBufferOnFail);
635     }
636 
637     @Override
638     public void setAutoFlushTo(boolean autoFlush) {
639       table.setAutoFlushTo(autoFlush);
640     }
641 
642     @Override
643     public long getWriteBufferSize() {
644       checkState();
645       return table.getWriteBufferSize();
646     }
647 
648     @Override
649     public void setWriteBufferSize(long writeBufferSize) throws IOException {
650       checkState();
651       table.setWriteBufferSize(writeBufferSize);
652     }
653 
654     boolean isOpen() {
655       return open;
656     }
657 
658     private void checkState() {
659       if (!isOpen()) {
660         throw new IllegalStateException("Table=" + table.getName()
661                 + " already closed");
662       }
663     }
664 
665     @Override
666     public long incrementColumnValue(byte[] row, byte[] family,
667         byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
668       return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
669     }
670 
671     @Override
672     public <R extends Message> Map<byte[], R> batchCoprocessorService(
673         Descriptors.MethodDescriptor method, Message request,
674         byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
675       checkState();
676       return table.batchCoprocessorService(method, request, startKey, endKey,
677           responsePrototype);
678     }
679 
680     @Override
681     public <R extends Message> void batchCoprocessorService(
682         Descriptors.MethodDescriptor method, Message request,
683         byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
684         throws ServiceException, Throwable {
685       checkState();
686       table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
687     }
688 
689     @Override
690     public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
691         byte[] value, RowMutations mutation) throws IOException {
692       checkState();
693       return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
694     }
695   }
696 }