001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertThrows; 022 023import java.io.IOException; 024import java.net.SocketAddress; 025import java.util.Collections; 026import java.util.Set; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.stream.Collectors; 032import java.util.stream.IntStream; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.ipc.HBaseRpcController; 039import org.apache.hadoop.hbase.ipc.RpcClient; 040import org.apache.hadoop.hbase.ipc.RpcClientFactory; 041import org.apache.hadoop.hbase.security.User; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.SmallTests; 044import org.apache.hadoop.hbase.util.FutureUtils; 045import org.junit.AfterClass; 046import org.junit.Before; 047import org.junit.BeforeClass; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 055import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 056import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 057import org.apache.hbase.thirdparty.com.google.protobuf.Message; 058import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 059import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 060import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 061 062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; 063 064@Category({ ClientTests.class, SmallTests.class }) 065public class TestMasterRegistryHedgedReads { 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestMasterRegistryHedgedReads.class); 070 071 private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegistryHedgedReads.class); 072 073 private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); 074 075 private static final ExecutorService EXECUTOR = 076 Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build()); 077 078 private static AtomicInteger CALLED = new AtomicInteger(0); 079 080 private static volatile int BAD_RESP_INDEX; 081 082 private static volatile Set<Integer> GOOD_RESP_INDEXS; 083 084 private static GetClusterIdResponse RESP = 085 GetClusterIdResponse.newBuilder().setClusterId("id").build(); 086 087 public static final class RpcClientImpl implements RpcClient { 088 089 public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress, 090 MetricsConnection metrics) { 091 } 092 093 @Override 094 public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) 095 throws IOException { 096 throw new UnsupportedOperationException(); 097 } 098 099 @Override 100 public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) 101 throws IOException { 102 return new RpcChannelImpl(); 103 } 104 105 @Override 106 public void cancelConnections(ServerName sn) { 107 } 108 109 @Override 110 public void close() { 111 } 112 113 @Override 114 public boolean hasCellBlockSupport() { 115 return false; 116 } 117 } 118 119 public static final class RpcChannelImpl implements RpcChannel { 120 121 @Override 122 public void callMethod(MethodDescriptor method, RpcController controller, Message request, 123 Message responsePrototype, RpcCallback<Message> done) { 124 // simulate the asynchronous behavior otherwise all logic will perform in the same thread... 125 EXECUTOR.execute(() -> { 126 int index = CALLED.getAndIncrement(); 127 if (index == BAD_RESP_INDEX) { 128 done.run(GetClusterIdResponse.getDefaultInstance()); 129 } else if (GOOD_RESP_INDEXS.contains(index)) { 130 done.run(RESP); 131 } else { 132 ((HBaseRpcController) controller).setFailed("inject error"); 133 done.run(null); 134 } 135 }); 136 } 137 } 138 139 @BeforeClass 140 public static void setUpBeforeClass() { 141 Configuration conf = UTIL.getConfiguration(); 142 conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class, 143 RpcClient.class); 144 String masters = IntStream.range(0, 10).mapToObj(i -> "localhost:" + (10000 + 100 * i)) 145 .collect(Collectors.joining(",")); 146 conf.set(HConstants.MASTER_ADDRS_KEY, masters); 147 } 148 149 @AfterClass 150 public static void tearDownAfterClass() { 151 EXECUTOR.shutdownNow(); 152 } 153 154 @Before 155 public void setUp() { 156 CALLED.set(0); 157 BAD_RESP_INDEX = -1; 158 GOOD_RESP_INDEXS = Collections.emptySet(); 159 } 160 161 private <T> T logIfError(CompletableFuture<T> future) throws IOException { 162 try { 163 return FutureUtils.get(future); 164 } catch (Throwable t) { 165 LOG.warn("", t); 166 throw t; 167 } 168 } 169 170 @Test 171 public void testAllFailNoHedged() throws IOException { 172 Configuration conf = UTIL.getConfiguration(); 173 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 1); 174 try (MasterRegistry registry = new MasterRegistry(conf)) { 175 assertThrows(IOException.class, () -> logIfError(registry.getClusterId())); 176 assertEquals(10, CALLED.get()); 177 } 178 } 179 180 @Test 181 public void testAllFailHedged3() throws IOException { 182 Configuration conf = UTIL.getConfiguration(); 183 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 3); 184 BAD_RESP_INDEX = 5; 185 try (MasterRegistry registry = new MasterRegistry(conf)) { 186 assertThrows(IOException.class, () -> logIfError(registry.getClusterId())); 187 assertEquals(10, CALLED.get()); 188 } 189 } 190 191 @Test 192 public void testFirstSucceededNoHedge() throws IOException { 193 Configuration conf = UTIL.getConfiguration(); 194 // will be set to 1 195 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 0); 196 GOOD_RESP_INDEXS = 197 IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet()); 198 try (MasterRegistry registry = new MasterRegistry(conf)) { 199 String clusterId = logIfError(registry.getClusterId()); 200 assertEquals(RESP.getClusterId(), clusterId); 201 assertEquals(1, CALLED.get()); 202 } 203 } 204 205 @Test 206 public void testSecondRoundSucceededHedge4() throws IOException { 207 Configuration conf = UTIL.getConfiguration(); 208 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4); 209 GOOD_RESP_INDEXS = Collections.singleton(6); 210 try (MasterRegistry registry = new MasterRegistry(conf)) { 211 String clusterId = logIfError(registry.getClusterId()); 212 assertEquals(RESP.getClusterId(), clusterId); 213 UTIL.waitFor(5000, () -> CALLED.get() == 8); 214 } 215 } 216 217 @Test 218 public void testSucceededWithLargestHedged() throws IOException, InterruptedException { 219 Configuration conf = UTIL.getConfiguration(); 220 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, Integer.MAX_VALUE); 221 GOOD_RESP_INDEXS = Collections.singleton(5); 222 try (MasterRegistry registry = new MasterRegistry(conf)) { 223 String clusterId = logIfError(registry.getClusterId()); 224 assertEquals(RESP.getClusterId(), clusterId); 225 UTIL.waitFor(5000, () -> CALLED.get() == 10); 226 Thread.sleep(1000); 227 // make sure we do not send more 228 assertEquals(10, CALLED.get()); 229 } 230 } 231}