001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE; 021import static org.junit.jupiter.api.Assertions.assertArrayEquals; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertNotNull; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.List; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellScanner; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.ClientMetaTableAccessor; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.MetaTableAccessor; 038import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.Waiter; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.RegionLocator; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.regionserver.HRegion; 050import org.apache.hadoop.hbase.regionserver.HRegionServer; 051import org.apache.hadoop.hbase.regionserver.Region; 052import org.apache.hadoop.hbase.regionserver.RegionScanner; 053import org.apache.hadoop.hbase.testclassification.LargeTests; 054import org.apache.hadoop.hbase.testclassification.ReplicationTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 057import org.junit.jupiter.api.AfterEach; 058import org.junit.jupiter.api.BeforeEach; 059import org.junit.jupiter.api.Tag; 060import org.junit.jupiter.api.Test; 061import org.junit.jupiter.api.TestInfo; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * Tests region replication for hbase:meta by setting up region replicas and verifying async wal 067 * replication replays the edits to the secondary region in various scenarios. 068 * @see TestRegionReplicaReplication 069 */ 070@Tag(ReplicationTests.TAG) 071@Tag(LargeTests.TAG) 072public class TestMetaRegionReplicaReplication { 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestMetaRegionReplicaReplication.class); 075 private static final int NB_SERVERS = 4; 076 private final HBaseTestingUtil HTU = new HBaseTestingUtil(); 077 private int numOfMetaReplica = NB_SERVERS - 1; 078 private static byte[] VALUE = Bytes.toBytes("value"); 079 080 private String testName; 081 082 @BeforeEach 083 public void before(TestInfo testInfo) throws Exception { 084 testName = testInfo.getTestMethod().get().getName(); 085 Configuration conf = HTU.getConfiguration(); 086 conf.setInt("zookeeper.recovery.retry", 1); 087 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 088 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 089 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 090 conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 091 // Enable hbase:meta replication. 092 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true); 093 // Set hbase:meta replicas to be 3. 094 // conf.setInt(HConstants.META_REPLICAS_NUM, numOfMetaReplica); 095 HTU.startMiniCluster(NB_SERVERS); 096 // Enable hbase:meta replication. 097 HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, numOfMetaReplica); 098 099 HTU.waitFor(30000, () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size() 100 >= numOfMetaReplica); 101 } 102 103 @AfterEach 104 public void after() throws Exception { 105 HTU.shutdownMiniCluster(); 106 } 107 108 /** 109 * Test meta region replica replication. Create some tables and see if replicas pick up the 110 * additions. 111 */ 112 @Test 113 public void testHBaseMetaReplicates() throws Exception { 114 try ( 115 Table table = HTU.createTable(TableName.valueOf(testName + "_0"), HConstants.CATALOG_FAMILY, 116 Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) { 117 verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName())); 118 } 119 try ( 120 Table table = HTU.createTable(TableName.valueOf(testName + "_1"), HConstants.CATALOG_FAMILY, 121 Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) { 122 verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName())); 123 // Try delete. 124 HTU.deleteTableIfAny(table.getName()); 125 verifyDeletedReplication(TableName.META_TABLE_NAME, numOfMetaReplica, table.getName()); 126 } 127 } 128 129 @Test 130 public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Exception { 131 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 132 Table table = connection.getTable(TableName.META_TABLE_NAME)) { 133 // load the data to the table 134 for (int i = 0; i < 5; i++) { 135 LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000)); 136 HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000); 137 LOG.info("flushing table"); 138 HTU.flush(TableName.META_TABLE_NAME); 139 LOG.info("compacting table"); 140 if (i < 4) { 141 HTU.compact(TableName.META_TABLE_NAME, false); 142 } 143 } 144 145 verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000, 146 HConstants.CATALOG_FAMILY); 147 } 148 } 149 150 @Test 151 public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception { 152 SingleProcessHBaseCluster cluster = HTU.getMiniHBaseCluster(); 153 HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta()); 154 155 HRegionServer hrsNoMetaReplica = null; 156 HRegionServer server = null; 157 Region metaReplica = null; 158 boolean hostingMeta; 159 160 for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) { 161 server = cluster.getRegionServer(i); 162 hostingMeta = false; 163 if (server == hrs) { 164 continue; 165 } 166 for (Region region : server.getOnlineRegionsLocalContext()) { 167 if (region.getRegionInfo().isMetaRegion()) { 168 if (metaReplica == null) { 169 metaReplica = region; 170 } 171 hostingMeta = true; 172 break; 173 } 174 } 175 if (!hostingMeta) { 176 hrsNoMetaReplica = server; 177 } 178 } 179 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 180 Table table = connection.getTable(TableName.META_TABLE_NAME)) { 181 // load the data to the table 182 for (int i = 0; i < 5; i++) { 183 LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000)); 184 HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000); 185 if (i == 0) { 186 HTU.moveRegionAndWait(metaReplica.getRegionInfo(), hrsNoMetaReplica.getServerName()); 187 } 188 } 189 190 verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000, 191 HConstants.CATALOG_FAMILY); 192 } 193 } 194 195 protected void verifyReplication(TableName tableName, int regionReplication, final int startRow, 196 final int endRow, final byte[] family) throws Exception { 197 verifyReplication(tableName, regionReplication, startRow, endRow, family, true); 198 } 199 200 private void verifyReplication(TableName tableName, int regionReplication, final int startRow, 201 final int endRow, final byte[] family, final boolean present) throws Exception { 202 // find the regions 203 final Region[] regions = new Region[regionReplication]; 204 205 for (int i = 0; i < NB_SERVERS; i++) { 206 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 207 List<HRegion> onlineRegions = rs.getRegions(tableName); 208 for (HRegion region : onlineRegions) { 209 regions[region.getRegionInfo().getReplicaId()] = region; 210 } 211 } 212 213 for (Region region : regions) { 214 assertNotNull(region); 215 } 216 217 for (int i = 1; i < regionReplication; i++) { 218 final Region region = regions[i]; 219 // wait until all the data is replicated to all secondary regions 220 Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() { 221 @Override 222 public boolean evaluate() throws Exception { 223 LOG.info("verifying replication for region replica:" + region.getRegionInfo()); 224 try { 225 HTU.verifyNumericRows(region, family, startRow, endRow, present); 226 } catch (Throwable ex) { 227 LOG.warn("Verification from secondary region is not complete yet", ex); 228 // still wait 229 return false; 230 } 231 return true; 232 } 233 }); 234 } 235 } 236 237 /** 238 * Scan hbase:meta for <code>tableName</code> content. 239 */ 240 private List<Result> getMetaCells(TableName tableName) throws IOException { 241 final List<Result> results = new ArrayList<>(); 242 ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { 243 @Override 244 public boolean visit(Result r) throws IOException { 245 results.add(r); 246 return true; 247 } 248 }; 249 MetaTableAccessor.scanMetaForTableRegions(HTU.getConnection(), visitor, tableName); 250 return results; 251 } 252 253 /** Returns All Regions for tableName including Replicas. */ 254 private Region[] getAllRegions(TableName tableName, int replication) { 255 final Region[] regions = new Region[replication]; 256 for (int i = 0; i < NB_SERVERS; i++) { 257 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 258 List<HRegion> onlineRegions = rs.getRegions(tableName); 259 for (HRegion region : onlineRegions) { 260 regions[region.getRegionInfo().getReplicaId()] = region; 261 } 262 } 263 for (Region region : regions) { 264 assertNotNull(region); 265 } 266 return regions; 267 } 268 269 /** 270 * Verify when a Table is deleted from primary, then there are no references in replicas (because 271 * they get the delete of the table rows too). 272 */ 273 private void verifyDeletedReplication(TableName tableName, int regionReplication, 274 final TableName deletedTableName) { 275 final Region[] regions = getAllRegions(tableName, regionReplication); 276 277 // Start count at '1' so we skip default, primary replica and only look at secondaries. 278 for (int i = 1; i < regionReplication; i++) { 279 final Region region = regions[i]; 280 // wait until all the data is replicated to all secondary regions 281 Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() { 282 @Override 283 public boolean evaluate() throws Exception { 284 LOG.info("Verifying replication for region replica {}", region.getRegionInfo()); 285 try (RegionScanner rs = region.getScanner(new Scan())) { 286 List<Cell> cells = new ArrayList<>(); 287 while (rs.next(cells)) { 288 continue; 289 } 290 return doesNotContain(cells, deletedTableName); 291 } catch (Throwable ex) { 292 LOG.warn("Verification from secondary region is not complete yet", ex); 293 // still wait 294 return false; 295 } 296 } 297 }); 298 } 299 } 300 301 /** 302 * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed by 303 * HConstants.DELIMITER. Make sure the deleted table is no longer present in passed 304 * <code>cells</code>. 305 */ 306 private boolean doesNotContain(List<Cell> cells, TableName tableName) { 307 for (Cell cell : cells) { 308 String row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 309 if (row.startsWith(tableName.toString() + HConstants.DELIMITER)) { 310 return false; 311 } 312 } 313 return true; 314 } 315 316 /** 317 * Verify Replicas have results (exactly). 318 */ 319 private void verifyReplication(TableName tableName, int regionReplication, 320 List<Result> contains) { 321 final Region[] regions = getAllRegions(tableName, regionReplication); 322 323 // Start count at '1' so we skip default, primary replica and only look at secondaries. 324 for (int i = 1; i < regionReplication; i++) { 325 final Region region = regions[i]; 326 // wait until all the data is replicated to all secondary regions 327 Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() { 328 @Override 329 public boolean evaluate() throws Exception { 330 LOG.info("Verifying replication for region replica {}", region.getRegionInfo()); 331 try (RegionScanner rs = region.getScanner(new Scan())) { 332 List<Cell> cells = new ArrayList<>(); 333 while (rs.next(cells)) { 334 continue; 335 } 336 return contains(contains, cells); 337 } catch (Throwable ex) { 338 LOG.warn("Verification from secondary region is not complete yet", ex); 339 // still wait 340 return false; 341 } 342 } 343 }); 344 } 345 } 346 347 /** 348 * Presumes sorted Cells. Verify that <code>cells</code> has <code>contains</code> at least. 349 */ 350 static boolean contains(List<Result> contains, List<Cell> cells) throws IOException { 351 CellScanner containsScanner = CellUtil.createCellScanner(contains); 352 CellScanner cellsScanner = CellUtil.createCellScanner(cells); 353 int matches = 0; 354 int count = 0; 355 while (containsScanner.advance()) { 356 while (cellsScanner.advance()) { 357 count++; 358 LOG.info("{} {}", containsScanner.current(), cellsScanner.current()); 359 if (containsScanner.current().equals(cellsScanner.current())) { 360 matches++; 361 break; 362 } 363 } 364 } 365 return !containsScanner.advance() && matches >= 1 && count >= matches && count == cells.size(); 366 } 367 368 private void doNGets(final Table table, final byte[][] keys) throws Exception { 369 for (byte[] key : keys) { 370 Result r = table.get(new Get(key)); 371 assertArrayEquals(VALUE, r.getValue(HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY)); 372 } 373 } 374 375 private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) { 376 // There are read requests increase for primary meta replica. 377 assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] > before[RegionInfo.DEFAULT_REPLICA_ID]); 378 379 // No change for replica regions 380 for (int i = 1; i < after.length; i++) { 381 assertEquals(before[i], after[i]); 382 } 383 } 384 385 private void primaryIncreaseReplicaIncrease(final long[] before, final long[] after) { 386 // There are read requests increase for all meta replica regions, 387 for (int i = 0; i < after.length; i++) { 388 assertTrue(after[i] > before[i]); 389 } 390 } 391 392 private void getMetaReplicaReadRequests(final Region[] metaRegions, final long[] counters) { 393 int i = 0; 394 for (Region r : metaRegions) { 395 LOG.info("read request for region {} is {}", r, r.getReadRequestsCount()); 396 counters[i] = r.getReadRequestsCount(); 397 i++; 398 } 399 } 400 401 @Test 402 public void testHBaseMetaReplicaGets() throws Exception { 403 TableName tn = TableName.valueOf(testName); 404 final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica); 405 long[] readReqsForMetaReplicas = new long[numOfMetaReplica]; 406 long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica]; 407 long[] readReqsForMetaReplicasAfterGetAllLocations = new long[numOfMetaReplica]; 408 long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica]; 409 long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica]; 410 long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica]; 411 Region userRegion = null; 412 HRegionServer srcRs = null; 413 HRegionServer destRs = null; 414 415 try (Table table = HTU.createTable(tn, HConstants.CATALOG_FAMILY, 416 Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) { 417 verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName())); 418 // load different values 419 HTU.loadTable(table, new byte[][] { HConstants.CATALOG_FAMILY }, VALUE); 420 for (int i = 0; i < NB_SERVERS; i++) { 421 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 422 List<HRegion> onlineRegions = rs.getRegions(tn); 423 if (onlineRegions.size() > 0) { 424 userRegion = onlineRegions.get(0); 425 srcRs = rs; 426 if (i > 0) { 427 destRs = HTU.getMiniHBaseCluster().getRegionServer(0); 428 } else { 429 destRs = HTU.getMiniHBaseCluster().getRegionServer(1); 430 } 431 } 432 } 433 434 getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicas); 435 436 Configuration c = new Configuration(HTU.getConfiguration()); 437 c.setBoolean(HConstants.USE_META_REPLICAS, true); 438 c.set(LOCATOR_META_REPLICAS_MODE, "LoadBalance"); 439 Connection connection = ConnectionFactory.createConnection(c); 440 Table tableForGet = connection.getTable(tn); 441 byte[][] getRows = new byte[HBaseTestingUtil.KEYS.length][]; 442 443 int i = 0; 444 for (byte[] key : HBaseTestingUtil.KEYS) { 445 getRows[i] = key; 446 i++; 447 } 448 getRows[0] = Bytes.toBytes("aaa"); 449 doNGets(tableForGet, getRows); 450 451 getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGet); 452 453 // There are more reads against all meta replica regions, including the primary region. 454 primaryIncreaseReplicaIncrease(readReqsForMetaReplicas, readReqsForMetaReplicasAfterGet); 455 456 RegionLocator locator = tableForGet.getRegionLocator(); 457 458 for (int j = 0; j < numOfMetaReplica * 3; j++) { 459 locator.getAllRegionLocations(); 460 } 461 462 getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGetAllLocations); 463 primaryIncreaseReplicaIncrease(readReqsForMetaReplicasAfterGet, 464 readReqsForMetaReplicasAfterGetAllLocations); 465 466 // move one of regions so it meta cache may be invalid. 467 HTU.moveRegionAndWait(userRegion.getRegionInfo(), destRs.getServerName()); 468 469 doNGets(tableForGet, getRows); 470 471 getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterMove); 472 473 // There are read requests increase for primary meta replica. 474 // For rest of meta replicas, there is no change as regionMove will tell the new location 475 primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGetAllLocations, 476 readReqsForMetaReplicasAfterMove); 477 // Move region again. 478 HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName()); 479 480 // Wait until moveRegion cache timeout. 481 while (destRs.getMovedRegion(userRegion.getRegionInfo().getEncodedName()) != null) { 482 Thread.sleep(1000); 483 } 484 485 getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterSecondMove); 486 487 // There are read requests increase for primary meta replica. 488 // For rest of meta replicas, there is no change. 489 primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterMove, 490 readReqsForMetaReplicasAfterSecondMove); 491 492 doNGets(tableForGet, getRows); 493 494 getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterThirdGet); 495 496 // Since it gets RegionNotServedException, it will go to primary for the next lookup. 497 // There are read requests increase for primary meta replica. 498 // For rest of meta replicas, there is no change. 499 primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterSecondMove, 500 readReqsForMetaReplicasAfterThirdGet); 501 } 502 } 503}