001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertTrue;
021import static org.mockito.ArgumentMatchers.any;
022import static org.mockito.ArgumentMatchers.anyInt;
023import static org.mockito.ArgumentMatchers.anyLong;
024import static org.mockito.Mockito.doAnswer;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.Collections;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.ipc.HBaseRpcController;
041import org.apache.hadoop.hbase.security.User;
042import org.apache.hadoop.hbase.testclassification.RegionServerTests;
043import org.apache.hadoop.hbase.testclassification.SmallTests;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.junit.AfterClass;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050
051import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
052import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
053
054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.Interface;
056
057/**
058 * Make sure we could fallback to use replay method if replicateToReplica method is not present,
059 * i.e, we are connecting an old region server.
060 */
061@Category({ RegionServerTests.class, SmallTests.class })
062public class TestFallbackToUseReplay {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066    HBaseClassTestRule.forClass(TestFallbackToUseReplay.class);
067
068  private static Configuration CONF = HBaseConfiguration.create();
069
070  private static AsyncClusterConnectionImpl CONN;
071
072  private static AsyncRegionReplicationRetryingCaller CALLER;
073
074  private static RegionInfo REPLICA =
075    RegionInfoBuilder.newBuilder(TableName.valueOf("test")).setReplicaId(1).build();
076
077  private static AtomicBoolean REPLAY_CALLED = new AtomicBoolean(false);
078
079  @BeforeClass
080  public static void setUpBeforeClass() throws IOException {
081    CONF.setInt(AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
082    AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
083    when(locator.getRegionLocation(any(), any(), anyInt(), any(), anyLong()))
084      .thenReturn(CompletableFuture.completedFuture(new HRegionLocation(REPLICA,
085        ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()))));
086    AdminService.Interface stub = mock(AdminService.Interface.class);
087    // fail the call to replicateToReplica
088    doAnswer(i -> {
089      HBaseRpcController controller = i.getArgument(0, HBaseRpcController.class);
090      controller.setFailed(new DoNotRetryIOException(new UnsupportedOperationException()));
091      RpcCallback<?> done = i.getArgument(2, RpcCallback.class);
092      done.run(null);
093      return null;
094    }).when(stub).replicateToReplica(any(), any(), any());
095    doAnswer(i -> {
096      REPLAY_CALLED.set(true);
097      RpcCallback<?> done = i.getArgument(2, RpcCallback.class);
098      done.run(null);
099      return null;
100    }).when(stub).replay(any(), any(), any());
101    CONN = new AsyncClusterConnectionImpl(CONF, mock(ConnectionRegistry.class), "test", null,
102      User.getCurrent()) {
103
104      @Override
105      AsyncRegionLocator getLocator() {
106        return locator;
107      }
108
109      @Override
110      Interface getAdminStub(ServerName serverName) throws IOException {
111        return stub;
112      }
113    };
114    CALLER = new AsyncRegionReplicationRetryingCaller(AsyncClusterConnectionImpl.RETRY_TIMER, CONN,
115      10, TimeUnit.SECONDS.toNanos(1), TimeUnit.SECONDS.toNanos(10), REPLICA,
116      Collections.emptyList());
117  }
118
119  @AfterClass
120  public static void tearDownAfterClass() throws IOException {
121    Closeables.close(CONN, true);
122  }
123
124  @Test
125  public void testFallback() {
126    CALLER.call().join();
127    assertTrue(REPLAY_CALLED.get());
128  }
129}