001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertThrows;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.Callable;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.RegionTooBusyException;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
040import org.apache.hadoop.hbase.regionserver.HRegionServer;
041import org.apache.hadoop.hbase.regionserver.RSRpcServices;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.AfterClass;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050
051import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
052import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
053import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
054
055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
056
057@Category({ MediumTests.class, ClientTests.class })
058public class TestAsyncClientPauseForRpcThrottling {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestAsyncClientPauseForRpcThrottling.class);
063
064  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
065
066  private static TableName TABLE_NAME = TableName.valueOf("RpcThrottling");
067
068  private static byte[] FAMILY = Bytes.toBytes("Family");
069
070  private static byte[] QUALIFIER = Bytes.toBytes("Qualifier");
071
072  private static AsyncConnection CONN;
073  private static final AtomicBoolean THROTTLE = new AtomicBoolean(false);
074  private static final AtomicInteger FORCE_RETRIES = new AtomicInteger(0);
075  private static final long WAIT_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1);
076  private static final int RETRY_COUNT = 3;
077  private static final int MAX_MULTIPLIER_EXPECTATION = 2;
078
079  public static final class ThrottlingRSRpcServicesForTest extends RSRpcServices {
080
081    public ThrottlingRSRpcServicesForTest(HRegionServer rs) throws IOException {
082      super(rs);
083    }
084
085    @Override
086    public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
087      throws ServiceException {
088      maybeForceRetry();
089      maybeThrottle();
090      return super.get(controller, request);
091    }
092
093    @Override
094    public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
095      throws ServiceException {
096      maybeForceRetry();
097      maybeThrottle();
098      return super.multi(rpcc, request);
099    }
100
101    @Override
102    public ClientProtos.ScanResponse scan(RpcController controller,
103      ClientProtos.ScanRequest request) throws ServiceException {
104      maybeForceRetry();
105      maybeThrottle();
106      return super.scan(controller, request);
107    }
108
109    private void maybeForceRetry() throws ServiceException {
110      if (FORCE_RETRIES.get() > 0) {
111        FORCE_RETRIES.addAndGet(-1);
112        throw new ServiceException(new RegionTooBusyException("Retry"));
113      }
114    }
115
116    private void maybeThrottle() throws ServiceException {
117      if (THROTTLE.get()) {
118        THROTTLE.set(false);
119        throw new ServiceException(new RpcThrottlingException("number of requests exceeded - wait "
120          + TimeUnit.NANOSECONDS.toMillis(WAIT_INTERVAL_NANOS) + "ms"));
121      }
122    }
123  }
124
125  public static final class ThrottlingRegionServerForTest extends HRegionServer {
126
127    public ThrottlingRegionServerForTest(Configuration conf) throws IOException {
128      super(conf);
129    }
130
131    @Override
132    protected RSRpcServices createRpcServices() throws IOException {
133      return new ThrottlingRSRpcServicesForTest(this);
134    }
135  }
136
137  @BeforeClass
138  public static void setUp() throws Exception {
139    assertTrue(
140      "The MAX_MULTIPLIER_EXPECTATION must be less than HConstants.RETRY_BACKOFF[RETRY_COUNT] "
141        + "in order for our tests to adequately verify that we aren't "
142        + "multiplying throttled pauses based on the retry count.",
143      MAX_MULTIPLIER_EXPECTATION < HConstants.RETRY_BACKOFF[RETRY_COUNT]);
144
145    UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
146    UTIL.startMiniCluster(1);
147    UTIL.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL,
148      ThrottlingRegionServerForTest.class, HRegionServer.class);
149    HRegionServer regionServer = UTIL.getMiniHBaseCluster().startRegionServer().getRegionServer();
150
151    try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
152      UTIL.waitTableAvailable(TABLE_NAME);
153      for (int i = 0; i < 100; i++) {
154        table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)));
155      }
156    }
157
158    UTIL.getAdmin().move(UTIL.getAdmin().getRegions(TABLE_NAME).get(0).getEncodedNameAsBytes(),
159      regionServer.getServerName());
160    Configuration conf = new Configuration(UTIL.getConfiguration());
161    CONN = ConnectionFactory.createAsyncConnection(conf).get();
162  }
163
164  @AfterClass
165  public static void tearDown() throws Exception {
166    UTIL.getAdmin().disableTable(TABLE_NAME);
167    UTIL.getAdmin().deleteTable(TABLE_NAME);
168    Closeables.close(CONN, true);
169    UTIL.shutdownMiniCluster();
170  }
171
172  private void assertTime(Callable<Void> callable, long time, boolean isGreater) throws Exception {
173    long costNs = getCostNs(callable);
174    if (isGreater) {
175      assertTrue(costNs > time);
176    } else {
177      assertTrue(costNs <= time);
178    }
179  }
180
181  private void assertTimeBetween(Callable<Void> callable, long minNs, long maxNs) throws Exception {
182    long costNs = getCostNs(callable);
183    assertTrue(costNs > minNs);
184    assertTrue(costNs < maxNs);
185  }
186
187  private long getCostNs(Callable<Void> callable) throws Exception {
188    long startNs = System.nanoTime();
189    callable.call();
190    return System.nanoTime() - startNs;
191  }
192
193  @Test
194  public void itWaitsForThrottledGet() throws Exception {
195    boolean isThrottled = true;
196    THROTTLE.set(isThrottled);
197    AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
198    assertTime(() -> {
199      table.get(new Get(Bytes.toBytes(0))).get();
200      return null;
201    }, WAIT_INTERVAL_NANOS, isThrottled);
202  }
203
204  @Test
205  public void itDoesNotWaitForUnthrottledGet() throws Exception {
206    boolean isThrottled = false;
207    THROTTLE.set(isThrottled);
208    AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
209    assertTime(() -> {
210      table.get(new Get(Bytes.toBytes(0))).get();
211      return null;
212    }, WAIT_INTERVAL_NANOS, isThrottled);
213  }
214
215  @Test
216  public void itDoesNotWaitForThrottledGetExceedingTimeout() throws Exception {
217    AsyncTable<AdvancedScanResultConsumer> table =
218      CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MILLISECONDS).build();
219    boolean isThrottled = true;
220    THROTTLE.set(isThrottled);
221    assertTime(() -> {
222      assertThrows(ExecutionException.class, () -> table.get(new Get(Bytes.toBytes(0))).get());
223      return null;
224    }, WAIT_INTERVAL_NANOS, false);
225  }
226
227  @Test
228  public void itDoesNotMultiplyThrottledGetWait() throws Exception {
229    THROTTLE.set(true);
230    FORCE_RETRIES.set(RETRY_COUNT);
231
232    AsyncTable<AdvancedScanResultConsumer> table =
233      CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
234        .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build();
235
236    assertTimeBetween(() -> {
237      table.get(new Get(Bytes.toBytes(0))).get();
238      return null;
239    }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
240  }
241
242  @Test
243  public void itWaitsForThrottledBatch() throws Exception {
244    boolean isThrottled = true;
245    THROTTLE.set(isThrottled);
246    assertTime(() -> {
247      List<CompletableFuture<?>> futures = new ArrayList<>();
248      try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
249        for (int i = 100; i < 110; i++) {
250          futures.add(mutator
251            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
252        }
253      }
254      return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
255    }, WAIT_INTERVAL_NANOS, isThrottled);
256  }
257
258  @Test
259  public void itDoesNotWaitForUnthrottledBatch() throws Exception {
260    boolean isThrottled = false;
261    THROTTLE.set(isThrottled);
262    assertTime(() -> {
263      List<CompletableFuture<?>> futures = new ArrayList<>();
264      try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
265        for (int i = 100; i < 110; i++) {
266          futures.add(mutator
267            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
268        }
269      }
270      return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
271    }, WAIT_INTERVAL_NANOS, isThrottled);
272  }
273
274  @Test
275  public void itDoesNotWaitForThrottledBatchExceedingTimeout() throws Exception {
276    boolean isThrottled = true;
277    THROTTLE.set(isThrottled);
278    assertTime(() -> {
279      List<CompletableFuture<?>> futures = new ArrayList<>();
280      try (AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
281        .setOperationTimeout(1, TimeUnit.MILLISECONDS).build()) {
282        for (int i = 100; i < 110; i++) {
283          futures.add(mutator
284            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
285        }
286      }
287      assertThrows(ExecutionException.class,
288        () -> CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get());
289      return null;
290    }, WAIT_INTERVAL_NANOS, false);
291  }
292
293  @Test
294  public void itDoesNotMultiplyThrottledBatchWait() throws Exception {
295    THROTTLE.set(true);
296    FORCE_RETRIES.set(RETRY_COUNT);
297
298    assertTimeBetween(() -> {
299      List<CompletableFuture<?>> futures = new ArrayList<>();
300      try (AsyncBufferedMutator mutator =
301        CONN.getBufferedMutatorBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
302          .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build()) {
303        for (int i = 100; i < 110; i++) {
304          futures.add(mutator
305            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
306        }
307      }
308      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
309      return null;
310    }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
311  }
312
313  @Test
314  public void itWaitsForThrottledScan() throws Exception {
315    boolean isThrottled = true;
316    THROTTLE.set(isThrottled);
317    assertTime(() -> {
318      try (
319        ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
320        for (int i = 0; i < 100; i++) {
321          Result result = scanner.next();
322          assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
323        }
324      }
325      return null;
326    }, WAIT_INTERVAL_NANOS, isThrottled);
327  }
328
329  @Test
330  public void itDoesNotWaitForUnthrottledScan() throws Exception {
331    boolean isThrottled = false;
332    THROTTLE.set(isThrottled);
333    assertTime(() -> {
334      try (
335        ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
336        for (int i = 0; i < 100; i++) {
337          Result result = scanner.next();
338          assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
339        }
340      }
341      return null;
342    }, WAIT_INTERVAL_NANOS, isThrottled);
343  }
344
345  @Test
346  public void itDoesNotWaitForThrottledScanExceedingTimeout() throws Exception {
347    AsyncTable<AdvancedScanResultConsumer> table =
348      CONN.getTableBuilder(TABLE_NAME).setScanTimeout(1, TimeUnit.MILLISECONDS).build();
349    boolean isThrottled = true;
350    THROTTLE.set(isThrottled);
351    assertTime(() -> {
352      try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) {
353        for (int i = 0; i < 100; i++) {
354          assertThrows(RetriesExhaustedException.class, scanner::next);
355        }
356      }
357      return null;
358    }, WAIT_INTERVAL_NANOS, false);
359  }
360
361  @Test
362  public void itDoesNotMultiplyThrottledScanWait() throws Exception {
363    THROTTLE.set(true);
364    FORCE_RETRIES.set(RETRY_COUNT);
365
366    AsyncTable<AdvancedScanResultConsumer> table =
367      CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
368        .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build();
369
370    assertTimeBetween(() -> {
371      try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) {
372        for (int i = 0; i < 100; i++) {
373          Result result = scanner.next();
374          assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
375        }
376      }
377      return null;
378    }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
379  }
380}