001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022 023import java.net.SocketAddress; 024import java.net.SocketTimeoutException; 025import java.util.Map; 026import java.util.concurrent.ThreadLocalRandom; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException; 035import org.apache.hadoop.hbase.ipc.AbstractRpcClient; 036import org.apache.hadoop.hbase.ipc.BlockingRpcClient; 037import org.apache.hadoop.hbase.ipc.HBaseRpcController; 038import org.apache.hadoop.hbase.ipc.RpcClientFactory; 039import org.apache.hadoop.hbase.net.Address; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.junit.AfterClass; 044import org.junit.BeforeClass; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048 049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 050import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 051import org.apache.hbase.thirdparty.com.google.protobuf.Message; 052import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 053import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 054import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 055import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 056 057@Category({ MediumTests.class, ClientTests.class }) 058public class TestClientTimeouts { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestClientTimeouts.class); 063 064 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 065 protected static int SLAVES = 1; 066 067 @BeforeClass 068 public static void setUpBeforeClass() throws Exception { 069 TEST_UTIL.startMiniCluster(SLAVES); 070 // Set the custom RPC client with random timeouts as the client 071 TEST_UTIL.getConfiguration().set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, 072 RandomTimeoutRpcClient.class.getName()); 073 } 074 075 @AfterClass 076 public static void tearDownAfterClass() throws Exception { 077 TEST_UTIL.shutdownMiniCluster(); 078 } 079 080 /** 081 * Test that a client that fails an RPC to the master retries properly and doesn't throw any 082 * unexpected exceptions. 083 */ 084 @Test 085 public void testAdminTimeout() throws Exception { 086 boolean lastFailed = false; 087 int initialInvocations = invokations.get(); 088 RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory 089 .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()); 090 091 try { 092 for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) { 093 lastFailed = false; 094 // Ensure the HBaseAdmin uses a new connection by changing Configuration. 095 Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 096 conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 097 Admin admin = null; 098 Connection connection = null; 099 try { 100 connection = ConnectionFactory.createConnection(conf); 101 admin = connection.getAdmin(); 102 admin.balancerSwitch(false, false); 103 } catch (MasterRegistryFetchException ex) { 104 // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get 105 // a MasterRegistryFetchException. It's a bug if we get other exceptions. 106 lastFailed = true; 107 } finally { 108 if (admin != null) { 109 admin.close(); 110 if (admin.getConnection().isClosed()) { 111 rpcClient = (RandomTimeoutRpcClient) RpcClientFactory 112 .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()); 113 } 114 } 115 if (connection != null) { 116 connection.close(); 117 } 118 } 119 } 120 // Ensure the RandomTimeoutRpcEngine is actually being used. 121 assertFalse(lastFailed); 122 assertTrue(invokations.get() > initialInvocations); 123 } finally { 124 rpcClient.close(); 125 } 126 } 127 128 /** 129 * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel 130 */ 131 public static class RandomTimeoutRpcClient extends BlockingRpcClient { 132 public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, 133 MetricsConnection metrics, Map<String, byte[]> connectionAttributes) { 134 super(conf, clusterId, localAddr, metrics, connectionAttributes); 135 } 136 137 // Return my own instance, one that does random timeouts 138 @Override 139 public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) { 140 return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout); 141 } 142 143 @Override 144 public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) { 145 return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout); 146 } 147 } 148 149 private static AtomicInteger invokations = new AtomicInteger(); 150 151 private static final double CHANCE_OF_TIMEOUT = 0.3; 152 153 /** 154 * Blocking rpc channel that goes via hbase rpc. 155 */ 156 private static class RandomTimeoutBlockingRpcChannel 157 extends AbstractRpcClient.BlockingRpcChannelImplementation { 158 159 RandomTimeoutBlockingRpcChannel(BlockingRpcClient rpcClient, ServerName sn, User ticket, 160 int rpcTimeout) { 161 super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); 162 } 163 164 @Override 165 public Message callBlockingMethod(MethodDescriptor md, RpcController controller, Message param, 166 Message returnType) throws ServiceException { 167 invokations.getAndIncrement(); 168 if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) { 169 // throw a ServiceException, becuase that is the only exception type that 170 // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different 171 // "actual" type, this may not properly mimic the underlying RpcEngine. 172 throw new ServiceException(new SocketTimeoutException("fake timeout")); 173 } 174 return super.callBlockingMethod(md, controller, param, returnType); 175 } 176 } 177 178 private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation { 179 180 RandomTimeoutRpcChannel(AbstractRpcClient<?> rpcClient, ServerName sn, User ticket, 181 int rpcTimeout) { 182 super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); 183 } 184 185 @Override 186 public void callMethod(MethodDescriptor md, RpcController controller, Message param, 187 Message returnType, RpcCallback<Message> done) { 188 invokations.getAndIncrement(); 189 if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) { 190 // throw a ServiceException, because that is the only exception type that 191 // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different 192 // "actual" type, this may not properly mimic the underlying RpcEngine. 193 ((HBaseRpcController) controller).setFailed(new SocketTimeoutException("fake timeout")); 194 done.run(null); 195 return; 196 } 197 super.callMethod(md, controller, param, returnType, done); 198 } 199 } 200}