001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022
023import java.net.SocketAddress;
024import java.net.SocketTimeoutException;
025import java.util.Map;
026import java.util.concurrent.ThreadLocalRandom;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
035import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
036import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
037import org.apache.hadoop.hbase.ipc.HBaseRpcController;
038import org.apache.hadoop.hbase.ipc.RpcClientFactory;
039import org.apache.hadoop.hbase.net.Address;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.junit.AfterClass;
044import org.junit.BeforeClass;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048
049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
050import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
051import org.apache.hbase.thirdparty.com.google.protobuf.Message;
052import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
053import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
054import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
055import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
056
057@Category({ MediumTests.class, ClientTests.class })
058public class TestClientTimeouts {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestClientTimeouts.class);
063
064  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
065  protected static int SLAVES = 1;
066
067  @BeforeClass
068  public static void setUpBeforeClass() throws Exception {
069    TEST_UTIL.startMiniCluster(SLAVES);
070    // Set the custom RPC client with random timeouts as the client
071    TEST_UTIL.getConfiguration().set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
072      RandomTimeoutRpcClient.class.getName());
073  }
074
075  @AfterClass
076  public static void tearDownAfterClass() throws Exception {
077    TEST_UTIL.shutdownMiniCluster();
078  }
079
080  /**
081   * Test that a client that fails an RPC to the master retries properly and doesn't throw any
082   * unexpected exceptions.
083   */
084  @Test
085  public void testAdminTimeout() throws Exception {
086    boolean lastFailed = false;
087    int initialInvocations = invokations.get();
088    RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
089      .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
090
091    try {
092      for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
093        lastFailed = false;
094        // Ensure the HBaseAdmin uses a new connection by changing Configuration.
095        Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
096        conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
097        Admin admin = null;
098        Connection connection = null;
099        try {
100          connection = ConnectionFactory.createConnection(conf);
101          admin = connection.getAdmin();
102          admin.balancerSwitch(false, false);
103        } catch (MasterRegistryFetchException ex) {
104          // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
105          // a MasterRegistryFetchException. It's a bug if we get other exceptions.
106          lastFailed = true;
107        } finally {
108          if (admin != null) {
109            admin.close();
110            if (admin.getConnection().isClosed()) {
111              rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
112                .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
113            }
114          }
115          if (connection != null) {
116            connection.close();
117          }
118        }
119      }
120      // Ensure the RandomTimeoutRpcEngine is actually being used.
121      assertFalse(lastFailed);
122      assertTrue(invokations.get() > initialInvocations);
123    } finally {
124      rpcClient.close();
125    }
126  }
127
128  /**
129   * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
130   */
131  public static class RandomTimeoutRpcClient extends BlockingRpcClient {
132    public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
133      MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
134      super(conf, clusterId, localAddr, metrics, connectionAttributes);
135    }
136
137    // Return my own instance, one that does random timeouts
138    @Override
139    public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) {
140      return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
141    }
142
143    @Override
144    public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) {
145      return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout);
146    }
147  }
148
149  private static AtomicInteger invokations = new AtomicInteger();
150
151  private static final double CHANCE_OF_TIMEOUT = 0.3;
152
153  /**
154   * Blocking rpc channel that goes via hbase rpc.
155   */
156  private static class RandomTimeoutBlockingRpcChannel
157    extends AbstractRpcClient.BlockingRpcChannelImplementation {
158
159    RandomTimeoutBlockingRpcChannel(BlockingRpcClient rpcClient, ServerName sn, User ticket,
160      int rpcTimeout) {
161      super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
162    }
163
164    @Override
165    public Message callBlockingMethod(MethodDescriptor md, RpcController controller, Message param,
166      Message returnType) throws ServiceException {
167      invokations.getAndIncrement();
168      if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) {
169        // throw a ServiceException, becuase that is the only exception type that
170        // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different
171        // "actual" type, this may not properly mimic the underlying RpcEngine.
172        throw new ServiceException(new SocketTimeoutException("fake timeout"));
173      }
174      return super.callBlockingMethod(md, controller, param, returnType);
175    }
176  }
177
178  private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation {
179
180    RandomTimeoutRpcChannel(AbstractRpcClient<?> rpcClient, ServerName sn, User ticket,
181      int rpcTimeout) {
182      super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
183    }
184
185    @Override
186    public void callMethod(MethodDescriptor md, RpcController controller, Message param,
187      Message returnType, RpcCallback<Message> done) {
188      invokations.getAndIncrement();
189      if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) {
190        // throw a ServiceException, because that is the only exception type that
191        // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different
192        // "actual" type, this may not properly mimic the underlying RpcEngine.
193        ((HBaseRpcController) controller).setFailed(new SocketTimeoutException("fake timeout"));
194        done.run(null);
195        return;
196      }
197      super.callMethod(md, controller, param, returnType, done);
198    }
199  }
200}