001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertThrows;
022
023import java.io.IOException;
024import java.net.SocketAddress;
025import java.util.Collections;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.stream.Collectors;
033import java.util.stream.IntStream;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.ipc.RpcClient;
039import org.apache.hadoop.hbase.ipc.RpcClientFactory;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.SmallTests;
043import org.apache.hadoop.hbase.util.FutureUtils;
044import org.junit.AfterClass;
045import org.junit.Before;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
054import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
055import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
056import org.apache.hbase.thirdparty.com.google.protobuf.Message;
057import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
058import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
059import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
060
061import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
064
065@Category({ ClientTests.class, SmallTests.class })
066public class TestRpcBasedRegistryHedgedReads {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070    HBaseClassTestRule.forClass(TestRpcBasedRegistryHedgedReads.class);
071
072  private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class);
073
074  private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout";
075  private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
076    "hbase.test.refresh.initial.delay.secs";
077  private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
078    "hbase.test.refresh.interval.secs";
079  private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
080    "hbase.test.min.refresh.interval.secs";
081
082  private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
083
084  private static final ExecutorService EXECUTOR =
085    Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
086
087  private static Set<ServerName> BOOTSTRAP_NODES;
088
089  private static AtomicInteger CALLED = new AtomicInteger(0);
090
091  private static volatile int BAD_RESP_INDEX;
092
093  private static volatile Set<Integer> GOOD_RESP_INDEXS;
094
095  private static GetClusterIdResponse RESP =
096    GetClusterIdResponse.newBuilder().setClusterId("id").build();
097
098  public static final class RpcClientImpl implements RpcClient {
099
100    public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress,
101      MetricsConnection metrics, Map<String, byte[]> attributes) {
102    }
103
104    @Override
105    public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) {
106      throw new UnsupportedOperationException();
107    }
108
109    @Override
110    public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) {
111      return new RpcChannelImpl();
112    }
113
114    @Override
115    public void cancelConnections(ServerName sn) {
116    }
117
118    @Override
119    public void close() {
120    }
121
122    @Override
123    public boolean hasCellBlockSupport() {
124      return false;
125    }
126  }
127
128  /**
129   * A dummy RpcChannel implementation that intercepts the GetClusterId() RPC calls and injects
130   * errors. All other RPCs are ignored.
131   */
132  public static final class RpcChannelImpl implements RpcChannel {
133
134    @Override
135    public void callMethod(MethodDescriptor method, RpcController controller, Message request,
136      Message responsePrototype, RpcCallback<Message> done) {
137      if (method.getService().equals(ConnectionRegistryService.getDescriptor())) {
138        // this is for setting up the rpc client
139        done.run(
140          GetConnectionRegistryResponse.newBuilder().setClusterId(RESP.getClusterId()).build());
141        return;
142      }
143      if (!method.getName().equals("GetClusterId")) {
144        // On RPC failures, MasterRegistry internally runs getMasters() RPC to keep the master list
145        // fresh. We do not want to intercept those RPCs here and double count.
146        return;
147      }
148      // simulate the asynchronous behavior otherwise all logic will perform in the same thread...
149      EXECUTOR.execute(() -> {
150        int index = CALLED.getAndIncrement();
151        if (index == BAD_RESP_INDEX) {
152          done.run(GetClusterIdResponse.getDefaultInstance());
153        } else if (GOOD_RESP_INDEXS.contains(index)) {
154          done.run(RESP);
155        } else {
156          controller.setFailed("inject error");
157          done.run(null);
158        }
159      });
160    }
161  }
162
163  private AbstractRpcBasedConnectionRegistry createRegistry(int hedged) throws IOException {
164    Configuration conf = UTIL.getConfiguration();
165    conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged);
166    return new AbstractRpcBasedConnectionRegistry(conf, User.getCurrent(),
167      HEDGED_REQS_FANOUT_CONFIG_NAME, INITIAL_DELAY_SECS_CONFIG_NAME,
168      REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
169
170      @Override
171      protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
172        return BOOTSTRAP_NODES;
173      }
174
175      @Override
176      protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
177        return CompletableFuture.completedFuture(BOOTSTRAP_NODES);
178      }
179
180      @Override
181      public String getConnectionString() {
182        return "unimplemented";
183      }
184    };
185  }
186
187  @BeforeClass
188  public static void setUpBeforeClass() {
189    Configuration conf = UTIL.getConfiguration();
190    conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
191      RpcClient.class);
192    // disable refresh, we do not need to refresh in this test
193    conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, Integer.MAX_VALUE);
194    conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE);
195    conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1);
196    BOOTSTRAP_NODES = IntStream.range(0, 10)
197      .mapToObj(i -> ServerName.valueOf("localhost", (10000 + 100 * i), ServerName.NON_STARTCODE))
198      .collect(Collectors.toSet());
199  }
200
201  @AfterClass
202  public static void tearDownAfterClass() {
203    EXECUTOR.shutdownNow();
204  }
205
206  @Before
207  public void setUp() {
208    CALLED.set(0);
209    BAD_RESP_INDEX = -1;
210    GOOD_RESP_INDEXS = Collections.emptySet();
211  }
212
213  private <T> T logIfError(CompletableFuture<T> future) throws IOException {
214    try {
215      return FutureUtils.get(future);
216    } catch (Throwable t) {
217      LOG.warn("", t);
218      throw t;
219    }
220  }
221
222  @Test
223  public void testAllFailNoHedged() throws IOException {
224    try (AbstractRpcBasedConnectionRegistry registry = createRegistry(1)) {
225      assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
226      assertEquals(10, CALLED.get());
227    }
228  }
229
230  @Test
231  public void testAllFailHedged3() throws IOException {
232    BAD_RESP_INDEX = 5;
233    try (AbstractRpcBasedConnectionRegistry registry = createRegistry(3)) {
234      assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
235      assertEquals(10, CALLED.get());
236    }
237  }
238
239  @Test
240  public void testFirstSucceededNoHedge() throws IOException {
241    GOOD_RESP_INDEXS =
242      IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet());
243    // will be set to 1
244    try (AbstractRpcBasedConnectionRegistry registry = createRegistry(0)) {
245      String clusterId = logIfError(registry.getClusterId());
246      assertEquals(RESP.getClusterId(), clusterId);
247      assertEquals(1, CALLED.get());
248    }
249  }
250
251  @Test
252  public void testSecondRoundSucceededHedge4() throws IOException {
253    GOOD_RESP_INDEXS = Collections.singleton(6);
254    try (AbstractRpcBasedConnectionRegistry registry = createRegistry(4)) {
255      String clusterId = logIfError(registry.getClusterId());
256      assertEquals(RESP.getClusterId(), clusterId);
257      UTIL.waitFor(5000, () -> CALLED.get() == 8);
258    }
259  }
260
261  @Test
262  public void testSucceededWithLargestHedged() throws IOException, InterruptedException {
263    GOOD_RESP_INDEXS = Collections.singleton(5);
264    try (AbstractRpcBasedConnectionRegistry registry = createRegistry(Integer.MAX_VALUE)) {
265      String clusterId = logIfError(registry.getClusterId());
266      assertEquals(RESP.getClusterId(), clusterId);
267      UTIL.waitFor(5000, () -> CALLED.get() == 10);
268      Thread.sleep(1000);
269      // make sure we do not send more
270      assertEquals(10, CALLED.get());
271    }
272  }
273}