001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion; 020import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion; 021import java.io.IOException; 022import java.util.List; 023import java.util.Random; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.Executors; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.concurrent.atomic.AtomicReference; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HRegionInfo; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.TestMetaTableAccessor; 036import org.apache.hadoop.hbase.client.Consistency; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionLocator; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.io.hfile.HFileScanner; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.testclassification.RegionServerTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 047import org.apache.hadoop.hbase.util.Threads; 048import org.apache.hadoop.hdfs.DFSConfigKeys; 049import org.apache.hadoop.util.StringUtils; 050import org.junit.AfterClass; 051import org.junit.Assert; 052import org.junit.BeforeClass; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 059import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 060import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 062 063/** 064 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole 065 * cluster. See {@link TestRegionServerNoMaster}. 066 */ 067@Category({RegionServerTests.class, LargeTests.class}) 068public class TestRegionReplicas { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestRegionReplicas.class); 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicas.class); 075 076 private static final int NB_SERVERS = 1; 077 private static Table table; 078 private static final byte[] row = Bytes.toBytes("TestRegionReplicas"); 079 080 private static HRegionInfo hriPrimary; 081 private static HRegionInfo hriSecondary; 082 083 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 084 private static final byte[] f = HConstants.CATALOG_FAMILY; 085 086 @BeforeClass 087 public static void before() throws Exception { 088 // Reduce the hdfs block size and prefetch to trigger the file-link reopen 089 // when the file is moved to archive (e.g. compaction) 090 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192); 091 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1); 092 HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024); 093 094 HTU.startMiniCluster(NB_SERVERS); 095 final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName()); 096 097 // Create table then get the single region for our new table. 098 table = HTU.createTable(tableName, f); 099 100 try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) { 101 hriPrimary = locator.getRegionLocation(row, false).getRegionInfo(); 102 } 103 104 // mock a secondary region info to open 105 hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(), 106 hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1); 107 108 // No master 109 TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU); 110 } 111 112 @AfterClass 113 public static void afterClass() throws Exception { 114 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; 115 table.close(); 116 HTU.shutdownMiniCluster(); 117 } 118 119 private HRegionServer getRS() { 120 return HTU.getMiniHBaseCluster().getRegionServer(0); 121 } 122 123 @Test 124 public void testOpenRegionReplica() throws Exception { 125 openRegion(HTU, getRS(), hriSecondary); 126 try { 127 //load some data to primary 128 HTU.loadNumericRows(table, f, 0, 1000); 129 130 // assert that we can read back from primary 131 Assert.assertEquals(1000, HTU.countRows(table)); 132 } finally { 133 HTU.deleteNumericRows(table, f, 0, 1000); 134 closeRegion(HTU, getRS(), hriSecondary); 135 } 136 } 137 138 /** Tests that the meta location is saved for secondary regions */ 139 @Test 140 public void testRegionReplicaUpdatesMetaLocation() throws Exception { 141 openRegion(HTU, getRS(), hriSecondary); 142 Table meta = null; 143 try { 144 meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME); 145 TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName() 146 , getRS().getServerName(), -1, 1, false); 147 } finally { 148 if (meta != null) { 149 meta.close(); 150 } 151 closeRegion(HTU, getRS(), hriSecondary); 152 } 153 } 154 155 @Test 156 public void testRegionReplicaGets() throws Exception { 157 try { 158 //load some data to primary 159 HTU.loadNumericRows(table, f, 0, 1000); 160 // assert that we can read back from primary 161 Assert.assertEquals(1000, HTU.countRows(table)); 162 // flush so that region replica can read 163 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 164 region.flush(true); 165 166 openRegion(HTU, getRS(), hriSecondary); 167 168 // first try directly against region 169 region = getRS().getRegion(hriSecondary.getEncodedName()); 170 assertGet(region, 42, true); 171 172 assertGetRpc(hriSecondary, 42, true); 173 } finally { 174 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 175 closeRegion(HTU, getRS(), hriSecondary); 176 } 177 } 178 179 @Test 180 public void testGetOnTargetRegionReplica() throws Exception { 181 try { 182 //load some data to primary 183 HTU.loadNumericRows(table, f, 0, 1000); 184 // assert that we can read back from primary 185 Assert.assertEquals(1000, HTU.countRows(table)); 186 // flush so that region replica can read 187 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 188 region.flush(true); 189 190 openRegion(HTU, getRS(), hriSecondary); 191 192 // try directly Get against region replica 193 byte[] row = Bytes.toBytes(String.valueOf(42)); 194 Get get = new Get(row); 195 get.setConsistency(Consistency.TIMELINE); 196 get.setReplicaId(1); 197 Result result = table.get(get); 198 Assert.assertArrayEquals(row, result.getValue(f, null)); 199 } finally { 200 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 201 closeRegion(HTU, getRS(), hriSecondary); 202 } 203 } 204 205 private void assertGet(Region region, int value, boolean expect) throws IOException { 206 byte[] row = Bytes.toBytes(String.valueOf(value)); 207 Get get = new Get(row); 208 Result result = region.get(get); 209 if (expect) { 210 Assert.assertArrayEquals(row, result.getValue(f, null)); 211 } else { 212 result.isEmpty(); 213 } 214 } 215 216 // build a mock rpc 217 private void assertGetRpc(HRegionInfo info, int value, boolean expect) 218 throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException { 219 byte[] row = Bytes.toBytes(String.valueOf(value)); 220 Get get = new Get(row); 221 ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get); 222 ClientProtos.GetResponse getResp = getRS().getRSRpcServices().get(null, getReq); 223 Result result = ProtobufUtil.toResult(getResp.getResult()); 224 if (expect) { 225 Assert.assertArrayEquals(row, result.getValue(f, null)); 226 } else { 227 result.isEmpty(); 228 } 229 } 230 231 private void restartRegionServer() throws Exception { 232 afterClass(); 233 before(); 234 } 235 236 @Test 237 public void testRefresStoreFiles() throws Exception { 238 // enable store file refreshing 239 final int refreshPeriod = 2000; // 2 sec 240 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100); 241 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 242 refreshPeriod); 243 // restart the region server so that it starts the refresher chore 244 restartRegionServer(); 245 246 try { 247 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); 248 openRegion(HTU, getRS(), hriSecondary); 249 250 //load some data to primary 251 LOG.info("Loading data to primary region"); 252 HTU.loadNumericRows(table, f, 0, 1000); 253 // assert that we can read back from primary 254 Assert.assertEquals(1000, HTU.countRows(table)); 255 // flush so that region replica can read 256 LOG.info("Flushing primary region"); 257 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 258 region.flush(true); 259 HRegion primaryRegion = region; 260 261 // ensure that chore is run 262 LOG.info("Sleeping for " + (4 * refreshPeriod)); 263 Threads.sleep(4 * refreshPeriod); 264 265 LOG.info("Checking results from secondary region replica"); 266 Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName()); 267 Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount()); 268 269 assertGet(secondaryRegion, 42, true); 270 assertGetRpc(hriSecondary, 42, true); 271 assertGetRpc(hriSecondary, 1042, false); 272 273 //load some data to primary 274 HTU.loadNumericRows(table, f, 1000, 1100); 275 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 276 region.flush(true); 277 278 HTU.loadNumericRows(table, f, 2000, 2100); 279 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 280 region.flush(true); 281 282 // ensure that chore is run 283 Threads.sleep(4 * refreshPeriod); 284 285 assertGetRpc(hriSecondary, 42, true); 286 assertGetRpc(hriSecondary, 1042, true); 287 assertGetRpc(hriSecondary, 2042, true); 288 289 // ensure that we see the 3 store files 290 Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount()); 291 292 // force compaction 293 HTU.compact(table.getName(), true); 294 295 long wakeUpTime = System.currentTimeMillis() + 4 * refreshPeriod; 296 while (System.currentTimeMillis() < wakeUpTime) { 297 assertGetRpc(hriSecondary, 42, true); 298 assertGetRpc(hriSecondary, 1042, true); 299 assertGetRpc(hriSecondary, 2042, true); 300 Threads.sleep(10); 301 } 302 303 // ensure that we see the compacted file only 304 // This will be 4 until the cleaner chore runs 305 Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount()); 306 307 } finally { 308 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 309 closeRegion(HTU, getRS(), hriSecondary); 310 } 311 } 312 313 @Test 314 public void testFlushAndCompactionsInPrimary() throws Exception { 315 316 long runtime = 30 * 1000; 317 // enable store file refreshing 318 final int refreshPeriod = 100; // 100ms refresh is a lot 319 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3); 320 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 321 refreshPeriod); 322 // restart the region server so that it starts the refresher chore 323 restartRegionServer(); 324 final int startKey = 0, endKey = 1000; 325 326 try { 327 openRegion(HTU, getRS(), hriSecondary); 328 329 //load some data to primary so that reader won't fail 330 HTU.loadNumericRows(table, f, startKey, endKey); 331 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary); 332 // ensure that chore is run 333 Threads.sleep(2 * refreshPeriod); 334 335 final AtomicBoolean running = new AtomicBoolean(true); 336 @SuppressWarnings("unchecked") 337 final AtomicReference<Exception>[] exceptions = new AtomicReference[3]; 338 for (int i=0; i < exceptions.length; i++) { 339 exceptions[i] = new AtomicReference<>(); 340 } 341 342 Runnable writer = new Runnable() { 343 int key = startKey; 344 @Override 345 public void run() { 346 try { 347 while (running.get()) { 348 byte[] data = Bytes.toBytes(String.valueOf(key)); 349 Put put = new Put(data); 350 put.addColumn(f, null, data); 351 table.put(put); 352 key++; 353 if (key == endKey) { 354 key = startKey; 355 } 356 } 357 } catch (Exception ex) { 358 LOG.warn(ex.toString(), ex); 359 exceptions[0].compareAndSet(null, ex); 360 } 361 } 362 }; 363 364 Runnable flusherCompactor = new Runnable() { 365 Random random = new Random(); 366 @Override 367 public void run() { 368 try { 369 while (running.get()) { 370 // flush or compact 371 if (random.nextBoolean()) { 372 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary); 373 } else { 374 HTU.compact(table.getName(), random.nextBoolean()); 375 } 376 } 377 } catch (Exception ex) { 378 LOG.warn(ex.toString(), ex); 379 exceptions[1].compareAndSet(null, ex); 380 } 381 } 382 }; 383 384 Runnable reader = new Runnable() { 385 Random random = new Random(); 386 @Override 387 public void run() { 388 try { 389 while (running.get()) { 390 // whether to do a close and open 391 if (random.nextInt(10) == 0) { 392 try { 393 closeRegion(HTU, getRS(), hriSecondary); 394 } catch (Exception ex) { 395 LOG.warn("Failed closing the region " + hriSecondary + " " + 396 StringUtils.stringifyException(ex)); 397 exceptions[2].compareAndSet(null, ex); 398 } 399 try { 400 openRegion(HTU, getRS(), hriSecondary); 401 } catch (Exception ex) { 402 LOG.warn("Failed opening the region " + hriSecondary + " " + 403 StringUtils.stringifyException(ex)); 404 exceptions[2].compareAndSet(null, ex); 405 } 406 } 407 408 int key = random.nextInt(endKey - startKey) + startKey; 409 assertGetRpc(hriSecondary, key, true); 410 } 411 } catch (Exception ex) { 412 LOG.warn("Failed getting the value in the region " + hriSecondary + " " + 413 StringUtils.stringifyException(ex)); 414 exceptions[2].compareAndSet(null, ex); 415 } 416 } 417 }; 418 419 LOG.info("Starting writer and reader, secondary={}", hriSecondary.getEncodedName()); 420 ExecutorService executor = Executors.newFixedThreadPool(3); 421 executor.submit(writer); 422 executor.submit(flusherCompactor); 423 executor.submit(reader); 424 425 // wait for threads 426 Threads.sleep(runtime); 427 running.set(false); 428 executor.shutdown(); 429 executor.awaitTermination(30, TimeUnit.SECONDS); 430 431 for (AtomicReference<Exception> exRef : exceptions) { 432 Assert.assertNull(exRef.get()); 433 } 434 } finally { 435 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey); 436 try { 437 closeRegion(HTU, getRS(), hriSecondary); 438 } catch (ServiceException e) { 439 LOG.info("Closing wrong region {}", hriSecondary, e); 440 } 441 } 442 } 443 444 @Test 445 public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception { 446 // disable the store file refresh chore (we do this by hand) 447 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0); 448 restartRegionServer(); 449 450 try { 451 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); 452 openRegion(HTU, getRS(), hriSecondary); 453 454 // load some data to primary 455 LOG.info("Loading data to primary region"); 456 for (int i = 0; i < 3; ++i) { 457 HTU.loadNumericRows(table, f, i * 1000, (i + 1) * 1000); 458 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 459 region.flush(true); 460 } 461 462 HRegion primaryRegion = getRS().getRegion(hriPrimary.getEncodedName()); 463 Assert.assertEquals(3, primaryRegion.getStore(f).getStorefilesCount()); 464 465 // Refresh store files on the secondary 466 Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName()); 467 secondaryRegion.getStore(f).refreshStoreFiles(); 468 Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount()); 469 470 // force compaction 471 LOG.info("Force Major compaction on primary region " + hriPrimary); 472 primaryRegion.compact(true); 473 Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount()); 474 List<RegionServerThread> regionServerThreads = HTU.getMiniHBaseCluster() 475 .getRegionServerThreads(); 476 HRegionServer hrs = null; 477 for (RegionServerThread rs : regionServerThreads) { 478 if (rs.getRegionServer() 479 .getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) != null) { 480 hrs = rs.getRegionServer(); 481 break; 482 } 483 } 484 CompactedHFilesDischarger cleaner = 485 new CompactedHFilesDischarger(100, null, hrs, false); 486 cleaner.chore(); 487 // scan all the hfiles on the secondary. 488 // since there are no read on the secondary when we ask locations to 489 // the NN a FileNotFound exception will be returned and the FileLink 490 // should be able to deal with it giving us all the result we expect. 491 int keys = 0; 492 int sum = 0; 493 for (HStoreFile sf : ((HStore) secondaryRegion.getStore(f)).getStorefiles()) { 494 // Our file does not exist anymore. was moved by the compaction above. 495 LOG.debug(Boolean.toString(getRS().getFileSystem().exists(sf.getPath()))); 496 Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath())); 497 498 HFileScanner scanner = sf.getReader().getScanner(false, false); 499 scanner.seekTo(); 500 do { 501 keys++; 502 503 Cell cell = scanner.getCell(); 504 sum += Integer.parseInt(Bytes.toString(cell.getRowArray(), 505 cell.getRowOffset(), cell.getRowLength())); 506 } while (scanner.next()); 507 } 508 Assert.assertEquals(3000, keys); 509 Assert.assertEquals(4498500, sum); 510 } finally { 511 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 512 closeRegion(HTU, getRS(), hriSecondary); 513 } 514 } 515}