001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022
023import java.net.InetSocketAddress;
024import java.net.SocketAddress;
025import java.net.SocketTimeoutException;
026import java.net.UnknownHostException;
027import java.util.Random;
028import java.util.concurrent.atomic.AtomicInteger;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.MasterNotRunningException;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
037import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
038import org.apache.hadoop.hbase.ipc.RpcClientFactory;
039import org.apache.hadoop.hbase.security.User;
040import org.apache.hadoop.hbase.testclassification.ClientTests;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.junit.AfterClass;
043import org.junit.BeforeClass;
044import org.junit.ClassRule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047
048import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
049import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
050import org.apache.hbase.thirdparty.com.google.protobuf.Message;
051import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
052import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
053
054@Category({MediumTests.class, ClientTests.class})
055public class TestClientTimeouts {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059      HBaseClassTestRule.forClass(TestClientTimeouts.class);
060
061  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
062  protected static int SLAVES = 1;
063
064 /**
065   * @throws java.lang.Exception
066   */
067  @BeforeClass
068  public static void setUpBeforeClass() throws Exception {
069    TEST_UTIL.startMiniCluster(SLAVES);
070    // Set the custom RPC client with random timeouts as the client
071    TEST_UTIL.getConfiguration().set(
072        RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
073        RandomTimeoutRpcClient.class.getName());
074  }
075
076  /**
077   * @throws java.lang.Exception
078   */
079  @AfterClass
080  public static void tearDownAfterClass() throws Exception {
081    TEST_UTIL.shutdownMiniCluster();
082  }
083
084  /**
085   * Test that a client that fails an RPC to the master retries properly and
086   * doesn't throw any unexpected exceptions.
087   * @throws Exception
088   */
089  @Test
090  public void testAdminTimeout() throws Exception {
091    boolean lastFailed = false;
092    int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
093    RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
094        .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
095
096    try {
097      for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
098        lastFailed = false;
099        // Ensure the HBaseAdmin uses a new connection by changing Configuration.
100        Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
101        conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
102        Admin admin = null;
103        Connection connection = null;
104        try {
105          connection = ConnectionFactory.createConnection(conf);
106          admin = connection.getAdmin();
107          // run some admin commands
108          HBaseAdmin.available(conf);
109          admin.setBalancerRunning(false, false);
110        } catch (MasterNotRunningException ex) {
111          // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
112          // a MasterNotRunningException.  It's a bug if we get other exceptions.
113          lastFailed = true;
114        } finally {
115          if(admin != null) {
116            admin.close();
117            if (admin.getConnection().isClosed()) {
118              rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
119                  .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
120            }
121          }
122          if(connection != null) {
123            connection.close();
124          }
125        }
126      }
127      // Ensure the RandomTimeoutRpcEngine is actually being used.
128      assertFalse(lastFailed);
129      assertTrue(RandomTimeoutBlockingRpcChannel.invokations.get() > initialInvocations);
130    } finally {
131      rpcClient.close();
132    }
133  }
134
135  /**
136   * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
137   */
138  public static class RandomTimeoutRpcClient extends BlockingRpcClient {
139    public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
140        MetricsConnection metrics) {
141      super(conf, clusterId, localAddr, metrics);
142    }
143
144    // Return my own instance, one that does random timeouts
145    @Override
146    public BlockingRpcChannel createBlockingRpcChannel(ServerName sn,
147        User ticket, int rpcTimeout) throws UnknownHostException {
148      return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
149    }
150  }
151
152  /**
153   * Blocking rpc channel that goes via hbase rpc.
154   */
155  static class RandomTimeoutBlockingRpcChannel
156      extends AbstractRpcClient.BlockingRpcChannelImplementation {
157    private static final Random RANDOM = new Random(System.currentTimeMillis());
158    public static final double CHANCE_OF_TIMEOUT = 0.3;
159    private static AtomicInteger invokations = new AtomicInteger();
160
161    RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn,
162        final User ticket, final int rpcTimeout) {
163      super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
164    }
165
166    @Override
167    public Message callBlockingMethod(MethodDescriptor md,
168        RpcController controller, Message param, Message returnType)
169        throws ServiceException {
170      invokations.getAndIncrement();
171      if (RANDOM.nextFloat() < CHANCE_OF_TIMEOUT) {
172        // throw a ServiceException, becuase that is the only exception type that
173        // {@link ProtobufRpcEngine} throws.  If this RpcEngine is used with a different
174        // "actual" type, this may not properly mimic the underlying RpcEngine.
175        throw new ServiceException(new SocketTimeoutException("fake timeout"));
176      }
177      return super.callBlockingMethod(md, controller, param, returnType);
178    }
179  }
180}