001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertThrows;
022
023import java.io.IOException;
024import java.net.SocketAddress;
025import java.util.Collections;
026import java.util.Set;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.stream.Collectors;
032import java.util.stream.IntStream;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.ipc.HBaseRpcController;
039import org.apache.hadoop.hbase.ipc.RpcClient;
040import org.apache.hadoop.hbase.ipc.RpcClientFactory;
041import org.apache.hadoop.hbase.security.User;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.SmallTests;
044import org.apache.hadoop.hbase.util.FutureUtils;
045import org.junit.AfterClass;
046import org.junit.Before;
047import org.junit.BeforeClass;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
055import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
056import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
057import org.apache.hbase.thirdparty.com.google.protobuf.Message;
058import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
059import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
060import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
061
062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
063
064@Category({ ClientTests.class, SmallTests.class })
065public class TestMasterRegistryHedgedReads {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestMasterRegistryHedgedReads.class);
070
071  private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegistryHedgedReads.class);
072
073  private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
074
075  private static final ExecutorService EXECUTOR =
076    Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
077
078  private static AtomicInteger CALLED = new AtomicInteger(0);
079
080  private static volatile int BAD_RESP_INDEX;
081
082  private static volatile Set<Integer> GOOD_RESP_INDEXS;
083
084  private static GetClusterIdResponse RESP =
085    GetClusterIdResponse.newBuilder().setClusterId("id").build();
086
087  public static final class RpcClientImpl implements RpcClient {
088
089    public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress,
090      MetricsConnection metrics) {
091    }
092
093    @Override
094    public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout)
095      throws IOException {
096      throw new UnsupportedOperationException();
097    }
098
099    @Override
100    public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
101      throws IOException {
102      return new RpcChannelImpl();
103    }
104
105    @Override
106    public void cancelConnections(ServerName sn) {
107    }
108
109    @Override
110    public void close() {
111    }
112
113    @Override
114    public boolean hasCellBlockSupport() {
115      return false;
116    }
117  }
118
119  public static final class RpcChannelImpl implements RpcChannel {
120
121    @Override
122    public void callMethod(MethodDescriptor method, RpcController controller, Message request,
123      Message responsePrototype, RpcCallback<Message> done) {
124      // simulate the asynchronous behavior otherwise all logic will perform in the same thread...
125      EXECUTOR.execute(() -> {
126        int index = CALLED.getAndIncrement();
127        if (index == BAD_RESP_INDEX) {
128          done.run(GetClusterIdResponse.getDefaultInstance());
129        } else if (GOOD_RESP_INDEXS.contains(index)) {
130          done.run(RESP);
131        } else {
132          ((HBaseRpcController) controller).setFailed("inject error");
133          done.run(null);
134        }
135      });
136    }
137  }
138
139  @BeforeClass
140  public static void setUpBeforeClass() {
141    Configuration conf = UTIL.getConfiguration();
142    conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
143      RpcClient.class);
144    String masters = IntStream.range(0, 10).mapToObj(i -> "localhost:" + (10000 + 100 * i))
145      .collect(Collectors.joining(","));
146    conf.set(HConstants.MASTER_ADDRS_KEY, masters);
147  }
148
149  @AfterClass
150  public static void tearDownAfterClass() {
151    EXECUTOR.shutdownNow();
152  }
153
154  @Before
155  public void setUp() {
156    CALLED.set(0);
157    BAD_RESP_INDEX = -1;
158    GOOD_RESP_INDEXS = Collections.emptySet();
159  }
160
161  private <T> T logIfError(CompletableFuture<T> future) throws IOException {
162    try {
163      return FutureUtils.get(future);
164    } catch (Throwable t) {
165      LOG.warn("", t);
166      throw t;
167    }
168  }
169
170  @Test
171  public void testAllFailNoHedged() throws IOException {
172    Configuration conf = UTIL.getConfiguration();
173    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 1);
174    try (MasterRegistry registry = new MasterRegistry(conf)) {
175      assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
176      assertEquals(10, CALLED.get());
177    }
178  }
179
180  @Test
181  public void testAllFailHedged3() throws IOException {
182    Configuration conf = UTIL.getConfiguration();
183    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 3);
184    BAD_RESP_INDEX = 5;
185    try (MasterRegistry registry = new MasterRegistry(conf)) {
186      assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
187      assertEquals(10, CALLED.get());
188    }
189  }
190
191  @Test
192  public void testFirstSucceededNoHedge() throws IOException {
193    Configuration conf = UTIL.getConfiguration();
194    // will be set to 1
195    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 0);
196    GOOD_RESP_INDEXS =
197      IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet());
198    try (MasterRegistry registry = new MasterRegistry(conf)) {
199      String clusterId = logIfError(registry.getClusterId());
200      assertEquals(RESP.getClusterId(), clusterId);
201      assertEquals(1, CALLED.get());
202    }
203  }
204
205  @Test
206  public void testSecondRoundSucceededHedge4() throws IOException {
207    Configuration conf = UTIL.getConfiguration();
208    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
209    GOOD_RESP_INDEXS = Collections.singleton(6);
210    try (MasterRegistry registry = new MasterRegistry(conf)) {
211      String clusterId = logIfError(registry.getClusterId());
212      assertEquals(RESP.getClusterId(), clusterId);
213      UTIL.waitFor(5000, () -> CALLED.get() == 8);
214    }
215  }
216
217  @Test
218  public void testSucceededWithLargestHedged() throws IOException, InterruptedException {
219    Configuration conf = UTIL.getConfiguration();
220    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, Integer.MAX_VALUE);
221    GOOD_RESP_INDEXS = Collections.singleton(5);
222    try (MasterRegistry registry = new MasterRegistry(conf)) {
223      String clusterId = logIfError(registry.getClusterId());
224      assertEquals(RESP.getClusterId(), clusterId);
225      UTIL.waitFor(5000, () -> CALLED.get() == 10);
226      Thread.sleep(1000);
227      // make sure we do not send more
228      assertEquals(10, CALLED.get());
229    }
230  }
231}