001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.ThreadLocalRandom;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.TimeUnit;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hadoop.hbase.client.ClusterConnection;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.ConnectionFactory;
039import org.apache.hadoop.hbase.client.HTable;
040import org.apache.hadoop.hbase.client.Row;
041import org.apache.hadoop.hbase.client.coprocessor.Batch;
042
043/**
044 * Provides ability to create multiple Connection instances and allows to process a batch of
045 * actions using CHTable.doBatchWithCallback()
046 */
047@InterfaceAudience.Private
048public class MultiHConnection {
049  private static final Logger LOG = LoggerFactory.getLogger(MultiHConnection.class);
050  private Connection[] connections;
051  private final Object connectionsLock =  new Object();
052  private final int noOfConnections;
053  private ExecutorService batchPool;
054
055  /**
056   * Create multiple Connection instances and initialize a thread pool executor
057   * @param conf configuration
058   * @param noOfConnections total no of Connections to create
059   * @throws IOException if IO failure occurs
060   */
061  public MultiHConnection(Configuration conf, int noOfConnections)
062      throws IOException {
063    this.noOfConnections = noOfConnections;
064    synchronized (this.connectionsLock) {
065      connections = new Connection[noOfConnections];
066      for (int i = 0; i < noOfConnections; i++) {
067        Connection conn = ConnectionFactory.createConnection(conf);
068        connections[i] = conn;
069      }
070    }
071    createBatchPool(conf);
072  }
073
074  /**
075   * Close the open connections and shutdown the batchpool
076   */
077  public void close() {
078    synchronized (connectionsLock) {
079      if (connections != null) {
080        for (Connection conn : connections) {
081          if (conn != null) {
082            try {
083              conn.close();
084            } catch (IOException e) {
085              LOG.info("Got exception in closing connection", e);
086            } finally {
087              conn = null;
088            }
089          }
090        }
091        connections = null;
092      }
093    }
094    if (this.batchPool != null && !this.batchPool.isShutdown()) {
095      this.batchPool.shutdown();
096      try {
097        if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) {
098          this.batchPool.shutdownNow();
099        }
100      } catch (InterruptedException e) {
101        this.batchPool.shutdownNow();
102      }
103    }
104
105  }
106
107  /**
108   * Randomly pick a connection and process the batch of actions for a given table
109   * @param actions the actions
110   * @param tableName table name
111   * @param results the results array
112   * @param callback to run when results are in
113   * @throws IOException If IO failure occurs
114   */
115  @SuppressWarnings("deprecation")
116  public <R> void processBatchCallback(List<? extends Row> actions, TableName tableName,
117      Object[] results, Batch.Callback<R> callback) throws IOException {
118    // Currently used by RegionStateStore
119    ClusterConnection conn =
120      (ClusterConnection) connections[ThreadLocalRandom.current().nextInt(noOfConnections)];
121
122    HTable.doBatchWithCallback(actions, results, callback, conn, batchPool, tableName);
123  }
124
125  // Copied from ConnectionImplementation.getBatchPool()
126  // We should get rid of this when Connection.processBatchCallback is un-deprecated and provides
127  // an API to manage a batch pool
128  private void createBatchPool(Configuration conf) {
129    // Use the same config for keep alive as in ConnectionImplementation.getBatchPool();
130    int maxThreads = conf.getInt("hbase.multihconnection.threads.max", 256);
131    if (maxThreads == 0) {
132      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
133    }
134    long keepAliveTime = conf.getLong("hbase.multihconnection.threads.keepalivetime", 60);
135    LinkedBlockingQueue<Runnable> workQueue =
136        new LinkedBlockingQueue<>(maxThreads
137            * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
138              HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
139    ThreadPoolExecutor tpe =
140        new ThreadPoolExecutor(maxThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
141            Threads.newDaemonThreadFactory("MultiHConnection" + "-shared-"));
142    tpe.allowCoreThreadTimeOut(true);
143    this.batchPool = tpe;
144  }
145  
146}