001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion; 021import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.Random; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.ThreadLocalRandom; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicReference; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.HRegionInfo; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.TestMetaTableAccessor; 039import org.apache.hadoop.hbase.client.Consistency; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.RegionLocator; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.io.hfile.HFileScanner; 046import org.apache.hadoop.hbase.testclassification.LargeTests; 047import org.apache.hadoop.hbase.testclassification.RegionServerTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 051import org.apache.hadoop.hbase.util.Threads; 052import org.apache.hadoop.hdfs.DFSConfigKeys; 053import org.apache.hadoop.util.StringUtils; 054import org.junit.AfterClass; 055import org.junit.Assert; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 064 065import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 066import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 068 069/** 070 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole cluster. 071 * See {@link TestRegionServerNoMaster}. 072 */ 073@Category({ RegionServerTests.class, LargeTests.class }) 074public class TestRegionReplicas { 075 076 @ClassRule 077 public static final HBaseClassTestRule CLASS_RULE = 078 HBaseClassTestRule.forClass(TestRegionReplicas.class); 079 080 private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicas.class); 081 082 private static final int NB_SERVERS = 1; 083 private static Table table; 084 private static final byte[] row = Bytes.toBytes("TestRegionReplicas"); 085 086 private static HRegionInfo hriPrimary; 087 private static HRegionInfo hriSecondary; 088 089 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 090 private static final byte[] f = HConstants.CATALOG_FAMILY; 091 092 @BeforeClass 093 public static void before() throws Exception { 094 // Reduce the hdfs block size and prefetch to trigger the file-link reopen 095 // when the file is moved to archive (e.g. compaction) 096 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192); 097 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1); 098 HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024); 099 100 HTU.startMiniCluster(NB_SERVERS); 101 final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName()); 102 103 // Create table then get the single region for our new table. 104 table = HTU.createTable(tableName, f); 105 106 try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) { 107 hriPrimary = locator.getRegionLocation(row, false).getRegionInfo(); 108 } 109 110 // mock a secondary region info to open 111 hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(), 112 hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1); 113 114 // No master 115 TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU); 116 } 117 118 @AfterClass 119 public static void afterClass() throws Exception { 120 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; 121 table.close(); 122 HTU.shutdownMiniCluster(); 123 } 124 125 private HRegionServer getRS() { 126 return HTU.getMiniHBaseCluster().getRegionServer(0); 127 } 128 129 @Test 130 public void testOpenRegionReplica() throws Exception { 131 openRegion(HTU, getRS(), hriSecondary); 132 try { 133 // load some data to primary 134 HTU.loadNumericRows(table, f, 0, 1000); 135 136 // assert that we can read back from primary 137 Assert.assertEquals(1000, HTU.countRows(table)); 138 } finally { 139 HTU.deleteNumericRows(table, f, 0, 1000); 140 closeRegion(HTU, getRS(), hriSecondary); 141 } 142 } 143 144 /** Tests that the meta location is saved for secondary regions */ 145 @Test 146 public void testRegionReplicaUpdatesMetaLocation() throws Exception { 147 openRegion(HTU, getRS(), hriSecondary); 148 Table meta = null; 149 try { 150 meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME); 151 TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName(), 152 getRS().getServerName(), -1, 1, false); 153 } finally { 154 if (meta != null) { 155 meta.close(); 156 } 157 closeRegion(HTU, getRS(), hriSecondary); 158 } 159 } 160 161 @Test 162 public void testRegionReplicaGets() throws Exception { 163 try { 164 // load some data to primary 165 HTU.loadNumericRows(table, f, 0, 1000); 166 // assert that we can read back from primary 167 Assert.assertEquals(1000, HTU.countRows(table)); 168 // flush so that region replica can read 169 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 170 region.flush(true); 171 172 openRegion(HTU, getRS(), hriSecondary); 173 174 // first try directly against region 175 region = getRS().getRegion(hriSecondary.getEncodedName()); 176 assertGet(region, 42, true); 177 178 assertGetRpc(hriSecondary, 42, true); 179 } finally { 180 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 181 closeRegion(HTU, getRS(), hriSecondary); 182 } 183 } 184 185 @Test 186 public void testGetOnTargetRegionReplica() throws Exception { 187 try { 188 // load some data to primary 189 HTU.loadNumericRows(table, f, 0, 1000); 190 // assert that we can read back from primary 191 Assert.assertEquals(1000, HTU.countRows(table)); 192 // flush so that region replica can read 193 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 194 region.flush(true); 195 196 openRegion(HTU, getRS(), hriSecondary); 197 198 // try directly Get against region replica 199 byte[] row = Bytes.toBytes(String.valueOf(42)); 200 Get get = new Get(row); 201 get.setConsistency(Consistency.TIMELINE); 202 get.setReplicaId(1); 203 Result result = table.get(get); 204 Assert.assertArrayEquals(row, result.getValue(f, null)); 205 } finally { 206 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 207 closeRegion(HTU, getRS(), hriSecondary); 208 } 209 } 210 211 private void assertGet(Region region, int value, boolean expect) throws IOException { 212 byte[] row = Bytes.toBytes(String.valueOf(value)); 213 Get get = new Get(row); 214 Result result = region.get(get); 215 if (expect) { 216 Assert.assertArrayEquals(row, result.getValue(f, null)); 217 } else { 218 result.isEmpty(); 219 } 220 } 221 222 // build a mock rpc 223 private void assertGetRpc(HRegionInfo info, int value, boolean expect) 224 throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException { 225 byte[] row = Bytes.toBytes(String.valueOf(value)); 226 Get get = new Get(row); 227 ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get); 228 ClientProtos.GetResponse getResp = getRS().getRSRpcServices().get(null, getReq); 229 Result result = ProtobufUtil.toResult(getResp.getResult()); 230 if (expect) { 231 Assert.assertArrayEquals(row, result.getValue(f, null)); 232 } else { 233 result.isEmpty(); 234 } 235 } 236 237 private void restartRegionServer() throws Exception { 238 afterClass(); 239 before(); 240 } 241 242 @Test 243 public void testRefresStoreFiles() throws Exception { 244 // enable store file refreshing 245 final int refreshPeriod = 2000; // 2 sec 246 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100); 247 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 248 refreshPeriod); 249 // restart the region server so that it starts the refresher chore 250 restartRegionServer(); 251 252 try { 253 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); 254 openRegion(HTU, getRS(), hriSecondary); 255 256 // load some data to primary 257 LOG.info("Loading data to primary region"); 258 HTU.loadNumericRows(table, f, 0, 1000); 259 // assert that we can read back from primary 260 Assert.assertEquals(1000, HTU.countRows(table)); 261 // flush so that region replica can read 262 LOG.info("Flushing primary region"); 263 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 264 region.flush(true); 265 HRegion primaryRegion = region; 266 267 // ensure that chore is run 268 LOG.info("Sleeping for " + (4 * refreshPeriod)); 269 Threads.sleep(4 * refreshPeriod); 270 271 LOG.info("Checking results from secondary region replica"); 272 Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName()); 273 Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount()); 274 275 assertGet(secondaryRegion, 42, true); 276 assertGetRpc(hriSecondary, 42, true); 277 assertGetRpc(hriSecondary, 1042, false); 278 279 // load some data to primary 280 HTU.loadNumericRows(table, f, 1000, 1100); 281 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 282 region.flush(true); 283 284 HTU.loadNumericRows(table, f, 2000, 2100); 285 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 286 region.flush(true); 287 288 // ensure that chore is run 289 Threads.sleep(4 * refreshPeriod); 290 291 assertGetRpc(hriSecondary, 42, true); 292 assertGetRpc(hriSecondary, 1042, true); 293 assertGetRpc(hriSecondary, 2042, true); 294 295 // ensure that we see the 3 store files 296 Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount()); 297 298 // force compaction 299 HTU.compact(table.getName(), true); 300 301 long wakeUpTime = EnvironmentEdgeManager.currentTime() + 4 * refreshPeriod; 302 while (EnvironmentEdgeManager.currentTime() < wakeUpTime) { 303 assertGetRpc(hriSecondary, 42, true); 304 assertGetRpc(hriSecondary, 1042, true); 305 assertGetRpc(hriSecondary, 2042, true); 306 Threads.sleep(10); 307 } 308 309 // ensure that we see the compacted file only 310 // This will be 4 until the cleaner chore runs 311 Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount()); 312 313 } finally { 314 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 315 closeRegion(HTU, getRS(), hriSecondary); 316 } 317 } 318 319 @Test 320 public void testFlushAndCompactionsInPrimary() throws Exception { 321 322 long runtime = 30 * 1000; 323 // enable store file refreshing 324 final int refreshPeriod = 100; // 100ms refresh is a lot 325 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3); 326 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 327 refreshPeriod); 328 // restart the region server so that it starts the refresher chore 329 restartRegionServer(); 330 final int startKey = 0, endKey = 1000; 331 332 try { 333 openRegion(HTU, getRS(), hriSecondary); 334 335 // load some data to primary so that reader won't fail 336 HTU.loadNumericRows(table, f, startKey, endKey); 337 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary); 338 // ensure that chore is run 339 Threads.sleep(2 * refreshPeriod); 340 341 final AtomicBoolean running = new AtomicBoolean(true); 342 @SuppressWarnings("unchecked") 343 final AtomicReference<Exception>[] exceptions = new AtomicReference[3]; 344 for (int i = 0; i < exceptions.length; i++) { 345 exceptions[i] = new AtomicReference<>(); 346 } 347 348 Runnable writer = new Runnable() { 349 int key = startKey; 350 351 @Override 352 public void run() { 353 try { 354 while (running.get()) { 355 byte[] data = Bytes.toBytes(String.valueOf(key)); 356 Put put = new Put(data); 357 put.addColumn(f, null, data); 358 table.put(put); 359 key++; 360 if (key == endKey) { 361 key = startKey; 362 } 363 } 364 } catch (Exception ex) { 365 LOG.warn(ex.toString(), ex); 366 exceptions[0].compareAndSet(null, ex); 367 } 368 } 369 }; 370 371 Runnable flusherCompactor = new Runnable() { 372 Random random = ThreadLocalRandom.current(); 373 374 public void run() { 375 try { 376 while (running.get()) { 377 // flush or compact 378 if (random.nextBoolean()) { 379 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary); 380 } else { 381 HTU.compact(table.getName(), random.nextBoolean()); 382 } 383 } 384 } catch (Exception ex) { 385 LOG.warn(ex.toString(), ex); 386 exceptions[1].compareAndSet(null, ex); 387 } 388 } 389 }; 390 391 Runnable reader = new Runnable() { 392 @Override 393 public void run() { 394 try { 395 Random random = ThreadLocalRandom.current(); 396 while (running.get()) { 397 // whether to do a close and open 398 if (random.nextInt(10) == 0) { 399 try { 400 closeRegion(HTU, getRS(), hriSecondary); 401 } catch (Exception ex) { 402 LOG.warn("Failed closing the region " + hriSecondary + " " 403 + StringUtils.stringifyException(ex)); 404 exceptions[2].compareAndSet(null, ex); 405 } 406 try { 407 openRegion(HTU, getRS(), hriSecondary); 408 } catch (Exception ex) { 409 LOG.warn("Failed opening the region " + hriSecondary + " " 410 + StringUtils.stringifyException(ex)); 411 exceptions[2].compareAndSet(null, ex); 412 } 413 } 414 415 int key = random.nextInt(endKey - startKey) + startKey; 416 assertGetRpc(hriSecondary, key, true); 417 } 418 } catch (Exception ex) { 419 LOG.warn("Failed getting the value in the region " + hriSecondary + " " 420 + StringUtils.stringifyException(ex)); 421 exceptions[2].compareAndSet(null, ex); 422 } 423 } 424 }; 425 426 LOG.info("Starting writer and reader, secondary={}", hriSecondary.getEncodedName()); 427 ExecutorService executor = Executors.newFixedThreadPool(3); 428 executor.submit(writer); 429 executor.submit(flusherCompactor); 430 executor.submit(reader); 431 432 // wait for threads 433 Threads.sleep(runtime); 434 running.set(false); 435 executor.shutdown(); 436 executor.awaitTermination(30, TimeUnit.SECONDS); 437 438 for (AtomicReference<Exception> exRef : exceptions) { 439 Assert.assertNull(exRef.get()); 440 } 441 } finally { 442 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey); 443 try { 444 closeRegion(HTU, getRS(), hriSecondary); 445 } catch (ServiceException e) { 446 LOG.info("Closing wrong region {}", hriSecondary, e); 447 } 448 } 449 } 450 451 @Test 452 public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception { 453 // disable the store file refresh chore (we do this by hand) 454 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0); 455 restartRegionServer(); 456 457 try { 458 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); 459 openRegion(HTU, getRS(), hriSecondary); 460 461 // load some data to primary 462 LOG.info("Loading data to primary region"); 463 for (int i = 0; i < 3; ++i) { 464 HTU.loadNumericRows(table, f, i * 1000, (i + 1) * 1000); 465 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 466 region.flush(true); 467 } 468 469 HRegion primaryRegion = getRS().getRegion(hriPrimary.getEncodedName()); 470 Assert.assertEquals(3, primaryRegion.getStore(f).getStorefilesCount()); 471 472 // Refresh store files on the secondary 473 Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName()); 474 secondaryRegion.getStore(f).refreshStoreFiles(); 475 Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount()); 476 477 // force compaction 478 LOG.info("Force Major compaction on primary region " + hriPrimary); 479 primaryRegion.compact(true); 480 Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount()); 481 List<RegionServerThread> regionServerThreads = 482 HTU.getMiniHBaseCluster().getRegionServerThreads(); 483 HRegionServer hrs = null; 484 for (RegionServerThread rs : regionServerThreads) { 485 if ( 486 rs.getRegionServer().getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) 487 != null 488 ) { 489 hrs = rs.getRegionServer(); 490 break; 491 } 492 } 493 CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, hrs, false); 494 cleaner.chore(); 495 // scan all the hfiles on the secondary. 496 // since there are no read on the secondary when we ask locations to 497 // the NN a FileNotFound exception will be returned and the FileLink 498 // should be able to deal with it giving us all the result we expect. 499 int keys = 0; 500 int sum = 0; 501 for (HStoreFile sf : ((HStore) secondaryRegion.getStore(f)).getStorefiles()) { 502 // Our file does not exist anymore. was moved by the compaction above. 503 LOG.debug(Boolean.toString(getRS().getFileSystem().exists(sf.getPath()))); 504 Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath())); 505 506 HFileScanner scanner = sf.getReader().getScanner(false, false); 507 scanner.seekTo(); 508 do { 509 keys++; 510 511 Cell cell = scanner.getCell(); 512 sum += Integer 513 .parseInt(Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 514 } while (scanner.next()); 515 } 516 Assert.assertEquals(3000, keys); 517 Assert.assertEquals(4498500, sum); 518 } finally { 519 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 520 closeRegion(HTU, getRS(), hriSecondary); 521 } 522 } 523}