001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR; 021import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.ForkJoinPool; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicLong; 031import java.util.function.Supplier; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 036import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 037import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; 038import org.apache.hadoop.hbase.coprocessor.MasterObserver; 039import org.apache.hadoop.hbase.coprocessor.ObserverContext; 040import org.apache.hadoop.hbase.testclassification.ClientTests; 041import org.apache.hadoop.hbase.testclassification.LargeTests; 042import org.apache.hadoop.hbase.util.Threads; 043import org.junit.After; 044import org.junit.Before; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.runner.RunWith; 049import org.junit.runners.Parameterized; 050import org.junit.runners.Parameterized.Parameter; 051import org.junit.runners.Parameterized.Parameters; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 056 057@RunWith(Parameterized.class) 058@Category({ LargeTests.class, ClientTests.class }) 059public class TestAsyncAdminBuilder { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestAsyncAdminBuilder.class); 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestAsyncAdminBuilder.class); 066 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 067 private static AsyncConnection ASYNC_CONN; 068 069 @Parameter 070 public Supplier<AsyncAdminBuilder> getAdminBuilder; 071 072 private static AsyncAdminBuilder getRawAsyncAdminBuilder() { 073 return ASYNC_CONN.getAdminBuilder(); 074 } 075 076 private static AsyncAdminBuilder getAsyncAdminBuilder() { 077 return ASYNC_CONN.getAdminBuilder(ForkJoinPool.commonPool()); 078 } 079 080 @Parameters 081 public static List<Object[]> params() { 082 return Arrays.asList(new Supplier<?>[] { TestAsyncAdminBuilder::getRawAsyncAdminBuilder }, 083 new Supplier<?>[] { TestAsyncAdminBuilder::getAsyncAdminBuilder }); 084 } 085 086 private static final int DEFAULT_RPC_TIMEOUT = 10000; 087 private static final int DEFAULT_OPERATION_TIMEOUT = 30000; 088 private static final int DEFAULT_RETRIES_NUMBER = 2; 089 090 @Before 091 public void setUp() throws Exception { 092 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, DEFAULT_RPC_TIMEOUT); 093 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 094 DEFAULT_OPERATION_TIMEOUT); 095 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 096 DEFAULT_RETRIES_NUMBER); 097 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 098 } 099 100 @After 101 public void tearDown() throws Exception { 102 Closeables.close(ASYNC_CONN, true); 103 TEST_UTIL.shutdownMiniCluster(); 104 } 105 106 @Test 107 public void testRpcTimeout() throws Exception { 108 TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 109 TestRpcTimeoutCoprocessor.class.getName()); 110 TEST_UTIL.startMiniCluster(2); 111 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 112 113 try { 114 getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT / 2, TimeUnit.MILLISECONDS).build() 115 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 116 fail("We expect an exception here"); 117 } catch (Exception e) { 118 // expected 119 } 120 121 try { 122 getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT * 2, TimeUnit.MILLISECONDS).build() 123 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 124 } catch (Exception e) { 125 fail("The Operation should succeed, unexpected exception: " + e.getMessage()); 126 } 127 } 128 129 @Test 130 public void testOperationTimeout() throws Exception { 131 // set retry number to 100 to make sure that this test only be affected by operation timeout 132 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); 133 TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 134 TestOperationTimeoutCoprocessor.class.getName()); 135 TEST_UTIL.startMiniCluster(2); 136 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 137 138 try { 139 getAdminBuilder.get() 140 .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT / 2, TimeUnit.MILLISECONDS).build() 141 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 142 fail("We expect an exception here"); 143 } catch (Exception e) { 144 // expected 145 } 146 147 try { 148 getAdminBuilder.get() 149 .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT * 2, TimeUnit.MILLISECONDS).build() 150 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 151 } catch (Exception e) { 152 fail("The Operation should succeed, unexpected exception: " + e.getMessage()); 153 } 154 } 155 156 @Test 157 public void testMaxRetries() throws Exception { 158 // set operation timeout to 300s to make sure that this test only be affected by retry number 159 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 300000); 160 TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 161 TestMaxRetriesCoprocessor.class.getName()); 162 TEST_UTIL.startMiniCluster(2); 163 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 164 165 try { 166 getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER / 2).build() 167 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 168 fail("We expect an exception here"); 169 } catch (Exception e) { 170 // expected 171 } 172 173 try { 174 getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER * 2).build() 175 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 176 } catch (Exception e) { 177 fail("The Operation should succeed, unexpected exception: " + e.getMessage()); 178 } 179 } 180 181 public static class TestRpcTimeoutCoprocessor implements MasterCoprocessor, MasterObserver { 182 public TestRpcTimeoutCoprocessor() { 183 } 184 185 @Override 186 public Optional<MasterObserver> getMasterObserver() { 187 return Optional.of(this); 188 } 189 190 @Override 191 public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, 192 String namespace) throws IOException { 193 Threads.sleep(DEFAULT_RPC_TIMEOUT); 194 } 195 } 196 197 public static class TestOperationTimeoutCoprocessor implements MasterCoprocessor, MasterObserver { 198 AtomicLong sleepTime = new AtomicLong(0); 199 200 public TestOperationTimeoutCoprocessor() { 201 } 202 203 @Override 204 public Optional<MasterObserver> getMasterObserver() { 205 return Optional.of(this); 206 } 207 208 @Override 209 public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, 210 String namespace) throws IOException { 211 Threads.sleep(DEFAULT_RPC_TIMEOUT / 2); 212 if (sleepTime.addAndGet(DEFAULT_RPC_TIMEOUT / 2) < DEFAULT_OPERATION_TIMEOUT) { 213 throw new IOException("call fail"); 214 } 215 } 216 } 217 218 public static class TestMaxRetriesCoprocessor implements MasterCoprocessor, MasterObserver { 219 AtomicLong retryNum = new AtomicLong(0); 220 221 public TestMaxRetriesCoprocessor() { 222 } 223 224 @Override 225 public Optional<MasterObserver> getMasterObserver() { 226 return Optional.of(this); 227 } 228 229 @Override 230 public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, 231 String namespace) throws IOException { 232 if (retryNum.getAndIncrement() < DEFAULT_RETRIES_NUMBER) { 233 throw new IOException("call fail"); 234 } 235 } 236 } 237}