001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static junit.framework.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.CallQueueTooBigException; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HColumnDescriptor; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.HTableDescriptor; 036import org.apache.hadoop.hbase.MultiActionResultTooLarge; 037import org.apache.hadoop.hbase.NotServingRegionException; 038import org.apache.hadoop.hbase.RegionTooBusyException; 039import org.apache.hadoop.hbase.RetryImmediatelyException; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 042import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 043import org.apache.hadoop.hbase.quotas.RpcThrottlingException; 044import org.apache.hadoop.hbase.regionserver.HRegionServer; 045import org.apache.hadoop.hbase.regionserver.RSRpcServices; 046import org.apache.hadoop.hbase.testclassification.ClientTests; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.junit.AfterClass; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 058import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 059 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 063 064@Category({ MediumTests.class, ClientTests.class }) 065public class TestMetaCache { 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestMetaCache.class); 070 071 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 072 private static final TableName TABLE_NAME = TableName.valueOf("test_table"); 073 private static final byte[] FAMILY = Bytes.toBytes("fam1"); 074 private static final byte[] QUALIFIER = Bytes.toBytes("qual"); 075 private static HRegionServer badRS; 076 private static final Logger LOG = LoggerFactory.getLogger(TestMetaCache.class); 077 078 /** 079 * @throws java.lang.Exception 080 */ 081 @BeforeClass 082 public static void setUpBeforeClass() throws Exception { 083 Configuration conf = TEST_UTIL.getConfiguration(); 084 conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithFakeRpcServices.class.getName()); 085 TEST_UTIL.startMiniCluster(1); 086 TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); 087 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME.META_TABLE_NAME); 088 badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0); 089 assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices); 090 HTableDescriptor table = new HTableDescriptor(TABLE_NAME); 091 HColumnDescriptor fam = new HColumnDescriptor(FAMILY); 092 fam.setMaxVersions(2); 093 table.addFamily(fam); 094 TEST_UTIL.createTable(table, null); 095 } 096 097 /** 098 * @throws java.lang.Exception 099 */ 100 @AfterClass 101 public static void tearDownAfterClass() throws Exception { 102 TEST_UTIL.shutdownMiniCluster(); 103 } 104 105 @Test 106 public void testPreserveMetaCacheOnException() throws Exception { 107 ((FakeRSRpcServices) badRS.getRSRpcServices()) 108 .setExceptionInjector(new RoundRobinExceptionInjector()); 109 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 110 conf.set("hbase.client.retries.number", "1"); 111 ConnectionImplementation conn = 112 (ConnectionImplementation) ConnectionFactory.createConnection(conf); 113 try { 114 Table table = conn.getTable(TABLE_NAME); 115 byte[] row = Bytes.toBytes("row1"); 116 117 Put put = new Put(row); 118 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); 119 Get get = new Get(row); 120 Append append = new Append(row); 121 append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11)); 122 Increment increment = new Increment(row); 123 increment.addColumn(FAMILY, QUALIFIER, 10); 124 Delete delete = new Delete(row); 125 delete.addColumn(FAMILY, QUALIFIER); 126 RowMutations mutations = new RowMutations(row); 127 mutations.add(put); 128 mutations.add(delete); 129 130 Exception exp; 131 boolean success; 132 for (int i = 0; i < 50; i++) { 133 exp = null; 134 success = false; 135 try { 136 table.put(put); 137 // If at least one operation succeeded, we should have cached the region location. 138 success = true; 139 table.get(get); 140 table.append(append); 141 table.increment(increment); 142 table.delete(delete); 143 table.mutateRow(mutations); 144 } catch (IOException ex) { 145 // Only keep track of the last exception that updated the meta cache 146 if (ClientExceptionsUtil.isMetaClearingException(ex) || success) { 147 exp = ex; 148 } 149 } 150 // Do not test if we did not touch the meta cache in this iteration. 151 if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) { 152 assertNull(conn.getCachedLocation(TABLE_NAME, row)); 153 } else if (success) { 154 assertNotNull(conn.getCachedLocation(TABLE_NAME, row)); 155 } 156 } 157 } finally { 158 conn.close(); 159 } 160 } 161 162 @Test 163 public void testClearsCacheOnScanException() throws Exception { 164 ((FakeRSRpcServices) badRS.getRSRpcServices()) 165 .setExceptionInjector(new RoundRobinExceptionInjector()); 166 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 167 conf.set("hbase.client.retries.number", "1"); 168 169 try ( 170 ConnectionImplementation conn = 171 (ConnectionImplementation) ConnectionFactory.createConnection(conf); 172 Table table = conn.getTable(TABLE_NAME)) { 173 174 byte[] row = Bytes.toBytes("row2"); 175 176 Put put = new Put(row); 177 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); 178 179 Scan scan = new Scan(); 180 scan.withStartRow(row); 181 scan.setLimit(1); 182 scan.setCaching(1); 183 184 populateCache(table, row); 185 assertNotNull(conn.getCachedLocation(TABLE_NAME, row)); 186 assertTrue(executeUntilCacheClearingException(table, scan)); 187 assertNull(conn.getCachedLocation(TABLE_NAME, row)); 188 189 // repopulate cache so we can test with reverse scan too 190 populateCache(table, row); 191 assertNotNull(conn.getCachedLocation(TABLE_NAME, row)); 192 193 // run with reverse scan 194 scan.setReversed(true); 195 assertTrue(executeUntilCacheClearingException(table, scan)); 196 assertNull(conn.getCachedLocation(TABLE_NAME, row)); 197 } 198 } 199 200 private void populateCache(Table table, byte[] row) { 201 for (int i = 0; i < 50; i++) { 202 try { 203 table.get(new Get(row)); 204 return; 205 } catch (Exception e) { 206 // pass, we just want this to succeed so that region location will be cached 207 } 208 } 209 } 210 211 private boolean executeUntilCacheClearingException(Table table, Scan scan) { 212 for (int i = 0; i < 50; i++) { 213 try { 214 try (ResultScanner scanner = table.getScanner(scan)) { 215 scanner.next(); 216 } 217 } catch (Exception ex) { 218 // Only keep track of the last exception that updated the meta cache 219 if (ClientExceptionsUtil.isMetaClearingException(ex)) { 220 return true; 221 } 222 } 223 } 224 return false; 225 } 226 227 @Test 228 public void testCacheClearingOnCallQueueTooBig() throws Exception { 229 ((FakeRSRpcServices) badRS.getRSRpcServices()) 230 .setExceptionInjector(new CallQueueTooBigExceptionInjector()); 231 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 232 conf.set("hbase.client.retries.number", "2"); 233 conf.set(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, "true"); 234 ConnectionImplementation conn = 235 (ConnectionImplementation) ConnectionFactory.createConnection(conf); 236 try { 237 Table table = conn.getTable(TABLE_NAME); 238 byte[] row = Bytes.toBytes("row1"); 239 240 Put put = new Put(row); 241 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); 242 table.put(put); 243 244 // obtain the client metrics 245 MetricsConnection metrics = conn.getConnectionMetrics(); 246 long preGetRegionClears = metrics.metaCacheNumClearRegion.getCount(); 247 long preGetServerClears = metrics.metaCacheNumClearServer.getCount(); 248 249 // attempt a get on the test table 250 Get get = new Get(row); 251 try { 252 table.get(get); 253 fail("Expected CallQueueTooBigException"); 254 } catch (RetriesExhaustedException ree) { 255 // expected 256 } 257 258 // verify that no cache clearing took place 259 long postGetRegionClears = metrics.metaCacheNumClearRegion.getCount(); 260 long postGetServerClears = metrics.metaCacheNumClearServer.getCount(); 261 assertEquals(preGetRegionClears, postGetRegionClears); 262 assertEquals(preGetServerClears, postGetServerClears); 263 } finally { 264 conn.close(); 265 } 266 } 267 268 public static List<Throwable> metaCachePreservingExceptions() { 269 return new ArrayList<Throwable>() { 270 { 271 add(new RegionOpeningException(" ")); 272 add(new RegionTooBusyException("Some old message")); 273 add(new RpcThrottlingException(" ")); 274 add(new MultiActionResultTooLarge(" ")); 275 add(new RetryImmediatelyException(" ")); 276 add(new CallQueueTooBigException()); 277 } 278 }; 279 } 280 281 public static class RegionServerWithFakeRpcServices extends HRegionServer { 282 private FakeRSRpcServices rsRpcServices; 283 284 public RegionServerWithFakeRpcServices(Configuration conf) 285 throws IOException, InterruptedException { 286 super(conf); 287 } 288 289 @Override 290 protected RSRpcServices createRpcServices() throws IOException { 291 this.rsRpcServices = new FakeRSRpcServices(this); 292 return rsRpcServices; 293 } 294 295 public void setExceptionInjector(ExceptionInjector injector) { 296 rsRpcServices.setExceptionInjector(injector); 297 } 298 } 299 300 public static class FakeRSRpcServices extends RSRpcServices { 301 302 private ExceptionInjector exceptions; 303 304 public FakeRSRpcServices(HRegionServer rs) throws IOException { 305 super(rs); 306 exceptions = new RoundRobinExceptionInjector(); 307 } 308 309 public void setExceptionInjector(ExceptionInjector injector) { 310 this.exceptions = injector; 311 } 312 313 @Override 314 public GetResponse get(final RpcController controller, final ClientProtos.GetRequest request) 315 throws ServiceException { 316 exceptions.throwOnGet(this, request); 317 return super.get(controller, request); 318 } 319 320 @Override 321 public ClientProtos.MutateResponse mutate(final RpcController controller, 322 final ClientProtos.MutateRequest request) throws ServiceException { 323 exceptions.throwOnMutate(this, request); 324 return super.mutate(controller, request); 325 } 326 327 @Override 328 public ClientProtos.ScanResponse scan(final RpcController controller, 329 final ClientProtos.ScanRequest request) throws ServiceException { 330 exceptions.throwOnScan(this, request); 331 return super.scan(controller, request); 332 } 333 } 334 335 public static abstract class ExceptionInjector { 336 protected boolean isTestTable(FakeRSRpcServices rpcServices, 337 HBaseProtos.RegionSpecifier regionSpec) throws ServiceException { 338 try { 339 return TABLE_NAME 340 .equals(rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName()); 341 } catch (IOException ioe) { 342 throw new ServiceException(ioe); 343 } 344 } 345 346 public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 347 throws ServiceException; 348 349 public abstract void throwOnMutate(FakeRSRpcServices rpcServices, 350 ClientProtos.MutateRequest request) throws ServiceException; 351 352 public abstract void throwOnScan(FakeRSRpcServices rpcServices, 353 ClientProtos.ScanRequest request) throws ServiceException; 354 } 355 356 /** 357 * Rotates through the possible cache clearing and non-cache clearing exceptions for requests. 358 */ 359 public static class RoundRobinExceptionInjector extends ExceptionInjector { 360 private int numReqs = -1; 361 private int expCount = -1; 362 private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions(); 363 364 @Override 365 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 366 throws ServiceException { 367 throwSomeExceptions(rpcServices, request.getRegion()); 368 } 369 370 @Override 371 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) 372 throws ServiceException { 373 throwSomeExceptions(rpcServices, request.getRegion()); 374 } 375 376 @Override 377 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) 378 throws ServiceException { 379 if (!request.hasScannerId()) { 380 // only handle initial scan requests 381 throwSomeExceptions(rpcServices, request.getRegion()); 382 } 383 } 384 385 /** 386 * Throw some exceptions. Mostly throw exceptions which do not clear meta cache. Periodically 387 * throw NotSevingRegionException which clears the meta cache. n 388 */ 389 private void throwSomeExceptions(FakeRSRpcServices rpcServices, 390 HBaseProtos.RegionSpecifier regionSpec) throws ServiceException { 391 if (!isTestTable(rpcServices, regionSpec)) { 392 return; 393 } 394 395 numReqs++; 396 // Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw 397 // meta cache preserving exceptions otherwise. 398 if (numReqs % 5 == 0) { 399 return; 400 } else if (numReqs % 5 == 1 || numReqs % 5 == 2) { 401 throw new ServiceException(new NotServingRegionException()); 402 } 403 // Round robin between different special exceptions. 404 // This is not ideal since exception types are not tied to the operation performed here, 405 // But, we don't really care here if we throw MultiActionTooLargeException while doing 406 // single Gets. 407 expCount++; 408 Throwable t = 409 metaCachePreservingExceptions.get(expCount % metaCachePreservingExceptions.size()); 410 throw new ServiceException(t); 411 } 412 } 413 414 /** 415 * Throws CallQueueTooBigException for all gets. 416 */ 417 public static class CallQueueTooBigExceptionInjector extends ExceptionInjector { 418 @Override 419 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 420 throws ServiceException { 421 if (isTestTable(rpcServices, request.getRegion())) { 422 throw new ServiceException(new CallQueueTooBigException()); 423 } 424 } 425 426 @Override 427 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) 428 throws ServiceException { 429 } 430 431 @Override 432 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) 433 throws ServiceException { 434 } 435 } 436 437 @Test 438 public void testUserRegionLockThrowsException() throws IOException, InterruptedException { 439 ((FakeRSRpcServices) badRS.getRSRpcServices()).setExceptionInjector(new LockSleepInjector()); 440 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 441 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 442 conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000); 443 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 2000); 444 445 try (ConnectionImplementation conn = 446 (ConnectionImplementation) ConnectionFactory.createConnection(conf)) { 447 ClientThread client1 = new ClientThread(conn); 448 ClientThread client2 = new ClientThread(conn); 449 client1.start(); 450 client2.start(); 451 client1.join(); 452 client2.join(); 453 // One thread will get the lock but will sleep in LockExceptionInjector#throwOnScan and 454 // eventually fail since the sleep time is more than hbase client scanner timeout period. 455 // Other thread will wait to acquire userRegionLock. 456 // Have no idea which thread will be scheduled first. So need to check both threads. 457 458 // Both the threads will throw exception. One thread will throw exception since after 459 // acquiring user region lock, it is sleeping for 5 seconds when the scanner time out period 460 // is 2 seconds. 461 // Other thread will throw exception since it was not able to get hold of user region lock 462 // within meta operation timeout period. 463 assertNotNull(client1.getException()); 464 assertNotNull(client2.getException()); 465 466 assertTrue(client1.getException() instanceof LockTimeoutException 467 ^ client2.getException() instanceof LockTimeoutException); 468 } 469 } 470 471 private final class ClientThread extends Thread { 472 private Exception exception; 473 private ConnectionImplementation connection; 474 475 private ClientThread(ConnectionImplementation connection) { 476 this.connection = connection; 477 } 478 479 @Override 480 public void run() { 481 byte[] currentKey = HConstants.EMPTY_START_ROW; 482 try { 483 connection.getRegionLocation(TABLE_NAME, currentKey, true); 484 } catch (IOException e) { 485 LOG.error("Thread id: " + this.getId() + " exception: ", e); 486 this.exception = e; 487 } 488 } 489 490 public Exception getException() { 491 return exception; 492 } 493 } 494 495 public static class LockSleepInjector extends ExceptionInjector { 496 @Override 497 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) { 498 try { 499 Thread.sleep(5000); 500 } catch (InterruptedException e) { 501 LOG.info("Interrupted exception", e); 502 } 503 } 504 505 @Override 506 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) { 507 } 508 509 @Override 510 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) { 511 } 512 } 513}