001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion; 021import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion; 022import static org.junit.jupiter.api.Assertions.assertArrayEquals; 023import static org.junit.jupiter.api.Assertions.assertEquals; 024import static org.junit.jupiter.api.Assertions.assertFalse; 025import static org.junit.jupiter.api.Assertions.assertNull; 026 027import java.io.IOException; 028import java.util.List; 029import java.util.Random; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.ThreadLocalRandom; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicReference; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.TestMetaTableAccessor; 042import org.apache.hadoop.hbase.client.Consistency; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.client.RegionLocator; 047import org.apache.hadoop.hbase.client.RegionReplicaUtil; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.testclassification.RegionServerTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 055import org.apache.hadoop.hbase.util.Threads; 056import org.apache.hadoop.hdfs.DFSConfigKeys; 057import org.apache.hadoop.util.StringUtils; 058import org.junit.jupiter.api.AfterAll; 059import org.junit.jupiter.api.BeforeAll; 060import org.junit.jupiter.api.Tag; 061import org.junit.jupiter.api.Test; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 066 067import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 068import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 070 071/** 072 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole cluster. 073 * See {@link TestRegionServerNoMaster}. 074 */ 075@Tag(RegionServerTests.TAG) 076@Tag(LargeTests.TAG) 077public class TestRegionReplicas { 078 079 private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicas.class); 080 081 private static final int NB_SERVERS = 1; 082 private static Table table; 083 private static final byte[] row = Bytes.toBytes("TestRegionReplicas"); 084 085 private static RegionInfo hriPrimary; 086 private static RegionInfo hriSecondary; 087 088 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 089 private static final byte[] f = HConstants.CATALOG_FAMILY; 090 091 @BeforeAll 092 public static void before() throws Exception { 093 // Reduce the hdfs block size and prefetch to trigger the file-link reopen 094 // when the file is moved to archive (e.g. compaction) 095 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192); 096 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1); 097 HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024); 098 099 HTU.startMiniCluster(NB_SERVERS); 100 final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName()); 101 102 // Create table then get the single region for our new table. 103 table = HTU.createTable(tableName, f); 104 105 try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) { 106 hriPrimary = locator.getRegionLocation(row, false).getRegion(); 107 } 108 109 // mock a secondary region info to open 110 hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1); 111 112 // No master 113 TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU); 114 } 115 116 @AfterAll 117 public static void afterClass() throws Exception { 118 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; 119 table.close(); 120 HTU.shutdownMiniCluster(); 121 } 122 123 private HRegionServer getRS() { 124 return HTU.getMiniHBaseCluster().getRegionServer(0); 125 } 126 127 @Test 128 public void testOpenRegionReplica() throws Exception { 129 openRegion(HTU, getRS(), hriSecondary); 130 try { 131 // load some data to primary 132 HTU.loadNumericRows(table, f, 0, 1000); 133 134 // assert that we can read back from primary 135 assertEquals(1000, HBaseTestingUtil.countRows(table)); 136 } finally { 137 HTU.deleteNumericRows(table, f, 0, 1000); 138 closeRegion(HTU, getRS(), hriSecondary); 139 } 140 } 141 142 /** Tests that the meta location is saved for secondary regions */ 143 @Test 144 public void testRegionReplicaUpdatesMetaLocation() throws Exception { 145 openRegion(HTU, getRS(), hriSecondary); 146 Table meta = null; 147 try { 148 meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME); 149 TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName(), 150 getRS().getServerName(), -1, 1, false); 151 } finally { 152 if (meta != null) { 153 meta.close(); 154 } 155 closeRegion(HTU, getRS(), hriSecondary); 156 } 157 } 158 159 @Test 160 public void testRegionReplicaGets() throws Exception { 161 try { 162 // load some data to primary 163 HTU.loadNumericRows(table, f, 0, 1000); 164 // assert that we can read back from primary 165 assertEquals(1000, HBaseTestingUtil.countRows(table)); 166 // flush so that region replica can read 167 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 168 region.flush(true); 169 170 openRegion(HTU, getRS(), hriSecondary); 171 172 // first try directly against region 173 region = getRS().getRegion(hriSecondary.getEncodedName()); 174 assertGet(region, 42, true); 175 176 assertGetRpc(hriSecondary, 42, true); 177 } finally { 178 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 179 closeRegion(HTU, getRS(), hriSecondary); 180 } 181 } 182 183 @Test 184 public void testGetOnTargetRegionReplica() throws Exception { 185 try { 186 // load some data to primary 187 HTU.loadNumericRows(table, f, 0, 1000); 188 // assert that we can read back from primary 189 assertEquals(1000, HBaseTestingUtil.countRows(table)); 190 // flush so that region replica can read 191 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 192 region.flush(true); 193 194 openRegion(HTU, getRS(), hriSecondary); 195 196 // try directly Get against region replica 197 byte[] row = Bytes.toBytes(String.valueOf(42)); 198 Get get = new Get(row); 199 get.setConsistency(Consistency.TIMELINE); 200 get.setReplicaId(1); 201 Result result = table.get(get); 202 assertArrayEquals(row, result.getValue(f, null)); 203 } finally { 204 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 205 closeRegion(HTU, getRS(), hriSecondary); 206 } 207 } 208 209 private void assertGet(Region region, int value, boolean expect) throws IOException { 210 byte[] row = Bytes.toBytes(String.valueOf(value)); 211 Get get = new Get(row); 212 Result result = region.get(get); 213 if (expect) { 214 assertArrayEquals(row, result.getValue(f, null)); 215 } else { 216 result.isEmpty(); 217 } 218 } 219 220 // build a mock rpc 221 private void assertGetRpc(RegionInfo info, int value, boolean expect) 222 throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException { 223 byte[] row = Bytes.toBytes(String.valueOf(value)); 224 Get get = new Get(row); 225 ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get); 226 ClientProtos.GetResponse getResp = getRS().getRSRpcServices().get(null, getReq); 227 Result result = ProtobufUtil.toResult(getResp.getResult()); 228 if (expect) { 229 assertArrayEquals(row, result.getValue(f, null)); 230 } else { 231 result.isEmpty(); 232 } 233 } 234 235 private void restartRegionServer() throws Exception { 236 afterClass(); 237 before(); 238 } 239 240 @Test 241 public void testRefresStoreFiles() throws Exception { 242 // enable store file refreshing 243 final int refreshPeriod = 2000; // 2 sec 244 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100); 245 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 246 refreshPeriod); 247 // restart the region server so that it starts the refresher chore 248 restartRegionServer(); 249 250 try { 251 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); 252 openRegion(HTU, getRS(), hriSecondary); 253 254 // load some data to primary 255 LOG.info("Loading data to primary region"); 256 HTU.loadNumericRows(table, f, 0, 1000); 257 // assert that we can read back from primary 258 assertEquals(1000, HBaseTestingUtil.countRows(table)); 259 // flush so that region replica can read 260 LOG.info("Flushing primary region"); 261 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 262 region.flush(true); 263 264 // ensure that chore is run 265 LOG.info("Sleeping for " + (4 * refreshPeriod)); 266 Threads.sleep(4 * refreshPeriod); 267 268 LOG.info("Checking results from secondary region replica"); 269 Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName()); 270 assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount()); 271 272 assertGet(secondaryRegion, 42, true); 273 assertGetRpc(hriSecondary, 42, true); 274 assertGetRpc(hriSecondary, 1042, false); 275 276 // load some data to primary 277 HTU.loadNumericRows(table, f, 1000, 1100); 278 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 279 region.flush(true); 280 281 HTU.loadNumericRows(table, f, 2000, 2100); 282 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 283 region.flush(true); 284 285 // ensure that chore is run 286 Threads.sleep(4 * refreshPeriod); 287 288 assertGetRpc(hriSecondary, 42, true); 289 assertGetRpc(hriSecondary, 1042, true); 290 assertGetRpc(hriSecondary, 2042, true); 291 292 // ensure that we see the 3 store files 293 assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount()); 294 295 // force compaction 296 HTU.compact(table.getName(), true); 297 298 long wakeUpTime = EnvironmentEdgeManager.currentTime() + 4 * refreshPeriod; 299 while (EnvironmentEdgeManager.currentTime() < wakeUpTime) { 300 assertGetRpc(hriSecondary, 42, true); 301 assertGetRpc(hriSecondary, 1042, true); 302 assertGetRpc(hriSecondary, 2042, true); 303 Threads.sleep(10); 304 } 305 306 // ensure that we see the compacted file only 307 // This will be 4 until the cleaner chore runs 308 assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount()); 309 310 } finally { 311 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 312 closeRegion(HTU, getRS(), hriSecondary); 313 } 314 } 315 316 @Test 317 public void testFlushAndCompactionsInPrimary() throws Exception { 318 319 long runtime = 30 * 1000; 320 // enable store file refreshing 321 final int refreshPeriod = 100; // 100ms refresh is a lot 322 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3); 323 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 324 refreshPeriod); 325 // restart the region server so that it starts the refresher chore 326 restartRegionServer(); 327 final int startKey = 0, endKey = 1000; 328 329 try { 330 openRegion(HTU, getRS(), hriSecondary); 331 332 // load some data to primary so that reader won't fail 333 HTU.loadNumericRows(table, f, startKey, endKey); 334 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary); 335 // ensure that chore is run 336 Threads.sleep(2 * refreshPeriod); 337 338 final AtomicBoolean running = new AtomicBoolean(true); 339 @SuppressWarnings("unchecked") 340 final AtomicReference<Exception>[] exceptions = new AtomicReference[3]; 341 for (int i = 0; i < exceptions.length; i++) { 342 exceptions[i] = new AtomicReference<>(); 343 } 344 345 Runnable writer = new Runnable() { 346 int key = startKey; 347 348 @Override 349 public void run() { 350 try { 351 while (running.get()) { 352 byte[] data = Bytes.toBytes(String.valueOf(key)); 353 Put put = new Put(data); 354 put.addColumn(f, null, data); 355 table.put(put); 356 key++; 357 if (key == endKey) { 358 key = startKey; 359 } 360 } 361 } catch (Exception ex) { 362 LOG.warn(ex.toString(), ex); 363 exceptions[0].compareAndSet(null, ex); 364 } 365 } 366 }; 367 368 Runnable flusherCompactor = new Runnable() { 369 Random random = ThreadLocalRandom.current(); 370 371 public void run() { 372 try { 373 while (running.get()) { 374 // flush or compact 375 if (random.nextBoolean()) { 376 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary); 377 } else { 378 HTU.compact(table.getName(), random.nextBoolean()); 379 } 380 } 381 } catch (Exception ex) { 382 LOG.warn(ex.toString(), ex); 383 exceptions[1].compareAndSet(null, ex); 384 } 385 } 386 }; 387 388 Runnable reader = new Runnable() { 389 @Override 390 public void run() { 391 try { 392 Random random = ThreadLocalRandom.current(); 393 while (running.get()) { 394 // whether to do a close and open 395 if (random.nextInt(10) == 0) { 396 try { 397 closeRegion(HTU, getRS(), hriSecondary); 398 } catch (Exception ex) { 399 LOG.warn("Failed closing the region " + hriSecondary + " " 400 + StringUtils.stringifyException(ex)); 401 exceptions[2].compareAndSet(null, ex); 402 } 403 try { 404 openRegion(HTU, getRS(), hriSecondary); 405 } catch (Exception ex) { 406 LOG.warn("Failed opening the region " + hriSecondary + " " 407 + StringUtils.stringifyException(ex)); 408 exceptions[2].compareAndSet(null, ex); 409 } 410 } 411 412 int key = random.nextInt(endKey - startKey) + startKey; 413 assertGetRpc(hriSecondary, key, true); 414 } 415 } catch (Exception ex) { 416 LOG.warn("Failed getting the value in the region " + hriSecondary + " " 417 + StringUtils.stringifyException(ex)); 418 exceptions[2].compareAndSet(null, ex); 419 } 420 } 421 }; 422 423 LOG.info("Starting writer and reader, secondary={}", hriSecondary.getEncodedName()); 424 ExecutorService executor = Executors.newFixedThreadPool(3); 425 executor.submit(writer); 426 executor.submit(flusherCompactor); 427 executor.submit(reader); 428 429 // wait for threads 430 Threads.sleep(runtime); 431 running.set(false); 432 executor.shutdown(); 433 executor.awaitTermination(30, TimeUnit.SECONDS); 434 435 for (AtomicReference<Exception> exRef : exceptions) { 436 assertNull(exRef.get()); 437 } 438 } finally { 439 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey); 440 try { 441 closeRegion(HTU, getRS(), hriSecondary); 442 } catch (ServiceException e) { 443 LOG.info("Closing wrong region {}", hriSecondary, e); 444 } 445 } 446 } 447 448 @Test 449 public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception { 450 // disable the store file refresh chore (we do this by hand) 451 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0); 452 restartRegionServer(); 453 454 try { 455 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); 456 openRegion(HTU, getRS(), hriSecondary); 457 458 // load some data to primary 459 LOG.info("Loading data to primary region"); 460 for (int i = 0; i < 3; ++i) { 461 HTU.loadNumericRows(table, f, i * 1000, (i + 1) * 1000); 462 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 463 region.flush(true); 464 } 465 466 HRegion primaryRegion = getRS().getRegion(hriPrimary.getEncodedName()); 467 assertEquals(3, primaryRegion.getStore(f).getStorefilesCount()); 468 469 // Refresh store files on the secondary 470 Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName()); 471 secondaryRegion.getStore(f).refreshStoreFiles(); 472 assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount()); 473 474 // force compaction 475 LOG.info("Force Major compaction on primary region " + hriPrimary); 476 primaryRegion.compact(true); 477 assertEquals(1, primaryRegion.getStore(f).getStorefilesCount()); 478 List<RegionServerThread> regionServerThreads = 479 HTU.getMiniHBaseCluster().getRegionServerThreads(); 480 HRegionServer hrs = null; 481 for (RegionServerThread rs : regionServerThreads) { 482 if ( 483 rs.getRegionServer().getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) 484 != null 485 ) { 486 hrs = rs.getRegionServer(); 487 break; 488 } 489 } 490 CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, hrs, false); 491 cleaner.chore(); 492 // scan all the hfiles on the secondary. 493 // since there are no read on the secondary when we ask locations to 494 // the NN a FileNotFound exception will be returned and the FileLink 495 // should be able to deal with it giving us all the result we expect. 496 int keys = 0; 497 int sum = 0; 498 for (HStoreFile sf : ((HStore) secondaryRegion.getStore(f)).getStorefiles()) { 499 // Our file does not exist anymore. was moved by the compaction above. 500 LOG.debug(Boolean.toString(getRS().getFileSystem().exists(sf.getPath()))); 501 assertFalse(getRS().getFileSystem().exists(sf.getPath())); 502 sf.initReader(); 503 try (StoreFileScanner scanner = sf.getPreadScanner(false, Long.MAX_VALUE, 0, false)) { 504 scanner.seek(KeyValue.LOWESTKEY); 505 for (Cell cell;;) { 506 cell = scanner.next(); 507 if (cell == null) { 508 break; 509 } 510 keys++; 511 sum += Integer.parseInt( 512 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 513 } 514 } 515 } 516 assertEquals(3000, keys); 517 assertEquals(4498500, sum); 518 } finally { 519 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 520 closeRegion(HTU, getRS(), hriSecondary); 521 } 522 } 523}