001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertThrows; 022 023import java.io.IOException; 024import java.net.SocketAddress; 025import java.util.Collections; 026import java.util.Set; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.stream.Collectors; 032import java.util.stream.IntStream; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.ipc.RpcClient; 039import org.apache.hadoop.hbase.ipc.RpcClientFactory; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.SmallTests; 043import org.apache.hadoop.hbase.util.FutureUtils; 044import org.junit.AfterClass; 045import org.junit.Before; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 054import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 055import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 056import org.apache.hbase.thirdparty.com.google.protobuf.Message; 057import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 058import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 059import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 060 061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; 062 063@Category({ ClientTests.class, SmallTests.class }) 064public class TestMasterRegistryHedgedReads { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestMasterRegistryHedgedReads.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegistryHedgedReads.class); 071 072 private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); 073 074 private static final ExecutorService EXECUTOR = 075 Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build()); 076 077 private static AtomicInteger CALLED = new AtomicInteger(0); 078 079 private static volatile int BAD_RESP_INDEX; 080 081 private static volatile Set<Integer> GOOD_RESP_INDEXS; 082 083 private static GetClusterIdResponse RESP = 084 GetClusterIdResponse.newBuilder().setClusterId("id").build(); 085 086 public static final class RpcClientImpl implements RpcClient { 087 088 public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress, 089 MetricsConnection metrics) { 090 } 091 092 @Override 093 public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) 094 throws IOException { 095 throw new UnsupportedOperationException(); 096 } 097 098 @Override 099 public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) 100 throws IOException { 101 return new RpcChannelImpl(); 102 } 103 104 @Override 105 public void cancelConnections(ServerName sn) { 106 } 107 108 @Override 109 public void close() { 110 } 111 112 @Override 113 public boolean hasCellBlockSupport() { 114 return false; 115 } 116 } 117 118 /** 119 * A dummy RpcChannel implementation that intercepts the GetClusterId() RPC calls and injects 120 * errors. All other RPCs are ignored. 121 */ 122 public static final class RpcChannelImpl implements RpcChannel { 123 124 @Override 125 public void callMethod(MethodDescriptor method, RpcController controller, Message request, 126 Message responsePrototype, RpcCallback<Message> done) { 127 if (!method.getName().equals("GetClusterId")) { 128 // On RPC failures, MasterRegistry internally runs getMasters() RPC to keep the master list 129 // fresh. We do not want to intercept those RPCs here and double count. 130 return; 131 } 132 // simulate the asynchronous behavior otherwise all logic will perform in the same thread... 133 EXECUTOR.execute(() -> { 134 int index = CALLED.getAndIncrement(); 135 if (index == BAD_RESP_INDEX) { 136 done.run(GetClusterIdResponse.getDefaultInstance()); 137 } else if (GOOD_RESP_INDEXS.contains(index)) { 138 done.run(RESP); 139 } else { 140 controller.setFailed("inject error"); 141 done.run(null); 142 } 143 }); 144 } 145 } 146 147 @BeforeClass 148 public static void setUpBeforeClass() { 149 Configuration conf = UTIL.getConfiguration(); 150 conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class, 151 RpcClient.class); 152 String masters = IntStream.range(0, 10).mapToObj(i -> "localhost:" + (10000 + 100 * i)) 153 .collect(Collectors.joining(",")); 154 conf.set(HConstants.MASTER_ADDRS_KEY, masters); 155 } 156 157 @AfterClass 158 public static void tearDownAfterClass() { 159 EXECUTOR.shutdownNow(); 160 } 161 162 @Before 163 public void setUp() { 164 CALLED.set(0); 165 BAD_RESP_INDEX = -1; 166 GOOD_RESP_INDEXS = Collections.emptySet(); 167 } 168 169 private <T> T logIfError(CompletableFuture<T> future) throws IOException { 170 try { 171 return FutureUtils.get(future); 172 } catch (Throwable t) { 173 LOG.warn("", t); 174 throw t; 175 } 176 } 177 178 @Test 179 public void testAllFailNoHedged() throws IOException { 180 Configuration conf = UTIL.getConfiguration(); 181 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 1); 182 try (MasterRegistry registry = new MasterRegistry(conf)) { 183 assertThrows(IOException.class, () -> logIfError(registry.getClusterId())); 184 assertEquals(10, CALLED.get()); 185 } 186 } 187 188 @Test 189 public void testAllFailHedged3() throws IOException { 190 Configuration conf = UTIL.getConfiguration(); 191 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 3); 192 BAD_RESP_INDEX = 5; 193 try (MasterRegistry registry = new MasterRegistry(conf)) { 194 assertThrows(IOException.class, () -> logIfError(registry.getClusterId())); 195 assertEquals(10, CALLED.get()); 196 } 197 } 198 199 @Test 200 public void testFirstSucceededNoHedge() throws IOException { 201 Configuration conf = UTIL.getConfiguration(); 202 // will be set to 1 203 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 0); 204 GOOD_RESP_INDEXS = 205 IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet()); 206 try (MasterRegistry registry = new MasterRegistry(conf)) { 207 String clusterId = logIfError(registry.getClusterId()); 208 assertEquals(RESP.getClusterId(), clusterId); 209 assertEquals(1, CALLED.get()); 210 } 211 } 212 213 @Test 214 public void testSecondRoundSucceededHedge4() throws IOException { 215 Configuration conf = UTIL.getConfiguration(); 216 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4); 217 GOOD_RESP_INDEXS = Collections.singleton(6); 218 try (MasterRegistry registry = new MasterRegistry(conf)) { 219 String clusterId = logIfError(registry.getClusterId()); 220 assertEquals(RESP.getClusterId(), clusterId); 221 UTIL.waitFor(5000, () -> CALLED.get() == 8); 222 } 223 } 224 225 @Test 226 public void testSucceededWithLargestHedged() throws IOException, InterruptedException { 227 Configuration conf = UTIL.getConfiguration(); 228 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, Integer.MAX_VALUE); 229 GOOD_RESP_INDEXS = Collections.singleton(5); 230 try (MasterRegistry registry = new MasterRegistry(conf)) { 231 String clusterId = logIfError(registry.getClusterId()); 232 assertEquals(RESP.getClusterId(), clusterId); 233 UTIL.waitFor(5000, () -> CALLED.get() == 10); 234 Thread.sleep(1000); 235 // make sure we do not send more 236 assertEquals(10, CALLED.get()); 237 } 238 } 239}