001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
021import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.ForkJoinPool;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicLong;
031import java.util.function.Supplier;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
036import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
037import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
038import org.apache.hadoop.hbase.coprocessor.MasterObserver;
039import org.apache.hadoop.hbase.coprocessor.ObserverContext;
040import org.apache.hadoop.hbase.testclassification.ClientTests;
041import org.apache.hadoop.hbase.testclassification.LargeTests;
042import org.apache.hadoop.hbase.util.Threads;
043import org.junit.After;
044import org.junit.Before;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.junit.runner.RunWith;
049import org.junit.runners.Parameterized;
050import org.junit.runners.Parameterized.Parameter;
051import org.junit.runners.Parameterized.Parameters;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
056
057@RunWith(Parameterized.class)
058@Category({ LargeTests.class, ClientTests.class })
059public class TestAsyncAdminBuilder {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestAsyncAdminBuilder.class);
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestAsyncAdminBuilder.class);
066  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
067  private static AsyncConnection ASYNC_CONN;
068
069  @Parameter
070  public Supplier<AsyncAdminBuilder> getAdminBuilder;
071
072  private static AsyncAdminBuilder getRawAsyncAdminBuilder() {
073    return ASYNC_CONN.getAdminBuilder();
074  }
075
076  private static AsyncAdminBuilder getAsyncAdminBuilder() {
077    return ASYNC_CONN.getAdminBuilder(ForkJoinPool.commonPool());
078  }
079
080  @Parameters
081  public static List<Object[]> params() {
082    return Arrays.asList(new Supplier<?>[] { TestAsyncAdminBuilder::getRawAsyncAdminBuilder },
083      new Supplier<?>[] { TestAsyncAdminBuilder::getAsyncAdminBuilder });
084  }
085
086  private static final int DEFAULT_RPC_TIMEOUT = 10000;
087  private static final int DEFAULT_OPERATION_TIMEOUT = 30000;
088  private static final int DEFAULT_RETRIES_NUMBER = 2;
089
090  @Before
091  public void setUp() throws Exception {
092    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, DEFAULT_RPC_TIMEOUT);
093    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
094      DEFAULT_OPERATION_TIMEOUT);
095    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
096      DEFAULT_RETRIES_NUMBER);
097    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
098  }
099
100  @After
101  public void tearDown() throws Exception {
102    Closeables.close(ASYNC_CONN, true);
103    TEST_UTIL.shutdownMiniCluster();
104  }
105
106  @Test
107  public void testRpcTimeout() throws Exception {
108    TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
109      TestRpcTimeoutCoprocessor.class.getName());
110    TEST_UTIL.startMiniCluster(2);
111    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
112
113    try {
114      getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT / 2, TimeUnit.MILLISECONDS).build()
115        .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
116      fail("We expect an exception here");
117    } catch (Exception e) {
118      // expected
119    }
120
121    try {
122      getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT * 2, TimeUnit.MILLISECONDS).build()
123        .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
124    } catch (Exception e) {
125      fail("The Operation should succeed, unexpected exception: " + e.getMessage());
126    }
127  }
128
129  @Test
130  public void testOperationTimeout() throws Exception {
131    // set retry number to 100 to make sure that this test only be affected by operation timeout
132    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100);
133    TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
134      TestOperationTimeoutCoprocessor.class.getName());
135    TEST_UTIL.startMiniCluster(2);
136    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
137
138    try {
139      getAdminBuilder.get()
140        .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT / 2, TimeUnit.MILLISECONDS).build()
141        .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
142      fail("We expect an exception here");
143    } catch (Exception e) {
144      // expected
145    }
146
147    try {
148      getAdminBuilder.get()
149        .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT * 2, TimeUnit.MILLISECONDS).build()
150        .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
151    } catch (Exception e) {
152      fail("The Operation should succeed, unexpected exception: " + e.getMessage());
153    }
154  }
155
156  @Test
157  public void testMaxRetries() throws Exception {
158    // set operation timeout to 300s to make sure that this test only be affected by retry number
159    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 300000);
160    TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
161      TestMaxRetriesCoprocessor.class.getName());
162    TEST_UTIL.startMiniCluster(2);
163    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
164
165    try {
166      getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER / 2).build()
167        .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
168      fail("We expect an exception here");
169    } catch (Exception e) {
170      // expected
171    }
172
173    try {
174      getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER * 2).build()
175        .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
176    } catch (Exception e) {
177      fail("The Operation should succeed, unexpected exception: " + e.getMessage());
178    }
179  }
180
181  public static class TestRpcTimeoutCoprocessor implements MasterCoprocessor, MasterObserver {
182    public TestRpcTimeoutCoprocessor() {
183    }
184
185    @Override
186    public Optional<MasterObserver> getMasterObserver() {
187      return Optional.of(this);
188    }
189
190    @Override
191    public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
192      String namespace) throws IOException {
193      Threads.sleep(DEFAULT_RPC_TIMEOUT);
194    }
195  }
196
197  public static class TestOperationTimeoutCoprocessor implements MasterCoprocessor, MasterObserver {
198    AtomicLong sleepTime = new AtomicLong(0);
199
200    public TestOperationTimeoutCoprocessor() {
201    }
202
203    @Override
204    public Optional<MasterObserver> getMasterObserver() {
205      return Optional.of(this);
206    }
207
208    @Override
209    public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
210      String namespace) throws IOException {
211      Threads.sleep(DEFAULT_RPC_TIMEOUT / 2);
212      if (sleepTime.addAndGet(DEFAULT_RPC_TIMEOUT / 2) < DEFAULT_OPERATION_TIMEOUT) {
213        throw new IOException("call fail");
214      }
215    }
216  }
217
218  public static class TestMaxRetriesCoprocessor implements MasterCoprocessor, MasterObserver {
219    AtomicLong retryNum = new AtomicLong(0);
220
221    public TestMaxRetriesCoprocessor() {
222    }
223
224    @Override
225    public Optional<MasterObserver> getMasterObserver() {
226      return Optional.of(this);
227    }
228
229    @Override
230    public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
231      String namespace) throws IOException {
232      if (retryNum.getAndIncrement() < DEFAULT_RETRIES_NUMBER) {
233        throw new IOException("call fail");
234      }
235    }
236  }
237}