001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.util.List; 030import java.util.UUID; 031import java.util.concurrent.Executors; 032import java.util.concurrent.atomic.AtomicLong; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.Cell.Type; 037import org.apache.hadoop.hbase.CellBuilderFactory; 038import org.apache.hadoop.hbase.CellBuilderType; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HRegionLocation; 043import org.apache.hadoop.hbase.HTableDescriptor; 044import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.Waiter; 047import org.apache.hadoop.hbase.client.Admin; 048import org.apache.hadoop.hbase.client.ClusterConnection; 049import org.apache.hadoop.hbase.client.Connection; 050import org.apache.hadoop.hbase.client.ConnectionFactory; 051import org.apache.hadoop.hbase.client.RegionLocator; 052import org.apache.hadoop.hbase.client.Table; 053import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 054import org.apache.hadoop.hbase.regionserver.HRegion; 055import org.apache.hadoop.hbase.regionserver.HRegionServer; 056import org.apache.hadoop.hbase.regionserver.Region; 057import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 058import org.apache.hadoop.hbase.replication.ReplicationException; 059import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 060import org.apache.hadoop.hbase.testclassification.FlakeyTests; 061import org.apache.hadoop.hbase.testclassification.LargeTests; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.FSTableDescriptors; 064import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 065import org.apache.hadoop.hbase.wal.WAL.Entry; 066import org.apache.hadoop.hbase.wal.WALEdit; 067import org.apache.hadoop.hbase.wal.WALKeyImpl; 068import org.apache.hadoop.hbase.zookeeper.ZKConfig; 069import org.junit.AfterClass; 070import org.junit.BeforeClass; 071import org.junit.ClassRule; 072import org.junit.Rule; 073import org.junit.Test; 074import org.junit.experimental.categories.Category; 075import org.junit.rules.TestName; 076import org.mockito.Mockito; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 081 082/** 083 * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying 084 * async wal replication replays the edits to the secondary region in various scenarios. 085 */ 086@Category({FlakeyTests.class, LargeTests.class}) 087public class TestRegionReplicaReplicationEndpoint { 088 089 @ClassRule 090 public static final HBaseClassTestRule CLASS_RULE = 091 HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpoint.class); 092 093 private static final Logger LOG = 094 LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class); 095 096 private static final int NB_SERVERS = 2; 097 098 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 099 100 @Rule 101 public TestName name = new TestName(); 102 103 @BeforeClass 104 public static void beforeClass() throws Exception { 105 Configuration conf = HTU.getConfiguration(); 106 conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); 107 conf.setInt("replication.source.size.capacity", 10240); 108 conf.setLong("replication.source.sleepforretries", 100); 109 conf.setInt("hbase.regionserver.maxlogs", 10); 110 conf.setLong("hbase.master.logcleaner.ttl", 10); 111 conf.setInt("zookeeper.recovery.retry", 1); 112 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 113 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 114 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 115 conf.setInt("replication.stats.thread.period.seconds", 5); 116 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 117 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed 118 conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 119 120 HTU.startMiniCluster(NB_SERVERS); 121 } 122 123 @AfterClass 124 public static void afterClass() throws Exception { 125 HTU.shutdownMiniCluster(); 126 } 127 128 @Test 129 public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException { 130 // create a table with region replicas. Check whether the replication peer is created 131 // and replication started. 132 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 133 Admin admin = connection.getAdmin()) { 134 String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER; 135 136 ReplicationPeerConfig peerConfig = null; 137 try { 138 peerConfig = admin.getReplicationPeerConfig(peerId); 139 } catch (ReplicationPeerNotFoundException e) { 140 LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); 141 } 142 143 try { 144 peerConfig = admin.getReplicationPeerConfig(peerId); 145 } catch (ReplicationPeerNotFoundException e) { 146 LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); 147 } 148 149 if (peerConfig != null) { 150 admin.removeReplicationPeer(peerId); 151 peerConfig = null; 152 } 153 154 HTableDescriptor htd = HTU.createTableDescriptor( 155 "testReplicationPeerIsCreated_no_region_replicas"); 156 HTU.getAdmin().createTable(htd); 157 try { 158 peerConfig = admin.getReplicationPeerConfig(peerId); 159 fail("Should throw ReplicationException, because replication peer id=" + peerId 160 + " not exist"); 161 } catch (ReplicationPeerNotFoundException e) { 162 } 163 assertNull(peerConfig); 164 165 htd = HTU.createTableDescriptor("testReplicationPeerIsCreated"); 166 htd.setRegionReplication(2); 167 HTU.getAdmin().createTable(htd); 168 169 // assert peer configuration is correct 170 peerConfig = admin.getReplicationPeerConfig(peerId); 171 assertNotNull(peerConfig); 172 assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( 173 HTU.getConfiguration())); 174 assertEquals(RegionReplicaReplicationEndpoint.class.getName(), 175 peerConfig.getReplicationEndpointImpl()); 176 } 177 } 178 179 @Test 180 public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception { 181 // modify a table by adding region replicas. Check whether the replication peer is created 182 // and replication started. 183 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 184 Admin admin = connection.getAdmin()) { 185 String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER; 186 ReplicationPeerConfig peerConfig = null; 187 try { 188 peerConfig = admin.getReplicationPeerConfig(peerId); 189 } catch (ReplicationPeerNotFoundException e) { 190 LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); 191 } 192 193 if (peerConfig != null) { 194 admin.removeReplicationPeer(peerId); 195 peerConfig = null; 196 } 197 198 HTableDescriptor htd = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable"); 199 HTU.getAdmin().createTable(htd); 200 201 // assert that replication peer is not created yet 202 try { 203 peerConfig = admin.getReplicationPeerConfig(peerId); 204 fail("Should throw ReplicationException, because replication peer id=" + peerId 205 + " not exist"); 206 } catch (ReplicationPeerNotFoundException e) { 207 } 208 assertNull(peerConfig); 209 210 HTU.getAdmin().disableTable(htd.getTableName()); 211 htd.setRegionReplication(2); 212 HTU.getAdmin().modifyTable(htd.getTableName(), htd); 213 HTU.getAdmin().enableTable(htd.getTableName()); 214 215 // assert peer configuration is correct 216 peerConfig = admin.getReplicationPeerConfig(peerId); 217 assertNotNull(peerConfig); 218 assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration())); 219 assertEquals(RegionReplicaReplicationEndpoint.class.getName(), 220 peerConfig.getReplicationEndpointImpl()); 221 admin.close(); 222 } 223 } 224 225 public void testRegionReplicaReplication(int regionReplication) throws Exception { 226 // test region replica replication. Create a table with single region, write some data 227 // ensure that data is replicated to the secondary region 228 TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_" 229 + regionReplication); 230 HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); 231 htd.setRegionReplication(regionReplication); 232 HTU.getAdmin().createTable(htd); 233 TableName tableNameNoReplicas = 234 TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS"); 235 HTU.deleteTableIfAny(tableNameNoReplicas); 236 HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1); 237 238 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 239 Table table = connection.getTable(tableName); 240 Table tableNoReplicas = connection.getTable(tableNameNoReplicas); 241 242 try { 243 // load some data to the non-replicated table 244 HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000); 245 246 // load the data to the table 247 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); 248 249 verifyReplication(tableName, regionReplication, 0, 1000); 250 251 } finally { 252 table.close(); 253 tableNoReplicas.close(); 254 HTU.deleteTableIfAny(tableNameNoReplicas); 255 connection.close(); 256 } 257 } 258 259 private void verifyReplication(TableName tableName, int regionReplication, 260 final int startRow, final int endRow) throws Exception { 261 verifyReplication(tableName, regionReplication, startRow, endRow, true); 262 } 263 264 private void verifyReplication(TableName tableName, int regionReplication, 265 final int startRow, final int endRow, final boolean present) throws Exception { 266 // find the regions 267 final Region[] regions = new Region[regionReplication]; 268 269 for (int i=0; i < NB_SERVERS; i++) { 270 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 271 List<HRegion> onlineRegions = rs.getRegions(tableName); 272 for (HRegion region : onlineRegions) { 273 regions[region.getRegionInfo().getReplicaId()] = region; 274 } 275 } 276 277 for (Region region : regions) { 278 assertNotNull(region); 279 } 280 281 for (int i = 1; i < regionReplication; i++) { 282 final Region region = regions[i]; 283 // wait until all the data is replicated to all secondary regions 284 Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() { 285 @Override 286 public boolean evaluate() throws Exception { 287 LOG.info("verifying replication for region replica:" + region.getRegionInfo()); 288 try { 289 HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow, present); 290 } catch(Throwable ex) { 291 LOG.warn("Verification from secondary region is not complete yet", ex); 292 // still wait 293 return false; 294 } 295 return true; 296 } 297 }); 298 } 299 } 300 301 @Test 302 public void testRegionReplicaReplicationWith2Replicas() throws Exception { 303 testRegionReplicaReplication(2); 304 } 305 306 @Test 307 public void testRegionReplicaReplicationWith3Replicas() throws Exception { 308 testRegionReplicaReplication(3); 309 } 310 311 @Test 312 public void testRegionReplicaReplicationWith10Replicas() throws Exception { 313 testRegionReplicaReplication(10); 314 } 315 316 @Test 317 public void testRegionReplicaWithoutMemstoreReplication() throws Exception { 318 int regionReplication = 3; 319 final TableName tableName = TableName.valueOf(name.getMethodName()); 320 HTableDescriptor htd = HTU.createTableDescriptor(tableName); 321 htd.setRegionReplication(regionReplication); 322 htd.setRegionMemstoreReplication(false); 323 HTU.getAdmin().createTable(htd); 324 325 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 326 Table table = connection.getTable(tableName); 327 try { 328 // write data to the primary. The replicas should not receive the data 329 final int STEP = 100; 330 for (int i = 0; i < 3; ++i) { 331 final int startRow = i * STEP; 332 final int endRow = (i + 1) * STEP; 333 LOG.info("Writing data from " + startRow + " to " + endRow); 334 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, startRow, endRow); 335 verifyReplication(tableName, regionReplication, startRow, endRow, false); 336 337 // Flush the table, now the data should show up in the replicas 338 LOG.info("flushing table"); 339 HTU.flush(tableName); 340 verifyReplication(tableName, regionReplication, 0, endRow, true); 341 } 342 } finally { 343 table.close(); 344 connection.close(); 345 } 346 } 347 348 @Test 349 public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception { 350 // Tests a table with region replication 3. Writes some data, and causes flushes and 351 // compactions. Verifies that the data is readable from the replicas. Note that this 352 // does not test whether the replicas actually pick up flushed files and apply compaction 353 // to their stores 354 int regionReplication = 3; 355 final TableName tableName = TableName.valueOf(name.getMethodName()); 356 HTableDescriptor htd = HTU.createTableDescriptor(tableName); 357 htd.setRegionReplication(regionReplication); 358 HTU.getAdmin().createTable(htd); 359 360 361 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 362 Table table = connection.getTable(tableName); 363 try { 364 // load the data to the table 365 366 for (int i = 0; i < 6000; i += 1000) { 367 LOG.info("Writing data from " + i + " to " + (i+1000)); 368 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i+1000); 369 LOG.info("flushing table"); 370 HTU.flush(tableName); 371 LOG.info("compacting table"); 372 HTU.compact(tableName, false); 373 } 374 375 verifyReplication(tableName, regionReplication, 0, 1000); 376 } finally { 377 table.close(); 378 connection.close(); 379 } 380 } 381 382 @Test 383 public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception { 384 testRegionReplicaReplicationIgnores(false, false); 385 } 386 387 @Test 388 public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception { 389 testRegionReplicaReplicationIgnores(true, false); 390 } 391 392 @Test 393 public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception { 394 testRegionReplicaReplicationIgnores(false, true); 395 } 396 397 public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication) 398 throws Exception { 399 400 // tests having edits from a disabled or dropped table is handled correctly by skipping those 401 // entries and further edits after the edits from dropped/disabled table can be replicated 402 // without problems. 403 final TableName tableName = TableName.valueOf( 404 name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication); 405 HTableDescriptor htd = HTU.createTableDescriptor(tableName); 406 int regionReplication = 3; 407 htd.setRegionReplication(regionReplication); 408 HTU.deleteTableIfAny(tableName); 409 410 HTU.getAdmin().createTable(htd); 411 TableName toBeDisabledTable = TableName.valueOf( 412 dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable")); 413 HTU.deleteTableIfAny(toBeDisabledTable); 414 htd = HTU.createTableDescriptor(toBeDisabledTable.toString()); 415 htd.setRegionReplication(regionReplication); 416 HTU.getAdmin().createTable(htd); 417 418 // both tables are created, now pause replication 419 HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER); 420 421 // now that the replication is disabled, write to the table to be dropped, then drop the table. 422 423 Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 424 Table table = connection.getTable(tableName); 425 Table tableToBeDisabled = connection.getTable(toBeDisabledTable); 426 427 HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000); 428 429 AtomicLong skippedEdits = new AtomicLong(); 430 RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = 431 mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); 432 when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); 433 FSTableDescriptors fstd = 434 new FSTableDescriptors(FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath()); 435 RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = 436 new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, 437 (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE, 438 fstd); 439 RegionLocator rl = connection.getRegionLocator(toBeDisabledTable); 440 HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); 441 byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); 442 443 Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A")) 444 .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build(); 445 Entry entry = new Entry( 446 new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1), 447 new WALEdit() 448 .add(cell)); 449 450 HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table 451 if (dropTable) { 452 HTU.getAdmin().deleteTable(toBeDisabledTable); 453 } else if (disableReplication) { 454 htd.setRegionReplication(regionReplication - 2); 455 HTU.getAdmin().modifyTable(toBeDisabledTable, htd); 456 HTU.getAdmin().enableTable(toBeDisabledTable); 457 } 458 sinkWriter.append(toBeDisabledTable, encodedRegionName, 459 HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry)); 460 461 assertEquals(2, skippedEdits.get()); 462 463 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0); 464 MetricsSource metrics = mock(MetricsSource.class); 465 ReplicationEndpoint.Context ctx = 466 new ReplicationEndpoint.Context(HTU.getConfiguration(), HTU.getConfiguration(), 467 HTU.getTestFileSystem(), ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER, 468 UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService(). 469 getReplicationManager().getReplicationPeers() 470 .getPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER), 471 metrics, rs.getTableDescriptors(), rs); 472 RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint(); 473 rrpe.init(ctx); 474 rrpe.start(); 475 ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext(); 476 repCtx.setEntries(Lists.newArrayList(entry, entry)); 477 assertTrue(rrpe.replicate(repCtx)); 478 /* Come back here. There is a difference on how counting is done here and in master branch. 479 St.Ack 480 Mockito.verify(metrics, Mockito.times(1)). 481 incrLogEditsFiltered(Mockito.eq(2L)); 482 */ 483 rrpe.stop(); 484 if (disableReplication) { 485 // enable replication again so that we can verify replication 486 HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table 487 htd.setRegionReplication(regionReplication); 488 HTU.getAdmin().modifyTable(toBeDisabledTable, htd); 489 HTU.getAdmin().enableTable(toBeDisabledTable); 490 } 491 492 try { 493 // load some data to the to-be-dropped table 494 495 // load the data to the table 496 HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); 497 498 // now enable the replication 499 HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER); 500 501 verifyReplication(tableName, regionReplication, 0, 1000); 502 503 } finally { 504 table.close(); 505 rl.close(); 506 tableToBeDisabled.close(); 507 HTU.deleteTableIfAny(toBeDisabledTable); 508 connection.close(); 509 } 510 } 511}