001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021import static org.mockito.ArgumentMatchers.any;
022import static org.mockito.ArgumentMatchers.anyInt;
023import static org.mockito.ArgumentMatchers.anyLong;
024import static org.mockito.Mockito.doAnswer;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.Collections;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HRegionLocation;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.ipc.HBaseRpcController;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.testclassification.RegionServerTests;
042import org.apache.hadoop.hbase.testclassification.SmallTests;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.junit.jupiter.api.AfterAll;
045import org.junit.jupiter.api.BeforeAll;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048
049import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
050import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
051
052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.Interface;
054
055/**
056 * Make sure we could fallback to use replay method if replicateToReplica method is not present,
057 * i.e, we are connecting an old region server.
058 */
059@Tag(RegionServerTests.TAG)
060@Tag(SmallTests.TAG)
061public class TestFallbackToUseReplay {
062
063  private static Configuration CONF = HBaseConfiguration.create();
064
065  private static AsyncClusterConnectionImpl CONN;
066
067  private static AsyncRegionReplicationRetryingCaller CALLER;
068
069  private static RegionInfo REPLICA =
070    RegionInfoBuilder.newBuilder(TableName.valueOf("test")).setReplicaId(1).build();
071
072  private static AtomicBoolean REPLAY_CALLED = new AtomicBoolean(false);
073
074  @BeforeAll
075  public static void setUpBeforeClass() throws IOException {
076    CONF.setInt(AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
077    AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
078    when(locator.getRegionLocation(any(), any(), anyInt(), any(), anyLong()))
079      .thenReturn(CompletableFuture.completedFuture(new HRegionLocation(REPLICA,
080        ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()))));
081    AdminService.Interface stub = mock(AdminService.Interface.class);
082    // fail the call to replicateToReplica
083    doAnswer(i -> {
084      HBaseRpcController controller = i.getArgument(0, HBaseRpcController.class);
085      controller.setFailed(new DoNotRetryIOException(new UnsupportedOperationException()));
086      RpcCallback<?> done = i.getArgument(2, RpcCallback.class);
087      done.run(null);
088      return null;
089    }).when(stub).replicateToReplica(any(), any(), any());
090    doAnswer(i -> {
091      REPLAY_CALLED.set(true);
092      RpcCallback<?> done = i.getArgument(2, RpcCallback.class);
093      done.run(null);
094      return null;
095    }).when(stub).replay(any(), any(), any());
096    CONN = new AsyncClusterConnectionImpl(CONF, mock(ConnectionRegistry.class), "test", null,
097      User.getCurrent()) {
098
099      @Override
100      AsyncRegionLocator getLocator() {
101        return locator;
102      }
103
104      @Override
105      Interface getAdminStub(ServerName serverName) throws IOException {
106        return stub;
107      }
108    };
109    CALLER = new AsyncRegionReplicationRetryingCaller(AsyncClusterConnectionImpl.RETRY_TIMER, CONN,
110      10, TimeUnit.SECONDS.toNanos(1), TimeUnit.SECONDS.toNanos(10), REPLICA,
111      Collections.emptyList());
112  }
113
114  @AfterAll
115  public static void tearDownAfterClass() throws IOException {
116    Closeables.close(CONN, true);
117  }
118
119  @Test
120  public void testFallback() {
121    CALLER.call().join();
122    assertTrue(REPLAY_CALLED.get());
123  }
124}