001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022import static org.junit.jupiter.api.Assertions.assertNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.io.IOException; 027import java.util.Arrays; 028import java.util.List; 029import java.util.concurrent.TimeUnit; 030import java.util.stream.Collectors; 031import java.util.stream.IntStream; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.CallQueueTooBigException; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.MultiActionResultTooLarge; 037import org.apache.hadoop.hbase.NotServingRegionException; 038import org.apache.hadoop.hbase.RegionTooBusyException; 039import org.apache.hadoop.hbase.RetryImmediatelyException; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 042import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 043import org.apache.hadoop.hbase.quotas.RpcThrottlingException; 044import org.apache.hadoop.hbase.regionserver.HRegionServer; 045import org.apache.hadoop.hbase.regionserver.RSRpcServices; 046import org.apache.hadoop.hbase.testclassification.ClientTests; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.junit.jupiter.api.AfterAll; 050import org.junit.jupiter.api.AfterEach; 051import org.junit.jupiter.api.BeforeAll; 052import org.junit.jupiter.api.Tag; 053import org.junit.jupiter.api.Test; 054import org.junit.jupiter.api.function.Executable; 055 056import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 057import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 058import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 059 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 063 064@Tag(MediumTests.TAG) 065@Tag(ClientTests.TAG) 066public class TestMetaCache { 067 068 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 069 private static final TableName TABLE_NAME = TableName.valueOf("test_table"); 070 private static final byte[] FAMILY = Bytes.toBytes("fam1"); 071 private static final byte[] QUALIFIER = Bytes.toBytes("qual"); 072 073 private static HRegionServer badRS; 074 075 private Connection conn; 076 private MetricsConnection metrics; 077 private AsyncRegionLocator locator; 078 079 @BeforeAll 080 public static void setUpBeforeClass() throws Exception { 081 Configuration conf = TEST_UTIL.getConfiguration(); 082 conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithFakeRpcServices.class.getName()); 083 TEST_UTIL.startMiniCluster(1); 084 TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); 085 TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); 086 badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0); 087 assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices); 088 TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME) 089 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(2).build()) 090 .build(); 091 TEST_UTIL.createTable(desc, null); 092 } 093 094 @AfterAll 095 public static void tearDownAfterClass() throws Exception { 096 TEST_UTIL.shutdownMiniCluster(); 097 } 098 099 @AfterEach 100 public void tearDown() throws IOException { 101 Closeables.close(conn, true); 102 } 103 104 private void setupConnection(int retry) throws IOException { 105 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 106 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retry); 107 conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 108 conn = ConnectionFactory.createConnection(conf); 109 AsyncConnectionImpl asyncConn = (AsyncConnectionImpl) conn.toAsyncConnection(); 110 locator = asyncConn.getLocator(); 111 metrics = asyncConn.getConnectionMetrics().get(); 112 } 113 114 /** 115 * Test that our cleanOverlappingRegions doesn't incorrectly remove regions from cache. Originally 116 * encountered when using floorEntry rather than lowerEntry. 117 */ 118 @Test 119 public void testAddToCacheReverse() throws IOException, InterruptedException { 120 setupConnection(1); 121 TableName tableName = TableName.valueOf("testAddToCache"); 122 byte[] family = Bytes.toBytes("CF"); 123 TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) 124 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); 125 int maxSplits = 10; 126 List<byte[]> splits = 127 IntStream.range(1, maxSplits).mapToObj(Bytes::toBytes).collect(Collectors.toList()); 128 129 TEST_UTIL.getAdmin().createTable(td, splits.toArray(new byte[0][])); 130 TEST_UTIL.waitTableAvailable(tableName); 131 TEST_UTIL.waitUntilNoRegionsInTransition(); 132 133 assertEquals(splits.size() + 1, TEST_UTIL.getAdmin().getRegions(tableName).size()); 134 135 RegionLocator locatorForTable = conn.getRegionLocator(tableName); 136 for (int i = maxSplits; i >= 0; i--) { 137 locatorForTable.getRegionLocation(Bytes.toBytes(i)); 138 } 139 140 for (int i = 0; i < maxSplits; i++) { 141 assertNotNull(locator.getRegionLocationInCache(tableName, Bytes.toBytes(i))); 142 } 143 } 144 145 @Test 146 public void testMergeEmptyWithMetaCache() throws Throwable { 147 TableName tableName = TableName.valueOf("testMergeEmptyWithMetaCache"); 148 byte[] family = Bytes.toBytes("CF"); 149 TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) 150 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); 151 TEST_UTIL.getAdmin().createTable(td, new byte[][] { Bytes.toBytes(2), Bytes.toBytes(5) }); 152 TEST_UTIL.waitTableAvailable(tableName); 153 TEST_UTIL.waitUntilNoRegionsInTransition(); 154 RegionInfo regionA = null; 155 RegionInfo regionB = null; 156 RegionInfo regionC = null; 157 for (RegionInfo region : TEST_UTIL.getAdmin().getRegions(tableName)) { 158 if (region.getStartKey().length == 0) { 159 regionA = region; 160 } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(2))) { 161 regionB = region; 162 } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(5))) { 163 regionC = region; 164 } 165 } 166 167 assertNotNull(regionA); 168 assertNotNull(regionB); 169 assertNotNull(regionC); 170 171 TEST_UTIL.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, 172 true); 173 try (AsyncConnection asyncConn = 174 ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { 175 AsyncConnectionImpl asyncConnImpl = (AsyncConnectionImpl) asyncConn; 176 177 MetricsConnection asyncMetrics = asyncConnImpl.getConnectionMetrics().get(); 178 179 // warm meta cache 180 asyncConn.getRegionLocator(tableName).getAllRegionLocations().get(); 181 182 assertEquals(3, TEST_UTIL.getAdmin().getRegions(tableName).size()); 183 184 // Merge the 3 regions into one 185 TEST_UTIL.getAdmin().mergeRegionsAsync( 186 new byte[][] { regionA.getRegionName(), regionB.getRegionName(), regionC.getRegionName() }, 187 false).get(30, TimeUnit.SECONDS); 188 189 assertEquals(1, TEST_UTIL.getAdmin().getRegions(tableName).size()); 190 191 AsyncTable<?> asyncTable = asyncConn.getTable(tableName); 192 193 // This request should cause us to cache the newly merged region. 194 // As part of caching that region, it should clear out any cached merge parent regions which 195 // are overlapped by the new region. That way, subsequent calls below won't fall into the 196 // bug in HBASE-27650. Otherwise, a request for row 6 would always get stuck on cached 197 // regionB and we'd continue to see cache misses below. 198 assertTrue( 199 executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(), asyncMetrics) 200 > 0); 201 202 // We verify no new cache misses here due to above, which proves we've fixed up the cache 203 assertEquals(0, executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(), 204 asyncMetrics)); 205 } 206 } 207 208 private long executeAndGetNewMisses(Executable runnable, MetricsConnection metrics) 209 throws Throwable { 210 long lastVal = metrics.getMetaCacheMisses(); 211 runnable.execute(); 212 long curVal = metrics.getMetaCacheMisses(); 213 return curVal - lastVal; 214 } 215 216 @Test 217 public void testPreserveMetaCacheOnException() throws Exception { 218 ((FakeRSRpcServices) badRS.getRSRpcServices()) 219 .setExceptionInjector(new RoundRobinExceptionInjector()); 220 setupConnection(1); 221 try (Table table = conn.getTable(TABLE_NAME)) { 222 byte[] row = Bytes.toBytes("row1"); 223 224 Put put = new Put(row); 225 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); 226 Get get = new Get(row); 227 Append append = new Append(row); 228 append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11)); 229 Increment increment = new Increment(row); 230 increment.addColumn(FAMILY, QUALIFIER, 10); 231 Delete delete = new Delete(row); 232 delete.addColumn(FAMILY, QUALIFIER); 233 RowMutations mutations = new RowMutations(row); 234 mutations.add(put); 235 mutations.add(delete); 236 237 Exception exp; 238 boolean success; 239 for (int i = 0; i < 50; i++) { 240 exp = null; 241 success = false; 242 try { 243 table.put(put); 244 // If at least one operation succeeded, we should have cached the region location. 245 success = true; 246 table.get(get); 247 table.append(append); 248 table.increment(increment); 249 table.delete(delete); 250 table.mutateRow(mutations); 251 } catch (IOException ex) { 252 // Only keep track of the last exception that updated the meta cache 253 if (ClientExceptionsUtil.isMetaClearingException(ex) || success) { 254 exp = ex; 255 } 256 } 257 // Do not test if we did not touch the meta cache in this iteration. 258 if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) { 259 assertNull(locator.getRegionLocationInCache(TABLE_NAME, row)); 260 } else if (success) { 261 assertNotNull(locator.getRegionLocationInCache(TABLE_NAME, row)); 262 } 263 } 264 } 265 } 266 267 @Test 268 public void testCacheClearingOnCallQueueTooBig() throws Exception { 269 ((FakeRSRpcServices) badRS.getRSRpcServices()) 270 .setExceptionInjector(new CallQueueTooBigExceptionInjector()); 271 setupConnection(2); 272 Table table = conn.getTable(TABLE_NAME); 273 byte[] row = Bytes.toBytes("row1"); 274 275 Put put = new Put(row); 276 put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); 277 table.put(put); 278 279 // obtain the client metrics 280 long preGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount(); 281 long preGetServerClears = metrics.getMetaCacheNumClearServer().getCount(); 282 283 // attempt a get on the test table 284 Get get = new Get(row); 285 try { 286 table.get(get); 287 fail("Expected CallQueueTooBigException"); 288 } catch (RetriesExhaustedException ree) { 289 // expected 290 } 291 292 // verify that no cache clearing took place 293 long postGetRegionClears = metrics.getMetaCacheNumClearRegion().getCount(); 294 long postGetServerClears = metrics.getMetaCacheNumClearServer().getCount(); 295 assertEquals(preGetRegionClears, postGetRegionClears); 296 assertEquals(preGetServerClears, postGetServerClears); 297 } 298 299 public static List<Throwable> metaCachePreservingExceptions() { 300 return Arrays.asList(new RegionOpeningException(" "), 301 new RegionTooBusyException("Some old message"), new RpcThrottlingException(" "), 302 new MultiActionResultTooLarge(" "), new RetryImmediatelyException(" "), 303 new CallQueueTooBigException()); 304 } 305 306 public static class RegionServerWithFakeRpcServices extends HRegionServer { 307 private FakeRSRpcServices rsRpcServices; 308 309 public RegionServerWithFakeRpcServices(Configuration conf) 310 throws IOException, InterruptedException { 311 super(conf); 312 } 313 314 @Override 315 protected RSRpcServices createRpcServices() throws IOException { 316 this.rsRpcServices = new FakeRSRpcServices(this); 317 return rsRpcServices; 318 } 319 320 public void setExceptionInjector(ExceptionInjector injector) { 321 rsRpcServices.setExceptionInjector(injector); 322 } 323 } 324 325 public static class FakeRSRpcServices extends RSRpcServices { 326 327 private ExceptionInjector exceptions; 328 329 public FakeRSRpcServices(HRegionServer rs) throws IOException { 330 super(rs); 331 exceptions = new RoundRobinExceptionInjector(); 332 } 333 334 public void setExceptionInjector(ExceptionInjector injector) { 335 this.exceptions = injector; 336 } 337 338 @Override 339 public GetResponse get(final RpcController controller, final ClientProtos.GetRequest request) 340 throws ServiceException { 341 exceptions.throwOnGet(this, request); 342 return super.get(controller, request); 343 } 344 345 @Override 346 public ClientProtos.MutateResponse mutate(final RpcController controller, 347 final ClientProtos.MutateRequest request) throws ServiceException { 348 exceptions.throwOnMutate(this, request); 349 return super.mutate(controller, request); 350 } 351 352 @Override 353 public ClientProtos.ScanResponse scan(final RpcController controller, 354 final ClientProtos.ScanRequest request) throws ServiceException { 355 exceptions.throwOnScan(this, request); 356 return super.scan(controller, request); 357 } 358 } 359 360 public static abstract class ExceptionInjector { 361 protected boolean isTestTable(FakeRSRpcServices rpcServices, 362 HBaseProtos.RegionSpecifier regionSpec) throws ServiceException { 363 try { 364 return TABLE_NAME 365 .equals(rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName()); 366 } catch (IOException ioe) { 367 throw new ServiceException(ioe); 368 } 369 } 370 371 public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 372 throws ServiceException; 373 374 public abstract void throwOnMutate(FakeRSRpcServices rpcServices, 375 ClientProtos.MutateRequest request) throws ServiceException; 376 377 public abstract void throwOnScan(FakeRSRpcServices rpcServices, 378 ClientProtos.ScanRequest request) throws ServiceException; 379 } 380 381 /** 382 * Rotates through the possible cache clearing and non-cache clearing exceptions for requests. 383 */ 384 public static class RoundRobinExceptionInjector extends ExceptionInjector { 385 private int numReqs = -1; 386 private int expCount = -1; 387 private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions(); 388 389 @Override 390 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 391 throws ServiceException { 392 throwSomeExceptions(rpcServices, request.getRegion()); 393 } 394 395 @Override 396 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) 397 throws ServiceException { 398 throwSomeExceptions(rpcServices, request.getRegion()); 399 } 400 401 @Override 402 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) 403 throws ServiceException { 404 if (!request.hasScannerId()) { 405 // only handle initial scan requests 406 throwSomeExceptions(rpcServices, request.getRegion()); 407 } 408 } 409 410 /** 411 * Throw some exceptions. Mostly throw exceptions which do not clear meta cache. Periodically 412 * throw NotSevingRegionException which clears the meta cache. 413 */ 414 private void throwSomeExceptions(FakeRSRpcServices rpcServices, 415 HBaseProtos.RegionSpecifier regionSpec) throws ServiceException { 416 if (!isTestTable(rpcServices, regionSpec)) { 417 return; 418 } 419 420 numReqs++; 421 // Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw 422 // meta cache preserving exceptions otherwise. 423 if (numReqs % 5 == 0) { 424 return; 425 } else if (numReqs % 5 == 1 || numReqs % 5 == 2) { 426 throw new ServiceException(new NotServingRegionException()); 427 } 428 // Round robin between different special exceptions. 429 // This is not ideal since exception types are not tied to the operation performed here, 430 // But, we don't really care here if we throw MultiActionTooLargeException while doing 431 // single Gets. 432 expCount++; 433 Throwable t = 434 metaCachePreservingExceptions.get(expCount % metaCachePreservingExceptions.size()); 435 throw new ServiceException(t); 436 } 437 } 438 439 /** 440 * Throws CallQueueTooBigException for all gets. 441 */ 442 public static class CallQueueTooBigExceptionInjector extends ExceptionInjector { 443 @Override 444 public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) 445 throws ServiceException { 446 if (isTestTable(rpcServices, request.getRegion())) { 447 throw new ServiceException(new CallQueueTooBigException()); 448 } 449 } 450 451 @Override 452 public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) 453 throws ServiceException { 454 } 455 456 @Override 457 public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) 458 throws ServiceException { 459 } 460 } 461}