001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.ThreadPoolExecutor; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.DoNotRetryIOException; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.log.HBaseMarkers; 033import org.apache.hadoop.hbase.util.FutureUtils; 034import org.apache.hadoop.hbase.util.IOExceptionSupplier; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 041 042/** 043 * The connection implementation based on {@link AsyncConnection}. 044 */ 045@InterfaceAudience.Private 046class ConnectionOverAsyncConnection implements Connection { 047 048 private static final Logger LOG = LoggerFactory.getLogger(ConnectionOverAsyncConnection.class); 049 050 private volatile boolean aborted = false; 051 052 // only used for executing coprocessor calls, as users may reference the methods in the 053 // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread... 054 // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin 055 // interface. 056 private volatile ExecutorService batchPool = null; 057 058 private final AsyncConnectionImpl conn; 059 060 private final ConnectionConfiguration connConf; 061 062 ConnectionOverAsyncConnection(AsyncConnectionImpl conn) { 063 this.conn = conn; 064 this.connConf = new ConnectionConfiguration(conn.getConfiguration()); 065 } 066 067 @RestrictedApi(explanation = "Should only be called in tests", link = "", 068 allowedOnPath = ".*/src/test/.*") 069 AsyncConnectionImpl getAsyncConnection() { 070 return conn; 071 } 072 073 @Override 074 public void abort(String why, Throwable error) { 075 if (error != null) { 076 LOG.error(HBaseMarkers.FATAL, why, error); 077 } else { 078 LOG.error(HBaseMarkers.FATAL, why); 079 } 080 aborted = true; 081 try { 082 Closeables.close(this, true); 083 } catch (IOException e) { 084 throw new AssertionError(e); 085 } 086 } 087 088 @Override 089 public boolean isAborted() { 090 return aborted; 091 } 092 093 @Override 094 public Configuration getConfiguration() { 095 return conn.getConfiguration(); 096 } 097 098 @Override 099 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 100 AsyncBufferedMutatorBuilder builder = conn.getBufferedMutatorBuilder(params.getTableName()); 101 if (params.getRpcTimeout() != BufferedMutatorParams.UNSET) { 102 builder.setRpcTimeout(params.getRpcTimeout(), TimeUnit.MILLISECONDS); 103 } 104 if (params.getOperationTimeout() != BufferedMutatorParams.UNSET) { 105 builder.setOperationTimeout(params.getOperationTimeout(), TimeUnit.MILLISECONDS); 106 } 107 if (params.getWriteBufferSize() != BufferedMutatorParams.UNSET) { 108 builder.setWriteBufferSize(params.getWriteBufferSize()); 109 } 110 if (params.getWriteBufferPeriodicFlushTimeoutMs() != BufferedMutatorParams.UNSET) { 111 builder.setWriteBufferPeriodicFlush(params.getWriteBufferPeriodicFlushTimeoutMs(), 112 TimeUnit.MILLISECONDS); 113 } 114 if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) { 115 builder.setMaxKeyValueSize(params.getMaxKeyValueSize()); 116 } 117 if (params.getMaxMutations() != BufferedMutatorParams.UNSET) { 118 builder.setMaxMutations(params.getMaxMutations()); 119 } 120 if (!params.getRequestAttributes().isEmpty()) { 121 122 builder.setRequestAttributes(params.getRequestAttributes()); 123 } 124 return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener()); 125 } 126 127 @Override 128 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 129 return new RegionLocatorOverAsyncTableRegionLocator(conn.getRegionLocator(tableName)); 130 } 131 132 @Override 133 public void clearRegionLocationCache() { 134 conn.clearRegionLocationCache(); 135 } 136 137 @Override 138 public Admin getAdmin() throws IOException { 139 return new AdminOverAsyncAdmin(this, (RawAsyncHBaseAdmin) conn.getAdmin()); 140 } 141 142 @Override 143 public void close() throws IOException { 144 conn.close(); 145 } 146 147 // will be called from AsyncConnection, to avoid infinite loop as in the above method we will call 148 // AsyncConnection.close. 149 synchronized void closePool() { 150 ExecutorService batchPool = this.batchPool; 151 if (batchPool != null) { 152 ConnectionUtils.shutdownPool(batchPool); 153 this.batchPool = null; 154 } 155 } 156 157 @Override 158 public boolean isClosed() { 159 return conn.isClosed(); 160 } 161 162 // only used for executing coprocessor calls, as users may reference the methods in the 163 // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread... 164 // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin 165 // interface. 166 private ThreadPoolExecutor createThreadPool() { 167 Configuration conf = conn.getConfiguration(); 168 int threads = conf.getInt("hbase.hconnection.threads.max", 256); 169 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); 170 BlockingQueue<Runnable> workQueue = 171 new LinkedBlockingQueue<>(threads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 172 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); 173 ThreadPoolExecutor tpe = new ThreadPoolExecutor(threads, threads, keepAliveTime, 174 TimeUnit.SECONDS, workQueue, 175 new ThreadFactoryBuilder().setDaemon(true).setNameFormat(toString() + "-shared-%d").build()); 176 tpe.allowCoreThreadTimeOut(true); 177 return tpe; 178 } 179 180 // only used for executing coprocessor calls, as users may reference the methods in the 181 // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread... 182 // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin 183 // interface. 184 private ExecutorService getBatchPool() throws IOException { 185 if (batchPool == null) { 186 synchronized (this) { 187 if (isClosed()) { 188 throw new DoNotRetryIOException("Connection is closed"); 189 } 190 if (batchPool == null) { 191 this.batchPool = createThreadPool(); 192 } 193 } 194 } 195 return this.batchPool; 196 } 197 198 @Override 199 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 200 return new TableBuilderBase(tableName, connConf) { 201 202 @Override 203 public Table build() { 204 IOExceptionSupplier<ExecutorService> poolSupplier = 205 pool != null ? () -> pool : ConnectionOverAsyncConnection.this::getBatchPool; 206 AsyncTableBuilder<AdvancedScanResultConsumer> tableBuilder = 207 conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS) 208 .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS) 209 .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS) 210 .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS); 211 requestAttributes.forEach(tableBuilder::setRequestAttribute); 212 return new TableOverAsyncTable(conn, tableBuilder.build(), poolSupplier); 213 } 214 }; 215 } 216 217 @Override 218 public AsyncConnection toAsyncConnection() { 219 return conn; 220 } 221 222 @Override 223 public String getClusterId() { 224 return conn.getClusterId(); 225 } 226 227 @Override 228 public Hbck getHbck() throws IOException { 229 return FutureUtils.get(conn.getHbck()); 230 } 231 232 @Override 233 public Hbck getHbck(ServerName masterServer) throws IOException { 234 return conn.getHbck(masterServer); 235 } 236 237 /** 238 * An identifier that will remain the same for a given connection. 239 */ 240 @Override 241 public String toString() { 242 return "connection-over-async-connection-0x" + Integer.toHexString(hashCode()); 243 } 244}