001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.jupiter.api.Assertions.assertArrayEquals; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.io.IOException; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicInteger; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.testclassification.ClientTests; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.junit.jupiter.api.AfterAll; 040import org.junit.jupiter.api.BeforeAll; 041import org.junit.jupiter.api.Tag; 042import org.junit.jupiter.api.Test; 043 044import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 045 046@Tag(MediumTests.TAG) 047@Tag(ClientTests.TAG) 048public class TestAsyncSingleRequestRpcRetryingCaller { 049 050 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 051 052 private static TableName TABLE_NAME = TableName.valueOf("async"); 053 054 private static byte[] FAMILY = Bytes.toBytes("cf"); 055 056 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 057 058 private static byte[] ROW = Bytes.toBytes("row"); 059 060 private static byte[] VALUE = Bytes.toBytes("value"); 061 062 private static AsyncConnectionImpl CONN; 063 064 @BeforeAll 065 public static void setUpBeforeClass() throws Exception { 066 TEST_UTIL.startMiniCluster(2); 067 TEST_UTIL.getAdmin().balancerSwitch(false, true); 068 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 069 TEST_UTIL.waitTableAvailable(TABLE_NAME); 070 ConnectionRegistry registry = 071 ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); 072 CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, 073 registry.getClusterId().get(), null, User.getCurrent()); 074 } 075 076 @AfterAll 077 public static void tearDownAfterClass() throws Exception { 078 Closeables.close(CONN, true); 079 TEST_UTIL.shutdownMiniCluster(); 080 } 081 082 @Test 083 public void testRegionMove() throws InterruptedException, ExecutionException, IOException { 084 // This will leave a cached entry in location cache 085 HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); 086 int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegion().getRegionName()); 087 TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), 088 TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName()); 089 AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS) 090 .setMaxRetries(30).build(); 091 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); 092 093 // move back 094 TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), loc.getServerName()); 095 Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); 096 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 097 } 098 099 private <T> CompletableFuture<T> failedFuture() { 100 CompletableFuture<T> future = new CompletableFuture<>(); 101 future.completeExceptionally(new RuntimeException("Inject error!")); 102 return future; 103 } 104 105 @Test 106 public void testMaxRetries() throws IOException, InterruptedException { 107 try { 108 CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS) 109 .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS) 110 .action((controller, loc, stub) -> failedFuture()).call().get(); 111 fail(); 112 } catch (ExecutionException e) { 113 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); 114 } 115 } 116 117 @Test 118 public void testOperationTimeout() throws IOException, InterruptedException { 119 long startNs = System.nanoTime(); 120 try { 121 CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS) 122 .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE) 123 .action((controller, loc, stub) -> failedFuture()).call().get(); 124 fail(); 125 } catch (ExecutionException e) { 126 e.printStackTrace(); 127 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); 128 } 129 long costNs = System.nanoTime() - startNs; 130 assertTrue(costNs >= TimeUnit.SECONDS.toNanos(1)); 131 assertTrue(costNs < TimeUnit.SECONDS.toNanos(2)); 132 } 133 134 @Test 135 public void testLocateError() throws IOException, InterruptedException, ExecutionException { 136 AtomicBoolean errorTriggered = new AtomicBoolean(false); 137 AtomicInteger count = new AtomicInteger(0); 138 HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); 139 AsyncRegionLocator mockedLocator = 140 new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) { 141 @Override 142 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, 143 int replicaId, RegionLocateType locateType, long timeoutNs) { 144 if (tableName.equals(TABLE_NAME)) { 145 CompletableFuture<HRegionLocation> future = new CompletableFuture<>(); 146 if (count.getAndIncrement() == 0) { 147 errorTriggered.set(true); 148 future.completeExceptionally(new RuntimeException("Inject error!")); 149 } else { 150 future.complete(loc); 151 } 152 return future; 153 } else { 154 return super.getRegionLocation(tableName, row, replicaId, locateType, timeoutNs); 155 } 156 } 157 158 @Override 159 void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { 160 } 161 }; 162 try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(), 163 CONN.registry, CONN.registry.getClusterId().get(), null, User.getCurrent()) { 164 165 @Override 166 AsyncRegionLocator getLocator() { 167 return mockedLocator; 168 } 169 }) { 170 AsyncTable<?> table = mockedConn.getTableBuilder(TABLE_NAME) 171 .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build(); 172 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); 173 assertTrue(errorTriggered.get()); 174 errorTriggered.set(false); 175 count.set(0); 176 Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); 177 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 178 assertTrue(errorTriggered.get()); 179 } 180 } 181}