View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.thrift2;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.hadoop.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HTableDescriptor;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.*;
33  import org.apache.hadoop.hbase.client.coprocessor.Batch;
34  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
35  import org.apache.hadoop.hbase.filter.BinaryComparator;
36  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
37  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.hbase.util.PoolMap;
40  import org.apache.hadoop.hbase.util.PoolMap.PoolType;
41  
42  import com.google.protobuf.Descriptors;
43  import com.google.protobuf.Message;
44  import com.google.protobuf.Service;
45  import com.google.protobuf.ServiceException;
46  
47  /**
48   * A simple pool of HTable instances.
49   *
50   * Each HTablePool acts as a pool for all tables. To use, instantiate an
51   * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
52   *
53   * This method is not needed anymore, clients should call
54   * HTableInterface.close() rather than returning the tables to the pool
55   *
56   * Once you are done with it, close your instance of
57   * {@link org.apache.hadoop.hbase.client.HTableInterface}
58   * by calling {@link org.apache.hadoop.hbase.client.HTableInterface#close()} rather than returning
59   * the tablesto the pool with (deprecated)
60   * {@link #putTable(org.apache.hadoop.hbase.client.HTableInterface)}.
61   *
62   * <p>
63   * A pool can be created with a <i>maxSize</i> which defines the most HTable
64   * references that will ever be retained for each table. Otherwise the default
65   * is {@link Integer#MAX_VALUE}.
66   *
67   * <p>
68   * Pool will manage its own connections to the cluster. See
69   * {@link org.apache.hadoop.hbase.client.HConnectionManager}.
70   * Was @deprecated made @InterfaceAudience.private as of 0.98.1.
71   * See {@link org.apache.hadoop.hbase.client.HConnection#getTable(String)},
72   * Moved to thrift2 module for 2.0
73   */
74  @InterfaceAudience.Private
75  public class HTablePool implements Closeable {
76    private final PoolMap<String, HTableInterface> tables;
77    private final int maxSize;
78    private final PoolType poolType;
79    private final Configuration config;
80    private final HTableInterfaceFactory tableFactory;
81  
82    /**
83     * Default Constructor. Default HBaseConfiguration and no limit on pool size.
84     */
85    public HTablePool() {
86      this(HBaseConfiguration.create(), Integer.MAX_VALUE);
87    }
88  
89    /**
90     * Constructor to set maximum versions and use the specified configuration.
91     *
92     * @param config
93     *          configuration
94     * @param maxSize
95     *          maximum number of references to keep for each table
96     */
97    public HTablePool(final Configuration config, final int maxSize) {
98      this(config, maxSize, null, null);
99    }
100 
101   /**
102    * Constructor to set maximum versions and use the specified configuration and
103    * table factory.
104    *
105    * @param config
106    *          configuration
107    * @param maxSize
108    *          maximum number of references to keep for each table
109    * @param tableFactory
110    *          table factory
111    */
112   public HTablePool(final Configuration config, final int maxSize,
113       final HTableInterfaceFactory tableFactory) {
114     this(config, maxSize, tableFactory, PoolType.Reusable);
115   }
116 
117   /**
118    * Constructor to set maximum versions and use the specified configuration and
119    * pool type.
120    *
121    * @param config
122    *          configuration
123    * @param maxSize
124    *          maximum number of references to keep for each table
125    * @param poolType
126    *          pool type which is one of {@link PoolType#Reusable} or
127    *          {@link PoolType#ThreadLocal}
128    */
129   public HTablePool(final Configuration config, final int maxSize,
130       final PoolType poolType) {
131     this(config, maxSize, null, poolType);
132   }
133 
134   /**
135    * Constructor to set maximum versions and use the specified configuration,
136    * table factory and pool type. The HTablePool supports the
137    * {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool
138    * type is null or not one of those two values, then it will default to
139    * {@link PoolType#Reusable}.
140    *
141    * @param config
142    *          configuration
143    * @param maxSize
144    *          maximum number of references to keep for each table
145    * @param tableFactory
146    *          table factory
147    * @param poolType
148    *          pool type which is one of {@link PoolType#Reusable} or
149    *          {@link PoolType#ThreadLocal}
150    */
151   public HTablePool(final Configuration config, final int maxSize,
152       final HTableInterfaceFactory tableFactory, PoolType poolType) {
153     // Make a new configuration instance so I can safely cleanup when
154     // done with the pool.
155     this.config = config == null ? HBaseConfiguration.create() : config;
156     this.maxSize = maxSize;
157     this.tableFactory = tableFactory == null ? new HTableFactory()
158         : tableFactory;
159     if (poolType == null) {
160       this.poolType = PoolType.Reusable;
161     } else {
162       switch (poolType) {
163       case Reusable:
164       case ThreadLocal:
165         this.poolType = poolType;
166         break;
167       default:
168         this.poolType = PoolType.Reusable;
169         break;
170       }
171     }
172     this.tables = new PoolMap<String, HTableInterface>(this.poolType,
173         this.maxSize);
174   }
175 
176   /**
177    * Get a reference to the specified table from the pool.
178    * <p>
179    * <p/>
180    *
181    * @param tableName
182    *          table name
183    * @return a reference to the specified table
184    * @throws RuntimeException
185    *           if there is a problem instantiating the HTable
186    */
187   public HTableInterface getTable(String tableName) {
188     // call the old getTable implementation renamed to findOrCreateTable
189     HTableInterface table = findOrCreateTable(tableName);
190     // return a proxy table so when user closes the proxy, the actual table
191     // will be returned to the pool
192     return new PooledHTable(table);
193   }
194 
195   /**
196    * Get a reference to the specified table from the pool.
197    * <p>
198    *
199    * Create a new one if one is not available.
200    *
201    * @param tableName
202    *          table name
203    * @return a reference to the specified table
204    * @throws RuntimeException
205    *           if there is a problem instantiating the HTable
206    */
207   private HTableInterface findOrCreateTable(String tableName) {
208     HTableInterface table = tables.get(tableName);
209     if (table == null) {
210       table = createHTable(tableName);
211     }
212     return table;
213   }
214 
215   /**
216    * Get a reference to the specified table from the pool.
217    * <p>
218    *
219    * Create a new one if one is not available.
220    *
221    * @param tableName
222    *          table name
223    * @return a reference to the specified table
224    * @throws RuntimeException
225    *           if there is a problem instantiating the HTable
226    */
227   public HTableInterface getTable(byte[] tableName) {
228     return getTable(Bytes.toString(tableName));
229   }
230 
231   /**
232    * This method is not needed anymore, clients should call
233    * HTableInterface.close() rather than returning the tables to the pool
234    *
235    * @param table
236    *          the proxy table user got from pool
237    * @deprecated
238    */
239   public void putTable(HTableInterface table) throws IOException {
240     // we need to be sure nobody puts a proxy implementation in the pool
241     // but if the client code is not updated
242     // and it will continue to call putTable() instead of calling close()
243     // then we need to return the wrapped table to the pool instead of the
244     // proxy
245     // table
246     if (table instanceof PooledHTable) {
247       returnTable(((PooledHTable) table).getWrappedTable());
248     } else {
249       // normally this should not happen if clients pass back the same
250       // table
251       // object they got from the pool
252       // but if it happens then it's better to reject it
253       throw new IllegalArgumentException("not a pooled table: " + table);
254     }
255   }
256 
257   /**
258    * Puts the specified HTable back into the pool.
259    * <p>
260    *
261    * If the pool already contains <i>maxSize</i> references to the table, then
262    * the table instance gets closed after flushing buffered edits.
263    *
264    * @param table
265    *          table
266    */
267   private void returnTable(HTableInterface table) throws IOException {
268     // this is the old putTable method renamed and made private
269     String tableName = Bytes.toString(table.getTableName());
270     if (tables.size(tableName) >= maxSize) {
271       // release table instance since we're not reusing it
272       this.tables.removeValue(tableName, table);
273       this.tableFactory.releaseHTableInterface(table);
274       return;
275     }
276     tables.put(tableName, table);
277   }
278 
279   protected HTableInterface createHTable(String tableName) {
280     return this.tableFactory.createHTableInterface(config,
281         Bytes.toBytes(tableName));
282   }
283 
284   /**
285    * Closes all the HTable instances , belonging to the given table, in the
286    * table pool.
287    * <p>
288    * Note: this is a 'shutdown' of the given table pool and different from
289    * {@link #putTable(HTableInterface)}, that is used to return the table
290    * instance to the pool for future re-use.
291    *
292    * @param tableName
293    */
294   public void closeTablePool(final String tableName) throws IOException {
295     Collection<HTableInterface> tables = this.tables.values(tableName);
296     if (tables != null) {
297       for (HTableInterface table : tables) {
298         this.tableFactory.releaseHTableInterface(table);
299       }
300     }
301     this.tables.remove(tableName);
302   }
303 
304   /**
305    * See {@link #closeTablePool(String)}.
306    *
307    * @param tableName
308    */
309   public void closeTablePool(final byte[] tableName) throws IOException {
310     closeTablePool(Bytes.toString(tableName));
311   }
312 
313   /**
314    * Closes all the HTable instances , belonging to all tables in the table
315    * pool.
316    * <p>
317    * Note: this is a 'shutdown' of all the table pools.
318    */
319   public void close() throws IOException {
320     for (String tableName : tables.keySet()) {
321       closeTablePool(tableName);
322     }
323     this.tables.clear();
324   }
325 
326   public int getCurrentPoolSize(String tableName) {
327     return tables.size(tableName);
328   }
329 
330   /**
331    * A proxy class that implements HTableInterface.close method to return the
332    * wrapped table back to the table pool
333    *
334    */
335   class PooledHTable implements HTableInterface {
336 
337     private boolean open = false;
338 
339     private HTableInterface table; // actual table implementation
340 
341     public PooledHTable(HTableInterface table) {
342       this.table = table;
343       this.open = true;
344     }
345 
346     @Override
347     public byte[] getTableName() {
348       checkState();
349       return table.getTableName();
350     }
351 
352     @Override
353     public TableName getName() {
354       return table.getName();
355     }
356 
357     @Override
358     public Configuration getConfiguration() {
359       checkState();
360       return table.getConfiguration();
361     }
362 
363     @Override
364     public HTableDescriptor getTableDescriptor() throws IOException {
365       checkState();
366       return table.getTableDescriptor();
367     }
368 
369     @Override
370     public boolean exists(Get get) throws IOException {
371       checkState();
372       return table.exists(get);
373     }
374 
375     @Override
376     public boolean[] existsAll(List<Get> gets) throws IOException {
377       checkState();
378       return table.existsAll(gets);
379     }
380 
381     @Override
382     public Boolean[] exists(List<Get> gets) throws IOException {
383       checkState();
384       return table.exists(gets);
385     }
386 
387     @Override
388     public void batch(List<? extends Row> actions, Object[] results) throws IOException,
389         InterruptedException {
390       checkState();
391       table.batch(actions, results);
392     }
393 
394     /**
395      * {@inheritDoc}
396      * @deprecated If any exception is thrown by one of the actions, there is no way to
397      * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
398      */
399     @Override
400     public Object[] batch(List<? extends Row> actions) throws IOException,
401         InterruptedException {
402       checkState();
403       return table.batch(actions);
404     }
405 
406     @Override
407     public Result get(Get get) throws IOException {
408       checkState();
409       return table.get(get);
410     }
411 
412     @Override
413     public Result[] get(List<Get> gets) throws IOException {
414       checkState();
415       return table.get(gets);
416     }
417 
418     @Override
419     @SuppressWarnings("deprecation")
420     @Deprecated
421     public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
422       checkState();
423       return table.getRowOrBefore(row, family);
424     }
425 
426     @Override
427     public ResultScanner getScanner(Scan scan) throws IOException {
428       checkState();
429       return table.getScanner(scan);
430     }
431 
432     @Override
433     public ResultScanner getScanner(byte[] family) throws IOException {
434       checkState();
435       return table.getScanner(family);
436     }
437 
438     @Override
439     public ResultScanner getScanner(byte[] family, byte[] qualifier)
440         throws IOException {
441       checkState();
442       return table.getScanner(family, qualifier);
443     }
444 
445     @Override
446     public void put(Put put) throws IOException {
447       checkState();
448       table.put(put);
449     }
450 
451     @Override
452     public void put(List<Put> puts) throws IOException {
453       checkState();
454       table.put(puts);
455     }
456 
457     @Override
458     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
459         byte[] value, Put put) throws IOException {
460       checkState();
461       return table.checkAndPut(row, family, qualifier, value, put);
462     }
463 
464     @Override
465     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
466         CompareOp compareOp, byte[] value, Put put) throws IOException {
467       checkState();
468       return table.checkAndPut(row, family, qualifier, compareOp, value, put);
469     }
470 
471     @Override
472     public void delete(Delete delete) throws IOException {
473       checkState();
474       table.delete(delete);
475     }
476 
477     @Override
478     public void delete(List<Delete> deletes) throws IOException {
479       checkState();
480       table.delete(deletes);
481     }
482 
483     @Override
484     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
485         byte[] value, Delete delete) throws IOException {
486       checkState();
487       return table.checkAndDelete(row, family, qualifier, value, delete);
488     }
489 
490     @Override
491     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
492         CompareOp compareOp, byte[] value, Delete delete) throws IOException {
493       checkState();
494       return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
495     }
496 
497     @Override
498     public Result increment(Increment increment) throws IOException {
499       checkState();
500       return table.increment(increment);
501     }
502 
503     @Override
504     public long incrementColumnValue(byte[] row, byte[] family,
505         byte[] qualifier, long amount) throws IOException {
506       checkState();
507       return table.incrementColumnValue(row, family, qualifier, amount);
508     }
509 
510     @Override
511     public long incrementColumnValue(byte[] row, byte[] family,
512         byte[] qualifier, long amount, Durability durability) throws IOException {
513       checkState();
514       return table.incrementColumnValue(row, family, qualifier, amount,
515           durability);
516     }
517 
518     @Override
519     public boolean isAutoFlush() {
520       checkState();
521       return table.isAutoFlush();
522     }
523 
524     @Override
525     public void flushCommits() throws IOException {
526       checkState();
527       table.flushCommits();
528     }
529 
530     /**
531      * Returns the actual table back to the pool
532      *
533      * @throws IOException
534      */
535     public void close() throws IOException {
536       checkState();
537       open = false;
538       returnTable(table);
539     }
540 
541     @Override
542     public CoprocessorRpcChannel coprocessorService(byte[] row) {
543       checkState();
544       return table.coprocessorService(row);
545     }
546 
547     @Override
548     public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
549         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
550         throws ServiceException, Throwable {
551       checkState();
552       return table.coprocessorService(service, startKey, endKey, callable);
553     }
554 
555     @Override
556     public <T extends Service, R> void coprocessorService(Class<T> service,
557         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
558         throws ServiceException, Throwable {
559       checkState();
560       table.coprocessorService(service, startKey, endKey, callable, callback);
561     }
562 
563     @Override
564     public String toString() {
565       return "PooledHTable{" + ", table=" + table + '}';
566     }
567 
568     /**
569      * Expose the wrapped HTable to tests in the same package
570      *
571      * @return wrapped htable
572      */
573     HTableInterface getWrappedTable() {
574       return table;
575     }
576 
577     @Override
578     public <R> void batchCallback(List<? extends Row> actions,
579         Object[] results, Callback<R> callback) throws IOException,
580         InterruptedException {
581       checkState();
582       table.batchCallback(actions, results, callback);
583     }
584 
585     /**
586      * {@inheritDoc}
587      * @deprecated If any exception is thrown by one of the actions, there is no way to
588      * retrieve the partially executed results. Use
589      * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
590      * instead.
591      */
592     @Override
593     public <R> Object[] batchCallback(List<? extends Row> actions,
594         Callback<R> callback) throws IOException, InterruptedException {
595       checkState();
596       return table.batchCallback(actions,  callback);
597     }
598 
599     @Override
600     public void mutateRow(RowMutations rm) throws IOException {
601       checkState();
602       table.mutateRow(rm);
603     }
604 
605     @Override
606     public Result append(Append append) throws IOException {
607       checkState();
608       return table.append(append);
609     }
610 
611     @Override
612     public void setAutoFlush(boolean autoFlush) {
613       checkState();
614       table.setAutoFlush(autoFlush, autoFlush);
615     }
616 
617     @Override
618     public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
619       checkState();
620       table.setAutoFlush(autoFlush, clearBufferOnFail);
621     }
622 
623     @Override
624     public void setAutoFlushTo(boolean autoFlush) {
625       table.setAutoFlushTo(autoFlush);
626     }
627 
628     @Override
629     public long getWriteBufferSize() {
630       checkState();
631       return table.getWriteBufferSize();
632     }
633 
634     @Override
635     public void setWriteBufferSize(long writeBufferSize) throws IOException {
636       checkState();
637       table.setWriteBufferSize(writeBufferSize);
638     }
639 
640     boolean isOpen() {
641       return open;
642     }
643 
644     private void checkState() {
645       if (!isOpen()) {
646         throw new IllegalStateException("Table=" + new String(table.getTableName())
647                 + " already closed");
648       }
649     }
650 
651     @Override
652     public long incrementColumnValue(byte[] row, byte[] family,
653         byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
654       return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
655     }
656 
657     @Override
658     public <R extends Message> Map<byte[], R> batchCoprocessorService(
659         Descriptors.MethodDescriptor method, Message request,
660         byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
661       checkState();
662       return table.batchCoprocessorService(method, request, startKey, endKey,
663           responsePrototype);
664     }
665 
666     @Override
667     public <R extends Message> void batchCoprocessorService(
668         Descriptors.MethodDescriptor method, Message request,
669         byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
670         throws ServiceException, Throwable {
671       checkState();
672       table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
673     }
674   }
675 }