001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.hasItems; 022import static org.hamcrest.Matchers.hasSize; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.Collections; 031import java.util.Map; 032import java.util.NavigableMap; 033import java.util.Set; 034import java.util.TreeMap; 035import java.util.stream.Collectors; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.CellBuilderType; 041import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 042import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtil; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.Server; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.RegionInfoBuilder; 051import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; 052import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint; 053import org.apache.hadoop.hbase.replication.ReplicationException; 054import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 055import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 056import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 057import org.apache.hadoop.hbase.replication.ReplicationPeers; 058import org.apache.hadoop.hbase.replication.ReplicationQueueId; 059import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 060import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 061import org.apache.hadoop.hbase.replication.ReplicationUtils; 062import org.apache.hadoop.hbase.replication.SyncReplicationState; 063import org.apache.hadoop.hbase.testclassification.MediumTests; 064import org.apache.hadoop.hbase.testclassification.ReplicationTests; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.apache.hadoop.hbase.util.CommonFSUtils; 067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 068import org.apache.hadoop.hbase.wal.WAL; 069import org.apache.hadoop.hbase.wal.WALEdit; 070import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 071import org.apache.hadoop.hbase.wal.WALFactory; 072import org.apache.hadoop.hbase.wal.WALKeyImpl; 073import org.hamcrest.Matchers; 074import org.junit.After; 075import org.junit.AfterClass; 076import org.junit.Before; 077import org.junit.BeforeClass; 078import org.junit.ClassRule; 079import org.junit.Rule; 080import org.junit.Test; 081import org.junit.experimental.categories.Category; 082import org.junit.rules.TestName; 083 084import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 085 086@Category({ ReplicationTests.class, MediumTests.class }) 087public class TestReplicationSourceManager { 088 089 @ClassRule 090 public static final HBaseClassTestRule CLASS_RULE = 091 HBaseClassTestRule.forClass(TestReplicationSourceManager.class); 092 093 public static final class ReplicationEndpointForTest extends DummyReplicationEndpoint { 094 095 private String clusterKey; 096 097 @Override 098 public boolean replicate(ReplicateContext replicateContext) { 099 // if you want to block the replication, for example, do not want the recovered source to be 100 // removed 101 if (clusterKey.endsWith("error")) { 102 throw new RuntimeException("Inject error"); 103 } 104 return true; 105 } 106 107 @Override 108 public void init(Context context) throws IOException { 109 super.init(context); 110 this.clusterKey = context.getReplicationPeer().getPeerConfig().getClusterKey(); 111 } 112 113 } 114 115 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 116 117 private static Configuration CONF; 118 119 private static FileSystem FS; 120 121 private static final byte[] F1 = Bytes.toBytes("f1"); 122 123 private static final byte[] F2 = Bytes.toBytes("f2"); 124 125 private static final TableName TABLE_NAME = TableName.valueOf("test"); 126 127 private static RegionInfo RI; 128 129 private static NavigableMap<byte[], Integer> SCOPES; 130 131 @Rule 132 public final TestName name = new TestName(); 133 134 private Path oldLogDir; 135 136 private Path logDir; 137 138 private Path remoteLogDir; 139 140 private Server server; 141 142 private Replication replication; 143 144 private ReplicationSourceManager manager; 145 146 @BeforeClass 147 public static void setUpBeforeClass() throws Exception { 148 UTIL.startMiniCluster(1); 149 FS = UTIL.getTestFileSystem(); 150 CONF = new Configuration(UTIL.getConfiguration()); 151 CONF.setLong("replication.sleep.before.failover", 0); 152 153 RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); 154 SCOPES = new TreeMap<>(Bytes.BYTES_COMPARATOR); 155 SCOPES.put(F1, 1); 156 SCOPES.put(F2, 0); 157 } 158 159 @AfterClass 160 public static void tearDownAfterClass() throws IOException { 161 UTIL.shutdownMiniCluster(); 162 } 163 164 @Before 165 public void setUp() throws Exception { 166 Path rootDir = UTIL.getDataTestDirOnTestFS(name.getMethodName()); 167 CommonFSUtils.setRootDir(CONF, rootDir); 168 server = mock(Server.class); 169 when(server.getConfiguration()).thenReturn(CONF); 170 when(server.getZooKeeper()).thenReturn(UTIL.getZooKeeperWatcher()); 171 when(server.getConnection()).thenReturn(UTIL.getConnection()); 172 ServerName sn = ServerName.valueOf("hostname.example.org", 1234, 1); 173 when(server.getServerName()).thenReturn(sn); 174 oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 175 FS.mkdirs(oldLogDir); 176 logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); 177 FS.mkdirs(logDir); 178 remoteLogDir = new Path(rootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); 179 FS.mkdirs(remoteLogDir); 180 TableName tableName = TableName.valueOf("replication_" + name.getMethodName()); 181 UTIL.getAdmin() 182 .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName)); 183 CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString()); 184 185 replication = new Replication(); 186 replication.initialize(server, FS, new Path(logDir, sn.toString()), oldLogDir, 187 new WALFactory(CONF, server.getServerName(), null)); 188 manager = replication.getReplicationManager(); 189 } 190 191 @After 192 public void tearDown() { 193 replication.stopReplicationService(); 194 } 195 196 /** 197 * Add a peer and wait for it to initialize 198 */ 199 private void addPeerAndWait(String peerId, String clusterKey, boolean syncRep) 200 throws ReplicationException, IOException { 201 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() 202 .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey) 203 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()); 204 if (syncRep) { 205 builder.setTableCFsMap(ImmutableMap.of(TABLE_NAME, Collections.emptyList())) 206 .setRemoteWALDir(FS.makeQualified(remoteLogDir).toString()); 207 } 208 209 manager.getReplicationPeers().getPeerStorage().addPeer(peerId, builder.build(), true, 210 syncRep ? SyncReplicationState.DOWNGRADE_ACTIVE : SyncReplicationState.NONE); 211 manager.addPeer(peerId); 212 UTIL.waitFor(20000, () -> { 213 ReplicationSourceInterface rs = manager.getSource(peerId); 214 return rs != null && rs.isSourceActive(); 215 }); 216 } 217 218 /** 219 * Remove a peer and wait for it to get cleaned up 220 */ 221 private void removePeerAndWait(String peerId) throws Exception { 222 ReplicationPeers rp = manager.getReplicationPeers(); 223 rp.getPeerStorage().removePeer(peerId); 224 manager.removePeer(peerId); 225 UTIL.waitFor(20000, () -> { 226 if (rp.getPeer(peerId) != null) { 227 return false; 228 } 229 if (manager.getSource(peerId) != null) { 230 return false; 231 } 232 return manager.getOldSources().stream().noneMatch(rs -> rs.getPeerId().equals(peerId)); 233 }); 234 } 235 236 private void createWALFile(Path file) throws Exception { 237 ProtobufLogWriter writer = new ProtobufLogWriter(); 238 try { 239 writer.init(FS, file, CONF, false, FS.getDefaultBlockSize(file), null); 240 WALKeyImpl key = new WALKeyImpl(RI.getEncodedNameAsBytes(), TABLE_NAME, 241 EnvironmentEdgeManager.currentTime(), SCOPES); 242 WALEdit edit = new WALEdit(); 243 WALEditInternalHelper.addExtendedCell(edit, 244 ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F1).setFamily(F1) 245 .setQualifier(F1).setType(Cell.Type.Put).setValue(F1).build()); 246 WALEditInternalHelper.addExtendedCell(edit, 247 ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F2).setFamily(F2) 248 .setQualifier(F2).setType(Cell.Type.Put).setValue(F2).build()); 249 writer.append(new WAL.Entry(key, edit)); 250 writer.sync(false); 251 } finally { 252 writer.close(); 253 } 254 } 255 256 @Test 257 public void testClaimQueue() throws Exception { 258 String peerId = "1"; 259 addPeerAndWait(peerId, "error", false); 260 ServerName serverName = ServerName.valueOf("hostname0.example.org", 12345, 123); 261 String walName1 = serverName.toString() + ".1"; 262 createWALFile(new Path(oldLogDir, walName1)); 263 ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId); 264 ReplicationQueueStorage queueStorage = manager.getQueueStorage(); 265 queueStorage.setOffset(queueId, "", new ReplicationGroupOffset(peerId, 0), 266 Collections.emptyMap()); 267 manager.claimQueue(queueId); 268 assertThat(manager.getOldSources(), hasSize(1)); 269 } 270 271 @Test 272 public void testSameWALPrefix() throws IOException { 273 String walName1 = "localhost,8080,12345-45678-Peer.34567"; 274 String walName2 = "localhost,8080,12345.56789"; 275 manager.postLogRoll(new Path(walName1)); 276 manager.postLogRoll(new Path(walName2)); 277 278 Set<String> latestWals = 279 manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet()); 280 assertThat(latestWals, 281 Matchers.<Set<String>> both(hasSize(2)).and(hasItems(walName1, walName2))); 282 } 283 284 private MetricsReplicationSourceSource getGlobalSource() { 285 return CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) 286 .getGlobalSource(); 287 } 288 289 @Test 290 public void testRemovePeerMetricsCleanup() throws Exception { 291 MetricsReplicationSourceSource globalSource = getGlobalSource(); 292 int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 293 String peerId = "DummyPeer"; 294 addPeerAndWait(peerId, "hbase", false); 295 // there is no latestPaths so the size of log queue should not change 296 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 297 298 ReplicationSourceInterface source = manager.getSource(peerId); 299 // Sanity check 300 assertNotNull(source); 301 int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 302 // Enqueue log and check if metrics updated 303 Path serverLogDir = new Path(logDir, server.getServerName().toString()); 304 source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1")); 305 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 306 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 307 globalSource.getSizeOfLogQueue()); 308 309 // Removing the peer should reset the global metrics 310 removePeerAndWait(peerId); 311 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 312 313 // Adding the same peer back again should reset the single source metrics 314 addPeerAndWait(peerId, "hbase", false); 315 source = manager.getSource(peerId); 316 assertNotNull(source); 317 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 318 globalSource.getSizeOfLogQueue()); 319 } 320 321 @Test 322 public void testDisablePeerMetricsCleanup() throws Exception { 323 final String peerId = "DummyPeer"; 324 try { 325 MetricsReplicationSourceSource globalSource = getGlobalSource(); 326 final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 327 addPeerAndWait(peerId, "hbase", false); 328 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 329 ReplicationSourceInterface source = manager.getSource(peerId); 330 // Sanity check 331 assertNotNull(source); 332 final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 333 // Enqueue log and check if metrics updated 334 Path serverLogDir = new Path(logDir, server.getServerName().toString()); 335 source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1")); 336 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 337 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 338 globalSource.getSizeOfLogQueue()); 339 340 // Refreshing the peer should decrement the global and single source metrics 341 manager.refreshSources(peerId); 342 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 343 344 source = manager.getSource(peerId); 345 assertNotNull(source); 346 assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 347 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 348 globalSource.getSizeOfLogQueue()); 349 } finally { 350 removePeerAndWait(peerId); 351 } 352 } 353 354 @Test 355 public void testRemoveRemoteWALs() throws Exception { 356 String peerId = "2"; 357 addPeerAndWait(peerId, "hbase", true); 358 // make sure that we can deal with files which does not exist 359 String walNameNotExists = 360 "remoteWAL-12345-" + peerId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX; 361 Path wal = new Path(logDir, walNameNotExists); 362 manager.postLogRoll(wal); 363 364 Path remoteLogDirForPeer = new Path(remoteLogDir, peerId); 365 FS.mkdirs(remoteLogDirForPeer); 366 String walName = "remoteWAL-12345-" + peerId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX; 367 Path remoteWAL = 368 new Path(remoteLogDirForPeer, walName).makeQualified(FS.getUri(), FS.getWorkingDirectory()); 369 FS.create(remoteWAL).close(); 370 wal = new Path(logDir, walName); 371 manager.postLogRoll(wal); 372 373 ReplicationSourceInterface source = manager.getSource(peerId); 374 manager.cleanOldLogs(walName, true, source); 375 assertFalse(FS.exists(remoteWAL)); 376 } 377 378 @Test 379 public void testPeerConfigurationOverridesPropagate() throws Exception { 380 Configuration globalConf = UTIL.getConfiguration(); 381 long globalSleepValue = 1000L; 382 globalConf.setLong("replication.source.sleepforretries", globalSleepValue); 383 384 long peerSleepOverride = 5000L; 385 String peerId = "testConfigOverridePeer"; 386 String clusterKey = "testPeerConfigOverride"; 387 388 ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() 389 .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey) 390 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()) 391 .putConfiguration("replication.source.sleepforretries", String.valueOf(peerSleepOverride)) 392 .build(); 393 394 manager.getReplicationPeers().getPeerStorage().addPeer(peerId, peerConfig, true, 395 SyncReplicationState.NONE); 396 manager.addPeer(peerId); 397 UTIL.waitFor(20000, () -> { 398 ReplicationSourceInterface rs = manager.getSource(peerId); 399 return rs != null && rs.isSourceActive(); 400 }); 401 402 ReplicationSource source = (ReplicationSource) manager.getSources().stream() 403 .filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null); 404 assertNotNull("Source should be created for peer", source); 405 406 assertEquals("ReplicationSource should use peer config override for sleepForRetries", 407 peerSleepOverride, source.getSleepForRetries()); 408 409 Map<String, ReplicationSourceShipper> workers = source.workerThreads; 410 if (!workers.isEmpty()) { 411 ReplicationSourceShipper shipper = workers.values().iterator().next(); 412 assertEquals("ReplicationSourceShipper should use peer config override for sleepForRetries", 413 peerSleepOverride, shipper.getSleepForRetries()); 414 415 ReplicationSourceWALReader reader = shipper.entryReader; 416 if (reader != null) { 417 assertEquals( 418 "ReplicationSourceWALReader should use peer config override for sleepForRetries", 419 peerSleepOverride, reader.getSleepForRetries()); 420 } 421 } 422 423 removePeerAndWait(peerId); 424 } 425 426 @Test 427 public void testPeerConfigurationIsolation() throws Exception { 428 Configuration globalConf = UTIL.getConfiguration(); 429 long globalSleepValue = 1000L; 430 globalConf.setLong("replication.source.sleepforretries", globalSleepValue); 431 432 // Create first peer WITH config override 433 long peerSleepOverride = 5000L; 434 String peerIdWithOverride = "peerWithOverride"; 435 String clusterKeyWithOverride = "testPeerWithOverride"; 436 437 ReplicationPeerConfig configWithOverride = ReplicationPeerConfig.newBuilder() 438 .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKeyWithOverride) 439 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()) 440 .putConfiguration("replication.source.sleepforretries", String.valueOf(peerSleepOverride)) 441 .build(); 442 443 manager.getReplicationPeers().getPeerStorage().addPeer(peerIdWithOverride, configWithOverride, 444 true, SyncReplicationState.NONE); 445 manager.addPeer(peerIdWithOverride); 446 447 // Create second peer WITHOUT config override 448 String peerIdWithoutOverride = "peerWithoutOverride"; 449 String clusterKeyWithoutOverride = "testPeerWithoutOverride"; 450 addPeerAndWait(peerIdWithoutOverride, clusterKeyWithoutOverride, false); 451 452 // Wait for both peers to be active 453 UTIL.waitFor(20000, () -> { 454 ReplicationSourceInterface rs1 = manager.getSource(peerIdWithOverride); 455 ReplicationSourceInterface rs2 = manager.getSource(peerIdWithoutOverride); 456 return rs1 != null && rs1.isSourceActive() && rs2 != null && rs2.isSourceActive(); 457 }); 458 459 // Verify peer with override uses the override value 460 ReplicationSource sourceWithOverride = (ReplicationSource) manager.getSources().stream() 461 .filter(s -> s.getPeerId().equals(peerIdWithOverride)).findFirst().orElse(null); 462 assertNotNull("Source with override should be created", sourceWithOverride); 463 assertEquals("Peer with override should use override value", peerSleepOverride, 464 sourceWithOverride.getSleepForRetries()); 465 466 // Verify peer without override uses global config 467 ReplicationSource sourceWithoutOverride = (ReplicationSource) manager.getSources().stream() 468 .filter(s -> s.getPeerId().equals(peerIdWithoutOverride)).findFirst().orElse(null); 469 assertNotNull("Source without override should be created", sourceWithoutOverride); 470 assertEquals("Peer without override should use global config", globalSleepValue, 471 sourceWithoutOverride.getSleepForRetries()); 472 473 removePeerAndWait(peerIdWithOverride); 474 removePeerAndWait(peerIdWithoutOverride); 475 } 476}