001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.util.List; 030import java.util.UUID; 031import java.util.concurrent.Executors; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.Cell.Type; 038import org.apache.hadoop.hbase.CellBuilderFactory; 039import org.apache.hadoop.hbase.CellBuilderType; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.HRegionLocation; 044import org.apache.hadoop.hbase.HTableDescriptor; 045import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.Waiter; 048import org.apache.hadoop.hbase.client.Admin; 049import org.apache.hadoop.hbase.client.ClusterConnection; 050import org.apache.hadoop.hbase.client.Connection; 051import org.apache.hadoop.hbase.client.ConnectionFactory; 052import org.apache.hadoop.hbase.client.RegionLocator; 053import org.apache.hadoop.hbase.client.Table; 054import org.apache.hadoop.hbase.client.TableDescriptor; 055import org.apache.hadoop.hbase.regionserver.HRegion; 056import org.apache.hadoop.hbase.regionserver.HRegionServer; 057import org.apache.hadoop.hbase.regionserver.Region; 058import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 059import org.apache.hadoop.hbase.replication.ReplicationException; 060import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 061import org.apache.hadoop.hbase.testclassification.FlakeyTests; 062import org.apache.hadoop.hbase.testclassification.LargeTests; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.FSTableDescriptors; 065import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 066import org.apache.hadoop.hbase.wal.WAL.Entry; 067import org.apache.hadoop.hbase.wal.WALEdit; 068import org.apache.hadoop.hbase.wal.WALKeyImpl; 069import org.apache.hadoop.hbase.zookeeper.ZKConfig; 070import org.junit.AfterClass; 071import org.junit.BeforeClass; 072import org.junit.ClassRule; 073import org.junit.Rule; 074import org.junit.Test; 075import org.junit.experimental.categories.Category; 076import org.junit.rules.TestName; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 081import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 082 083/** 084 * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying async 085 * wal replication replays the edits to the secondary region in various scenarios. 086 */ 087@Category({ FlakeyTests.class, LargeTests.class }) 088public class TestRegionReplicaReplicationEndpoint { 089 090 @ClassRule 091 public static final HBaseClassTestRule CLASS_RULE = 092 HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpoint.class); 093 094 private static final Logger LOG = 095 LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class); 096 097 private static final int NB_SERVERS = 2; 098 099 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 100 101 @Rule 102 public TestName name = new TestName(); 103 104 @BeforeClass 105 public static void beforeClass() throws Exception { 106 Configuration conf = HTU.getConfiguration(); 107 conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); 108 conf.setInt("replication.source.size.capacity", 10240); 109 conf.setLong("replication.source.sleepforretries", 100); 110 conf.setInt("hbase.regionserver.maxlogs", 10); 111 conf.setLong("hbase.master.logcleaner.ttl", 10); 112 conf.setInt("zookeeper.recovery.retry", 1); 113 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 114 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 115 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 116 conf.setInt("replication.stats.thread.period.seconds", 5); 117 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 118 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed 119 conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 120 121 HTU.startMiniCluster(NB_SERVERS); 122 } 123 124 @AfterClass 125 public static void afterClass() throws Exception { 126 HTU.shutdownMiniCluster(); 127 } 128 129 @Test 130 public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException { 131 // create a table with region replicas. Check whether the replication peer is created 132 // and replication started. 133 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 134 Admin admin = connection.getAdmin()) { 135 String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER; 136 137 ReplicationPeerConfig peerConfig = null; 138 try { 139 peerConfig = admin.getReplicationPeerConfig(peerId); 140 } catch (ReplicationPeerNotFoundException e) { 141 LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); 142 } 143 144 try { 145 peerConfig = admin.getReplicationPeerConfig(peerId); 146 } catch (ReplicationPeerNotFoundException e) { 147 LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); 148 } 149 150 if (peerConfig != null) { 151 admin.removeReplicationPeer(peerId); 152 peerConfig = null; 153 } 154 155 HTableDescriptor htd = 156 HTU.createTableDescriptor("testReplicationPeerIsCreated_no_region_replicas"); 157 createOrEnableTableWithRetries(htd, true); 158 try { 159 peerConfig = admin.getReplicationPeerConfig(peerId); 160 fail("Should throw ReplicationException, because replication peer id=" + peerId 161 + " not exist"); 162 } catch (ReplicationPeerNotFoundException e) { 163 } 164 assertNull(peerConfig); 165 166 htd = HTU.createTableDescriptor("testReplicationPeerIsCreated"); 167 htd.setRegionReplication(2); 168 createOrEnableTableWithRetries(htd, true); 169 170 // assert peer configuration is correct 171 peerConfig = admin.getReplicationPeerConfig(peerId); 172 assertNotNull(peerConfig); 173 assertEquals(peerConfig.getClusterKey(), 174 ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration())); 175 assertEquals(RegionReplicaReplicationEndpoint.class.getName(), 176 peerConfig.getReplicationEndpointImpl()); 177 } 178 } 179 180 @Test 181 public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception { 182 // modify a table by adding region replicas. Check whether the replication peer is created 183 // and replication started. 184 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 185 Admin admin = connection.getAdmin()) { 186 String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER; 187 ReplicationPeerConfig peerConfig = null; 188 try { 189 peerConfig = admin.getReplicationPeerConfig(peerId); 190 } catch (ReplicationPeerNotFoundException e) { 191 LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); 192 } 193 194 if (peerConfig != null) { 195 admin.removeReplicationPeer(peerId); 196 peerConfig = null; 197 } 198 199 HTableDescriptor htd = 200 HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable"); 201 createOrEnableTableWithRetries(htd, true); 202 203 // assert that replication peer is not created yet 204 try { 205 peerConfig = admin.getReplicationPeerConfig(peerId); 206 fail("Should throw ReplicationException, because replication peer id=" + peerId 207 + " not exist"); 208 } catch (ReplicationPeerNotFoundException e) { 209 } 210 assertNull(peerConfig); 211 212 HTU.getAdmin().disableTable(htd.getTableName()); 213 htd.setRegionReplication(2); 214 HTU.getAdmin().modifyTable(htd.getTableName(), htd); 215 createOrEnableTableWithRetries(htd, false); 216 217 // assert peer configuration is correct 218 peerConfig = admin.getReplicationPeerConfig(peerId); 219 assertNotNull(peerConfig); 220 assertEquals(peerConfig.getClusterKey(), 221 ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration())); 222 assertEquals(RegionReplicaReplicationEndpoint.class.getName(), 223 peerConfig.getReplicationEndpointImpl()); 224 admin.close(); 225 } 226 } 227 228 public void testRegionReplicaReplication(int regionReplication) throws Exception { 229 // test region replica replication. Create a table with single region, write some data 230 // ensure that data is replicated to the secondary region 231 TableName tableName = 232 TableName.valueOf("testRegionReplicaReplicationWithReplicas_" + regionReplication); 233 HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); 234 htd.setRegionReplication(regionReplication); 235 createOrEnableTableWithRetries(htd, true); 236 TableName tableNameNoReplicas = 237 TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS"); 238 HTU.deleteTableIfAny(tableNameNoReplicas); 239 HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1); 240 241 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 242 Table table = connection.getTable(tableName); 243 Table tableNoReplicas = connection.getTable(tableNameNoReplicas); 244 245 try { 246 // load some data to the non-replicated table 247 HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000); 248 249 // load the data to the table 250 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); 251 252 verifyReplication(tableName, regionReplication, 0, 1000); 253 254 } finally { 255 table.close(); 256 tableNoReplicas.close(); 257 HTU.deleteTableIfAny(tableNameNoReplicas); 258 connection.close(); 259 } 260 } 261 262 private void verifyReplication(TableName tableName, int regionReplication, final int startRow, 263 final int endRow) throws Exception { 264 verifyReplication(tableName, regionReplication, startRow, endRow, true); 265 } 266 267 private void verifyReplication(TableName tableName, int regionReplication, final int startRow, 268 final int endRow, final boolean present) throws Exception { 269 // find the regions 270 final Region[] regions = new Region[regionReplication]; 271 272 for (int i = 0; i < NB_SERVERS; i++) { 273 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 274 List<HRegion> onlineRegions = rs.getRegions(tableName); 275 for (HRegion region : onlineRegions) { 276 regions[region.getRegionInfo().getReplicaId()] = region; 277 } 278 } 279 280 for (Region region : regions) { 281 assertNotNull(region); 282 } 283 284 for (int i = 1; i < regionReplication; i++) { 285 final Region region = regions[i]; 286 // wait until all the data is replicated to all secondary regions 287 Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() { 288 @Override 289 public boolean evaluate() throws Exception { 290 LOG.info("verifying replication for region replica:" + region.getRegionInfo()); 291 try { 292 HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow, present); 293 } catch (Throwable ex) { 294 LOG.warn("Verification from secondary region is not complete yet", ex); 295 // still wait 296 return false; 297 } 298 return true; 299 } 300 }); 301 } 302 } 303 304 @Test 305 public void testRegionReplicaReplicationWith2Replicas() throws Exception { 306 testRegionReplicaReplication(2); 307 } 308 309 @Test 310 public void testRegionReplicaReplicationWith3Replicas() throws Exception { 311 testRegionReplicaReplication(3); 312 } 313 314 @Test 315 public void testRegionReplicaReplicationWith10Replicas() throws Exception { 316 testRegionReplicaReplication(10); 317 } 318 319 @Test 320 public void testRegionReplicaWithoutMemstoreReplication() throws Exception { 321 int regionReplication = 3; 322 final TableName tableName = TableName.valueOf(name.getMethodName()); 323 HTableDescriptor htd = HTU.createTableDescriptor(tableName); 324 htd.setRegionReplication(regionReplication); 325 htd.setRegionMemstoreReplication(false); 326 createOrEnableTableWithRetries(htd, true); 327 328 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 329 Table table = connection.getTable(tableName); 330 try { 331 // write data to the primary. The replicas should not receive the data 332 final int STEP = 100; 333 for (int i = 0; i < 3; ++i) { 334 final int startRow = i * STEP; 335 final int endRow = (i + 1) * STEP; 336 LOG.info("Writing data from " + startRow + " to " + endRow); 337 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, startRow, endRow); 338 verifyReplication(tableName, regionReplication, startRow, endRow, false); 339 340 // Flush the table, now the data should show up in the replicas 341 LOG.info("flushing table"); 342 HTU.flush(tableName); 343 verifyReplication(tableName, regionReplication, 0, endRow, true); 344 } 345 } finally { 346 table.close(); 347 connection.close(); 348 } 349 } 350 351 @Test 352 public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception { 353 // Tests a table with region replication 3. Writes some data, and causes flushes and 354 // compactions. Verifies that the data is readable from the replicas. Note that this 355 // does not test whether the replicas actually pick up flushed files and apply compaction 356 // to their stores 357 int regionReplication = 3; 358 final TableName tableName = TableName.valueOf(name.getMethodName()); 359 HTableDescriptor htd = HTU.createTableDescriptor(tableName); 360 htd.setRegionReplication(regionReplication); 361 createOrEnableTableWithRetries(htd, true); 362 363 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 364 Table table = connection.getTable(tableName); 365 try { 366 // load the data to the table 367 368 for (int i = 0; i < 6000; i += 1000) { 369 LOG.info("Writing data from " + i + " to " + (i + 1000)); 370 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i + 1000); 371 LOG.info("flushing table"); 372 HTU.flush(tableName); 373 LOG.info("compacting table"); 374 HTU.compact(tableName, false); 375 } 376 377 verifyReplication(tableName, regionReplication, 0, 1000); 378 } finally { 379 table.close(); 380 connection.close(); 381 } 382 } 383 384 @Test 385 public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception { 386 testRegionReplicaReplicationIgnores(false, false); 387 } 388 389 @Test 390 public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception { 391 testRegionReplicaReplicationIgnores(true, false); 392 } 393 394 @Test 395 public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception { 396 testRegionReplicaReplicationIgnores(false, true); 397 } 398 399 public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication) 400 throws Exception { 401 402 // tests having edits from a disabled or dropped table is handled correctly by skipping those 403 // entries and further edits after the edits from dropped/disabled table can be replicated 404 // without problems. 405 final TableName tableName = TableName.valueOf( 406 name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication); 407 HTableDescriptor htd = HTU.createTableDescriptor(tableName); 408 int regionReplication = 3; 409 htd.setRegionReplication(regionReplication); 410 HTU.deleteTableIfAny(tableName); 411 412 createOrEnableTableWithRetries(htd, true); 413 TableName toBeDisabledTable = TableName.valueOf( 414 dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable")); 415 HTU.deleteTableIfAny(toBeDisabledTable); 416 htd = HTU.createTableDescriptor(toBeDisabledTable.toString()); 417 htd.setRegionReplication(regionReplication); 418 createOrEnableTableWithRetries(htd, true); 419 420 // both tables are created, now pause replication 421 HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER); 422 423 // now that the replication is disabled, write to the table to be dropped, then drop the table. 424 425 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 426 Table table = connection.getTable(tableName); 427 Table tableToBeDisabled = connection.getTable(toBeDisabledTable); 428 429 HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000); 430 431 AtomicLong skippedEdits = new AtomicLong(); 432 RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = 433 mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); 434 when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); 435 FSTableDescriptors fstd = 436 new FSTableDescriptors(FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath()); 437 RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = 438 new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, 439 (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE, 440 fstd); 441 RegionLocator rl = connection.getRegionLocator(toBeDisabledTable); 442 HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); 443 byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); 444 445 Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A")) 446 .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build(); 447 Entry entry = 448 new Entry(new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1), new WALEdit().add(cell)); 449 450 HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table 451 if (dropTable) { 452 HTU.getAdmin().deleteTable(toBeDisabledTable); 453 } else if (disableReplication) { 454 htd.setRegionReplication(regionReplication - 2); 455 HTU.getAdmin().modifyTable(toBeDisabledTable, htd); 456 createOrEnableTableWithRetries(htd, false); 457 } 458 sinkWriter.append(toBeDisabledTable, encodedRegionName, HConstants.EMPTY_BYTE_ARRAY, 459 Lists.newArrayList(entry, entry)); 460 461 assertEquals(2, skippedEdits.get()); 462 463 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0); 464 MetricsSource metrics = mock(MetricsSource.class); 465 ReplicationEndpoint.Context ctx = new ReplicationEndpoint.Context(HTU.getConfiguration(), 466 HTU.getConfiguration(), HTU.getTestFileSystem(), 467 ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER, UUID.fromString(rs.getClusterId()), 468 rs.getReplicationSourceService().getReplicationManager().getReplicationPeers() 469 .getPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER), 470 metrics, rs.getTableDescriptors(), rs); 471 RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint(); 472 rrpe.init(ctx); 473 rrpe.start(); 474 ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext(); 475 repCtx.setEntries(Lists.newArrayList(entry, entry)); 476 assertTrue(rrpe.replicate(repCtx)); 477 /* 478 * Come back here. There is a difference on how counting is done here and in master branch. 479 * St.Ack Mockito.verify(metrics, Mockito.times(1)). incrLogEditsFiltered(Mockito.eq(2L)); 480 */ 481 rrpe.stop(); 482 if (disableReplication) { 483 // enable replication again so that we can verify replication 484 HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table 485 htd.setRegionReplication(regionReplication); 486 HTU.getAdmin().modifyTable(toBeDisabledTable, htd); 487 createOrEnableTableWithRetries(htd, false); 488 } 489 490 try { 491 // load some data to the to-be-dropped table 492 493 // load the data to the table 494 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); 495 496 // now enable the replication 497 HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER); 498 499 verifyReplication(tableName, regionReplication, 0, 1000); 500 501 } finally { 502 table.close(); 503 rl.close(); 504 tableToBeDisabled.close(); 505 HTU.deleteTableIfAny(toBeDisabledTable); 506 connection.close(); 507 } 508 } 509 510 private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) { 511 // Helper function to run create/enable table operations with a retry feature 512 boolean continueToRetry = true; 513 int tries = 0; 514 while (continueToRetry && tries < 50) { 515 try { 516 continueToRetry = false; 517 if (createTableOperation) { 518 HTU.getAdmin().createTable(htd); 519 } else { 520 HTU.getAdmin().enableTable(htd.getTableName()); 521 } 522 } catch (IOException e) { 523 if (e.getCause() instanceof ReplicationException) { 524 continueToRetry = true; 525 tries++; 526 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); 527 } 528 } 529 } 530 } 531}