001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
023import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
024import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE;
025import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
026import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING;
027import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
028import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
029import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT;
030import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
031import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
032import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
033import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
034import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
035import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
036import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
037import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING;
038import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
039import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
040import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
041import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
042import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
043import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT;
044import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY;
045import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
046import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
047import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND;
048import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT;
049import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
050import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
051import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
052import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
053
054import java.util.concurrent.TimeUnit;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Timeout configs.
063 */
064@InterfaceAudience.Private
065class AsyncConnectionConfiguration {
066
067  private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class);
068
069  /**
070   * Parameter name for client pause when server is overloaded, denoted by
071   * {@link org.apache.hadoop.hbase.HBaseServerException#isServerOverloaded()}
072   */
073  public static final String HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED =
074    "hbase.client.pause.server.overloaded";
075
076  static {
077    // This is added where the configs are referenced. It may be too late to happen before
078    // any user _sets_ the old cqtbe config onto a Configuration option. So we still need
079    // to handle checking both properties in parsing below. The benefit of calling this is
080    // that it should still cause Configuration to log a warning if we do end up falling
081    // through to the old deprecated config.
082    Configuration.addDeprecation(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE,
083      HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED);
084  }
085
086  /**
087   * Configure the number of failures after which the client will start logging. A few failures is
088   * fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
089   * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at this
090   * stage.
091   */
092  public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
093    "hbase.client.start.log.errors.counter";
094  public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5;
095
096  private final long metaOperationTimeoutNs;
097
098  // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected
099  // by this value, see scanTimeoutNs.
100  private final long operationTimeoutNs;
101
102  // timeout for each rpc request. Can be overridden by a more specific config, such as
103  // readRpcTimeout or writeRpcTimeout.
104  private final long rpcTimeoutNs;
105
106  // timeout for each read rpc request
107  private final long readRpcTimeoutNs;
108
109  // timeout for each read rpc request against system tables
110  private final long metaReadRpcTimeoutNs;
111
112  // timeout for each write rpc request
113  private final long writeRpcTimeoutNs;
114
115  private final long pauseNs;
116
117  private final long pauseNsForServerOverloaded;
118
119  private final int maxRetries;
120
121  /** How many retries are allowed before we start to log */
122  private final int startLogErrorsCnt;
123
124  // As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is
125  // crash. The RS will always return something before the rpc timeout or scan timeout to tell the
126  // client that it is still alive. The scan timeout is used as operation timeout for every
127  // operations in a scan, such as openScanner or next.
128  private final long scanTimeoutNs;
129  private final long metaScanTimeoutNs;
130
131  private final int scannerCaching;
132
133  private final int metaScannerCaching;
134
135  private final long scannerMaxResultSize;
136
137  private final long writeBufferSize;
138
139  private final long writeBufferPeriodicFlushTimeoutNs;
140
141  // this is for supporting region replica get, if the primary does not finished within this
142  // timeout, we will send request to secondaries.
143  private final long primaryCallTimeoutNs;
144
145  private final long primaryScanTimeoutNs;
146
147  private final long primaryMetaScanTimeoutNs;
148
149  private final int maxKeyValueSize;
150
151  AsyncConnectionConfiguration(Configuration conf) {
152    this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
153      conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
154    this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
155      conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
156    long rpcTimeoutMs = conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
157    this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(rpcTimeoutMs);
158    long readRpcTimeoutMillis = conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutMs);
159    this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(readRpcTimeoutMillis);
160    this.metaReadRpcTimeoutNs = TimeUnit.MILLISECONDS
161      .toNanos(conf.getLong(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeoutMillis));
162    this.writeRpcTimeoutNs =
163      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs));
164    long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
165    long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
166      conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs));
167    if (pauseMsForServerOverloaded < pauseMs) {
168      LOG.warn(
169        "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead",
170        HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseMsForServerOverloaded, HBASE_CLIENT_PAUSE,
171        pauseMs);
172      pauseMsForServerOverloaded = pauseMs;
173    }
174    this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs);
175    this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(pauseMsForServerOverloaded);
176    this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
177    this.startLogErrorsCnt =
178      conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
179    long scannerTimeoutMillis = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
180      DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
181    this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(scannerTimeoutMillis);
182    this.metaScanTimeoutNs = TimeUnit.MILLISECONDS
183      .toNanos(conf.getLong(HBASE_CLIENT_META_SCANNER_TIMEOUT, scannerTimeoutMillis));
184    this.scannerCaching =
185      conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
186    this.metaScannerCaching =
187      conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
188    this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
189      DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
190    this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
191    this.writeBufferPeriodicFlushTimeoutNs =
192      TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
193        WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
194    this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
195      conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
196    this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
197      conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT));
198    this.primaryMetaScanTimeoutNs =
199      TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
200        HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
201    this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
202  }
203
204  long getMetaOperationTimeoutNs() {
205    return metaOperationTimeoutNs;
206  }
207
208  long getOperationTimeoutNs() {
209    return operationTimeoutNs;
210  }
211
212  long getRpcTimeoutNs() {
213    return rpcTimeoutNs;
214  }
215
216  long getReadRpcTimeoutNs() {
217    return readRpcTimeoutNs;
218  }
219
220  long getMetaReadRpcTimeoutNs() {
221    return metaReadRpcTimeoutNs;
222  }
223
224  long getWriteRpcTimeoutNs() {
225    return writeRpcTimeoutNs;
226  }
227
228  long getPauseNs() {
229    return pauseNs;
230  }
231
232  long getPauseNsForServerOverloaded() {
233    return pauseNsForServerOverloaded;
234  }
235
236  int getMaxRetries() {
237    return maxRetries;
238  }
239
240  int getStartLogErrorsCnt() {
241    return startLogErrorsCnt;
242  }
243
244  long getScanTimeoutNs() {
245    return scanTimeoutNs;
246  }
247
248  long getMetaScanTimeoutNs() {
249    return metaScanTimeoutNs;
250  }
251
252  int getScannerCaching() {
253    return scannerCaching;
254  }
255
256  int getMetaScannerCaching() {
257    return metaScannerCaching;
258  }
259
260  long getScannerMaxResultSize() {
261    return scannerMaxResultSize;
262  }
263
264  long getWriteBufferSize() {
265    return writeBufferSize;
266  }
267
268  long getWriteBufferPeriodicFlushTimeoutNs() {
269    return writeBufferPeriodicFlushTimeoutNs;
270  }
271
272  long getPrimaryCallTimeoutNs() {
273    return primaryCallTimeoutNs;
274  }
275
276  long getPrimaryScanTimeoutNs() {
277    return primaryScanTimeoutNs;
278  }
279
280  long getPrimaryMetaScanTimeoutNs() {
281    return primaryMetaScanTimeoutNs;
282  }
283
284  int getMaxKeyValueSize() {
285    return maxKeyValueSize;
286  }
287}