001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE; 022import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; 023import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; 024import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; 025import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; 026import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING; 027import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; 028import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; 029import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT; 030import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT; 031import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; 032import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; 033import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; 034import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING; 035import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY; 036import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; 037import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING; 038import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; 039import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; 040import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; 041import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; 042import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; 043import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT; 044import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY; 045import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND; 046import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT; 047import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND; 048import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT; 049import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS; 050import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT; 051import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT; 052import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY; 053 054import java.util.concurrent.TimeUnit; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Timeout configs. 063 */ 064@InterfaceAudience.Private 065class AsyncConnectionConfiguration { 066 067 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class); 068 069 /** 070 * Parameter name for client pause when server is overloaded, denoted by 071 * {@link org.apache.hadoop.hbase.HBaseServerException#isServerOverloaded()} 072 */ 073 public static final String HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED = 074 "hbase.client.pause.server.overloaded"; 075 076 static { 077 // This is added where the configs are referenced. It may be too late to happen before 078 // any user _sets_ the old cqtbe config onto a Configuration option. So we still need 079 // to handle checking both properties in parsing below. The benefit of calling this is 080 // that it should still cause Configuration to log a warning if we do end up falling 081 // through to the old deprecated config. 082 Configuration.addDeprecation(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, 083 HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED); 084 } 085 086 /** 087 * Configure the number of failures after which the client will start logging. A few failures is 088 * fine: region moved, then is not opened, then is overloaded. We try to have an acceptable 089 * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at this 090 * stage. 091 */ 092 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = 093 "hbase.client.start.log.errors.counter"; 094 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5; 095 096 private final long metaOperationTimeoutNs; 097 098 // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected 099 // by this value, see scanTimeoutNs. 100 private final long operationTimeoutNs; 101 102 // timeout for each rpc request. Can be overridden by a more specific config, such as 103 // readRpcTimeout or writeRpcTimeout. 104 private final long rpcTimeoutNs; 105 106 // timeout for each read rpc request 107 private final long readRpcTimeoutNs; 108 109 // timeout for each read rpc request against system tables 110 private final long metaReadRpcTimeoutNs; 111 112 // timeout for each write rpc request 113 private final long writeRpcTimeoutNs; 114 115 private final long pauseNs; 116 117 private final long pauseNsForServerOverloaded; 118 119 private final int maxRetries; 120 121 /** How many retries are allowed before we start to log */ 122 private final int startLogErrorsCnt; 123 124 // As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is 125 // crash. The RS will always return something before the rpc timeout or scan timeout to tell the 126 // client that it is still alive. The scan timeout is used as operation timeout for every 127 // operations in a scan, such as openScanner or next. 128 private final long scanTimeoutNs; 129 private final long metaScanTimeoutNs; 130 131 private final int scannerCaching; 132 133 private final int metaScannerCaching; 134 135 private final long scannerMaxResultSize; 136 137 private final long writeBufferSize; 138 139 private final long writeBufferPeriodicFlushTimeoutNs; 140 141 // this is for supporting region replica get, if the primary does not finished within this 142 // timeout, we will send request to secondaries. 143 private final long primaryCallTimeoutNs; 144 145 private final long primaryScanTimeoutNs; 146 147 private final long primaryMetaScanTimeoutNs; 148 149 private final int maxKeyValueSize; 150 151 AsyncConnectionConfiguration(Configuration conf) { 152 this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( 153 conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); 154 this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( 155 conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); 156 long rpcTimeoutMs = conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT); 157 this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(rpcTimeoutMs); 158 long readRpcTimeoutMillis = conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutMs); 159 this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(readRpcTimeoutMillis); 160 this.metaReadRpcTimeoutNs = TimeUnit.MILLISECONDS 161 .toNanos(conf.getLong(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeoutMillis)); 162 this.writeRpcTimeoutNs = 163 TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs)); 164 long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); 165 long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, 166 conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs)); 167 if (pauseMsForServerOverloaded < pauseMs) { 168 LOG.warn( 169 "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead", 170 HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseMsForServerOverloaded, HBASE_CLIENT_PAUSE, 171 pauseMs); 172 pauseMsForServerOverloaded = pauseMs; 173 } 174 this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs); 175 this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(pauseMsForServerOverloaded); 176 this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 177 this.startLogErrorsCnt = 178 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); 179 long scannerTimeoutMillis = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 180 DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 181 this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(scannerTimeoutMillis); 182 this.metaScanTimeoutNs = TimeUnit.MILLISECONDS 183 .toNanos(conf.getLong(HBASE_CLIENT_META_SCANNER_TIMEOUT, scannerTimeoutMillis)); 184 this.scannerCaching = 185 conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING); 186 this.metaScannerCaching = 187 conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING); 188 this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 189 DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); 190 this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); 191 this.writeBufferPeriodicFlushTimeoutNs = 192 TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, 193 WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT)); 194 this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos( 195 conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT)); 196 this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos( 197 conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT)); 198 this.primaryMetaScanTimeoutNs = 199 TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, 200 HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT)); 201 this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); 202 } 203 204 long getMetaOperationTimeoutNs() { 205 return metaOperationTimeoutNs; 206 } 207 208 long getOperationTimeoutNs() { 209 return operationTimeoutNs; 210 } 211 212 long getRpcTimeoutNs() { 213 return rpcTimeoutNs; 214 } 215 216 long getReadRpcTimeoutNs() { 217 return readRpcTimeoutNs; 218 } 219 220 long getMetaReadRpcTimeoutNs() { 221 return metaReadRpcTimeoutNs; 222 } 223 224 long getWriteRpcTimeoutNs() { 225 return writeRpcTimeoutNs; 226 } 227 228 long getPauseNs() { 229 return pauseNs; 230 } 231 232 long getPauseNsForServerOverloaded() { 233 return pauseNsForServerOverloaded; 234 } 235 236 int getMaxRetries() { 237 return maxRetries; 238 } 239 240 int getStartLogErrorsCnt() { 241 return startLogErrorsCnt; 242 } 243 244 long getScanTimeoutNs() { 245 return scanTimeoutNs; 246 } 247 248 long getMetaScanTimeoutNs() { 249 return metaScanTimeoutNs; 250 } 251 252 int getScannerCaching() { 253 return scannerCaching; 254 } 255 256 int getMetaScannerCaching() { 257 return metaScannerCaching; 258 } 259 260 long getScannerMaxResultSize() { 261 return scannerMaxResultSize; 262 } 263 264 long getWriteBufferSize() { 265 return writeBufferSize; 266 } 267 268 long getWriteBufferPeriodicFlushTimeoutNs() { 269 return writeBufferPeriodicFlushTimeoutNs; 270 } 271 272 long getPrimaryCallTimeoutNs() { 273 return primaryCallTimeoutNs; 274 } 275 276 long getPrimaryScanTimeoutNs() { 277 return primaryScanTimeoutNs; 278 } 279 280 long getPrimaryMetaScanTimeoutNs() { 281 return primaryMetaScanTimeoutNs; 282 } 283 284 int getMaxKeyValueSize() { 285 return maxKeyValueSize; 286 } 287}