001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion; 021import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.Random; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.atomic.AtomicReference; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.HRegionInfo; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.TestMetaTableAccessor; 038import org.apache.hadoop.hbase.client.Consistency; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.RegionLocator; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.io.hfile.HFileScanner; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.testclassification.RegionServerTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 049import org.apache.hadoop.hbase.util.Threads; 050import org.apache.hadoop.hdfs.DFSConfigKeys; 051import org.apache.hadoop.util.StringUtils; 052import org.junit.AfterClass; 053import org.junit.Assert; 054import org.junit.BeforeClass; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 062import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 064 065/** 066 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole 067 * cluster. See {@link TestRegionServerNoMaster}. 068 */ 069@Category({RegionServerTests.class, MediumTests.class}) 070public class TestRegionReplicas { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestRegionReplicas.class); 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicas.class); 077 078 private static final int NB_SERVERS = 1; 079 private static Table table; 080 private static final byte[] row = Bytes.toBytes("TestRegionReplicas"); 081 082 private static HRegionInfo hriPrimary; 083 private static HRegionInfo hriSecondary; 084 085 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 086 private static final byte[] f = HConstants.CATALOG_FAMILY; 087 088 @BeforeClass 089 public static void before() throws Exception { 090 // Reduce the hdfs block size and prefetch to trigger the file-link reopen 091 // when the file is moved to archive (e.g. compaction) 092 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192); 093 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1); 094 HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024); 095 096 HTU.startMiniCluster(NB_SERVERS); 097 final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName()); 098 099 // Create table then get the single region for our new table. 100 table = HTU.createTable(tableName, f); 101 102 try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) { 103 hriPrimary = locator.getRegionLocation(row, false).getRegionInfo(); 104 } 105 106 // mock a secondary region info to open 107 hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(), 108 hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1); 109 110 // No master 111 TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU); 112 } 113 114 @AfterClass 115 public static void afterClass() throws Exception { 116 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; 117 table.close(); 118 HTU.shutdownMiniCluster(); 119 } 120 121 private HRegionServer getRS() { 122 return HTU.getMiniHBaseCluster().getRegionServer(0); 123 } 124 125 @Test 126 public void testOpenRegionReplica() throws Exception { 127 openRegion(HTU, getRS(), hriSecondary); 128 try { 129 //load some data to primary 130 HTU.loadNumericRows(table, f, 0, 1000); 131 132 // assert that we can read back from primary 133 Assert.assertEquals(1000, HTU.countRows(table)); 134 } finally { 135 HTU.deleteNumericRows(table, f, 0, 1000); 136 closeRegion(HTU, getRS(), hriSecondary); 137 } 138 } 139 140 /** Tests that the meta location is saved for secondary regions */ 141 @Test 142 public void testRegionReplicaUpdatesMetaLocation() throws Exception { 143 openRegion(HTU, getRS(), hriSecondary); 144 Table meta = null; 145 try { 146 meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME); 147 TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName() 148 , getRS().getServerName(), -1, 1, false); 149 } finally { 150 if (meta != null ) meta.close(); 151 closeRegion(HTU, getRS(), hriSecondary); 152 } 153 } 154 155 @Test 156 public void testRegionReplicaGets() throws Exception { 157 try { 158 //load some data to primary 159 HTU.loadNumericRows(table, f, 0, 1000); 160 // assert that we can read back from primary 161 Assert.assertEquals(1000, HTU.countRows(table)); 162 // flush so that region replica can read 163 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 164 region.flush(true); 165 166 openRegion(HTU, getRS(), hriSecondary); 167 168 // first try directly against region 169 region = getRS().getRegion(hriSecondary.getEncodedName()); 170 assertGet(region, 42, true); 171 172 assertGetRpc(hriSecondary, 42, true); 173 } finally { 174 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 175 closeRegion(HTU, getRS(), hriSecondary); 176 } 177 } 178 179 @Test 180 public void testGetOnTargetRegionReplica() throws Exception { 181 try { 182 //load some data to primary 183 HTU.loadNumericRows(table, f, 0, 1000); 184 // assert that we can read back from primary 185 Assert.assertEquals(1000, HTU.countRows(table)); 186 // flush so that region replica can read 187 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 188 region.flush(true); 189 190 openRegion(HTU, getRS(), hriSecondary); 191 192 // try directly Get against region replica 193 byte[] row = Bytes.toBytes(String.valueOf(42)); 194 Get get = new Get(row); 195 get.setConsistency(Consistency.TIMELINE); 196 get.setReplicaId(1); 197 Result result = table.get(get); 198 Assert.assertArrayEquals(row, result.getValue(f, null)); 199 } finally { 200 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 201 closeRegion(HTU, getRS(), hriSecondary); 202 } 203 } 204 205 private void assertGet(Region region, int value, boolean expect) throws IOException { 206 byte[] row = Bytes.toBytes(String.valueOf(value)); 207 Get get = new Get(row); 208 Result result = region.get(get); 209 if (expect) { 210 Assert.assertArrayEquals(row, result.getValue(f, null)); 211 } else { 212 result.isEmpty(); 213 } 214 } 215 216 // build a mock rpc 217 private void assertGetRpc(HRegionInfo info, int value, boolean expect) 218 throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException { 219 byte[] row = Bytes.toBytes(String.valueOf(value)); 220 Get get = new Get(row); 221 ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get); 222 ClientProtos.GetResponse getResp = getRS().getRSRpcServices().get(null, getReq); 223 Result result = ProtobufUtil.toResult(getResp.getResult()); 224 if (expect) { 225 Assert.assertArrayEquals(row, result.getValue(f, null)); 226 } else { 227 result.isEmpty(); 228 } 229 } 230 231 private void restartRegionServer() throws Exception { 232 afterClass(); 233 before(); 234 } 235 236 @Test 237 public void testRefresStoreFiles() throws Exception { 238 // enable store file refreshing 239 final int refreshPeriod = 2000; // 2 sec 240 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100); 241 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 242 refreshPeriod); 243 // restart the region server so that it starts the refresher chore 244 restartRegionServer(); 245 246 try { 247 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); 248 openRegion(HTU, getRS(), hriSecondary); 249 250 //load some data to primary 251 LOG.info("Loading data to primary region"); 252 HTU.loadNumericRows(table, f, 0, 1000); 253 // assert that we can read back from primary 254 Assert.assertEquals(1000, HTU.countRows(table)); 255 // flush so that region replica can read 256 LOG.info("Flushing primary region"); 257 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 258 region.flush(true); 259 HRegion primaryRegion = region; 260 261 // ensure that chore is run 262 LOG.info("Sleeping for " + (4 * refreshPeriod)); 263 Threads.sleep(4 * refreshPeriod); 264 265 LOG.info("Checking results from secondary region replica"); 266 Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName()); 267 Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount()); 268 269 assertGet(secondaryRegion, 42, true); 270 assertGetRpc(hriSecondary, 42, true); 271 assertGetRpc(hriSecondary, 1042, false); 272 273 //load some data to primary 274 HTU.loadNumericRows(table, f, 1000, 1100); 275 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 276 region.flush(true); 277 278 HTU.loadNumericRows(table, f, 2000, 2100); 279 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 280 region.flush(true); 281 282 // ensure that chore is run 283 Threads.sleep(4 * refreshPeriod); 284 285 assertGetRpc(hriSecondary, 42, true); 286 assertGetRpc(hriSecondary, 1042, true); 287 assertGetRpc(hriSecondary, 2042, true); 288 289 // ensure that we see the 3 store files 290 Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount()); 291 292 // force compaction 293 HTU.compact(table.getName(), true); 294 295 long wakeUpTime = System.currentTimeMillis() + 4 * refreshPeriod; 296 while (System.currentTimeMillis() < wakeUpTime) { 297 assertGetRpc(hriSecondary, 42, true); 298 assertGetRpc(hriSecondary, 1042, true); 299 assertGetRpc(hriSecondary, 2042, true); 300 Threads.sleep(10); 301 } 302 303 // ensure that we see the compacted file only 304 // This will be 4 until the cleaner chore runs 305 Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount()); 306 307 } finally { 308 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 309 closeRegion(HTU, getRS(), hriSecondary); 310 } 311 } 312 313 @Test 314 public void testFlushAndCompactionsInPrimary() throws Exception { 315 316 long runtime = 30 * 1000; 317 // enable store file refreshing 318 final int refreshPeriod = 100; // 100ms refresh is a lot 319 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3); 320 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod); 321 // restart the region server so that it starts the refresher chore 322 restartRegionServer(); 323 final int startKey = 0, endKey = 1000; 324 325 try { 326 openRegion(HTU, getRS(), hriSecondary); 327 328 //load some data to primary so that reader won't fail 329 HTU.loadNumericRows(table, f, startKey, endKey); 330 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary); 331 // ensure that chore is run 332 Threads.sleep(2 * refreshPeriod); 333 334 final AtomicBoolean running = new AtomicBoolean(true); 335 @SuppressWarnings("unchecked") 336 final AtomicReference<Exception>[] exceptions = new AtomicReference[3]; 337 for (int i=0; i < exceptions.length; i++) { 338 exceptions[i] = new AtomicReference<>(); 339 } 340 341 Runnable writer = new Runnable() { 342 int key = startKey; 343 @Override 344 public void run() { 345 try { 346 while (running.get()) { 347 byte[] data = Bytes.toBytes(String.valueOf(key)); 348 Put put = new Put(data); 349 put.addColumn(f, null, data); 350 table.put(put); 351 key++; 352 if (key == endKey) key = startKey; 353 } 354 } catch (Exception ex) { 355 LOG.warn(ex.toString(), ex); 356 exceptions[0].compareAndSet(null, ex); 357 } 358 } 359 }; 360 361 Runnable flusherCompactor = new Runnable() { 362 Random random = new Random(); 363 @Override 364 public void run() { 365 try { 366 while (running.get()) { 367 // flush or compact 368 if (random.nextBoolean()) { 369 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary); 370 } else { 371 HTU.compact(table.getName(), random.nextBoolean()); 372 } 373 } 374 } catch (Exception ex) { 375 LOG.warn(ex.toString(), ex); 376 exceptions[1].compareAndSet(null, ex); 377 } 378 } 379 }; 380 381 Runnable reader = new Runnable() { 382 Random random = new Random(); 383 @Override 384 public void run() { 385 try { 386 while (running.get()) { 387 // whether to do a close and open 388 if (random.nextInt(10) == 0) { 389 try { 390 closeRegion(HTU, getRS(), hriSecondary); 391 } catch (Exception ex) { 392 LOG.warn("Failed closing the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); 393 exceptions[2].compareAndSet(null, ex); 394 } 395 try { 396 openRegion(HTU, getRS(), hriSecondary); 397 } catch (Exception ex) { 398 LOG.warn("Failed opening the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); 399 exceptions[2].compareAndSet(null, ex); 400 } 401 } 402 403 int key = random.nextInt(endKey - startKey) + startKey; 404 assertGetRpc(hriSecondary, key, true); 405 } 406 } catch (Exception ex) { 407 LOG.warn("Failed getting the value in the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); 408 exceptions[2].compareAndSet(null, ex); 409 } 410 } 411 }; 412 413 LOG.info("Starting writer and reader"); 414 ExecutorService executor = Executors.newFixedThreadPool(3); 415 executor.submit(writer); 416 executor.submit(flusherCompactor); 417 executor.submit(reader); 418 419 // wait for threads 420 Threads.sleep(runtime); 421 running.set(false); 422 executor.shutdown(); 423 executor.awaitTermination(30, TimeUnit.SECONDS); 424 425 for (AtomicReference<Exception> exRef : exceptions) { 426 Assert.assertNull(exRef.get()); 427 } 428 } finally { 429 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey); 430 closeRegion(HTU, getRS(), hriSecondary); 431 } 432 } 433 434 @Test 435 public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception { 436 // disable the store file refresh chore (we do this by hand) 437 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0); 438 restartRegionServer(); 439 440 try { 441 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName()); 442 openRegion(HTU, getRS(), hriSecondary); 443 444 // load some data to primary 445 LOG.info("Loading data to primary region"); 446 for (int i = 0; i < 3; ++i) { 447 HTU.loadNumericRows(table, f, i * 1000, (i + 1) * 1000); 448 HRegion region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); 449 region.flush(true); 450 } 451 452 HRegion primaryRegion = getRS().getRegion(hriPrimary.getEncodedName()); 453 Assert.assertEquals(3, primaryRegion.getStore(f).getStorefilesCount()); 454 455 // Refresh store files on the secondary 456 Region secondaryRegion = getRS().getRegion(hriSecondary.getEncodedName()); 457 secondaryRegion.getStore(f).refreshStoreFiles(); 458 Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount()); 459 460 // force compaction 461 LOG.info("Force Major compaction on primary region " + hriPrimary); 462 primaryRegion.compact(true); 463 Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount()); 464 List<RegionServerThread> regionServerThreads = HTU.getMiniHBaseCluster() 465 .getRegionServerThreads(); 466 HRegionServer hrs = null; 467 for (RegionServerThread rs : regionServerThreads) { 468 if (rs.getRegionServer() 469 .getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) != null) { 470 hrs = rs.getRegionServer(); 471 break; 472 } 473 } 474 CompactedHFilesDischarger cleaner = 475 new CompactedHFilesDischarger(100, null, hrs, false); 476 cleaner.chore(); 477 // scan all the hfiles on the secondary. 478 // since there are no read on the secondary when we ask locations to 479 // the NN a FileNotFound exception will be returned and the FileLink 480 // should be able to deal with it giving us all the result we expect. 481 int keys = 0; 482 int sum = 0; 483 for (HStoreFile sf : ((HStore) secondaryRegion.getStore(f)).getStorefiles()) { 484 // Our file does not exist anymore. was moved by the compaction above. 485 LOG.debug(Boolean.toString(getRS().getFileSystem().exists(sf.getPath()))); 486 Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath())); 487 488 HFileScanner scanner = sf.getReader().getScanner(false, false); 489 scanner.seekTo(); 490 do { 491 keys++; 492 493 Cell cell = scanner.getCell(); 494 sum += Integer.parseInt(Bytes.toString(cell.getRowArray(), 495 cell.getRowOffset(), cell.getRowLength())); 496 } while (scanner.next()); 497 } 498 Assert.assertEquals(3000, keys); 499 Assert.assertEquals(4498500, sum); 500 } finally { 501 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); 502 closeRegion(HTU, getRS(), hriSecondary); 503 } 504 } 505}