001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.jupiter.api.Assertions.assertArrayEquals;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.IOException;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicInteger;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.testclassification.ClientTests;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.junit.jupiter.api.AfterAll;
040import org.junit.jupiter.api.BeforeAll;
041import org.junit.jupiter.api.Tag;
042import org.junit.jupiter.api.Test;
043
044import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
045
046@Tag(MediumTests.TAG)
047@Tag(ClientTests.TAG)
048public class TestAsyncSingleRequestRpcRetryingCaller {
049
050  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
051
052  private static TableName TABLE_NAME = TableName.valueOf("async");
053
054  private static byte[] FAMILY = Bytes.toBytes("cf");
055
056  private static byte[] QUALIFIER = Bytes.toBytes("cq");
057
058  private static byte[] ROW = Bytes.toBytes("row");
059
060  private static byte[] VALUE = Bytes.toBytes("value");
061
062  private static AsyncConnectionImpl CONN;
063
064  @BeforeAll
065  public static void setUpBeforeClass() throws Exception {
066    TEST_UTIL.startMiniCluster(2);
067    TEST_UTIL.getAdmin().balancerSwitch(false, true);
068    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
069    TEST_UTIL.waitTableAvailable(TABLE_NAME);
070    ConnectionRegistry registry =
071      ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent());
072    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
073      registry.getClusterId().get(), null, User.getCurrent());
074  }
075
076  @AfterAll
077  public static void tearDownAfterClass() throws Exception {
078    Closeables.close(CONN, true);
079    TEST_UTIL.shutdownMiniCluster();
080  }
081
082  @Test
083  public void testRegionMove() throws InterruptedException, ExecutionException, IOException {
084    // This will leave a cached entry in location cache
085    HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
086    int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegion().getRegionName());
087    TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(),
088      TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName());
089    AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
090      .setMaxRetries(30).build();
091    table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
092
093    // move back
094    TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), loc.getServerName());
095    Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
096    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
097  }
098
099  private <T> CompletableFuture<T> failedFuture() {
100    CompletableFuture<T> future = new CompletableFuture<>();
101    future.completeExceptionally(new RuntimeException("Inject error!"));
102    return future;
103  }
104
105  @Test
106  public void testMaxRetries() throws IOException, InterruptedException {
107    try {
108      CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
109        .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
110        .action((controller, loc, stub) -> failedFuture()).call().get();
111      fail();
112    } catch (ExecutionException e) {
113      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
114    }
115  }
116
117  @Test
118  public void testOperationTimeout() throws IOException, InterruptedException {
119    long startNs = System.nanoTime();
120    try {
121      CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS)
122        .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
123        .action((controller, loc, stub) -> failedFuture()).call().get();
124      fail();
125    } catch (ExecutionException e) {
126      e.printStackTrace();
127      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
128    }
129    long costNs = System.nanoTime() - startNs;
130    assertTrue(costNs >= TimeUnit.SECONDS.toNanos(1));
131    assertTrue(costNs < TimeUnit.SECONDS.toNanos(2));
132  }
133
134  @Test
135  public void testLocateError() throws IOException, InterruptedException, ExecutionException {
136    AtomicBoolean errorTriggered = new AtomicBoolean(false);
137    AtomicInteger count = new AtomicInteger(0);
138    HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
139    AsyncRegionLocator mockedLocator =
140      new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
141        @Override
142        CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
143          int replicaId, RegionLocateType locateType, long timeoutNs) {
144          if (tableName.equals(TABLE_NAME)) {
145            CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
146            if (count.getAndIncrement() == 0) {
147              errorTriggered.set(true);
148              future.completeExceptionally(new RuntimeException("Inject error!"));
149            } else {
150              future.complete(loc);
151            }
152            return future;
153          } else {
154            return super.getRegionLocation(tableName, row, replicaId, locateType, timeoutNs);
155          }
156        }
157
158        @Override
159        void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
160        }
161      };
162    try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
163      CONN.registry, CONN.registry.getClusterId().get(), null, User.getCurrent()) {
164
165      @Override
166      AsyncRegionLocator getLocator() {
167        return mockedLocator;
168      }
169    }) {
170      AsyncTable<?> table = mockedConn.getTableBuilder(TABLE_NAME)
171        .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
172      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
173      assertTrue(errorTriggered.get());
174      errorTriggered.set(false);
175      count.set(0);
176      Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
177      assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
178      assertTrue(errorTriggered.get());
179    }
180  }
181}