001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.net.SocketAddress;
025import java.net.SocketTimeoutException;
026import java.util.Map;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.concurrent.atomic.AtomicInteger;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
036import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
037import org.apache.hadoop.hbase.ipc.HBaseRpcController;
038import org.apache.hadoop.hbase.ipc.RpcClientFactory;
039import org.apache.hadoop.hbase.net.Address;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.junit.AfterClass;
044import org.junit.BeforeClass;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048
049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
050import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
051import org.apache.hbase.thirdparty.com.google.protobuf.Message;
052import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
053import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
054import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
055import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
056
057@Category({ MediumTests.class, ClientTests.class })
058public class TestClientTimeouts {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestClientTimeouts.class);
063
064  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
065  protected static int SLAVES = 1;
066
067  @BeforeClass
068  public static void setUpBeforeClass() throws Exception {
069    TEST_UTIL.startMiniCluster(SLAVES);
070  }
071
072  @AfterClass
073  public static void tearDownAfterClass() throws Exception {
074    TEST_UTIL.shutdownMiniCluster();
075  }
076
077  private Connection createConnection() {
078    // Ensure the HBaseAdmin uses a new connection by changing Configuration.
079    Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
080    // Set the custom RPC client with random timeouts as the client
081    conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
082      RandomTimeoutRpcClient.class.getName());
083    conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
084    for (;;) {
085      try {
086        return ConnectionFactory.createConnection(conf);
087      } catch (IOException e) {
088        // since we randomly throw SocketTimeoutException, it is possible that we fail when creating
089        // the Connection, but this is not what we want to test here, so just ignore it and try
090        // again
091      }
092    }
093  }
094
095  /**
096   * Test that a client that fails an RPC to the master retries properly and doesn't throw any
097   * unexpected exceptions.
098   */
099  @Test
100  public void testAdminTimeout() throws Exception {
101    try (Connection conn = createConnection(); Admin admin = conn.getAdmin()) {
102      int initialInvocations = invokations.get();
103      boolean balanceEnabled = admin.isBalancerEnabled();
104      for (int i = 0; i < 5; i++) {
105        assertEquals(balanceEnabled, admin.balancerSwitch(!balanceEnabled, false));
106        balanceEnabled = !balanceEnabled;
107      }
108      // Ensure the RandomTimeoutRpcEngine is actually being used.
109      assertTrue(invokations.get() > initialInvocations);
110    }
111  }
112
113  /**
114   * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
115   */
116  public static class RandomTimeoutRpcClient extends BlockingRpcClient {
117    public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
118      MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
119      super(conf, clusterId, localAddr, metrics, connectionAttributes);
120    }
121
122    // Return my own instance, one that does random timeouts
123    @Override
124    public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User ticket, int rpcTimeout) {
125      return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
126    }
127
128    @Override
129    public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout) {
130      return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout);
131    }
132  }
133
134  private static AtomicInteger invokations = new AtomicInteger();
135
136  private static final double CHANCE_OF_TIMEOUT = 0.3;
137
138  /**
139   * Blocking rpc channel that goes via hbase rpc.
140   */
141  private static class RandomTimeoutBlockingRpcChannel
142    extends AbstractRpcClient.BlockingRpcChannelImplementation {
143
144    RandomTimeoutBlockingRpcChannel(BlockingRpcClient rpcClient, ServerName sn, User ticket,
145      int rpcTimeout) {
146      super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
147    }
148
149    @Override
150    public Message callBlockingMethod(MethodDescriptor md, RpcController controller, Message param,
151      Message returnType) throws ServiceException {
152      invokations.getAndIncrement();
153      if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) {
154        // throw a ServiceException, becuase that is the only exception type that
155        // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different
156        // "actual" type, this may not properly mimic the underlying RpcEngine.
157        throw new ServiceException(new SocketTimeoutException("fake timeout"));
158      }
159      return super.callBlockingMethod(md, controller, param, returnType);
160    }
161  }
162
163  private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation {
164
165    RandomTimeoutRpcChannel(AbstractRpcClient<?> rpcClient, ServerName sn, User ticket,
166      int rpcTimeout) {
167      super(rpcClient, Address.fromParts(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
168    }
169
170    @Override
171    public void callMethod(MethodDescriptor md, RpcController controller, Message param,
172      Message returnType, RpcCallback<Message> done) {
173      invokations.getAndIncrement();
174      if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) {
175        // throw a ServiceException, because that is the only exception type that
176        // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different
177        // "actual" type, this may not properly mimic the underlying RpcEngine.
178        ((HBaseRpcController) controller).setFailed(new SocketTimeoutException("fake timeout"));
179        done.run(null);
180        return;
181      }
182      super.callMethod(md, controller, param, returnType, done);
183    }
184  }
185}