001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022
023import java.net.InetSocketAddress;
024import java.net.SocketAddress;
025import java.net.SocketTimeoutException;
026import java.net.UnknownHostException;
027import java.util.Random;
028import java.util.concurrent.ThreadLocalRandom;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.MasterNotRunningException;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
038import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
039import org.apache.hadoop.hbase.ipc.HBaseRpcController;
040import org.apache.hadoop.hbase.ipc.RpcClientFactory;
041import org.apache.hadoop.hbase.security.User;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.junit.AfterClass;
045import org.junit.BeforeClass;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
051import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
052import org.apache.hbase.thirdparty.com.google.protobuf.Message;
053import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
054import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
055import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
057
058@Category({MediumTests.class, ClientTests.class})
059public class TestClientTimeouts {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063      HBaseClassTestRule.forClass(TestClientTimeouts.class);
064
065  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
066  protected static int SLAVES = 1;
067
068 /**
069   * @throws java.lang.Exception
070   */
071  @BeforeClass
072  public static void setUpBeforeClass() throws Exception {
073    TEST_UTIL.startMiniCluster(SLAVES);
074    // Set the custom RPC client with random timeouts as the client
075    TEST_UTIL.getConfiguration().set(
076        RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
077        RandomTimeoutRpcClient.class.getName());
078  }
079
080  /**
081   * @throws java.lang.Exception
082   */
083  @AfterClass
084  public static void tearDownAfterClass() throws Exception {
085    TEST_UTIL.shutdownMiniCluster();
086  }
087
088  /**
089   * Test that a client that fails an RPC to the master retries properly and
090   * doesn't throw any unexpected exceptions.
091   * @throws Exception
092   */
093  @Test
094  public void testAdminTimeout() throws Exception {
095    boolean lastFailed = false;
096    int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
097    RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
098        .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
099
100    try {
101      for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
102        lastFailed = false;
103        // Ensure the HBaseAdmin uses a new connection by changing Configuration.
104        Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
105        conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
106        Admin admin = null;
107        Connection connection = null;
108        try {
109          connection = ConnectionFactory.createConnection(conf);
110          admin = connection.getAdmin();
111          // run some admin commands
112          HBaseAdmin.available(conf);
113          admin.setBalancerRunning(false, false);
114        } catch (MasterNotRunningException ex) {
115          // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
116          // a MasterNotRunningException.  It's a bug if we get other exceptions.
117          lastFailed = true;
118        } finally {
119          if(admin != null) {
120            admin.close();
121            if (admin.getConnection().isClosed()) {
122              rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
123                  .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
124            }
125          }
126          if(connection != null) {
127            connection.close();
128          }
129        }
130      }
131      // Ensure the RandomTimeoutRpcEngine is actually being used.
132      assertFalse(lastFailed);
133      assertTrue(RandomTimeoutBlockingRpcChannel.invokations.get() > initialInvocations);
134    } finally {
135      rpcClient.close();
136    }
137  }
138
139  /**
140   * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
141   */
142  public static class RandomTimeoutRpcClient extends BlockingRpcClient {
143    public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
144        MetricsConnection metrics) {
145      super(conf, clusterId, localAddr, metrics);
146    }
147
148    // Return my own instance, one that does random timeouts
149    @Override
150    public BlockingRpcChannel createBlockingRpcChannel(ServerName sn,
151        User ticket, int rpcTimeout) throws UnknownHostException {
152      return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
153    }
154
155    @Override
156    public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout)
157        throws UnknownHostException {
158      return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout);
159    }
160  }
161
162  /**
163   * Blocking rpc channel that goes via hbase rpc.
164   */
165  static class RandomTimeoutBlockingRpcChannel
166      extends AbstractRpcClient.BlockingRpcChannelImplementation {
167    private static final Random RANDOM = new Random(System.currentTimeMillis());
168    public static final double CHANCE_OF_TIMEOUT = 0.3;
169    private static AtomicInteger invokations = new AtomicInteger();
170
171    RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn,
172        final User ticket, final int rpcTimeout) {
173      super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
174    }
175
176    @Override
177    public Message callBlockingMethod(MethodDescriptor md,
178        RpcController controller, Message param, Message returnType)
179        throws ServiceException {
180      invokations.getAndIncrement();
181      if (RANDOM.nextFloat() < CHANCE_OF_TIMEOUT) {
182        // throw a ServiceException, becuase that is the only exception type that
183        // {@link ProtobufRpcEngine} throws.  If this RpcEngine is used with a different
184        // "actual" type, this may not properly mimic the underlying RpcEngine.
185        throw new ServiceException(new SocketTimeoutException("fake timeout"));
186      }
187      return super.callBlockingMethod(md, controller, param, returnType);
188    }
189  }
190
191  private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation {
192
193    RandomTimeoutRpcChannel(AbstractRpcClient<?> rpcClient, ServerName sn, User ticket,
194                            int rpcTimeout) throws UnknownHostException {
195      super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
196    }
197
198    @Override
199    public void callMethod(MethodDescriptor md, RpcController controller, Message param,
200                           Message returnType, RpcCallback<Message> done) {
201      RandomTimeoutBlockingRpcChannel.invokations.getAndIncrement();
202      if (ThreadLocalRandom.current().nextFloat() <
203          RandomTimeoutBlockingRpcChannel.CHANCE_OF_TIMEOUT) {
204        // throw a ServiceException, because that is the only exception type that
205        // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different
206        // "actual" type, this may not properly mimic the underlying RpcEngine.
207        ((HBaseRpcController) controller).setFailed(new SocketTimeoutException("fake timeout"));
208        done.run(null);
209        return;
210      }
211      super.callMethod(md, controller, param, returnType, done);
212    }
213  }
214}