001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertThrows;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.Callable;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.RegionTooBusyException;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
039import org.apache.hadoop.hbase.regionserver.HRegionServer;
040import org.apache.hadoop.hbase.regionserver.RSRpcServices;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.junit.jupiter.api.AfterAll;
045import org.junit.jupiter.api.BeforeAll;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048
049import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
050import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
051import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
052
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
054
055@Tag(MediumTests.TAG)
056@Tag(ClientTests.TAG)
057public class TestAsyncClientPauseForRpcThrottling {
058
059  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
060
061  private static TableName TABLE_NAME = TableName.valueOf("RpcThrottling");
062
063  private static byte[] FAMILY = Bytes.toBytes("Family");
064
065  private static byte[] QUALIFIER = Bytes.toBytes("Qualifier");
066
067  private static AsyncConnection CONN;
068  private static final AtomicBoolean THROTTLE = new AtomicBoolean(false);
069  private static final AtomicInteger FORCE_RETRIES = new AtomicInteger(0);
070  private static final long WAIT_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1);
071  private static final int RETRY_COUNT = 3;
072  private static final int MAX_MULTIPLIER_EXPECTATION = 2;
073
074  public static final class ThrottlingRSRpcServicesForTest extends RSRpcServices {
075
076    public ThrottlingRSRpcServicesForTest(HRegionServer rs) throws IOException {
077      super(rs);
078    }
079
080    @Override
081    public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
082      throws ServiceException {
083      maybeForceRetry();
084      maybeThrottle();
085      return super.get(controller, request);
086    }
087
088    @Override
089    public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
090      throws ServiceException {
091      maybeForceRetry();
092      maybeThrottle();
093      return super.multi(rpcc, request);
094    }
095
096    @Override
097    public ClientProtos.ScanResponse scan(RpcController controller,
098      ClientProtos.ScanRequest request) throws ServiceException {
099      maybeForceRetry();
100      maybeThrottle();
101      return super.scan(controller, request);
102    }
103
104    private void maybeForceRetry() throws ServiceException {
105      if (FORCE_RETRIES.get() > 0) {
106        FORCE_RETRIES.addAndGet(-1);
107        throw new ServiceException(new RegionTooBusyException("Retry"));
108      }
109    }
110
111    private void maybeThrottle() throws ServiceException {
112      if (THROTTLE.get()) {
113        THROTTLE.set(false);
114        throw new ServiceException(new RpcThrottlingException("number of requests exceeded - wait "
115          + TimeUnit.NANOSECONDS.toMillis(WAIT_INTERVAL_NANOS) + "ms"));
116      }
117    }
118  }
119
120  public static final class ThrottlingRegionServerForTest extends HRegionServer {
121
122    public ThrottlingRegionServerForTest(Configuration conf) throws IOException {
123      super(conf);
124    }
125
126    @Override
127    protected RSRpcServices createRpcServices() throws IOException {
128      return new ThrottlingRSRpcServicesForTest(this);
129    }
130  }
131
132  @BeforeAll
133  public static void setUp() throws Exception {
134    assertTrue(MAX_MULTIPLIER_EXPECTATION < HConstants.RETRY_BACKOFF[RETRY_COUNT],
135      "The MAX_MULTIPLIER_EXPECTATION must be less than HConstants.RETRY_BACKOFF[RETRY_COUNT] "
136        + "in order for our tests to adequately verify that we aren't "
137        + "multiplying throttled pauses based on the retry count.");
138
139    UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
140    UTIL.startMiniCluster(1);
141    UTIL.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL,
142      ThrottlingRegionServerForTest.class, HRegionServer.class);
143    HRegionServer regionServer = UTIL.getMiniHBaseCluster().startRegionServer().getRegionServer();
144
145    try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
146      UTIL.waitTableAvailable(TABLE_NAME);
147      for (int i = 0; i < 100; i++) {
148        table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)));
149      }
150    }
151
152    UTIL.getAdmin().move(UTIL.getAdmin().getRegions(TABLE_NAME).get(0).getEncodedNameAsBytes(),
153      regionServer.getServerName());
154    Configuration conf = new Configuration(UTIL.getConfiguration());
155    CONN = ConnectionFactory.createAsyncConnection(conf).get();
156  }
157
158  @AfterAll
159  public static void tearDown() throws Exception {
160    UTIL.getAdmin().disableTable(TABLE_NAME);
161    UTIL.getAdmin().deleteTable(TABLE_NAME);
162    Closeables.close(CONN, true);
163    UTIL.shutdownMiniCluster();
164  }
165
166  private void assertTime(Callable<Void> callable, long time, boolean isGreater) throws Exception {
167    long costNs = getCostNs(callable);
168    if (isGreater) {
169      assertTrue(costNs > time);
170    } else {
171      assertTrue(costNs <= time);
172    }
173  }
174
175  private void assertTimeBetween(Callable<Void> callable, long minNs, long maxNs) throws Exception {
176    long costNs = getCostNs(callable);
177    assertTrue(costNs > minNs);
178    assertTrue(costNs < maxNs);
179  }
180
181  private long getCostNs(Callable<Void> callable) throws Exception {
182    long startNs = System.nanoTime();
183    callable.call();
184    return System.nanoTime() - startNs;
185  }
186
187  @Test
188  public void itWaitsForThrottledGet() throws Exception {
189    boolean isThrottled = true;
190    THROTTLE.set(isThrottled);
191    AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
192    assertTime(() -> {
193      table.get(new Get(Bytes.toBytes(0))).get();
194      return null;
195    }, WAIT_INTERVAL_NANOS, isThrottled);
196  }
197
198  @Test
199  public void itDoesNotWaitForUnthrottledGet() throws Exception {
200    boolean isThrottled = false;
201    THROTTLE.set(isThrottled);
202    AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
203    assertTime(() -> {
204      table.get(new Get(Bytes.toBytes(0))).get();
205      return null;
206    }, WAIT_INTERVAL_NANOS, isThrottled);
207  }
208
209  @Test
210  public void itDoesNotWaitForThrottledGetExceedingTimeout() throws Exception {
211    AsyncTable<AdvancedScanResultConsumer> table =
212      CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MILLISECONDS).build();
213    boolean isThrottled = true;
214    THROTTLE.set(isThrottled);
215    assertTime(() -> {
216      assertThrows(ExecutionException.class, () -> table.get(new Get(Bytes.toBytes(0))).get());
217      return null;
218    }, WAIT_INTERVAL_NANOS, false);
219  }
220
221  @Test
222  public void itDoesNotMultiplyThrottledGetWait() throws Exception {
223    THROTTLE.set(true);
224    FORCE_RETRIES.set(RETRY_COUNT);
225
226    AsyncTable<AdvancedScanResultConsumer> table =
227      CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
228        .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build();
229
230    assertTimeBetween(() -> {
231      table.get(new Get(Bytes.toBytes(0))).get();
232      return null;
233    }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
234  }
235
236  @Test
237  public void itWaitsForThrottledBatch() throws Exception {
238    boolean isThrottled = true;
239    THROTTLE.set(isThrottled);
240    assertTime(() -> {
241      List<CompletableFuture<?>> futures = new ArrayList<>();
242      try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
243        for (int i = 100; i < 110; i++) {
244          futures.add(mutator
245            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
246        }
247      }
248      return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
249    }, WAIT_INTERVAL_NANOS, isThrottled);
250  }
251
252  @Test
253  public void itDoesNotWaitForUnthrottledBatch() throws Exception {
254    boolean isThrottled = false;
255    THROTTLE.set(isThrottled);
256    assertTime(() -> {
257      List<CompletableFuture<?>> futures = new ArrayList<>();
258      try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
259        for (int i = 100; i < 110; i++) {
260          futures.add(mutator
261            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
262        }
263      }
264      return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
265    }, WAIT_INTERVAL_NANOS, isThrottled);
266  }
267
268  @Test
269  public void itDoesNotWaitForThrottledBatchExceedingTimeout() throws Exception {
270    boolean isThrottled = true;
271    THROTTLE.set(isThrottled);
272    assertTime(() -> {
273      List<CompletableFuture<?>> futures = new ArrayList<>();
274      try (AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
275        .setOperationTimeout(1, TimeUnit.MILLISECONDS).build()) {
276        for (int i = 100; i < 110; i++) {
277          futures.add(mutator
278            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
279        }
280      }
281      assertThrows(ExecutionException.class,
282        () -> CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get());
283      return null;
284    }, WAIT_INTERVAL_NANOS, false);
285  }
286
287  @Test
288  public void itDoesNotMultiplyThrottledBatchWait() throws Exception {
289    THROTTLE.set(true);
290    FORCE_RETRIES.set(RETRY_COUNT);
291
292    assertTimeBetween(() -> {
293      List<CompletableFuture<?>> futures = new ArrayList<>();
294      try (AsyncBufferedMutator mutator =
295        CONN.getBufferedMutatorBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
296          .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build()) {
297        for (int i = 100; i < 110; i++) {
298          futures.add(mutator
299            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
300        }
301      }
302      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
303      return null;
304    }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
305  }
306
307  @Test
308  public void itWaitsForThrottledScan() throws Exception {
309    boolean isThrottled = true;
310    THROTTLE.set(isThrottled);
311    assertTime(() -> {
312      try (
313        ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
314        for (int i = 0; i < 100; i++) {
315          Result result = scanner.next();
316          assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
317        }
318      }
319      return null;
320    }, WAIT_INTERVAL_NANOS, isThrottled);
321  }
322
323  @Test
324  public void itDoesNotWaitForUnthrottledScan() throws Exception {
325    boolean isThrottled = false;
326    THROTTLE.set(isThrottled);
327    assertTime(() -> {
328      try (
329        ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
330        for (int i = 0; i < 100; i++) {
331          Result result = scanner.next();
332          assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
333        }
334      }
335      return null;
336    }, WAIT_INTERVAL_NANOS, isThrottled);
337  }
338
339  @Test
340  public void itDoesNotWaitForThrottledScanExceedingTimeout() throws Exception {
341    AsyncTable<AdvancedScanResultConsumer> table =
342      CONN.getTableBuilder(TABLE_NAME).setScanTimeout(1, TimeUnit.MILLISECONDS).build();
343    boolean isThrottled = true;
344    THROTTLE.set(isThrottled);
345    assertTime(() -> {
346      try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) {
347        for (int i = 0; i < 100; i++) {
348          assertThrows(RetriesExhaustedException.class, scanner::next);
349        }
350      }
351      return null;
352    }, WAIT_INTERVAL_NANOS, false);
353  }
354
355  @Test
356  public void itDoesNotMultiplyThrottledScanWait() throws Exception {
357    THROTTLE.set(true);
358    FORCE_RETRIES.set(RETRY_COUNT);
359
360    AsyncTable<AdvancedScanResultConsumer> table =
361      CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
362        .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build();
363
364    assertTimeBetween(() -> {
365      try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) {
366        for (int i = 0; i < 100; i++) {
367          Result result = scanner.next();
368          assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
369        }
370      }
371      return null;
372    }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
373  }
374}