001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertThrows; 022 023import java.io.IOException; 024import java.net.SocketAddress; 025import java.util.Collections; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.stream.Collectors; 033import java.util.stream.IntStream; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.ipc.RpcClient; 039import org.apache.hadoop.hbase.ipc.RpcClientFactory; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.SmallTests; 043import org.apache.hadoop.hbase.util.FutureUtils; 044import org.junit.AfterClass; 045import org.junit.Before; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 054import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 055import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 056import org.apache.hbase.thirdparty.com.google.protobuf.Message; 057import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 058import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 059import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 060 061import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse; 064 065@Category({ ClientTests.class, SmallTests.class }) 066public class TestRpcBasedRegistryHedgedReads { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestRpcBasedRegistryHedgedReads.class); 071 072 private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class); 073 074 private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout"; 075 private static final String INITIAL_DELAY_SECS_CONFIG_NAME = 076 "hbase.test.refresh.initial.delay.secs"; 077 private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME = 078 "hbase.test.refresh.interval.secs"; 079 private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME = 080 "hbase.test.min.refresh.interval.secs"; 081 082 private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(); 083 084 private static final ExecutorService EXECUTOR = 085 Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build()); 086 087 private static Set<ServerName> BOOTSTRAP_NODES; 088 089 private static AtomicInteger CALLED = new AtomicInteger(0); 090 091 private static volatile int BAD_RESP_INDEX; 092 093 private static volatile Set<Integer> GOOD_RESP_INDEXS; 094 095 private static GetClusterIdResponse RESP = 096 GetClusterIdResponse.newBuilder().setClusterId("id").build(); 097 098 public static final class RpcClientImpl implements RpcClient { 099 100 public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress, 101 MetricsConnection metrics, Map<String, byte[]> attributes) { 102 } 103 104 @Override 105 public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) { 106 throw new UnsupportedOperationException(); 107 } 108 109 @Override 110 public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) { 111 return new RpcChannelImpl(); 112 } 113 114 @Override 115 public void cancelConnections(ServerName sn) { 116 } 117 118 @Override 119 public void close() { 120 } 121 122 @Override 123 public boolean hasCellBlockSupport() { 124 return false; 125 } 126 } 127 128 /** 129 * A dummy RpcChannel implementation that intercepts the GetClusterId() RPC calls and injects 130 * errors. All other RPCs are ignored. 131 */ 132 public static final class RpcChannelImpl implements RpcChannel { 133 134 @Override 135 public void callMethod(MethodDescriptor method, RpcController controller, Message request, 136 Message responsePrototype, RpcCallback<Message> done) { 137 if (method.getService().equals(ConnectionRegistryService.getDescriptor())) { 138 // this is for setting up the rpc client 139 done.run( 140 GetConnectionRegistryResponse.newBuilder().setClusterId(RESP.getClusterId()).build()); 141 return; 142 } 143 if (!method.getName().equals("GetClusterId")) { 144 // On RPC failures, MasterRegistry internally runs getMasters() RPC to keep the master list 145 // fresh. We do not want to intercept those RPCs here and double count. 146 return; 147 } 148 // simulate the asynchronous behavior otherwise all logic will perform in the same thread... 149 EXECUTOR.execute(() -> { 150 int index = CALLED.getAndIncrement(); 151 if (index == BAD_RESP_INDEX) { 152 done.run(GetClusterIdResponse.getDefaultInstance()); 153 } else if (GOOD_RESP_INDEXS.contains(index)) { 154 done.run(RESP); 155 } else { 156 controller.setFailed("inject error"); 157 done.run(null); 158 } 159 }); 160 } 161 } 162 163 private AbstractRpcBasedConnectionRegistry createRegistry(int hedged) throws IOException { 164 Configuration conf = UTIL.getConfiguration(); 165 conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged); 166 return new AbstractRpcBasedConnectionRegistry(conf, User.getCurrent(), 167 HEDGED_REQS_FANOUT_CONFIG_NAME, INITIAL_DELAY_SECS_CONFIG_NAME, 168 REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) { 169 170 @Override 171 protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException { 172 return BOOTSTRAP_NODES; 173 } 174 175 @Override 176 protected CompletableFuture<Set<ServerName>> fetchEndpoints() { 177 return CompletableFuture.completedFuture(BOOTSTRAP_NODES); 178 } 179 180 @Override 181 public String getConnectionString() { 182 return "unimplemented"; 183 } 184 }; 185 } 186 187 @BeforeClass 188 public static void setUpBeforeClass() { 189 Configuration conf = UTIL.getConfiguration(); 190 conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class, 191 RpcClient.class); 192 // disable refresh, we do not need to refresh in this test 193 conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, Integer.MAX_VALUE); 194 conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE); 195 conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1); 196 BOOTSTRAP_NODES = IntStream.range(0, 10) 197 .mapToObj(i -> ServerName.valueOf("localhost", (10000 + 100 * i), ServerName.NON_STARTCODE)) 198 .collect(Collectors.toSet()); 199 } 200 201 @AfterClass 202 public static void tearDownAfterClass() { 203 EXECUTOR.shutdownNow(); 204 } 205 206 @Before 207 public void setUp() { 208 CALLED.set(0); 209 BAD_RESP_INDEX = -1; 210 GOOD_RESP_INDEXS = Collections.emptySet(); 211 } 212 213 private <T> T logIfError(CompletableFuture<T> future) throws IOException { 214 try { 215 return FutureUtils.get(future); 216 } catch (Throwable t) { 217 LOG.warn("", t); 218 throw t; 219 } 220 } 221 222 @Test 223 public void testAllFailNoHedged() throws IOException { 224 try (AbstractRpcBasedConnectionRegistry registry = createRegistry(1)) { 225 assertThrows(IOException.class, () -> logIfError(registry.getClusterId())); 226 assertEquals(10, CALLED.get()); 227 } 228 } 229 230 @Test 231 public void testAllFailHedged3() throws IOException { 232 BAD_RESP_INDEX = 5; 233 try (AbstractRpcBasedConnectionRegistry registry = createRegistry(3)) { 234 assertThrows(IOException.class, () -> logIfError(registry.getClusterId())); 235 assertEquals(10, CALLED.get()); 236 } 237 } 238 239 @Test 240 public void testFirstSucceededNoHedge() throws IOException { 241 GOOD_RESP_INDEXS = 242 IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet()); 243 // will be set to 1 244 try (AbstractRpcBasedConnectionRegistry registry = createRegistry(0)) { 245 String clusterId = logIfError(registry.getClusterId()); 246 assertEquals(RESP.getClusterId(), clusterId); 247 assertEquals(1, CALLED.get()); 248 } 249 } 250 251 @Test 252 public void testSecondRoundSucceededHedge4() throws IOException { 253 GOOD_RESP_INDEXS = Collections.singleton(6); 254 try (AbstractRpcBasedConnectionRegistry registry = createRegistry(4)) { 255 String clusterId = logIfError(registry.getClusterId()); 256 assertEquals(RESP.getClusterId(), clusterId); 257 UTIL.waitFor(5000, () -> CALLED.get() == 8); 258 } 259 } 260 261 @Test 262 public void testSucceededWithLargestHedged() throws IOException, InterruptedException { 263 GOOD_RESP_INDEXS = Collections.singleton(5); 264 try (AbstractRpcBasedConnectionRegistry registry = createRegistry(Integer.MAX_VALUE)) { 265 String clusterId = logIfError(registry.getClusterId()); 266 assertEquals(RESP.getClusterId(), clusterId); 267 UTIL.waitFor(5000, () -> CALLED.get() == 10); 268 Thread.sleep(1000); 269 // make sure we do not send more 270 assertEquals(10, CALLED.get()); 271 } 272 } 273}