001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static junit.framework.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.CallQueueTooBigException; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HColumnDescriptor; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.HTableDescriptor; 036import org.apache.hadoop.hbase.MultiActionResultTooLarge; 037import org.apache.hadoop.hbase.NotServingRegionException; 038import org.apache.hadoop.hbase.RegionTooBusyException; 039import org.apache.hadoop.hbase.RetryImmediatelyException; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 042import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 043import org.apache.hadoop.hbase.quotas.RpcThrottlingException; 044import org.apache.hadoop.hbase.regionserver.HRegionServer; 045import org.apache.hadoop.hbase.regionserver.RSRpcServices; 046import org.apache.hadoop.hbase.testclassification.ClientTests; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.junit.AfterClass; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054 055import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 057 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064@Category({MediumTests.class, ClientTests.class}) 065public class TestMetaCache { 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestMetaCache.class); 070 071 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 072 private static final TableName TABLE_NAME = TableName.valueOf("test_table"); 073 private static final byte[] FAMILY = Bytes.toBytes("fam1"); 074 private static final byte[] QUALIFIER = Bytes.toBytes("qual"); 075 private static HRegionServer badRS; 076 private static final Logger LOG = LoggerFactory.getLogger(TestMetaCache.class); 077 078 /** 079 * @throws java.lang.Exception 080 */ 081 @BeforeClass 082 public static void setUpBeforeClass() throws Exception { 083 Configuration conf = TEST_UTIL.getConfiguration(); 084 conf.setStrings(HConstants.REGION_SERVER_IMPL, 085 RegionServerWithFakeRpcServices.class.getName()); 086 TEST_UTIL.startMiniCluster(1); 087 TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); 088 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME.META_TABLE_NAME); 089 badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0); 090 assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices); 091 HTableDescriptor table = new HTableDescriptor(TABLE_NAME); 092 HColumnDescriptor fam = new HColumnDescriptor(FAMILY); 093 fam.setMaxVersions(2); 094 table.addFamily(fam); 095 TEST_UTIL.createTable(table, null); 096 } 097 098 099 /** 100 * @throws java.lang.Exception 101 */ 102 @AfterClass 103 public static void tearDownAfterClass() throws Exception { 104 TEST_UTIL.shutdownMiniCluster(); 105 } 106 107 @Test 108 public void testPreserveMetaCacheOnException() throws Exception { 109 ((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector( 110 new RoundRobinExceptionInjector()); 111 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 112 conf.set("hbase.client.retries.number", "1"); 113 ConnectionImplementation conn = 114 (ConnectionImplementation) ConnectionFactory.createConnection(conf); 115 try { 116 Table table = conn.getTable(TABLE_NAME); 117 byte[] row = Bytes.toBytes("row1"); 118 119 Put put = new Put(row); 120 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); 121 Get get = new Get(row); 122 Append append = new Append(row); 123 append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11)); 124 Increment increment = new Increment(row); 125 increment.addColumn(FAMILY, QUALIFIER, 10); 126 Delete delete = new Delete(row); 127 delete.addColumn(FAMILY, QUALIFIER); 128 RowMutations mutations = new RowMutations(row); 129 mutations.add(put); 130 mutations.add(delete); 131 132 Exception exp; 133 boolean success; 134 for (int i = 0; i < 50; i++) { 135 exp = null; 136 success = false; 137 try { 138 table.put(put); 139 // If at least one operation succeeded, we should have cached the region location. 140 success = true; 141 table.get(get); 142 table.append(append); 143 table.increment(increment); 144 table.delete(delete); 145 table.mutateRow(mutations); 146 } catch (IOException ex) { 147 // Only keep track of the last exception that updated the meta cache 148 if (ClientExceptionsUtil.isMetaClearingException(ex) || success) { 149 exp = ex; 150 } 151 } 152 // Do not test if we did not touch the meta cache in this iteration. 153 if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) { 154 assertNull(conn.getCachedLocation(TABLE_NAME, row)); 155 } else if (success) { 156 assertNotNull(conn.getCachedLocation(TABLE_NAME, row)); 157 } 158 } 159 } finally { 160 conn.close(); 161 } 162 } 163 164 @Test 165 public void testCacheClearingOnCallQueueTooBig() throws Exception { 166 ((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector( 167 new CallQueueTooBigExceptionInjector()); 168 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 169 conf.set("hbase.client.retries.number", "2"); 170 conf.set(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, "true"); 171 ConnectionImplementation conn = 172 (ConnectionImplementation) ConnectionFactory.createConnection(conf); 173 try { 174 Table table = conn.getTable(TABLE_NAME); 175 byte[] row = Bytes.toBytes("row1"); 176 177 Put put = new Put(row); 178 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); 179 table.put(put); 180 181 // obtain the client metrics 182 MetricsConnection metrics = conn.getConnectionMetrics(); 183 long preGetRegionClears = metrics.metaCacheNumClearRegion.getCount(); 184 long preGetServerClears = metrics.metaCacheNumClearServer.getCount(); 185 186 // attempt a get on the test table 187 Get get = new Get(row); 188 try { 189 table.get(get); 190 fail("Expected CallQueueTooBigException"); 191 } catch (RetriesExhaustedException ree) { 192 // expected 193 } 194 195 // verify that no cache clearing took place 196 long postGetRegionClears = metrics.metaCacheNumClearRegion.getCount(); 197 long postGetServerClears = metrics.metaCacheNumClearServer.getCount(); 198 assertEquals(preGetRegionClears, postGetRegionClears); 199 assertEquals(preGetServerClears, postGetServerClears); 200 } finally { 201 conn.close(); 202 } 203 } 204 205 public static List<Throwable> metaCachePreservingExceptions() { 206 return new ArrayList<Throwable>() {{ 207 add(new RegionOpeningException(" ")); 208 add(new RegionTooBusyException("Some old message")); 209 add(new RpcThrottlingException(" ")); 210 add(new MultiActionResultTooLarge(" ")); 211 add(new RetryImmediatelyException(" ")); 212 add(new CallQueueTooBigException()); 213 }}; 214 } 215 216 public static class RegionServerWithFakeRpcServices extends HRegionServer { 217 private FakeRSRpcServices rsRpcServices; 218 219 public RegionServerWithFakeRpcServices(Configuration conf) 220 throws IOException, InterruptedException { 221 super(conf); 222 } 223 224 @Override 225 protected RSRpcServices createRpcServices() throws IOException { 226 this.rsRpcServices = new FakeRSRpcServices(this); 227 return rsRpcServices; 228 } 229 230 public void setExceptionInjector(ExceptionInjector injector) { 231 rsRpcServices.setExceptionInjector(injector); 232 } 233 } 234 235 public static class FakeRSRpcServices extends RSRpcServices { 236 237 private ExceptionInjector exceptions; 238 239 public FakeRSRpcServices(HRegionServer rs) throws IOException { 240 super(rs); 241 exceptions = new RoundRobinExceptionInjector(); 242 } 243 244 public void setExceptionInjector(ExceptionInjector injector) { 245 this.exceptions = injector; 246 } 247 248 @Override 249 public GetResponse get(final RpcController controller, 250 final ClientProtos.GetRequest request) throws ServiceException { 251 exceptions.throwOnGet(this, request); 252 return super.get(controller, request); 253 } 254 255 @Override 256 public ClientProtos.MutateResponse mutate(final RpcController controller, 257 final ClientProtos.MutateRequest request) throws ServiceException { 258 exceptions.throwOnMutate(this, request); 259 return super.mutate(controller, request); 260 } 261 262 @Override 263 public ClientProtos.ScanResponse scan(final RpcController controller, 264 final ClientProtos.ScanRequest request) throws ServiceException { 265 exceptions.throwOnScan(this, request); 266 return super.scan(controller, request); 267 } 268 } 269 270 public static abstract class ExceptionInjector { 271 protected boolean isTestTable(FakeRSRpcServices rpcServices, 272 HBaseProtos.RegionSpecifier regionSpec) throws ServiceException { 273 try { 274 return TABLE_NAME.equals( 275 rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName()); 276 } catch (IOException ioe) { 277 throw new ServiceException(ioe); 278 } 279 } 280 281 public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 282 throws ServiceException; 283 284 public abstract void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) 285 throws ServiceException; 286 287 public abstract void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) 288 throws ServiceException; 289 } 290 291 /** 292 * Rotates through the possible cache clearing and non-cache clearing exceptions 293 * for requests. 294 */ 295 public static class RoundRobinExceptionInjector extends ExceptionInjector { 296 private int numReqs = -1; 297 private int expCount = -1; 298 private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions(); 299 300 @Override 301 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 302 throws ServiceException { 303 throwSomeExceptions(rpcServices, request.getRegion()); 304 } 305 306 @Override 307 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) 308 throws ServiceException { 309 throwSomeExceptions(rpcServices, request.getRegion()); 310 } 311 312 @Override 313 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) 314 throws ServiceException { 315 if (!request.hasScannerId()) { 316 // only handle initial scan requests 317 throwSomeExceptions(rpcServices, request.getRegion()); 318 } 319 } 320 321 /** 322 * Throw some exceptions. Mostly throw exceptions which do not clear meta cache. 323 * Periodically throw NotSevingRegionException which clears the meta cache. 324 * @throws ServiceException 325 */ 326 private void throwSomeExceptions(FakeRSRpcServices rpcServices, 327 HBaseProtos.RegionSpecifier regionSpec) 328 throws ServiceException { 329 if (!isTestTable(rpcServices, regionSpec)) { 330 return; 331 } 332 333 numReqs++; 334 // Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw 335 // meta cache preserving exceptions otherwise. 336 if (numReqs % 5 ==0) { 337 return; 338 } else if (numReqs % 5 == 1 || numReqs % 5 == 2) { 339 throw new ServiceException(new NotServingRegionException()); 340 } 341 // Round robin between different special exceptions. 342 // This is not ideal since exception types are not tied to the operation performed here, 343 // But, we don't really care here if we throw MultiActionTooLargeException while doing 344 // single Gets. 345 expCount++; 346 Throwable t = metaCachePreservingExceptions.get( 347 expCount % metaCachePreservingExceptions.size()); 348 throw new ServiceException(t); 349 } 350 } 351 352 /** 353 * Throws CallQueueTooBigException for all gets. 354 */ 355 public static class CallQueueTooBigExceptionInjector extends ExceptionInjector { 356 @Override 357 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 358 throws ServiceException { 359 if (isTestTable(rpcServices, request.getRegion())) { 360 throw new ServiceException(new CallQueueTooBigException()); 361 } 362 } 363 364 @Override 365 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) 366 throws ServiceException { 367 } 368 369 @Override 370 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) 371 throws ServiceException { 372 } 373 } 374 375 @Test 376 public void testUserRegionLockThrowsException() throws IOException, InterruptedException { 377 ((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(new LockSleepInjector()); 378 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 379 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 380 conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000); 381 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 2000); 382 383 try (ConnectionImplementation conn = 384 (ConnectionImplementation) ConnectionFactory.createConnection(conf)) { 385 ClientThread client1 = new ClientThread(conn); 386 ClientThread client2 = new ClientThread(conn); 387 client1.start(); 388 client2.start(); 389 client1.join(); 390 client2.join(); 391 // One thread will get the lock but will sleep in LockExceptionInjector#throwOnScan and 392 // eventually fail since the sleep time is more than hbase client scanner timeout period. 393 // Other thread will wait to acquire userRegionLock. 394 // Have no idea which thread will be scheduled first. So need to check both threads. 395 396 // Both the threads will throw exception. One thread will throw exception since after 397 // acquiring user region lock, it is sleeping for 5 seconds when the scanner time out period 398 // is 2 seconds. 399 // Other thread will throw exception since it was not able to get hold of user region lock 400 // within meta operation timeout period. 401 assertNotNull(client1.getException()); 402 assertNotNull(client2.getException()); 403 404 assertTrue(client1.getException() instanceof LockTimeoutException 405 ^ client2.getException() instanceof LockTimeoutException); 406 } 407 } 408 409 private final class ClientThread extends Thread { 410 private Exception exception; 411 private ConnectionImplementation connection; 412 413 private ClientThread(ConnectionImplementation connection) { 414 this.connection = connection; 415 } 416 @Override 417 public void run() { 418 byte[] currentKey = HConstants.EMPTY_START_ROW; 419 try { 420 connection.getRegionLocation(TABLE_NAME, currentKey, true); 421 } catch (IOException e) { 422 LOG.error("Thread id: " + this.getId() + " exception: ", e); 423 this.exception = e; 424 } 425 } 426 public Exception getException() { 427 return exception; 428 } 429 } 430 431 public static class LockSleepInjector extends ExceptionInjector { 432 @Override 433 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) { 434 try { 435 Thread.sleep(5000); 436 } catch (InterruptedException e) { 437 LOG.info("Interrupted exception", e); 438 } 439 } 440 441 @Override 442 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) { } 443 444 @Override 445 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) { } 446 } 447}