001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022 023import java.net.InetSocketAddress; 024import java.net.SocketAddress; 025import java.net.SocketTimeoutException; 026import java.net.UnknownHostException; 027import java.util.Random; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.MasterNotRunningException; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.ipc.AbstractRpcClient; 037import org.apache.hadoop.hbase.ipc.BlockingRpcClient; 038import org.apache.hadoop.hbase.ipc.RpcClientFactory; 039import org.apache.hadoop.hbase.security.User; 040import org.apache.hadoop.hbase.testclassification.ClientTests; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.junit.AfterClass; 043import org.junit.BeforeClass; 044import org.junit.ClassRule; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047 048import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 049import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 050import org.apache.hbase.thirdparty.com.google.protobuf.Message; 051import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 052import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 053 054@Category({MediumTests.class, ClientTests.class}) 055public class TestClientTimeouts { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestClientTimeouts.class); 060 061 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 062 protected static int SLAVES = 1; 063 064 /** 065 * @throws java.lang.Exception 066 */ 067 @BeforeClass 068 public static void setUpBeforeClass() throws Exception { 069 TEST_UTIL.startMiniCluster(SLAVES); 070 // Set the custom RPC client with random timeouts as the client 071 TEST_UTIL.getConfiguration().set( 072 RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, 073 RandomTimeoutRpcClient.class.getName()); 074 } 075 076 /** 077 * @throws java.lang.Exception 078 */ 079 @AfterClass 080 public static void tearDownAfterClass() throws Exception { 081 TEST_UTIL.shutdownMiniCluster(); 082 } 083 084 /** 085 * Test that a client that fails an RPC to the master retries properly and 086 * doesn't throw any unexpected exceptions. 087 * @throws Exception 088 */ 089 @Test 090 public void testAdminTimeout() throws Exception { 091 boolean lastFailed = false; 092 int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get(); 093 RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory 094 .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()); 095 096 try { 097 for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) { 098 lastFailed = false; 099 // Ensure the HBaseAdmin uses a new connection by changing Configuration. 100 Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 101 conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 102 Admin admin = null; 103 Connection connection = null; 104 try { 105 connection = ConnectionFactory.createConnection(conf); 106 admin = connection.getAdmin(); 107 // run some admin commands 108 HBaseAdmin.available(conf); 109 admin.setBalancerRunning(false, false); 110 } catch (MasterNotRunningException ex) { 111 // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get 112 // a MasterNotRunningException. It's a bug if we get other exceptions. 113 lastFailed = true; 114 } finally { 115 if(admin != null) { 116 admin.close(); 117 if (admin.getConnection().isClosed()) { 118 rpcClient = (RandomTimeoutRpcClient) RpcClientFactory 119 .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()); 120 } 121 } 122 if(connection != null) { 123 connection.close(); 124 } 125 } 126 } 127 // Ensure the RandomTimeoutRpcEngine is actually being used. 128 assertFalse(lastFailed); 129 assertTrue(RandomTimeoutBlockingRpcChannel.invokations.get() > initialInvocations); 130 } finally { 131 rpcClient.close(); 132 } 133 } 134 135 /** 136 * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel 137 */ 138 public static class RandomTimeoutRpcClient extends BlockingRpcClient { 139 public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, 140 MetricsConnection metrics) { 141 super(conf, clusterId, localAddr, metrics); 142 } 143 144 // Return my own instance, one that does random timeouts 145 @Override 146 public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, 147 User ticket, int rpcTimeout) throws UnknownHostException { 148 return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout); 149 } 150 } 151 152 /** 153 * Blocking rpc channel that goes via hbase rpc. 154 */ 155 static class RandomTimeoutBlockingRpcChannel 156 extends AbstractRpcClient.BlockingRpcChannelImplementation { 157 private static final Random RANDOM = new Random(System.currentTimeMillis()); 158 public static final double CHANCE_OF_TIMEOUT = 0.3; 159 private static AtomicInteger invokations = new AtomicInteger(); 160 161 RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn, 162 final User ticket, final int rpcTimeout) { 163 super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); 164 } 165 166 @Override 167 public Message callBlockingMethod(MethodDescriptor md, 168 RpcController controller, Message param, Message returnType) 169 throws ServiceException { 170 invokations.getAndIncrement(); 171 if (RANDOM.nextFloat() < CHANCE_OF_TIMEOUT) { 172 // throw a ServiceException, becuase that is the only exception type that 173 // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different 174 // "actual" type, this may not properly mimic the underlying RpcEngine. 175 throw new ServiceException(new SocketTimeoutException("fake timeout")); 176 } 177 return super.callBlockingMethod(md, controller, param, returnType); 178 } 179 } 180}