View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.hadoop.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HTableDescriptor;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.coprocessor.Batch;
33  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
34  import org.apache.hadoop.hbase.filter.BinaryComparator;
35  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
36  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.util.PoolMap;
39  import org.apache.hadoop.hbase.util.PoolMap.PoolType;
40  
41  import com.google.protobuf.Descriptors;
42  import com.google.protobuf.Message;
43  import com.google.protobuf.Service;
44  import com.google.protobuf.ServiceException;
45  
46  /**
47   * A simple pool of HTable instances.
48   *
49   * Each HTablePool acts as a pool for all tables. To use, instantiate an
50   * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
51   *
52     * This method is not needed anymore, clients should call
53     * HTableInterface.close() rather than returning the tables to the pool
54     *
55   * Once you are done with it, close your instance of {@link HTableInterface}
56   * by calling {@link HTableInterface#close()} rather than returning the tables
57   * to the pool with (deprecated) {@link #putTable(HTableInterface)}.
58   *
59   * <p>
60   * A pool can be created with a <i>maxSize</i> which defines the most HTable
61   * references that will ever be retained for each table. Otherwise the default
62   * is {@link Integer#MAX_VALUE}.
63   *
64   * <p>
65   * Pool will manage its own connections to the cluster. See
66   * {@link HConnectionManager}.
67   * @deprecated as of 0.98.1. See {@link HConnection#getTable(String)}.
68   */
69  @InterfaceAudience.Private
70  @Deprecated
71  public class HTablePool implements Closeable {
72    private final PoolMap<String, HTableInterface> tables;
73    private final int maxSize;
74    private final PoolType poolType;
75    private final Configuration config;
76    private final HTableInterfaceFactory tableFactory;
77  
78    /**
79     * Default Constructor. Default HBaseConfiguration and no limit on pool size.
80     */
81    public HTablePool() {
82      this(HBaseConfiguration.create(), Integer.MAX_VALUE);
83    }
84  
85    /**
86     * Constructor to set maximum versions and use the specified configuration.
87     *
88     * @param config
89     *          configuration
90     * @param maxSize
91     *          maximum number of references to keep for each table
92     */
93    public HTablePool(final Configuration config, final int maxSize) {
94      this(config, maxSize, null, null);
95    }
96  
97    /**
98     * Constructor to set maximum versions and use the specified configuration and
99     * table factory.
100    *
101    * @param config
102    *          configuration
103    * @param maxSize
104    *          maximum number of references to keep for each table
105    * @param tableFactory
106    *          table factory
107    */
108   public HTablePool(final Configuration config, final int maxSize,
109       final HTableInterfaceFactory tableFactory) {
110     this(config, maxSize, tableFactory, PoolType.Reusable);
111   }
112 
113   /**
114    * Constructor to set maximum versions and use the specified configuration and
115    * pool type.
116    *
117    * @param config
118    *          configuration
119    * @param maxSize
120    *          maximum number of references to keep for each table
121    * @param poolType
122    *          pool type which is one of {@link PoolType#Reusable} or
123    *          {@link PoolType#ThreadLocal}
124    */
125   public HTablePool(final Configuration config, final int maxSize,
126       final PoolType poolType) {
127     this(config, maxSize, null, poolType);
128   }
129 
130   /**
131    * Constructor to set maximum versions and use the specified configuration,
132    * table factory and pool type. The HTablePool supports the
133    * {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool
134    * type is null or not one of those two values, then it will default to
135    * {@link PoolType#Reusable}.
136    *
137    * @param config
138    *          configuration
139    * @param maxSize
140    *          maximum number of references to keep for each table
141    * @param tableFactory
142    *          table factory
143    * @param poolType
144    *          pool type which is one of {@link PoolType#Reusable} or
145    *          {@link PoolType#ThreadLocal}
146    */
147   public HTablePool(final Configuration config, final int maxSize,
148       final HTableInterfaceFactory tableFactory, PoolType poolType) {
149     // Make a new configuration instance so I can safely cleanup when
150     // done with the pool.
151     this.config = config == null ? HBaseConfiguration.create() : config;
152     this.maxSize = maxSize;
153     this.tableFactory = tableFactory == null ? new HTableFactory()
154         : tableFactory;
155     if (poolType == null) {
156       this.poolType = PoolType.Reusable;
157     } else {
158       switch (poolType) {
159       case Reusable:
160       case ThreadLocal:
161         this.poolType = poolType;
162         break;
163       default:
164         this.poolType = PoolType.Reusable;
165         break;
166       }
167     }
168     this.tables = new PoolMap<String, HTableInterface>(this.poolType,
169         this.maxSize);
170   }
171 
172   /**
173    * Get a reference to the specified table from the pool.
174    * <p>
175    * <p/>
176    *
177    * @param tableName
178    *          table name
179    * @return a reference to the specified table
180    * @throws RuntimeException
181    *           if there is a problem instantiating the HTable
182    */
183   public HTableInterface getTable(String tableName) {
184     // call the old getTable implementation renamed to findOrCreateTable
185     HTableInterface table = findOrCreateTable(tableName);
186     // return a proxy table so when user closes the proxy, the actual table
187     // will be returned to the pool
188     return new PooledHTable(table);
189   }
190 
191   /**
192    * Get a reference to the specified table from the pool.
193    * <p>
194    *
195    * Create a new one if one is not available.
196    *
197    * @param tableName
198    *          table name
199    * @return a reference to the specified table
200    * @throws RuntimeException
201    *           if there is a problem instantiating the HTable
202    */
203   private HTableInterface findOrCreateTable(String tableName) {
204     HTableInterface table = tables.get(tableName);
205     if (table == null) {
206       table = createHTable(tableName);
207     }
208     return table;
209   }
210 
211   /**
212    * Get a reference to the specified table from the pool.
213    * <p>
214    *
215    * Create a new one if one is not available.
216    *
217    * @param tableName
218    *          table name
219    * @return a reference to the specified table
220    * @throws RuntimeException
221    *           if there is a problem instantiating the HTable
222    */
223   public HTableInterface getTable(byte[] tableName) {
224     return getTable(Bytes.toString(tableName));
225   }
226 
227   /**
228    * This method is not needed anymore, clients should call
229    * HTableInterface.close() rather than returning the tables to the pool
230    *
231    * @param table
232    *          the proxy table user got from pool
233    * @deprecated
234    */
235   public void putTable(HTableInterface table) throws IOException {
236     // we need to be sure nobody puts a proxy implementation in the pool
237     // but if the client code is not updated
238     // and it will continue to call putTable() instead of calling close()
239     // then we need to return the wrapped table to the pool instead of the
240     // proxy
241     // table
242     if (table instanceof PooledHTable) {
243       returnTable(((PooledHTable) table).getWrappedTable());
244     } else {
245       // normally this should not happen if clients pass back the same
246       // table
247       // object they got from the pool
248       // but if it happens then it's better to reject it
249       throw new IllegalArgumentException("not a pooled table: " + table);
250     }
251   }
252 
253   /**
254    * Puts the specified HTable back into the pool.
255    * <p>
256    *
257    * If the pool already contains <i>maxSize</i> references to the table, then
258    * the table instance gets closed after flushing buffered edits.
259    *
260    * @param table
261    *          table
262    */
263   private void returnTable(HTableInterface table) throws IOException {
264     // this is the old putTable method renamed and made private
265     String tableName = Bytes.toString(table.getTableName());
266     if (tables.size(tableName) >= maxSize) {
267       // release table instance since we're not reusing it
268       this.tables.removeValue(tableName, table);
269       this.tableFactory.releaseHTableInterface(table);
270       return;
271     }
272     tables.put(tableName, table);
273   }
274 
275   protected HTableInterface createHTable(String tableName) {
276     return this.tableFactory.createHTableInterface(config,
277         Bytes.toBytes(tableName));
278   }
279 
280   /**
281    * Closes all the HTable instances , belonging to the given table, in the
282    * table pool.
283    * <p>
284    * Note: this is a 'shutdown' of the given table pool and different from
285    * {@link #putTable(HTableInterface)}, that is used to return the table
286    * instance to the pool for future re-use.
287    *
288    * @param tableName
289    */
290   public void closeTablePool(final String tableName) throws IOException {
291     Collection<HTableInterface> tables = this.tables.values(tableName);
292     if (tables != null) {
293       for (HTableInterface table : tables) {
294         this.tableFactory.releaseHTableInterface(table);
295       }
296     }
297     this.tables.remove(tableName);
298   }
299 
300   /**
301    * See {@link #closeTablePool(String)}.
302    *
303    * @param tableName
304    */
305   public void closeTablePool(final byte[] tableName) throws IOException {
306     closeTablePool(Bytes.toString(tableName));
307   }
308 
309   /**
310    * Closes all the HTable instances , belonging to all tables in the table
311    * pool.
312    * <p>
313    * Note: this is a 'shutdown' of all the table pools.
314    */
315   public void close() throws IOException {
316     for (String tableName : tables.keySet()) {
317       closeTablePool(tableName);
318     }
319     this.tables.clear();
320   }
321 
322   public int getCurrentPoolSize(String tableName) {
323     return tables.size(tableName);
324   }
325 
326   /**
327    * A proxy class that implements HTableInterface.close method to return the
328    * wrapped table back to the table pool
329    *
330    */
331   class PooledHTable implements HTableInterface {
332 
333     private boolean open = false;
334 
335     private HTableInterface table; // actual table implementation
336 
337     public PooledHTable(HTableInterface table) {
338       this.table = table;
339       this.open = true;
340     }
341 
342     @Override
343     public byte[] getTableName() {
344       checkState();
345       return table.getTableName();
346     }
347 
348     @Override
349     public TableName getName() {
350       return table.getName();
351     }
352 
353     @Override
354     public Configuration getConfiguration() {
355       checkState();
356       return table.getConfiguration();
357     }
358 
359     @Override
360     public HTableDescriptor getTableDescriptor() throws IOException {
361       checkState();
362       return table.getTableDescriptor();
363     }
364 
365     @Override
366     public boolean exists(Get get) throws IOException {
367       checkState();
368       return table.exists(get);
369     }
370 
371     @Override
372     public Boolean[] exists(List<Get> gets) throws IOException {
373       checkState();
374       return table.exists(gets);
375     }
376 
377     @Override
378     public void batch(List<? extends Row> actions, Object[] results) throws IOException,
379         InterruptedException {
380       checkState();
381       table.batch(actions, results);
382     }
383 
384     /**
385      * {@inheritDoc}
386      * @deprecated If any exception is thrown by one of the actions, there is no way to
387      * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
388      */
389     @Override
390     public Object[] batch(List<? extends Row> actions) throws IOException,
391         InterruptedException {
392       checkState();
393       return table.batch(actions);
394     }
395 
396     @Override
397     public Result get(Get get) throws IOException {
398       checkState();
399       return table.get(get);
400     }
401 
402     @Override
403     public Result[] get(List<Get> gets) throws IOException {
404       checkState();
405       return table.get(gets);
406     }
407 
408     @Override
409     @SuppressWarnings("deprecation")
410     @Deprecated
411     public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
412       checkState();
413       return table.getRowOrBefore(row, family);
414     }
415 
416     @Override
417     public ResultScanner getScanner(Scan scan) throws IOException {
418       checkState();
419       return table.getScanner(scan);
420     }
421 
422     @Override
423     public ResultScanner getScanner(byte[] family) throws IOException {
424       checkState();
425       return table.getScanner(family);
426     }
427 
428     @Override
429     public ResultScanner getScanner(byte[] family, byte[] qualifier)
430         throws IOException {
431       checkState();
432       return table.getScanner(family, qualifier);
433     }
434 
435     @Override
436     public void put(Put put) throws IOException {
437       checkState();
438       table.put(put);
439     }
440 
441     @Override
442     public void put(List<Put> puts) throws IOException {
443       checkState();
444       table.put(puts);
445     }
446 
447     @Override
448     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
449         byte[] value, Put put) throws IOException {
450       checkState();
451       return table.checkAndPut(row, family, qualifier, value, put);
452     }
453 
454     @Override
455     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
456         CompareOp compareOp, byte[] value, Put put) throws IOException {
457       checkState();
458       return table.checkAndPut(row, family, qualifier, compareOp, value, put);
459     }
460 
461     @Override
462     public void delete(Delete delete) throws IOException {
463       checkState();
464       table.delete(delete);
465     }
466 
467     @Override
468     public void delete(List<Delete> deletes) throws IOException {
469       checkState();
470       table.delete(deletes);
471     }
472 
473     @Override
474     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
475         byte[] value, Delete delete) throws IOException {
476       checkState();
477       return table.checkAndDelete(row, family, qualifier, value, delete);
478     }
479 
480     @Override
481     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
482         CompareOp compareOp, byte[] value, Delete delete) throws IOException {
483       checkState();
484       return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
485     }
486 
487     @Override
488     public Result increment(Increment increment) throws IOException {
489       checkState();
490       return table.increment(increment);
491     }
492 
493     @Override
494     public long incrementColumnValue(byte[] row, byte[] family,
495         byte[] qualifier, long amount) throws IOException {
496       checkState();
497       return table.incrementColumnValue(row, family, qualifier, amount);
498     }
499 
500     @Override
501     public long incrementColumnValue(byte[] row, byte[] family,
502         byte[] qualifier, long amount, Durability durability) throws IOException {
503       checkState();
504       return table.incrementColumnValue(row, family, qualifier, amount,
505           durability);
506     }
507 
508     @Override
509     public boolean isAutoFlush() {
510       checkState();
511       return table.isAutoFlush();
512     }
513 
514     @Override
515     public void flushCommits() throws IOException {
516       checkState();
517       table.flushCommits();
518     }
519 
520     /**
521      * Returns the actual table back to the pool
522      *
523      * @throws IOException
524      */
525     public void close() throws IOException {
526       checkState();
527       open = false;
528       returnTable(table);
529     }
530 
531     @Override
532     public CoprocessorRpcChannel coprocessorService(byte[] row) {
533       checkState();
534       return table.coprocessorService(row);
535     }
536 
537     @Override
538     public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
539         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
540         throws ServiceException, Throwable {
541       checkState();
542       return table.coprocessorService(service, startKey, endKey, callable);
543     }
544 
545     @Override
546     public <T extends Service, R> void coprocessorService(Class<T> service,
547         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
548         throws ServiceException, Throwable {
549       checkState();
550       table.coprocessorService(service, startKey, endKey, callable, callback);
551     }
552 
553     @Override
554     public String toString() {
555       return "PooledHTable{" + ", table=" + table + '}';
556     }
557 
558     /**
559      * Expose the wrapped HTable to tests in the same package
560      *
561      * @return wrapped htable
562      */
563     HTableInterface getWrappedTable() {
564       return table;
565     }
566 
567     @Override
568     public <R> void batchCallback(List<? extends Row> actions,
569         Object[] results, Callback<R> callback) throws IOException,
570         InterruptedException {
571       checkState();
572       table.batchCallback(actions, results, callback);
573     }
574 
575     /**
576      * {@inheritDoc}
577      * @deprecated If any exception is thrown by one of the actions, there is no way to
578      * retrieve the partially executed results. Use
579      * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
580      * instead.
581      */
582     @Override
583     public <R> Object[] batchCallback(List<? extends Row> actions,
584         Callback<R> callback) throws IOException, InterruptedException {
585       checkState();
586       return table.batchCallback(actions,  callback);
587     }
588 
589     @Override
590     public void mutateRow(RowMutations rm) throws IOException {
591       checkState();
592       table.mutateRow(rm);
593     }
594 
595     @Override
596     public Result append(Append append) throws IOException {
597       checkState();
598       return table.append(append);
599     }
600 
601     @Override
602     public void setAutoFlush(boolean autoFlush) {
603       checkState();
604       table.setAutoFlush(autoFlush, autoFlush);
605     }
606 
607     @Override
608     public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
609       checkState();
610       table.setAutoFlush(autoFlush, clearBufferOnFail);
611     }
612 
613     @Override
614     public void setAutoFlushTo(boolean autoFlush) {
615       table.setAutoFlushTo(autoFlush);
616     }
617 
618     @Override
619     public long getWriteBufferSize() {
620       checkState();
621       return table.getWriteBufferSize();
622     }
623 
624     @Override
625     public void setWriteBufferSize(long writeBufferSize) throws IOException {
626       checkState();
627       table.setWriteBufferSize(writeBufferSize);
628     }
629 
630     boolean isOpen() {
631       return open;
632     }
633 
634     private void checkState() {
635       if (!isOpen()) {
636         throw new IllegalStateException("Table=" + new String(table.getTableName()) + " already closed");
637       }
638     }
639 
640     @Override
641     public long incrementColumnValue(byte[] row, byte[] family,
642         byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
643       return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
644     }
645 
646     @Override
647     public <R extends Message> Map<byte[], R> batchCoprocessorService(
648         Descriptors.MethodDescriptor method, Message request,
649         byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
650       checkState();
651       return table.batchCoprocessorService(method, request, startKey, endKey,
652           responsePrototype);
653     }
654 
655     @Override
656     public <R extends Message> void batchCoprocessorService(
657         Descriptors.MethodDescriptor method, Message request,
658         byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
659         throws ServiceException, Throwable {
660       checkState();
661       table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
662     }
663   }
664 }