001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertThat;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicInteger;
032import org.apache.commons.io.IOUtils;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HRegionLocation;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.security.User;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046
047@Category({ MediumTests.class, ClientTests.class })
048public class TestAsyncSingleRequestRpcRetryingCaller {
049
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052    HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class);
053
054  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
055
056  private static TableName TABLE_NAME = TableName.valueOf("async");
057
058  private static byte[] FAMILY = Bytes.toBytes("cf");
059
060  private static byte[] QUALIFIER = Bytes.toBytes("cq");
061
062  private static byte[] ROW = Bytes.toBytes("row");
063
064  private static byte[] VALUE = Bytes.toBytes("value");
065
066  private static AsyncConnectionImpl CONN;
067
068  @BeforeClass
069  public static void setUpBeforeClass() throws Exception {
070    TEST_UTIL.startMiniCluster(2);
071    TEST_UTIL.getAdmin().balancerSwitch(false, true);
072    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
073    TEST_UTIL.waitTableAvailable(TABLE_NAME);
074    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
075    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
076      registry.getClusterId().get(), User.getCurrent());
077  }
078
079  @AfterClass
080  public static void tearDownAfterClass() throws Exception {
081    IOUtils.closeQuietly(CONN);
082    TEST_UTIL.shutdownMiniCluster();
083  }
084
085  @Test
086  public void testRegionMove() throws InterruptedException, ExecutionException, IOException {
087    // This will leave a cached entry in location cache
088    HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
089    int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegion().getRegionName());
090    TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(),
091      TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName());
092    AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
093      .setMaxRetries(30).build();
094    table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
095
096    // move back
097    TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), loc.getServerName());
098    Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
099    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
100  }
101
102  private <T> CompletableFuture<T> failedFuture() {
103    CompletableFuture<T> future = new CompletableFuture<>();
104    future.completeExceptionally(new RuntimeException("Inject error!"));
105    return future;
106  }
107
108  @Test
109  public void testMaxRetries() throws IOException, InterruptedException {
110    try {
111      CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
112        .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
113        .action((controller, loc, stub) -> failedFuture()).call().get();
114      fail();
115    } catch (ExecutionException e) {
116      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
117    }
118  }
119
120  @Test
121  public void testOperationTimeout() throws IOException, InterruptedException {
122    long startNs = System.nanoTime();
123    try {
124      CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS)
125        .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
126        .action((controller, loc, stub) -> failedFuture()).call().get();
127      fail();
128    } catch (ExecutionException e) {
129      e.printStackTrace();
130      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
131    }
132    long costNs = System.nanoTime() - startNs;
133    assertTrue(costNs >= TimeUnit.SECONDS.toNanos(1));
134    assertTrue(costNs < TimeUnit.SECONDS.toNanos(2));
135  }
136
137  @Test
138  public void testLocateError() throws IOException, InterruptedException, ExecutionException {
139    AtomicBoolean errorTriggered = new AtomicBoolean(false);
140    AtomicInteger count = new AtomicInteger(0);
141    HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
142    AsyncRegionLocator mockedLocator =
143      new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
144        @Override
145        CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
146            int replicaId, RegionLocateType locateType, long timeoutNs) {
147          if (tableName.equals(TABLE_NAME)) {
148            CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
149            if (count.getAndIncrement() == 0) {
150              errorTriggered.set(true);
151              future.completeExceptionally(new RuntimeException("Inject error!"));
152            } else {
153              future.complete(loc);
154            }
155            return future;
156          } else {
157            return super.getRegionLocation(tableName, row, replicaId, locateType, timeoutNs);
158          }
159        }
160
161        @Override
162        void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
163        }
164      };
165    try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
166      CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
167
168      @Override
169      AsyncRegionLocator getLocator() {
170        return mockedLocator;
171      }
172    }) {
173      AsyncTable<?> table = mockedConn.getTableBuilder(TABLE_NAME)
174        .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
175      table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
176      assertTrue(errorTriggered.get());
177      errorTriggered.set(false);
178      count.set(0);
179      Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
180      assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
181      assertTrue(errorTriggered.get());
182    }
183  }
184}