001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.util; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.ThreadLocalRandom; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.TimeUnit; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037import org.apache.hadoop.hbase.client.ClusterConnection; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.HTable; 041import org.apache.hadoop.hbase.client.Row; 042import org.apache.hadoop.hbase.client.coprocessor.Batch; 043 044/** 045 * Provides ability to create multiple Connection instances and allows to process a batch of 046 * actions using CHTable.doBatchWithCallback() 047 */ 048@InterfaceAudience.Private 049public class MultiHConnection { 050 private static final Logger LOG = LoggerFactory.getLogger(MultiHConnection.class); 051 private Connection[] connections; 052 private final Object connectionsLock = new Object(); 053 private final int noOfConnections; 054 private ExecutorService batchPool; 055 056 /** 057 * Create multiple Connection instances and initialize a thread pool executor 058 * @param conf configuration 059 * @param noOfConnections total no of Connections to create 060 * @throws IOException if IO failure occurs 061 */ 062 public MultiHConnection(Configuration conf, int noOfConnections) 063 throws IOException { 064 this.noOfConnections = noOfConnections; 065 synchronized (this.connectionsLock) { 066 connections = new Connection[noOfConnections]; 067 for (int i = 0; i < noOfConnections; i++) { 068 Connection conn = ConnectionFactory.createConnection(conf); 069 connections[i] = conn; 070 } 071 } 072 createBatchPool(conf); 073 } 074 075 /** 076 * Close the open connections and shutdown the batchpool 077 */ 078 public void close() { 079 synchronized (connectionsLock) { 080 if (connections != null) { 081 for (Connection conn : connections) { 082 if (conn != null) { 083 try { 084 conn.close(); 085 } catch (IOException e) { 086 LOG.info("Got exception in closing connection", e); 087 } finally { 088 conn = null; 089 } 090 } 091 } 092 connections = null; 093 } 094 } 095 if (this.batchPool != null && !this.batchPool.isShutdown()) { 096 this.batchPool.shutdown(); 097 try { 098 if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) { 099 this.batchPool.shutdownNow(); 100 } 101 } catch (InterruptedException e) { 102 this.batchPool.shutdownNow(); 103 } 104 } 105 106 } 107 108 /** 109 * Randomly pick a connection and process the batch of actions for a given table 110 * @param actions the actions 111 * @param tableName table name 112 * @param results the results array 113 * @param callback to run when results are in 114 * @throws IOException If IO failure occurs 115 */ 116 @SuppressWarnings("deprecation") 117 public <R> void processBatchCallback(List<? extends Row> actions, TableName tableName, 118 Object[] results, Batch.Callback<R> callback) throws IOException { 119 // Currently used by RegionStateStore 120 ClusterConnection conn = 121 (ClusterConnection) connections[ThreadLocalRandom.current().nextInt(noOfConnections)]; 122 123 HTable.doBatchWithCallback(actions, results, callback, conn, batchPool, tableName); 124 } 125 126 // Copied from ConnectionImplementation.getBatchPool() 127 // We should get rid of this when Connection.processBatchCallback is un-deprecated and provides 128 // an API to manage a batch pool 129 private void createBatchPool(Configuration conf) { 130 // Use the same config for keep alive as in ConnectionImplementation.getBatchPool(); 131 int maxThreads = conf.getInt("hbase.multihconnection.threads.max", 256); 132 if (maxThreads == 0) { 133 maxThreads = Runtime.getRuntime().availableProcessors() * 8; 134 } 135 long keepAliveTime = conf.getLong("hbase.multihconnection.threads.keepalivetime", 60); 136 LinkedBlockingQueue<Runnable> workQueue = 137 new LinkedBlockingQueue<>(maxThreads 138 * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 139 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); 140 ThreadPoolExecutor tpe = 141 new ThreadPoolExecutor(maxThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, 142 new ThreadFactoryBuilder().setNameFormat("MultiHConnection" + "-shared-pool-%d") 143 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 144 tpe.allowCoreThreadTimeOut(true); 145 this.batchPool = tpe; 146 } 147 148}