001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR; 021import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.ForkJoinPool; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicLong; 031import java.util.function.Supplier; 032import org.apache.commons.io.IOUtils; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 037import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 038import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; 039import org.apache.hadoop.hbase.coprocessor.MasterObserver; 040import org.apache.hadoop.hbase.coprocessor.ObserverContext; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.util.Threads; 044import org.junit.After; 045import org.junit.Before; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.junit.runner.RunWith; 050import org.junit.runners.Parameterized; 051import org.junit.runners.Parameterized.Parameter; 052import org.junit.runners.Parameterized.Parameters; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056@RunWith(Parameterized.class) 057@Category({ LargeTests.class, ClientTests.class }) 058public class TestAsyncAdminBuilder { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestAsyncAdminBuilder.class); 063 064 private static final Logger LOG = LoggerFactory.getLogger(TestAsyncAdminBuilder.class); 065 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 066 private static AsyncConnection ASYNC_CONN; 067 068 @Parameter 069 public Supplier<AsyncAdminBuilder> getAdminBuilder; 070 071 private static AsyncAdminBuilder getRawAsyncAdminBuilder() { 072 return ASYNC_CONN.getAdminBuilder(); 073 } 074 075 private static AsyncAdminBuilder getAsyncAdminBuilder() { 076 return ASYNC_CONN.getAdminBuilder(ForkJoinPool.commonPool()); 077 } 078 079 @Parameters 080 public static List<Object[]> params() { 081 return Arrays.asList(new Supplier<?>[] { TestAsyncAdminBuilder::getRawAsyncAdminBuilder }, 082 new Supplier<?>[] { TestAsyncAdminBuilder::getAsyncAdminBuilder }); 083 } 084 085 private static final int DEFAULT_RPC_TIMEOUT = 10000; 086 private static final int DEFAULT_OPERATION_TIMEOUT = 30000; 087 private static final int DEFAULT_RETRIES_NUMBER = 2; 088 089 @Before 090 public void setUp() throws Exception { 091 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, DEFAULT_RPC_TIMEOUT); 092 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 093 DEFAULT_OPERATION_TIMEOUT); 094 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 095 DEFAULT_RETRIES_NUMBER); 096 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 097 } 098 099 @After 100 public void tearDown() throws Exception { 101 IOUtils.closeQuietly(ASYNC_CONN); 102 TEST_UTIL.shutdownMiniCluster(); 103 } 104 105 @Test 106 public void testRpcTimeout() throws Exception { 107 TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 108 TestRpcTimeoutCoprocessor.class.getName()); 109 TEST_UTIL.startMiniCluster(2); 110 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 111 112 try { 113 getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT / 2, TimeUnit.MILLISECONDS).build() 114 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 115 fail("We expect an exception here"); 116 } catch (Exception e) { 117 // expected 118 } 119 120 try { 121 getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT * 2, TimeUnit.MILLISECONDS).build() 122 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 123 } catch (Exception e) { 124 fail("The Operation should succeed, unexpected exception: " + e.getMessage()); 125 } 126 } 127 128 @Test 129 public void testOperationTimeout() throws Exception { 130 // set retry number to 100 to make sure that this test only be affected by operation timeout 131 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); 132 TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 133 TestOperationTimeoutCoprocessor.class.getName()); 134 TEST_UTIL.startMiniCluster(2); 135 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 136 137 try { 138 getAdminBuilder.get() 139 .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT / 2, TimeUnit.MILLISECONDS).build() 140 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 141 fail("We expect an exception here"); 142 } catch (Exception e) { 143 // expected 144 } 145 146 try { 147 getAdminBuilder.get() 148 .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT * 2, TimeUnit.MILLISECONDS).build() 149 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 150 } catch (Exception e) { 151 fail("The Operation should succeed, unexpected exception: " + e.getMessage()); 152 } 153 } 154 155 @Test 156 public void testMaxRetries() throws Exception { 157 // set operation timeout to 300s to make sure that this test only be affected by retry number 158 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 300000); 159 TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 160 TestMaxRetriesCoprocessor.class.getName()); 161 TEST_UTIL.startMiniCluster(2); 162 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 163 164 try { 165 getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER / 2).build() 166 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 167 fail("We expect an exception here"); 168 } catch (Exception e) { 169 // expected 170 } 171 172 try { 173 getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER * 2).build() 174 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get(); 175 } catch (Exception e) { 176 fail("The Operation should succeed, unexpected exception: " + e.getMessage()); 177 } 178 } 179 180 public static class TestRpcTimeoutCoprocessor implements MasterCoprocessor, MasterObserver { 181 public TestRpcTimeoutCoprocessor() { 182 } 183 184 185 @Override 186 public Optional<MasterObserver> getMasterObserver() { 187 return Optional.of(this); 188 } 189 @Override 190 public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, 191 String namespace) throws IOException { 192 Threads.sleep(DEFAULT_RPC_TIMEOUT); 193 } 194 } 195 196 public static class TestOperationTimeoutCoprocessor implements MasterCoprocessor, MasterObserver { 197 AtomicLong sleepTime = new AtomicLong(0); 198 199 public TestOperationTimeoutCoprocessor() { 200 } 201 202 @Override 203 public Optional<MasterObserver> getMasterObserver() { 204 return Optional.of(this); 205 } 206 207 @Override 208 public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, 209 String namespace) throws IOException { 210 Threads.sleep(DEFAULT_RPC_TIMEOUT / 2); 211 if (sleepTime.addAndGet(DEFAULT_RPC_TIMEOUT / 2) < DEFAULT_OPERATION_TIMEOUT) { 212 throw new IOException("call fail"); 213 } 214 } 215 } 216 217 public static class TestMaxRetriesCoprocessor implements MasterCoprocessor, MasterObserver { 218 AtomicLong retryNum = new AtomicLong(0); 219 220 public TestMaxRetriesCoprocessor() { 221 } 222 223 @Override 224 public Optional<MasterObserver> getMasterObserver() { 225 return Optional.of(this); 226 } 227 228 @Override 229 public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, 230 String namespace) throws IOException { 231 if (retryNum.getAndIncrement() < DEFAULT_RETRIES_NUMBER) { 232 throw new IOException("call fail"); 233 } 234 } 235 } 236}