001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertTrue; 021import static org.mockito.ArgumentMatchers.any; 022import static org.mockito.ArgumentMatchers.anyInt; 023import static org.mockito.ArgumentMatchers.anyLong; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.util.Collections; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HRegionLocation; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.ipc.HBaseRpcController; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.testclassification.RegionServerTests; 042import org.apache.hadoop.hbase.testclassification.SmallTests; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.junit.jupiter.api.AfterAll; 045import org.junit.jupiter.api.BeforeAll; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048 049import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 050import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 051 052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.Interface; 054 055/** 056 * Make sure we could fallback to use replay method if replicateToReplica method is not present, 057 * i.e, we are connecting an old region server. 058 */ 059@Tag(RegionServerTests.TAG) 060@Tag(SmallTests.TAG) 061public class TestFallbackToUseReplay { 062 063 private static Configuration CONF = HBaseConfiguration.create(); 064 065 private static AsyncClusterConnectionImpl CONN; 066 067 private static AsyncRegionReplicationRetryingCaller CALLER; 068 069 private static RegionInfo REPLICA = 070 RegionInfoBuilder.newBuilder(TableName.valueOf("test")).setReplicaId(1).build(); 071 072 private static AtomicBoolean REPLAY_CALLED = new AtomicBoolean(false); 073 074 @BeforeAll 075 public static void setUpBeforeClass() throws IOException { 076 CONF.setInt(AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 077 AsyncRegionLocator locator = mock(AsyncRegionLocator.class); 078 when(locator.getRegionLocation(any(), any(), anyInt(), any(), anyLong())) 079 .thenReturn(CompletableFuture.completedFuture(new HRegionLocation(REPLICA, 080 ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime())))); 081 AdminService.Interface stub = mock(AdminService.Interface.class); 082 // fail the call to replicateToReplica 083 doAnswer(i -> { 084 HBaseRpcController controller = i.getArgument(0, HBaseRpcController.class); 085 controller.setFailed(new DoNotRetryIOException(new UnsupportedOperationException())); 086 RpcCallback<?> done = i.getArgument(2, RpcCallback.class); 087 done.run(null); 088 return null; 089 }).when(stub).replicateToReplica(any(), any(), any()); 090 doAnswer(i -> { 091 REPLAY_CALLED.set(true); 092 RpcCallback<?> done = i.getArgument(2, RpcCallback.class); 093 done.run(null); 094 return null; 095 }).when(stub).replay(any(), any(), any()); 096 CONN = new AsyncClusterConnectionImpl(CONF, mock(ConnectionRegistry.class), "test", null, 097 User.getCurrent()) { 098 099 @Override 100 AsyncRegionLocator getLocator() { 101 return locator; 102 } 103 104 @Override 105 Interface getAdminStub(ServerName serverName) throws IOException { 106 return stub; 107 } 108 }; 109 CALLER = new AsyncRegionReplicationRetryingCaller(AsyncClusterConnectionImpl.RETRY_TIMER, CONN, 110 10, TimeUnit.SECONDS.toNanos(1), TimeUnit.SECONDS.toNanos(10), REPLICA, 111 Collections.emptyList()); 112 } 113 114 @AfterAll 115 public static void tearDownAfterClass() throws IOException { 116 Closeables.close(CONN, true); 117 } 118 119 @Test 120 public void testFallback() { 121 CALLER.call().join(); 122 assertTrue(REPLAY_CALLED.get()); 123 } 124}