001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
023import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
024import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE;
025import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
026import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING;
027import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
028import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
029import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT;
030import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
031import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
032import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
033import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE;
034import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
035import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
036import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
037import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
038import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING;
039import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
040import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
041import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
042import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT;
043import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY;
044import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
045import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
046import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND;
047import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT;
048import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
049import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
050import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
051import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
052
053import java.util.concurrent.TimeUnit;
054import org.apache.hadoop.conf.Configuration;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059/**
060 * Timeout configs.
061 */
062@InterfaceAudience.Private
063class AsyncConnectionConfiguration {
064
065  private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class);
066
067  /**
068   * Configure the number of failures after which the client will start logging. A few failures
069   * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
070   * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at
071   * this stage.
072   */
073  public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
074      "hbase.client.start.log.errors.counter";
075  public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5;
076
077  private final long metaOperationTimeoutNs;
078
079  // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected
080  // by this value, see scanTimeoutNs.
081  private final long operationTimeoutNs;
082
083  // timeout for each rpc request. Can be overridden by a more specific config, such as
084  // readRpcTimeout or writeRpcTimeout.
085  private final long rpcTimeoutNs;
086
087  // timeout for each read rpc request
088  private final long readRpcTimeoutNs;
089
090  // timeout for each write rpc request
091  private final long writeRpcTimeoutNs;
092
093  private final long pauseNs;
094
095  private final long pauseForCQTBENs;
096
097  private final int maxRetries;
098
099  /** How many retries are allowed before we start to log */
100  private final int startLogErrorsCnt;
101
102  // As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is
103  // crash. The RS will always return something before the rpc timeout or scan timeout to tell the
104  // client that it is still alive. The scan timeout is used as operation timeout for every
105  // operations in a scan, such as openScanner or next.
106  private final long scanTimeoutNs;
107
108  private final int scannerCaching;
109
110  private final int metaScannerCaching;
111
112  private final long scannerMaxResultSize;
113
114  private final long writeBufferSize;
115
116  private final long writeBufferPeriodicFlushTimeoutNs;
117
118  // this is for supporting region replica get, if the primary does not finished within this
119  // timeout, we will send request to secondaries.
120  private final long primaryCallTimeoutNs;
121
122  private final long primaryScanTimeoutNs;
123
124  private final long primaryMetaScanTimeoutNs;
125
126  private final int maxKeyValueSize;
127
128  AsyncConnectionConfiguration(Configuration conf) {
129    this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
130      conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
131    this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
132      conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
133    long rpcTimeoutMs = conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
134    this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(rpcTimeoutMs);
135    this.readRpcTimeoutNs =
136      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutMs));
137    this.writeRpcTimeoutNs =
138      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs));
139    long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
140    long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs);
141    if (pauseForCQTBEMs < pauseMs) {
142      LOG.warn(
143        "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead",
144        HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE, pauseMs);
145      pauseForCQTBEMs = pauseMs;
146    }
147    this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs);
148    this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs);
149    this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
150    this.startLogErrorsCnt =
151      conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
152    this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
153        conf.getInt(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
154            DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
155    this.scannerCaching =
156      conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
157    this.metaScannerCaching =
158      conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
159    this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
160      DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
161    this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
162    this.writeBufferPeriodicFlushTimeoutNs =
163      TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
164        WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
165    this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
166      conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
167    this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
168      conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT));
169    this.primaryMetaScanTimeoutNs =
170      TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
171        HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
172    this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
173  }
174
175  long getMetaOperationTimeoutNs() {
176    return metaOperationTimeoutNs;
177  }
178
179  long getOperationTimeoutNs() {
180    return operationTimeoutNs;
181  }
182
183  long getRpcTimeoutNs() {
184    return rpcTimeoutNs;
185  }
186
187  long getReadRpcTimeoutNs() {
188    return readRpcTimeoutNs;
189  }
190
191  long getWriteRpcTimeoutNs() {
192    return writeRpcTimeoutNs;
193  }
194
195  long getPauseNs() {
196    return pauseNs;
197  }
198
199  long getPauseForCQTBENs() {
200    return pauseForCQTBENs;
201  }
202
203  int getMaxRetries() {
204    return maxRetries;
205  }
206
207  int getStartLogErrorsCnt() {
208    return startLogErrorsCnt;
209  }
210
211  long getScanTimeoutNs() {
212    return scanTimeoutNs;
213  }
214
215  int getScannerCaching() {
216    return scannerCaching;
217  }
218
219  int getMetaScannerCaching() {
220    return metaScannerCaching;
221  }
222
223  long getScannerMaxResultSize() {
224    return scannerMaxResultSize;
225  }
226
227  long getWriteBufferSize() {
228    return writeBufferSize;
229  }
230
231  long getWriteBufferPeriodicFlushTimeoutNs() {
232    return writeBufferPeriodicFlushTimeoutNs;
233  }
234
235  long getPrimaryCallTimeoutNs() {
236    return primaryCallTimeoutNs;
237  }
238
239  long getPrimaryScanTimeoutNs() {
240    return primaryScanTimeoutNs;
241  }
242
243  long getPrimaryMetaScanTimeoutNs() {
244    return primaryMetaScanTimeoutNs;
245  }
246
247  int getMaxKeyValueSize() {
248    return maxKeyValueSize;
249  }
250}