001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.net.SocketAddress; 025import java.net.SocketTimeoutException; 026import java.util.Map; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.ipc.AbstractRpcClient; 036import org.apache.hadoop.hbase.ipc.BlockingRpcClient; 037import org.apache.hadoop.hbase.ipc.HBaseRpcController; 038import org.apache.hadoop.hbase.ipc.RpcClientFactory; 039import org.apache.hadoop.hbase.net.Address; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.junit.AfterClass; 044import org.junit.BeforeClass; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048 049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 050import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 051import org.apache.hbase.thirdparty.com.google.protobuf.Message; 052import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 053import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 054import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 055import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 056 057@Category({ MediumTests.class, ClientTests.class }) 058public class TestClientTimeouts { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestClientTimeouts.class); 063 064 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 065 protected static int SLAVES = 1; 066 067 @BeforeClass 068 public static void setUpBeforeClass() throws Exception { 069 TEST_UTIL.startMiniCluster(SLAVES); 070 } 071 072 @AfterClass 073 public static void tearDownAfterClass() throws Exception { 074 TEST_UTIL.shutdownMiniCluster(); 075 } 076 077 private Connection createConnection() { 078 // Ensure the HBaseAdmin uses a new connection by changing Configuration. 079 Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 080 // Set the custom RPC client with random timeouts as the client 081 conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, 082 RandomTimeoutRpcClient.class.getName()); 083 conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); 084 for (;;) { 085 try { 086 return ConnectionFactory.createConnection(conf); 087 } catch (IOException e) { 088 // since we randomly throw SocketTimeoutException, it is possible that we fail when creating 089 // the Connection, but this is not what we want to test here, so just ignore it and try 090 // again 091 } 092 } 093 } 094 095 /** 096 * Test that a client that fails an RPC to the master retries properly and doesn't throw any 097 * unexpected exceptions. 098 */ 099 @Test 100 public void testAdminTimeout() throws Exception { 101 try (Connection conn = createConnection(); Admin admin = conn.getAdmin()) { 102 int initialInvocations = invokations.get(); 103 boolean balanceEnabled = admin.isBalancerEnabled(); 104 for (int i = 0; i < 5; i++) { 105 assertEquals(balanceEnabled, admin.balancerSwitch(!balanceEnabled, false)); 106 balanceEnabled = !balanceEnabled; 107 } 108 // Ensure the RandomTimeoutRpcEngine is actually being used. 109 assertTrue(invokations.get() > initialInvocations); 110 } 111 } 112 113 /** 114 * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel 115 */ 116 public static class RandomTimeoutRpcClient extends BlockingRpcClient { 117 public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, 118 MetricsConnection metrics, Map<String, byte[]> connectionAttributes) { 119 super(conf, clusterId, localAddr, metrics, connectionAttributes); 120 } 121 122 // Return my own instance, one that does random timeouts 123 @Override 124 public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) { 125 return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout); 126 } 127 128 @Override 129 public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) { 130 return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout); 131 } 132 } 133 134 private static AtomicInteger invokations = new AtomicInteger(); 135 136 private static final double CHANCE_OF_TIMEOUT = 0.3; 137 138 /** 139 * Blocking rpc channel that goes via hbase rpc. 140 */ 141 private static class RandomTimeoutBlockingRpcChannel 142 extends AbstractRpcClient.BlockingRpcChannelImplementation { 143 144 RandomTimeoutBlockingRpcChannel(BlockingRpcClient rpcClient, ServerName sn, User ticket, 145 int rpcTimeout) { 146 super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); 147 } 148 149 @Override 150 public Message callBlockingMethod(MethodDescriptor md, RpcController controller, Message param, 151 Message returnType) throws ServiceException { 152 invokations.getAndIncrement(); 153 if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) { 154 // throw a ServiceException, becuase that is the only exception type that 155 // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different 156 // "actual" type, this may not properly mimic the underlying RpcEngine. 157 throw new ServiceException(new SocketTimeoutException("fake timeout")); 158 } 159 return super.callBlockingMethod(md, controller, param, returnType); 160 } 161 } 162 163 private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation { 164 165 RandomTimeoutRpcChannel(AbstractRpcClient<?> rpcClient, ServerName sn, User ticket, 166 int rpcTimeout) { 167 super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); 168 } 169 170 @Override 171 public void callMethod(MethodDescriptor md, RpcController controller, Message param, 172 Message returnType, RpcCallback<Message> done) { 173 invokations.getAndIncrement(); 174 if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) { 175 // throw a ServiceException, because that is the only exception type that 176 // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different 177 // "actual" type, this may not properly mimic the underlying RpcEngine. 178 ((HBaseRpcController) controller).setFailed(new SocketTimeoutException("fake timeout")); 179 done.run(null); 180 return; 181 } 182 super.callMethod(md, controller, param, returnType, done); 183 } 184 } 185}