001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertThat;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicInteger;
032import org.apache.commons.io.IOUtils;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HRegionLocation;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.security.User;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046
047@Category({ MediumTests.class, ClientTests.class })
048public class TestAsyncSingleRequestRpcRetryingCaller {
049
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052    HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class);
053
054  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
055
056  private static TableName TABLE_NAME = TableName.valueOf("async");
057
058  private static byte[] FAMILY = Bytes.toBytes("cf");
059
060  private static byte[] QUALIFIER = Bytes.toBytes("cq");
061
062  private static byte[] ROW = Bytes.toBytes("row");
063
064  private static byte[] VALUE = Bytes.toBytes("value");
065
066  private static AsyncConnectionImpl CONN;
067
068  @BeforeClass
069  public static void setUpBeforeClass() throws Exception {
070    TEST_UTIL.startMiniCluster(2);
071    TEST_UTIL.getAdmin().balancerSwitch(false, true);
072    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
073    TEST_UTIL.waitTableAvailable(TABLE_NAME);
074    ConnectionRegistry registry =
075        ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
076    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
077      registry.getClusterId().get(), User.getCurrent());
078  }
079
080  @AfterClass
081  public static void tearDownAfterClass() throws Exception {
082    IOUtils.closeQuietly(CONN);
083    TEST_UTIL.shutdownMiniCluster();
084  }
085
086  @Test
087  public void testRegionMove() throws InterruptedException, ExecutionException, IOException {
088    // This will leave a cached entry in location cache
089    HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
090    int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegion().getRegionName());
091    TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(),
092      TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName());
093    AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
094      .setMaxRetries(30).build();
095    table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
096
097    // move back
098    TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), loc.getServerName());
099    Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
100    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
101  }
102
103  private <T> CompletableFuture<T> failedFuture() {
104    CompletableFuture<T> future = new CompletableFuture<>();
105    future.completeExceptionally(new RuntimeException("Inject error!"));
106    return future;
107  }
108
109  @Test
110  public void testMaxRetries() throws IOException, InterruptedException {
111    try {
112      CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
113        .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
114        .action((controller, loc, stub) -> failedFuture()).call().get();
115      fail();
116    } catch (ExecutionException e) {
117      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
118    }
119  }
120
121  @Test
122  public void testOperationTimeout() throws IOException, InterruptedException {
123    long startNs = System.nanoTime();
124    try {
125      CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS)
126        .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
127        .action((controller, loc, stub) -> failedFuture()).call().get();
128      fail();
129    } catch (ExecutionException e) {
130      e.printStackTrace();
131      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
132    }
133    long costNs = System.nanoTime() - startNs;
134    assertTrue(costNs >= TimeUnit.SECONDS.toNanos(1));
135    assertTrue(costNs < TimeUnit.SECONDS.toNanos(2));
136  }
137
138  @Test
139  public void testLocateError() throws IOException, InterruptedException, ExecutionException {
140    AtomicBoolean errorTriggered = new AtomicBoolean(false);
141    AtomicInteger count = new AtomicInteger(0);
142    HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
143    AsyncRegionLocator mockedLocator =
144      new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
145        @Override
146        CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
147            int replicaId, RegionLocateType locateType, long timeoutNs) {
148          if (tableName.equals(TABLE_NAME)) {
149            CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
150            if (count.getAndIncrement() == 0) {
151              errorTriggered.set(true);
152              future.completeExceptionally(new RuntimeException("Inject error!"));
153            } else {
154              future.complete(loc);
155            }
156            return future;
157          } else {
158            return super.getRegionLocation(tableName, row, replicaId, locateType, timeoutNs);
159          }
160        }
161
162        @Override
163        void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
164        }
165      };
166    try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
167      CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
168
169      @Override
170      AsyncRegionLocator getLocator() {
171        return mockedLocator;
172      }
173    }) {
174      AsyncTable<?> table = mockedConn.getTableBuilder(TABLE_NAME)
175        .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
176      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
177      assertTrue(errorTriggered.get());
178      errorTriggered.set(false);
179      count.set(0);
180      Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
181      assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
182      assertTrue(errorTriggered.get());
183    }
184  }
185}