001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022 023import java.net.InetSocketAddress; 024import java.net.SocketAddress; 025import java.net.SocketTimeoutException; 026import java.net.UnknownHostException; 027import java.util.Random; 028import java.util.concurrent.ThreadLocalRandom; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.MasterNotRunningException; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.ipc.AbstractRpcClient; 038import org.apache.hadoop.hbase.ipc.BlockingRpcClient; 039import org.apache.hadoop.hbase.ipc.HBaseRpcController; 040import org.apache.hadoop.hbase.ipc.RpcClientFactory; 041import org.apache.hadoop.hbase.security.User; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.junit.AfterClass; 045import org.junit.BeforeClass; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049 050import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 051import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 052import org.apache.hbase.thirdparty.com.google.protobuf.Message; 053import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 054import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 055import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 057 058@Category({MediumTests.class, ClientTests.class}) 059public class TestClientTimeouts { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestClientTimeouts.class); 064 065 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 066 protected static int SLAVES = 1; 067 068 /** 069 * @throws java.lang.Exception 070 */ 071 @BeforeClass 072 public static void setUpBeforeClass() throws Exception { 073 TEST_UTIL.startMiniCluster(SLAVES); 074 // Set the custom RPC client with random timeouts as the client 075 TEST_UTIL.getConfiguration().set( 076 RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, 077 RandomTimeoutRpcClient.class.getName()); 078 } 079 080 /** 081 * @throws java.lang.Exception 082 */ 083 @AfterClass 084 public static void tearDownAfterClass() throws Exception { 085 TEST_UTIL.shutdownMiniCluster(); 086 } 087 088 /** 089 * Test that a client that fails an RPC to the master retries properly and 090 * doesn't throw any unexpected exceptions. 091 * @throws Exception 092 */ 093 @Test 094 public void testAdminTimeout() throws Exception { 095 boolean lastFailed = false; 096 int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get(); 097 RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory 098 .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()); 099 100 try { 101 for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) { 102 lastFailed = false; 103 // Ensure the HBaseAdmin uses a new connection by changing Configuration. 104 Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 105 conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 106 Admin admin = null; 107 Connection connection = null; 108 try { 109 connection = ConnectionFactory.createConnection(conf); 110 admin = connection.getAdmin(); 111 // run some admin commands 112 HBaseAdmin.available(conf); 113 admin.setBalancerRunning(false, false); 114 } catch (MasterNotRunningException ex) { 115 // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get 116 // a MasterNotRunningException. It's a bug if we get other exceptions. 117 lastFailed = true; 118 } finally { 119 if(admin != null) { 120 admin.close(); 121 if (admin.getConnection().isClosed()) { 122 rpcClient = (RandomTimeoutRpcClient) RpcClientFactory 123 .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()); 124 } 125 } 126 if(connection != null) { 127 connection.close(); 128 } 129 } 130 } 131 // Ensure the RandomTimeoutRpcEngine is actually being used. 132 assertFalse(lastFailed); 133 assertTrue(RandomTimeoutBlockingRpcChannel.invokations.get() > initialInvocations); 134 } finally { 135 rpcClient.close(); 136 } 137 } 138 139 /** 140 * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel 141 */ 142 public static class RandomTimeoutRpcClient extends BlockingRpcClient { 143 public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, 144 MetricsConnection metrics) { 145 super(conf, clusterId, localAddr, metrics); 146 } 147 148 // Return my own instance, one that does random timeouts 149 @Override 150 public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, 151 User ticket, int rpcTimeout) throws UnknownHostException { 152 return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout); 153 } 154 155 @Override 156 public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) 157 throws UnknownHostException { 158 return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout); 159 } 160 } 161 162 /** 163 * Blocking rpc channel that goes via hbase rpc. 164 */ 165 static class RandomTimeoutBlockingRpcChannel 166 extends AbstractRpcClient.BlockingRpcChannelImplementation { 167 private static final Random RANDOM = new Random(System.currentTimeMillis()); 168 public static final double CHANCE_OF_TIMEOUT = 0.3; 169 private static AtomicInteger invokations = new AtomicInteger(); 170 171 RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn, 172 final User ticket, final int rpcTimeout) { 173 super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); 174 } 175 176 @Override 177 public Message callBlockingMethod(MethodDescriptor md, 178 RpcController controller, Message param, Message returnType) 179 throws ServiceException { 180 invokations.getAndIncrement(); 181 if (RANDOM.nextFloat() < CHANCE_OF_TIMEOUT) { 182 // throw a ServiceException, becuase that is the only exception type that 183 // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different 184 // "actual" type, this may not properly mimic the underlying RpcEngine. 185 throw new ServiceException(new SocketTimeoutException("fake timeout")); 186 } 187 return super.callBlockingMethod(md, controller, param, returnType); 188 } 189 } 190 191 private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation { 192 193 RandomTimeoutRpcChannel(AbstractRpcClient<?> rpcClient, ServerName sn, User ticket, 194 int rpcTimeout) throws UnknownHostException { 195 super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); 196 } 197 198 @Override 199 public void callMethod(MethodDescriptor md, RpcController controller, Message param, 200 Message returnType, RpcCallback<Message> done) { 201 RandomTimeoutBlockingRpcChannel.invokations.getAndIncrement(); 202 if (ThreadLocalRandom.current().nextFloat() < 203 RandomTimeoutBlockingRpcChannel.CHANCE_OF_TIMEOUT) { 204 // throw a ServiceException, because that is the only exception type that 205 // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different 206 // "actual" type, this may not properly mimic the underlying RpcEngine. 207 ((HBaseRpcController) controller).setFailed(new SocketTimeoutException("fake timeout")); 208 done.run(null); 209 return; 210 } 211 super.callMethod(md, controller, param, returnType, done); 212 } 213 } 214}