001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022 023import java.net.SocketAddress; 024import java.net.SocketTimeoutException; 025import java.util.Random; 026import java.util.concurrent.ThreadLocalRandom; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.MasterNotRunningException; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.ipc.AbstractRpcClient; 036import org.apache.hadoop.hbase.ipc.BlockingRpcClient; 037import org.apache.hadoop.hbase.ipc.HBaseRpcController; 038import org.apache.hadoop.hbase.ipc.RpcClientFactory; 039import org.apache.hadoop.hbase.net.Address; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.junit.AfterClass; 045import org.junit.BeforeClass; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049 050import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 051import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 052import org.apache.hbase.thirdparty.com.google.protobuf.Message; 053import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 054import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 055import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 057 058@Category({ MediumTests.class, ClientTests.class }) 059public class TestClientTimeouts { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestClientTimeouts.class); 064 065 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 066 protected static int SLAVES = 1; 067 068 /** 069 * @throws java.lang.Exception 070 */ 071 @BeforeClass 072 public static void setUpBeforeClass() throws Exception { 073 TEST_UTIL.startMiniCluster(SLAVES); 074 // Set the custom RPC client with random timeouts as the client 075 TEST_UTIL.getConfiguration().set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, 076 RandomTimeoutRpcClient.class.getName()); 077 } 078 079 /** 080 * @throws java.lang.Exception 081 */ 082 @AfterClass 083 public static void tearDownAfterClass() throws Exception { 084 TEST_UTIL.shutdownMiniCluster(); 085 } 086 087 /** 088 * Test that a client that fails an RPC to the master retries properly and doesn't throw any 089 * unexpected exceptions. n 090 */ 091 @Test 092 public void testAdminTimeout() throws Exception { 093 boolean lastFailed = false; 094 int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get(); 095 RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory 096 .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()); 097 098 try { 099 for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) { 100 lastFailed = false; 101 // Ensure the HBaseAdmin uses a new connection by changing Configuration. 102 Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 103 conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 104 Admin admin = null; 105 Connection connection = null; 106 try { 107 connection = ConnectionFactory.createConnection(conf); 108 admin = connection.getAdmin(); 109 // run some admin commands 110 HBaseAdmin.available(conf); 111 admin.setBalancerRunning(false, false); 112 } catch (MasterNotRunningException ex) { 113 // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get 114 // a MasterNotRunningException. It's a bug if we get other exceptions. 115 lastFailed = true; 116 } finally { 117 if (admin != null) { 118 admin.close(); 119 if (admin.getConnection().isClosed()) { 120 rpcClient = (RandomTimeoutRpcClient) RpcClientFactory 121 .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()); 122 } 123 } 124 if (connection != null) { 125 connection.close(); 126 } 127 } 128 } 129 // Ensure the RandomTimeoutRpcEngine is actually being used. 130 assertFalse(lastFailed); 131 assertTrue(RandomTimeoutBlockingRpcChannel.invokations.get() > initialInvocations); 132 } finally { 133 rpcClient.close(); 134 } 135 } 136 137 /** 138 * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel 139 */ 140 public static class RandomTimeoutRpcClient extends BlockingRpcClient { 141 public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, 142 MetricsConnection metrics) { 143 super(conf, clusterId, localAddr, metrics); 144 } 145 146 // Return my own instance, one that does random timeouts 147 @Override 148 public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) { 149 return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout); 150 } 151 152 @Override 153 public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) { 154 return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout); 155 } 156 } 157 158 /** 159 * Blocking rpc channel that goes via hbase rpc. 160 */ 161 static class RandomTimeoutBlockingRpcChannel 162 extends AbstractRpcClient.BlockingRpcChannelImplementation { 163 private static final Random RANDOM = new Random(EnvironmentEdgeManager.currentTime()); 164 public static final double CHANCE_OF_TIMEOUT = 0.3; 165 private static AtomicInteger invokations = new AtomicInteger(); 166 167 RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn, 168 final User ticket, final int rpcTimeout) { 169 super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); 170 } 171 172 @Override 173 public Message callBlockingMethod(MethodDescriptor md, RpcController controller, Message param, 174 Message returnType) throws ServiceException { 175 invokations.getAndIncrement(); 176 if (RANDOM.nextFloat() < CHANCE_OF_TIMEOUT) { 177 // throw a ServiceException, becuase that is the only exception type that 178 // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different 179 // "actual" type, this may not properly mimic the underlying RpcEngine. 180 throw new ServiceException(new SocketTimeoutException("fake timeout")); 181 } 182 return super.callBlockingMethod(md, controller, param, returnType); 183 } 184 } 185 186 private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation { 187 188 RandomTimeoutRpcChannel(AbstractRpcClient<?> rpcClient, ServerName sn, User ticket, 189 int rpcTimeout) { 190 super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); 191 } 192 193 @Override 194 public void callMethod(MethodDescriptor md, RpcController controller, Message param, 195 Message returnType, RpcCallback<Message> done) { 196 RandomTimeoutBlockingRpcChannel.invokations.getAndIncrement(); 197 if ( 198 ThreadLocalRandom.current().nextFloat() < RandomTimeoutBlockingRpcChannel.CHANCE_OF_TIMEOUT 199 ) { 200 // throw a ServiceException, because that is the only exception type that 201 // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different 202 // "actual" type, this may not properly mimic the underlying RpcEngine. 203 ((HBaseRpcController) controller).setFailed(new SocketTimeoutException("fake timeout")); 204 done.run(null); 205 return; 206 } 207 super.callMethod(md, controller, param, returnType, done); 208 } 209 } 210}