1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import com.google.protobuf.Service;
22  import com.google.protobuf.ServiceException;
23  import org.apache.hadoop.classification.InterfaceAudience;
24  import org.apache.hadoop.classification.InterfaceStability;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.HBaseConfiguration;
27  import org.apache.hadoop.hbase.HTableDescriptor;
28  import org.apache.hadoop.hbase.client.coprocessor.Batch;
29  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
30  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.hadoop.hbase.util.PoolMap;
33  import org.apache.hadoop.hbase.util.PoolMap.PoolType;
34  
35  import java.io.Closeable;
36  import java.io.IOException;
37  import java.util.Collection;
38  import java.util.List;
39  import java.util.Map;
40  
41  /**
42   * A simple pool of HTable instances.
43   *
44   * Each HTablePool acts as a pool for all tables. To use, instantiate an
45   * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
46   *
47     * This method is not needed anymore, clients should call
48     * HTableInterface.close() rather than returning the tables to the pool
49     *
50   * Once you are done with it, close your instance of {@link HTableInterface}
51   * by calling {@link HTableInterface#close()} rather than returning the tables
52   * to the pool with (deprecated) {@link #putTable(HTableInterface)}.
53   *
54   * <p>
55   * A pool can be created with a <i>maxSize</i> which defines the most HTable
56   * references that will ever be retained for each table. Otherwise the default
57   * is {@link Integer#MAX_VALUE}.
58   *
59   * <p>
60   * Pool will manage its own connections to the cluster. See
61   * {@link HConnectionManager}.
62   */
63  @InterfaceAudience.Public
64  @InterfaceStability.Stable
65  public class HTablePool implements Closeable {
66    private final PoolMap<String, HTableInterface> tables;
67    private final int maxSize;
68    private final PoolType poolType;
69    private final Configuration config;
70    private final HTableInterfaceFactory tableFactory;
71  
72    /**
73     * Default Constructor. Default HBaseConfiguration and no limit on pool size.
74     */
75    public HTablePool() {
76      this(HBaseConfiguration.create(), Integer.MAX_VALUE);
77    }
78  
79    /**
80     * Constructor to set maximum versions and use the specified configuration.
81     *
82     * @param config
83     *          configuration
84     * @param maxSize
85     *          maximum number of references to keep for each table
86     */
87    public HTablePool(final Configuration config, final int maxSize) {
88      this(config, maxSize, null, null);
89    }
90  
91    /**
92     * Constructor to set maximum versions and use the specified configuration and
93     * table factory.
94     *
95     * @param config
96     *          configuration
97     * @param maxSize
98     *          maximum number of references to keep for each table
99     * @param tableFactory
100    *          table factory
101    */
102   public HTablePool(final Configuration config, final int maxSize,
103       final HTableInterfaceFactory tableFactory) {
104     this(config, maxSize, tableFactory, PoolType.Reusable);
105   }
106 
107   /**
108    * Constructor to set maximum versions and use the specified configuration and
109    * pool type.
110    *
111    * @param config
112    *          configuration
113    * @param maxSize
114    *          maximum number of references to keep for each table
115    * @param poolType
116    *          pool type which is one of {@link PoolType#Reusable} or
117    *          {@link PoolType#ThreadLocal}
118    */
119   public HTablePool(final Configuration config, final int maxSize,
120       final PoolType poolType) {
121     this(config, maxSize, null, poolType);
122   }
123 
124   /**
125    * Constructor to set maximum versions and use the specified configuration,
126    * table factory and pool type. The HTablePool supports the
127    * {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool
128    * type is null or not one of those two values, then it will default to
129    * {@link PoolType#Reusable}.
130    *
131    * @param config
132    *          configuration
133    * @param maxSize
134    *          maximum number of references to keep for each table
135    * @param tableFactory
136    *          table factory
137    * @param poolType
138    *          pool type which is one of {@link PoolType#Reusable} or
139    *          {@link PoolType#ThreadLocal}
140    */
141   public HTablePool(final Configuration config, final int maxSize,
142       final HTableInterfaceFactory tableFactory, PoolType poolType) {
143     // Make a new configuration instance so I can safely cleanup when
144     // done with the pool.
145     this.config = config == null ? HBaseConfiguration.create() : config;
146     this.maxSize = maxSize;
147     this.tableFactory = tableFactory == null ? new HTableFactory()
148         : tableFactory;
149     if (poolType == null) {
150       this.poolType = PoolType.Reusable;
151     } else {
152       switch (poolType) {
153       case Reusable:
154       case ThreadLocal:
155         this.poolType = poolType;
156         break;
157       default:
158         this.poolType = PoolType.Reusable;
159         break;
160       }
161     }
162     this.tables = new PoolMap<String, HTableInterface>(this.poolType,
163         this.maxSize);
164   }
165 
166   /**
167    * Get a reference to the specified table from the pool.
168    * <p>
169    * <p/>
170    *
171    * @param tableName
172    *          table name
173    * @return a reference to the specified table
174    * @throws RuntimeException
175    *           if there is a problem instantiating the HTable
176    */
177   public HTableInterface getTable(String tableName) {
178     // call the old getTable implementation renamed to findOrCreateTable
179     HTableInterface table = findOrCreateTable(tableName);
180     // return a proxy table so when user closes the proxy, the actual table
181     // will be returned to the pool
182     return new PooledHTable(table);
183   }
184 
185   /**
186    * Get a reference to the specified table from the pool.
187    * <p>
188    *
189    * Create a new one if one is not available.
190    *
191    * @param tableName
192    *          table name
193    * @return a reference to the specified table
194    * @throws RuntimeException
195    *           if there is a problem instantiating the HTable
196    */
197   private HTableInterface findOrCreateTable(String tableName) {
198     HTableInterface table = tables.get(tableName);
199     if (table == null) {
200       table = createHTable(tableName);
201     }
202     return table;
203   }
204 
205   /**
206    * Get a reference to the specified table from the pool.
207    * <p>
208    *
209    * Create a new one if one is not available.
210    *
211    * @param tableName
212    *          table name
213    * @return a reference to the specified table
214    * @throws RuntimeException
215    *           if there is a problem instantiating the HTable
216    */
217   public HTableInterface getTable(byte[] tableName) {
218     return getTable(Bytes.toString(tableName));
219   }
220 
221   /**
222    * This method is not needed anymore, clients should call
223    * HTableInterface.close() rather than returning the tables to the pool
224    *
225    * @param table
226    *          the proxy table user got from pool
227    * @deprecated
228    */
229   public void putTable(HTableInterface table) throws IOException {
230     // we need to be sure nobody puts a proxy implementation in the pool
231     // but if the client code is not updated
232     // and it will continue to call putTable() instead of calling close()
233     // then we need to return the wrapped table to the pool instead of the
234     // proxy
235     // table
236     if (table instanceof PooledHTable) {
237       returnTable(((PooledHTable) table).getWrappedTable());
238     } else {
239       // normally this should not happen if clients pass back the same
240       // table
241       // object they got from the pool
242       // but if it happens then it's better to reject it
243       throw new IllegalArgumentException("not a pooled table: " + table);
244     }
245   }
246 
247   /**
248    * Puts the specified HTable back into the pool.
249    * <p>
250    *
251    * If the pool already contains <i>maxSize</i> references to the table, then
252    * the table instance gets closed after flushing buffered edits.
253    *
254    * @param table
255    *          table
256    */
257   private void returnTable(HTableInterface table) throws IOException {
258     // this is the old putTable method renamed and made private
259     String tableName = Bytes.toString(table.getTableName());
260     if (tables.size(tableName) >= maxSize) {
261       // release table instance since we're not reusing it
262       this.tables.remove(tableName, table);
263       this.tableFactory.releaseHTableInterface(table);
264       return;
265     }
266     tables.put(tableName, table);
267   }
268 
269   protected HTableInterface createHTable(String tableName) {
270     return this.tableFactory.createHTableInterface(config,
271         Bytes.toBytes(tableName));
272   }
273 
274   /**
275    * Closes all the HTable instances , belonging to the given table, in the
276    * table pool.
277    * <p>
278    * Note: this is a 'shutdown' of the given table pool and different from
279    * {@link #putTable(HTableInterface)}, that is used to return the table
280    * instance to the pool for future re-use.
281    *
282    * @param tableName
283    */
284   public void closeTablePool(final String tableName) throws IOException {
285     Collection<HTableInterface> tables = this.tables.values(tableName);
286     if (tables != null) {
287       for (HTableInterface table : tables) {
288         this.tableFactory.releaseHTableInterface(table);
289       }
290     }
291     this.tables.remove(tableName);
292   }
293 
294   /**
295    * See {@link #closeTablePool(String)}.
296    *
297    * @param tableName
298    */
299   public void closeTablePool(final byte[] tableName) throws IOException {
300     closeTablePool(Bytes.toString(tableName));
301   }
302 
303   /**
304    * Closes all the HTable instances , belonging to all tables in the table
305    * pool.
306    * <p>
307    * Note: this is a 'shutdown' of all the table pools.
308    */
309   public void close() throws IOException {
310     for (String tableName : tables.keySet()) {
311       closeTablePool(tableName);
312     }
313     this.tables.clear();
314   }
315 
316   public int getCurrentPoolSize(String tableName) {
317     return tables.size(tableName);
318   }
319 
320   /**
321    * A proxy class that implements HTableInterface.close method to return the
322    * wrapped table back to the table pool
323    *
324    */
325   class PooledHTable implements HTableInterface {
326 
327     private boolean open = false;
328 
329     private HTableInterface table; // actual table implementation
330 
331     public PooledHTable(HTableInterface table) {
332       this.table = table;
333       this.open = true;
334     }
335 
336     @Override
337     public byte[] getTableName() {
338       checkState();
339       return table.getTableName();
340     }
341 
342     @Override
343     public Configuration getConfiguration() {
344       checkState();
345       return table.getConfiguration();
346     }
347 
348     @Override
349     public HTableDescriptor getTableDescriptor() throws IOException {
350       checkState();
351       return table.getTableDescriptor();
352     }
353 
354     @Override
355     public boolean exists(Get get) throws IOException {
356       checkState();
357       return table.exists(get);
358     }
359 
360     @Override
361     public Boolean[] exists(List<Get> gets) throws IOException {
362       checkState();
363       return table.exists(gets);
364     }
365 
366     @Override
367     public void batch(List<? extends Row> actions, Object[] results) throws IOException,
368         InterruptedException {
369       checkState();
370       table.batch(actions, results);
371     }
372 
373     @Override
374     public Object[] batch(List<? extends Row> actions) throws IOException,
375         InterruptedException {
376       checkState();
377       return table.batch(actions);
378     }
379 
380     @Override
381     public Result get(Get get) throws IOException {
382       checkState();
383       return table.get(get);
384     }
385 
386     @Override
387     public Result[] get(List<Get> gets) throws IOException {
388       checkState();
389       return table.get(gets);
390     }
391 
392     @Override
393     @SuppressWarnings("deprecation")
394     public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
395       checkState();
396       return table.getRowOrBefore(row, family);
397     }
398 
399     @Override
400     public ResultScanner getScanner(Scan scan) throws IOException {
401       checkState();
402       return table.getScanner(scan);
403     }
404 
405     @Override
406     public ResultScanner getScanner(byte[] family) throws IOException {
407       checkState();
408       return table.getScanner(family);
409     }
410 
411     @Override
412     public ResultScanner getScanner(byte[] family, byte[] qualifier)
413         throws IOException {
414       checkState();
415       return table.getScanner(family, qualifier);
416     }
417 
418     @Override
419     public void put(Put put) throws IOException {
420       checkState();
421       table.put(put);
422     }
423 
424     @Override
425     public void put(List<Put> puts) throws IOException {
426       checkState();
427       table.put(puts);
428     }
429 
430     @Override
431     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
432         byte[] value, Put put) throws IOException {
433       checkState();
434       return table.checkAndPut(row, family, qualifier, value, put);
435     }
436 
437     @Override
438     public void delete(Delete delete) throws IOException {
439       checkState();
440       table.delete(delete);
441     }
442 
443     @Override
444     public void delete(List<Delete> deletes) throws IOException {
445       checkState();
446       table.delete(deletes);
447     }
448 
449     @Override
450     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
451         byte[] value, Delete delete) throws IOException {
452       checkState();
453       return table.checkAndDelete(row, family, qualifier, value, delete);
454     }
455 
456     @Override
457     public Result increment(Increment increment) throws IOException {
458       checkState();
459       return table.increment(increment);
460     }
461 
462     @Override
463     public long incrementColumnValue(byte[] row, byte[] family,
464         byte[] qualifier, long amount) throws IOException {
465       checkState();
466       return table.incrementColumnValue(row, family, qualifier, amount);
467     }
468 
469     @Override
470     public long incrementColumnValue(byte[] row, byte[] family,
471         byte[] qualifier, long amount, Durability durability) throws IOException {
472       checkState();
473       return table.incrementColumnValue(row, family, qualifier, amount,
474           durability);
475     }
476 
477     @Override
478     public boolean isAutoFlush() {
479       checkState();
480       return table.isAutoFlush();
481     }
482 
483     @Override
484     public void flushCommits() throws IOException {
485       checkState();
486       table.flushCommits();
487     }
488 
489     /**
490      * Returns the actual table back to the pool
491      *
492      * @throws IOException
493      */
494     public void close() throws IOException {
495       checkState();
496       open = false;
497       returnTable(table);
498     }
499 
500     @Override
501     public CoprocessorRpcChannel coprocessorService(byte[] row) {
502       checkState();
503       return table.coprocessorService(row);
504     }
505 
506     @Override
507     public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
508         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
509         throws ServiceException, Throwable {
510       checkState();
511       return table.coprocessorService(service, startKey, endKey, callable);
512     }
513 
514     @Override
515     public <T extends Service, R> void coprocessorService(Class<T> service,
516         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
517         throws ServiceException, Throwable {
518       checkState();
519       table.coprocessorService(service, startKey, endKey, callable, callback);
520     }
521 
522     @Override
523     public String toString() {
524       return "PooledHTable{" + ", table=" + table + '}';
525     }
526 
527     /**
528      * Expose the wrapped HTable to tests in the same package
529      *
530      * @return wrapped htable
531      */
532     HTableInterface getWrappedTable() {
533       return table;
534     }
535 
536     @Override
537     public <R> void batchCallback(List<? extends Row> actions,
538         Object[] results, Callback<R> callback) throws IOException,
539         InterruptedException {
540       checkState();
541       table.batchCallback(actions, results, callback);
542     }
543 
544     @Override
545     public <R> Object[] batchCallback(List<? extends Row> actions,
546         Callback<R> callback) throws IOException, InterruptedException {
547       checkState();
548       return table.batchCallback(actions,  callback);
549     }
550 
551     @Override
552     public void mutateRow(RowMutations rm) throws IOException {
553       checkState();
554       table.mutateRow(rm);
555     }
556 
557     @Override
558     public Result append(Append append) throws IOException {
559       checkState();
560       return table.append(append);
561     }
562 
563     @Override
564     public void setAutoFlush(boolean autoFlush) {
565       checkState();
566       table.setAutoFlush(autoFlush);
567     }
568 
569     @Override
570     public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
571       checkState();
572       table.setAutoFlush(autoFlush, clearBufferOnFail);
573     }
574 
575     @Override
576     public long getWriteBufferSize() {
577       checkState();
578       return table.getWriteBufferSize();
579     }
580 
581     @Override
582     public void setWriteBufferSize(long writeBufferSize) throws IOException {
583       checkState();
584       table.setWriteBufferSize(writeBufferSize);
585     }
586 
587     boolean isOpen() {
588       return open;
589     }
590 
591     private void checkState() {
592       if (!isOpen()) {
593         throw new IllegalStateException("Table=" + new String(table.getTableName()) + " already closed");
594       }
595     }
596   }
597 }