001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.hasItems; 022import static org.hamcrest.Matchers.hasSize; 023import static org.junit.jupiter.api.Assertions.assertEquals; 024import static org.junit.jupiter.api.Assertions.assertFalse; 025import static org.junit.jupiter.api.Assertions.assertNotNull; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.Collections; 031import java.util.Map; 032import java.util.NavigableMap; 033import java.util.Set; 034import java.util.TreeMap; 035import java.util.stream.Collectors; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.CellBuilderType; 041import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 042import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.Server; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionInfoBuilder; 050import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; 051import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint; 052import org.apache.hadoop.hbase.replication.ReplicationException; 053import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 054import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 055import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 056import org.apache.hadoop.hbase.replication.ReplicationPeers; 057import org.apache.hadoop.hbase.replication.ReplicationQueueId; 058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 059import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 060import org.apache.hadoop.hbase.replication.ReplicationUtils; 061import org.apache.hadoop.hbase.replication.SyncReplicationState; 062import org.apache.hadoop.hbase.testclassification.MediumTests; 063import org.apache.hadoop.hbase.testclassification.ReplicationTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.CommonFSUtils; 066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 067import org.apache.hadoop.hbase.wal.WAL; 068import org.apache.hadoop.hbase.wal.WALEdit; 069import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 070import org.apache.hadoop.hbase.wal.WALFactory; 071import org.apache.hadoop.hbase.wal.WALKeyImpl; 072import org.hamcrest.Matchers; 073import org.junit.jupiter.api.AfterAll; 074import org.junit.jupiter.api.AfterEach; 075import org.junit.jupiter.api.BeforeAll; 076import org.junit.jupiter.api.BeforeEach; 077import org.junit.jupiter.api.Tag; 078import org.junit.jupiter.api.Test; 079import org.junit.jupiter.api.TestInfo; 080 081import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 082 083@Tag(ReplicationTests.TAG) 084@Tag(MediumTests.TAG) 085public class TestReplicationSourceManager { 086 087 public static final class ReplicationEndpointForTest extends DummyReplicationEndpoint { 088 089 private String clusterKey; 090 091 @Override 092 public boolean replicate(ReplicateContext replicateContext) { 093 // if you want to block the replication, for example, do not want the recovered source to be 094 // removed 095 if (clusterKey.endsWith("error")) { 096 throw new RuntimeException("Inject error"); 097 } 098 return true; 099 } 100 101 @Override 102 public void init(Context context) throws IOException { 103 super.init(context); 104 this.clusterKey = context.getReplicationPeer().getPeerConfig().getClusterKey(); 105 } 106 107 } 108 109 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 110 111 private static Configuration CONF; 112 113 private static FileSystem FS; 114 115 private static final byte[] F1 = Bytes.toBytes("f1"); 116 117 private static final byte[] F2 = Bytes.toBytes("f2"); 118 119 private static final TableName TABLE_NAME = TableName.valueOf("test"); 120 121 private static RegionInfo RI; 122 123 private static NavigableMap<byte[], Integer> SCOPES; 124 125 private String testName; 126 127 private Path oldLogDir; 128 129 private Path logDir; 130 131 private Path remoteLogDir; 132 133 private Server server; 134 135 private Replication replication; 136 137 private ReplicationSourceManager manager; 138 139 @BeforeAll 140 public static void setUpBeforeClass() throws Exception { 141 UTIL.startMiniCluster(1); 142 FS = UTIL.getTestFileSystem(); 143 CONF = new Configuration(UTIL.getConfiguration()); 144 CONF.setLong("replication.sleep.before.failover", 0); 145 146 RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); 147 SCOPES = new TreeMap<>(Bytes.BYTES_COMPARATOR); 148 SCOPES.put(F1, 1); 149 SCOPES.put(F2, 0); 150 } 151 152 @AfterAll 153 public static void tearDownAfterClass() throws IOException { 154 UTIL.shutdownMiniCluster(); 155 } 156 157 @BeforeEach 158 public void setUp(TestInfo testInfo) throws Exception { 159 testName = testInfo.getTestMethod().get().getName(); 160 Path rootDir = UTIL.getDataTestDirOnTestFS(testName); 161 CommonFSUtils.setRootDir(CONF, rootDir); 162 server = mock(Server.class); 163 when(server.getConfiguration()).thenReturn(CONF); 164 when(server.getZooKeeper()).thenReturn(UTIL.getZooKeeperWatcher()); 165 when(server.getConnection()).thenReturn(UTIL.getConnection()); 166 ServerName sn = ServerName.valueOf("hostname.example.org", 1234, 1); 167 when(server.getServerName()).thenReturn(sn); 168 oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 169 FS.mkdirs(oldLogDir); 170 logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); 171 FS.mkdirs(logDir); 172 remoteLogDir = new Path(rootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); 173 FS.mkdirs(remoteLogDir); 174 TableName tableName = TableName.valueOf("replication_" + testName); 175 UTIL.getAdmin() 176 .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName)); 177 CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString()); 178 179 replication = new Replication(); 180 replication.initialize(server, FS, new Path(logDir, sn.toString()), oldLogDir, 181 new WALFactory(CONF, server.getServerName(), null)); 182 manager = replication.getReplicationManager(); 183 } 184 185 @AfterEach 186 public void tearDown() { 187 replication.stopReplicationService(); 188 } 189 190 /** 191 * Add a peer and wait for it to initialize 192 */ 193 private void addPeerAndWait(String peerId, String clusterKey, boolean syncRep) 194 throws ReplicationException, IOException { 195 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() 196 .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey) 197 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()); 198 if (syncRep) { 199 builder.setTableCFsMap(ImmutableMap.of(TABLE_NAME, Collections.emptyList())) 200 .setRemoteWALDir(FS.makeQualified(remoteLogDir).toString()); 201 } 202 203 manager.getReplicationPeers().getPeerStorage().addPeer(peerId, builder.build(), true, 204 syncRep ? SyncReplicationState.DOWNGRADE_ACTIVE : SyncReplicationState.NONE); 205 manager.addPeer(peerId); 206 UTIL.waitFor(20000, () -> { 207 ReplicationSourceInterface rs = manager.getSource(peerId); 208 return rs != null && rs.isSourceActive(); 209 }); 210 } 211 212 /** 213 * Remove a peer and wait for it to get cleaned up 214 */ 215 private void removePeerAndWait(String peerId) throws Exception { 216 ReplicationPeers rp = manager.getReplicationPeers(); 217 rp.getPeerStorage().removePeer(peerId); 218 manager.removePeer(peerId); 219 UTIL.waitFor(20000, () -> { 220 if (rp.getPeer(peerId) != null) { 221 return false; 222 } 223 if (manager.getSource(peerId) != null) { 224 return false; 225 } 226 return manager.getOldSources().stream().noneMatch(rs -> rs.getPeerId().equals(peerId)); 227 }); 228 } 229 230 private void createWALFile(Path file) throws Exception { 231 ProtobufLogWriter writer = new ProtobufLogWriter(); 232 try { 233 writer.init(FS, file, CONF, false, FS.getDefaultBlockSize(file), null); 234 WALKeyImpl key = new WALKeyImpl(RI.getEncodedNameAsBytes(), TABLE_NAME, 235 EnvironmentEdgeManager.currentTime(), SCOPES); 236 WALEdit edit = new WALEdit(); 237 WALEditInternalHelper.addExtendedCell(edit, 238 ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F1).setFamily(F1) 239 .setQualifier(F1).setType(Cell.Type.Put).setValue(F1).build()); 240 WALEditInternalHelper.addExtendedCell(edit, 241 ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F2).setFamily(F2) 242 .setQualifier(F2).setType(Cell.Type.Put).setValue(F2).build()); 243 writer.append(new WAL.Entry(key, edit)); 244 writer.sync(false); 245 } finally { 246 writer.close(); 247 } 248 } 249 250 @Test 251 public void testClaimQueue() throws Exception { 252 String peerId = "1"; 253 addPeerAndWait(peerId, "error", false); 254 ServerName serverName = ServerName.valueOf("hostname0.example.org", 12345, 123); 255 String walName1 = serverName.toString() + ".1"; 256 createWALFile(new Path(oldLogDir, walName1)); 257 ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId); 258 ReplicationQueueStorage queueStorage = manager.getQueueStorage(); 259 queueStorage.setOffset(queueId, "", new ReplicationGroupOffset(peerId, 0), 260 Collections.emptyMap()); 261 manager.claimQueue(queueId); 262 assertThat(manager.getOldSources(), hasSize(1)); 263 } 264 265 @Test 266 public void testSameWALPrefix() throws IOException { 267 String walName1 = "localhost,8080,12345-45678-Peer.34567"; 268 String walName2 = "localhost,8080,12345.56789"; 269 manager.postLogRoll(new Path(walName1)); 270 manager.postLogRoll(new Path(walName2)); 271 272 Set<String> latestWals = 273 manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet()); 274 assertThat(latestWals, 275 Matchers.<Set<String>> both(hasSize(2)).and(hasItems(walName1, walName2))); 276 } 277 278 private MetricsReplicationSourceSource getGlobalSource() { 279 return CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) 280 .getGlobalSource(); 281 } 282 283 @Test 284 public void testRemovePeerMetricsCleanup() throws Exception { 285 MetricsReplicationSourceSource globalSource = getGlobalSource(); 286 int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 287 String peerId = "DummyPeer"; 288 addPeerAndWait(peerId, "hbase", false); 289 // there is no latestPaths so the size of log queue should not change 290 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 291 292 ReplicationSourceInterface source = manager.getSource(peerId); 293 // Sanity check 294 assertNotNull(source); 295 int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 296 // Enqueue log and check if metrics updated 297 Path serverLogDir = new Path(logDir, server.getServerName().toString()); 298 source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1")); 299 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 300 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 301 globalSource.getSizeOfLogQueue()); 302 303 // Removing the peer should reset the global metrics 304 removePeerAndWait(peerId); 305 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 306 307 // Adding the same peer back again should reset the single source metrics 308 addPeerAndWait(peerId, "hbase", false); 309 source = manager.getSource(peerId); 310 assertNotNull(source); 311 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 312 globalSource.getSizeOfLogQueue()); 313 } 314 315 @Test 316 public void testDisablePeerMetricsCleanup() throws Exception { 317 final String peerId = "DummyPeer"; 318 try { 319 MetricsReplicationSourceSource globalSource = getGlobalSource(); 320 final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 321 addPeerAndWait(peerId, "hbase", false); 322 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 323 ReplicationSourceInterface source = manager.getSource(peerId); 324 // Sanity check 325 assertNotNull(source); 326 final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 327 // Enqueue log and check if metrics updated 328 Path serverLogDir = new Path(logDir, server.getServerName().toString()); 329 source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1")); 330 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 331 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 332 globalSource.getSizeOfLogQueue()); 333 334 // Refreshing the peer should decrement the global and single source metrics 335 manager.refreshSources(peerId); 336 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 337 338 source = manager.getSource(peerId); 339 assertNotNull(source); 340 assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 341 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 342 globalSource.getSizeOfLogQueue()); 343 } finally { 344 removePeerAndWait(peerId); 345 } 346 } 347 348 @Test 349 public void testRemoveRemoteWALs() throws Exception { 350 String peerId = "2"; 351 addPeerAndWait(peerId, "hbase", true); 352 // make sure that we can deal with files which does not exist 353 String walNameNotExists = 354 "remoteWAL-12345-" + peerId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX; 355 Path wal = new Path(logDir, walNameNotExists); 356 manager.postLogRoll(wal); 357 358 Path remoteLogDirForPeer = new Path(remoteLogDir, peerId); 359 FS.mkdirs(remoteLogDirForPeer); 360 String walName = "remoteWAL-12345-" + peerId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX; 361 Path remoteWAL = 362 new Path(remoteLogDirForPeer, walName).makeQualified(FS.getUri(), FS.getWorkingDirectory()); 363 FS.create(remoteWAL).close(); 364 wal = new Path(logDir, walName); 365 manager.postLogRoll(wal); 366 367 ReplicationSourceInterface source = manager.getSource(peerId); 368 manager.cleanOldLogs(walName, true, source); 369 assertFalse(FS.exists(remoteWAL)); 370 } 371 372 @Test 373 public void testPeerConfigurationOverridesPropagate() throws Exception { 374 Configuration globalConf = UTIL.getConfiguration(); 375 long globalSleepValue = 1000L; 376 globalConf.setLong("replication.source.sleepforretries", globalSleepValue); 377 378 long peerSleepOverride = 5000L; 379 String peerId = "testConfigOverridePeer"; 380 String clusterKey = "testPeerConfigOverride"; 381 382 ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() 383 .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey) 384 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()) 385 .putConfiguration("replication.source.sleepforretries", String.valueOf(peerSleepOverride)) 386 .build(); 387 388 manager.getReplicationPeers().getPeerStorage().addPeer(peerId, peerConfig, true, 389 SyncReplicationState.NONE); 390 manager.addPeer(peerId); 391 UTIL.waitFor(20000, () -> { 392 ReplicationSourceInterface rs = manager.getSource(peerId); 393 return rs != null && rs.isSourceActive(); 394 }); 395 396 ReplicationSource source = (ReplicationSource) manager.getSources().stream() 397 .filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null); 398 assertNotNull(source, "Source should be created for peer"); 399 400 assertEquals(peerSleepOverride, source.getSleepForRetries(), 401 "ReplicationSource should use peer config override for sleepForRetries"); 402 403 Map<String, ReplicationSourceShipper> workers = source.workerThreads; 404 if (!workers.isEmpty()) { 405 ReplicationSourceShipper shipper = workers.values().iterator().next(); 406 assertEquals(peerSleepOverride, shipper.getSleepForRetries(), 407 "ReplicationSourceShipper should use peer config override for sleepForRetries"); 408 409 ReplicationSourceWALReader reader = shipper.entryReader; 410 if (reader != null) { 411 assertEquals(peerSleepOverride, reader.getSleepForRetries(), 412 "ReplicationSourceWALReader should use peer config override for sleepForRetries"); 413 } 414 } 415 416 removePeerAndWait(peerId); 417 } 418 419 @Test 420 public void testPeerConfigurationIsolation() throws Exception { 421 Configuration globalConf = UTIL.getConfiguration(); 422 long globalSleepValue = 1000L; 423 globalConf.setLong("replication.source.sleepforretries", globalSleepValue); 424 425 // Create first peer WITH config override 426 long peerSleepOverride = 5000L; 427 String peerIdWithOverride = "peerWithOverride"; 428 String clusterKeyWithOverride = "testPeerWithOverride"; 429 430 ReplicationPeerConfig configWithOverride = ReplicationPeerConfig.newBuilder() 431 .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKeyWithOverride) 432 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()) 433 .putConfiguration("replication.source.sleepforretries", String.valueOf(peerSleepOverride)) 434 .build(); 435 436 manager.getReplicationPeers().getPeerStorage().addPeer(peerIdWithOverride, configWithOverride, 437 true, SyncReplicationState.NONE); 438 manager.addPeer(peerIdWithOverride); 439 440 // Create second peer WITHOUT config override 441 String peerIdWithoutOverride = "peerWithoutOverride"; 442 String clusterKeyWithoutOverride = "testPeerWithoutOverride"; 443 addPeerAndWait(peerIdWithoutOverride, clusterKeyWithoutOverride, false); 444 445 // Wait for both peers to be active 446 UTIL.waitFor(20000, () -> { 447 ReplicationSourceInterface rs1 = manager.getSource(peerIdWithOverride); 448 ReplicationSourceInterface rs2 = manager.getSource(peerIdWithoutOverride); 449 return rs1 != null && rs1.isSourceActive() && rs2 != null && rs2.isSourceActive(); 450 }); 451 452 // Verify peer with override uses the override value 453 ReplicationSource sourceWithOverride = (ReplicationSource) manager.getSources().stream() 454 .filter(s -> s.getPeerId().equals(peerIdWithOverride)).findFirst().orElse(null); 455 assertNotNull(sourceWithOverride, "Source with override should be created"); 456 assertEquals(peerSleepOverride, sourceWithOverride.getSleepForRetries(), 457 "Peer with override should use override value"); 458 459 // Verify peer without override uses global config 460 ReplicationSource sourceWithoutOverride = (ReplicationSource) manager.getSources().stream() 461 .filter(s -> s.getPeerId().equals(peerIdWithoutOverride)).findFirst().orElse(null); 462 assertNotNull(sourceWithoutOverride, "Source without override should be created"); 463 assertEquals(globalSleepValue, sourceWithoutOverride.getSleepForRetries(), 464 "Peer without override should use global config"); 465 466 removePeerAndWait(peerIdWithOverride); 467 removePeerAndWait(peerIdWithoutOverride); 468 } 469}