001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertThrows;
022
023import java.io.IOException;
024import java.net.SocketAddress;
025import java.util.Collections;
026import java.util.Set;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.stream.Collectors;
032import java.util.stream.IntStream;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.ipc.RpcClient;
039import org.apache.hadoop.hbase.ipc.RpcClientFactory;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.SmallTests;
043import org.apache.hadoop.hbase.util.FutureUtils;
044import org.junit.AfterClass;
045import org.junit.Before;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
054import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
055import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
056import org.apache.hbase.thirdparty.com.google.protobuf.Message;
057import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
058import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
059import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
060
061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
062
063@Category({ ClientTests.class, SmallTests.class })
064public class TestMasterRegistryHedgedReads {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestMasterRegistryHedgedReads.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegistryHedgedReads.class);
071
072  private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
073
074  private static final ExecutorService EXECUTOR =
075    Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
076
077  private static AtomicInteger CALLED = new AtomicInteger(0);
078
079  private static volatile int BAD_RESP_INDEX;
080
081  private static volatile Set<Integer> GOOD_RESP_INDEXS;
082
083  private static GetClusterIdResponse RESP =
084    GetClusterIdResponse.newBuilder().setClusterId("id").build();
085
086  public static final class RpcClientImpl implements RpcClient {
087
088    public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress,
089      MetricsConnection metrics) {
090    }
091
092    @Override
093    public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout)
094      throws IOException {
095      throw new UnsupportedOperationException();
096    }
097
098    @Override
099    public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
100      throws IOException {
101      return new RpcChannelImpl();
102    }
103
104    @Override
105    public void cancelConnections(ServerName sn) {
106    }
107
108    @Override
109    public void close() {
110    }
111
112    @Override
113    public boolean hasCellBlockSupport() {
114      return false;
115    }
116  }
117
118  /**
119   * A dummy RpcChannel implementation that intercepts the GetClusterId() RPC calls and injects
120   * errors. All other RPCs are ignored.
121   */
122  public static final class RpcChannelImpl implements RpcChannel {
123
124    @Override
125    public void callMethod(MethodDescriptor method, RpcController controller, Message request,
126      Message responsePrototype, RpcCallback<Message> done) {
127      if (!method.getName().equals("GetClusterId")) {
128        // On RPC failures, MasterRegistry internally runs getMasters() RPC to keep the master list
129        // fresh. We do not want to intercept those RPCs here and double count.
130        return;
131      }
132      // simulate the asynchronous behavior otherwise all logic will perform in the same thread...
133      EXECUTOR.execute(() -> {
134        int index = CALLED.getAndIncrement();
135        if (index == BAD_RESP_INDEX) {
136          done.run(GetClusterIdResponse.getDefaultInstance());
137        } else if (GOOD_RESP_INDEXS.contains(index)) {
138          done.run(RESP);
139        } else {
140          controller.setFailed("inject error");
141          done.run(null);
142        }
143      });
144    }
145  }
146
147  @BeforeClass
148  public static void setUpBeforeClass() {
149    Configuration conf = UTIL.getConfiguration();
150    conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
151      RpcClient.class);
152    String masters = IntStream.range(0, 10).mapToObj(i -> "localhost:" + (10000 + 100 * i))
153      .collect(Collectors.joining(","));
154    conf.set(HConstants.MASTER_ADDRS_KEY, masters);
155  }
156
157  @AfterClass
158  public static void tearDownAfterClass() {
159    EXECUTOR.shutdownNow();
160  }
161
162  @Before
163  public void setUp() {
164    CALLED.set(0);
165    BAD_RESP_INDEX = -1;
166    GOOD_RESP_INDEXS = Collections.emptySet();
167  }
168
169  private <T> T logIfError(CompletableFuture<T> future) throws IOException {
170    try {
171      return FutureUtils.get(future);
172    } catch (Throwable t) {
173      LOG.warn("", t);
174      throw t;
175    }
176  }
177
178  @Test
179  public void testAllFailNoHedged() throws IOException {
180    Configuration conf = UTIL.getConfiguration();
181    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 1);
182    try (MasterRegistry registry = new MasterRegistry(conf)) {
183      assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
184      assertEquals(10, CALLED.get());
185    }
186  }
187
188  @Test
189  public void testAllFailHedged3() throws IOException {
190    Configuration conf = UTIL.getConfiguration();
191    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 3);
192    BAD_RESP_INDEX = 5;
193    try (MasterRegistry registry = new MasterRegistry(conf)) {
194      assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
195      assertEquals(10, CALLED.get());
196    }
197  }
198
199  @Test
200  public void testFirstSucceededNoHedge() throws IOException {
201    Configuration conf = UTIL.getConfiguration();
202    // will be set to 1
203    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 0);
204    GOOD_RESP_INDEXS =
205      IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet());
206    try (MasterRegistry registry = new MasterRegistry(conf)) {
207      String clusterId = logIfError(registry.getClusterId());
208      assertEquals(RESP.getClusterId(), clusterId);
209      assertEquals(1, CALLED.get());
210    }
211  }
212
213  @Test
214  public void testSecondRoundSucceededHedge4() throws IOException {
215    Configuration conf = UTIL.getConfiguration();
216    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
217    GOOD_RESP_INDEXS = Collections.singleton(6);
218    try (MasterRegistry registry = new MasterRegistry(conf)) {
219      String clusterId = logIfError(registry.getClusterId());
220      assertEquals(RESP.getClusterId(), clusterId);
221      UTIL.waitFor(5000, () -> CALLED.get() == 8);
222    }
223  }
224
225  @Test
226  public void testSucceededWithLargestHedged() throws IOException, InterruptedException {
227    Configuration conf = UTIL.getConfiguration();
228    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, Integer.MAX_VALUE);
229    GOOD_RESP_INDEXS = Collections.singleton(5);
230    try (MasterRegistry registry = new MasterRegistry(conf)) {
231      String clusterId = logIfError(registry.getClusterId());
232      assertEquals(RESP.getClusterId(), clusterId);
233      UTIL.waitFor(5000, () -> CALLED.get() == 10);
234      Thread.sleep(1000);
235      // make sure we do not send more
236      assertEquals(10, CALLED.get());
237    }
238  }
239}