001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.concurrent.BlockingQueue; 022import java.util.concurrent.ExecutorService; 023import java.util.concurrent.LinkedBlockingQueue; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.DoNotRetryIOException; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.log.HBaseMarkers; 032import org.apache.hadoop.hbase.util.FutureUtils; 033import org.apache.hadoop.hbase.util.IOExceptionSupplier; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 039import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 040 041/** 042 * The connection implementation based on {@link AsyncConnection}. 043 */ 044@InterfaceAudience.Private 045class ConnectionOverAsyncConnection implements Connection { 046 047 private static final Logger LOG = LoggerFactory.getLogger(ConnectionOverAsyncConnection.class); 048 049 private volatile boolean aborted = false; 050 051 // only used for executing coprocessor calls, as users may reference the methods in the 052 // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread... 053 // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin 054 // interface. 055 private volatile ExecutorService batchPool = null; 056 057 private final AsyncConnectionImpl conn; 058 059 private final ConnectionConfiguration connConf; 060 061 ConnectionOverAsyncConnection(AsyncConnectionImpl conn) { 062 this.conn = conn; 063 this.connConf = new ConnectionConfiguration(conn.getConfiguration()); 064 } 065 066 @Override 067 public void abort(String why, Throwable error) { 068 if (error != null) { 069 LOG.error(HBaseMarkers.FATAL, why, error); 070 } else { 071 LOG.error(HBaseMarkers.FATAL, why); 072 } 073 aborted = true; 074 try { 075 Closeables.close(this, true); 076 } catch (IOException e) { 077 throw new AssertionError(e); 078 } 079 } 080 081 @Override 082 public boolean isAborted() { 083 return aborted; 084 } 085 086 @Override 087 public Configuration getConfiguration() { 088 return conn.getConfiguration(); 089 } 090 091 @Override 092 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 093 AsyncBufferedMutatorBuilder builder = conn.getBufferedMutatorBuilder(params.getTableName()); 094 if (params.getRpcTimeout() != BufferedMutatorParams.UNSET) { 095 builder.setRpcTimeout(params.getRpcTimeout(), TimeUnit.MILLISECONDS); 096 } 097 if (params.getOperationTimeout() != BufferedMutatorParams.UNSET) { 098 builder.setOperationTimeout(params.getOperationTimeout(), TimeUnit.MILLISECONDS); 099 } 100 if (params.getWriteBufferSize() != BufferedMutatorParams.UNSET) { 101 builder.setWriteBufferSize(params.getWriteBufferSize()); 102 } 103 if (params.getWriteBufferPeriodicFlushTimeoutMs() != BufferedMutatorParams.UNSET) { 104 builder.setWriteBufferPeriodicFlush(params.getWriteBufferPeriodicFlushTimeoutMs(), 105 TimeUnit.MILLISECONDS); 106 } 107 if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) { 108 builder.setMaxKeyValueSize(params.getMaxKeyValueSize()); 109 } 110 if (!params.getRequestAttributes().isEmpty()) { 111 112 builder.setRequestAttributes(params.getRequestAttributes()); 113 } 114 return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener()); 115 } 116 117 @Override 118 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 119 return new RegionLocatorOverAsyncTableRegionLocator(conn.getRegionLocator(tableName)); 120 } 121 122 @Override 123 public void clearRegionLocationCache() { 124 conn.clearRegionLocationCache(); 125 } 126 127 @Override 128 public Admin getAdmin() throws IOException { 129 return new AdminOverAsyncAdmin(this, (RawAsyncHBaseAdmin) conn.getAdmin()); 130 } 131 132 @Override 133 public void close() throws IOException { 134 conn.close(); 135 } 136 137 // will be called from AsyncConnection, to avoid infinite loop as in the above method we will call 138 // AsyncConnection.close. 139 synchronized void closePool() { 140 ExecutorService batchPool = this.batchPool; 141 if (batchPool != null) { 142 ConnectionUtils.shutdownPool(batchPool); 143 this.batchPool = null; 144 } 145 } 146 147 @Override 148 public boolean isClosed() { 149 return conn.isClosed(); 150 } 151 152 // only used for executing coprocessor calls, as users may reference the methods in the 153 // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread... 154 // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin 155 // interface. 156 private ThreadPoolExecutor createThreadPool() { 157 Configuration conf = conn.getConfiguration(); 158 int threads = conf.getInt("hbase.hconnection.threads.max", 256); 159 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); 160 BlockingQueue<Runnable> workQueue = 161 new LinkedBlockingQueue<>(threads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 162 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); 163 ThreadPoolExecutor tpe = new ThreadPoolExecutor(threads, threads, keepAliveTime, 164 TimeUnit.SECONDS, workQueue, 165 new ThreadFactoryBuilder().setDaemon(true).setNameFormat(toString() + "-shared-%d").build()); 166 tpe.allowCoreThreadTimeOut(true); 167 return tpe; 168 } 169 170 // only used for executing coprocessor calls, as users may reference the methods in the 171 // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread... 172 // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin 173 // interface. 174 private ExecutorService getBatchPool() throws IOException { 175 if (batchPool == null) { 176 synchronized (this) { 177 if (isClosed()) { 178 throw new DoNotRetryIOException("Connection is closed"); 179 } 180 if (batchPool == null) { 181 this.batchPool = createThreadPool(); 182 } 183 } 184 } 185 return this.batchPool; 186 } 187 188 @Override 189 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 190 return new TableBuilderBase(tableName, connConf) { 191 192 @Override 193 public Table build() { 194 IOExceptionSupplier<ExecutorService> poolSupplier = 195 pool != null ? () -> pool : ConnectionOverAsyncConnection.this::getBatchPool; 196 AsyncTableBuilder<AdvancedScanResultConsumer> tableBuilder = 197 conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS) 198 .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS) 199 .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS) 200 .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS); 201 requestAttributes.forEach(tableBuilder::setRequestAttribute); 202 return new TableOverAsyncTable(conn, tableBuilder.build(), poolSupplier); 203 } 204 }; 205 } 206 207 @Override 208 public AsyncConnection toAsyncConnection() { 209 return conn; 210 } 211 212 @Override 213 public String getClusterId() { 214 return conn.getClusterId(); 215 } 216 217 @Override 218 public Hbck getHbck() throws IOException { 219 return FutureUtils.get(conn.getHbck()); 220 } 221 222 @Override 223 public Hbck getHbck(ServerName masterServer) throws IOException { 224 return conn.getHbck(masterServer); 225 } 226 227 /** 228 * An identifier that will remain the same for a given connection. 229 */ 230 @Override 231 public String toString() { 232 return "connection-over-async-connection-0x" + Integer.toHexString(hashCode()); 233 } 234}