001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.hasItems; 022import static org.hamcrest.Matchers.hasSize; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.Collections; 031import java.util.NavigableMap; 032import java.util.Set; 033import java.util.TreeMap; 034import java.util.stream.Collectors; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellBuilderFactory; 040import org.apache.hadoop.hbase.CellBuilderType; 041import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.Server; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionInfoBuilder; 050import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; 051import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint; 052import org.apache.hadoop.hbase.replication.ReplicationException; 053import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 054import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 055import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 056import org.apache.hadoop.hbase.replication.ReplicationPeers; 057import org.apache.hadoop.hbase.replication.ReplicationQueueId; 058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 059import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 060import org.apache.hadoop.hbase.replication.ReplicationUtils; 061import org.apache.hadoop.hbase.replication.SyncReplicationState; 062import org.apache.hadoop.hbase.testclassification.MediumTests; 063import org.apache.hadoop.hbase.testclassification.ReplicationTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.CommonFSUtils; 066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 067import org.apache.hadoop.hbase.wal.WAL; 068import org.apache.hadoop.hbase.wal.WALEdit; 069import org.apache.hadoop.hbase.wal.WALFactory; 070import org.apache.hadoop.hbase.wal.WALKeyImpl; 071import org.hamcrest.Matchers; 072import org.junit.After; 073import org.junit.AfterClass; 074import org.junit.Before; 075import org.junit.BeforeClass; 076import org.junit.ClassRule; 077import org.junit.Rule; 078import org.junit.Test; 079import org.junit.experimental.categories.Category; 080import org.junit.rules.TestName; 081 082import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 083 084@Category({ ReplicationTests.class, MediumTests.class }) 085public class TestReplicationSourceManager { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestReplicationSourceManager.class); 090 091 public static final class ReplicationEndpointForTest extends DummyReplicationEndpoint { 092 093 private String clusterKey; 094 095 @Override 096 public boolean replicate(ReplicateContext replicateContext) { 097 // if you want to block the replication, for example, do not want the recovered source to be 098 // removed 099 if (clusterKey.endsWith("error")) { 100 throw new RuntimeException("Inject error"); 101 } 102 return true; 103 } 104 105 @Override 106 public void init(Context context) throws IOException { 107 super.init(context); 108 this.clusterKey = context.getReplicationPeer().getPeerConfig().getClusterKey(); 109 } 110 111 } 112 113 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 114 115 private static Configuration CONF; 116 117 private static FileSystem FS; 118 119 private static final byte[] F1 = Bytes.toBytes("f1"); 120 121 private static final byte[] F2 = Bytes.toBytes("f2"); 122 123 private static final TableName TABLE_NAME = TableName.valueOf("test"); 124 125 private static RegionInfo RI; 126 127 private static NavigableMap<byte[], Integer> SCOPES; 128 129 @Rule 130 public final TestName name = new TestName(); 131 132 private Path oldLogDir; 133 134 private Path logDir; 135 136 private Path remoteLogDir; 137 138 private Server server; 139 140 private Replication replication; 141 142 private ReplicationSourceManager manager; 143 144 @BeforeClass 145 public static void setUpBeforeClass() throws Exception { 146 UTIL.startMiniCluster(1); 147 FS = UTIL.getTestFileSystem(); 148 CONF = new Configuration(UTIL.getConfiguration()); 149 CONF.setLong("replication.sleep.before.failover", 0); 150 151 RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); 152 SCOPES = new TreeMap<>(Bytes.BYTES_COMPARATOR); 153 SCOPES.put(F1, 1); 154 SCOPES.put(F2, 0); 155 } 156 157 @AfterClass 158 public static void tearDownAfterClass() throws IOException { 159 UTIL.shutdownMiniCluster(); 160 } 161 162 @Before 163 public void setUp() throws Exception { 164 Path rootDir = UTIL.getDataTestDirOnTestFS(name.getMethodName()); 165 CommonFSUtils.setRootDir(CONF, rootDir); 166 server = mock(Server.class); 167 when(server.getConfiguration()).thenReturn(CONF); 168 when(server.getZooKeeper()).thenReturn(UTIL.getZooKeeperWatcher()); 169 when(server.getConnection()).thenReturn(UTIL.getConnection()); 170 ServerName sn = ServerName.valueOf("hostname.example.org", 1234, 1); 171 when(server.getServerName()).thenReturn(sn); 172 oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 173 FS.mkdirs(oldLogDir); 174 logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); 175 FS.mkdirs(logDir); 176 remoteLogDir = new Path(rootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); 177 FS.mkdirs(remoteLogDir); 178 TableName tableName = TableName.valueOf("replication_" + name.getMethodName()); 179 UTIL.getAdmin() 180 .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName)); 181 CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString()); 182 183 replication = new Replication(); 184 replication.initialize(server, FS, new Path(logDir, sn.toString()), oldLogDir, 185 new WALFactory(CONF, server.getServerName(), null)); 186 manager = replication.getReplicationManager(); 187 } 188 189 @After 190 public void tearDown() { 191 replication.stopReplicationService(); 192 } 193 194 /** 195 * Add a peer and wait for it to initialize 196 */ 197 private void addPeerAndWait(String peerId, String clusterKey, boolean syncRep) 198 throws ReplicationException, IOException { 199 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() 200 .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey) 201 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()); 202 if (syncRep) { 203 builder.setTableCFsMap(ImmutableMap.of(TABLE_NAME, Collections.emptyList())) 204 .setRemoteWALDir(FS.makeQualified(remoteLogDir).toString()); 205 } 206 207 manager.getReplicationPeers().getPeerStorage().addPeer(peerId, builder.build(), true, 208 syncRep ? SyncReplicationState.DOWNGRADE_ACTIVE : SyncReplicationState.NONE); 209 manager.addPeer(peerId); 210 UTIL.waitFor(20000, () -> { 211 ReplicationSourceInterface rs = manager.getSource(peerId); 212 return rs != null && rs.isSourceActive(); 213 }); 214 } 215 216 /** 217 * Remove a peer and wait for it to get cleaned up 218 */ 219 private void removePeerAndWait(String peerId) throws Exception { 220 ReplicationPeers rp = manager.getReplicationPeers(); 221 rp.getPeerStorage().removePeer(peerId); 222 manager.removePeer(peerId); 223 UTIL.waitFor(20000, () -> { 224 if (rp.getPeer(peerId) != null) { 225 return false; 226 } 227 if (manager.getSource(peerId) != null) { 228 return false; 229 } 230 return manager.getOldSources().stream().noneMatch(rs -> rs.getPeerId().equals(peerId)); 231 }); 232 } 233 234 private void createWALFile(Path file) throws Exception { 235 ProtobufLogWriter writer = new ProtobufLogWriter(); 236 try { 237 writer.init(FS, file, CONF, false, FS.getDefaultBlockSize(file), null); 238 WALKeyImpl key = new WALKeyImpl(RI.getEncodedNameAsBytes(), TABLE_NAME, 239 EnvironmentEdgeManager.currentTime(), SCOPES); 240 WALEdit edit = new WALEdit(); 241 edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F1).setFamily(F1) 242 .setQualifier(F1).setType(Cell.Type.Put).setValue(F1).build()); 243 edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F2).setFamily(F2) 244 .setQualifier(F2).setType(Cell.Type.Put).setValue(F2).build()); 245 writer.append(new WAL.Entry(key, edit)); 246 writer.sync(false); 247 } finally { 248 writer.close(); 249 } 250 } 251 252 @Test 253 public void testClaimQueue() throws Exception { 254 String peerId = "1"; 255 addPeerAndWait(peerId, "error", false); 256 ServerName serverName = ServerName.valueOf("hostname0.example.org", 12345, 123); 257 String walName1 = serverName.toString() + ".1"; 258 createWALFile(new Path(oldLogDir, walName1)); 259 ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId); 260 ReplicationQueueStorage queueStorage = manager.getQueueStorage(); 261 queueStorage.setOffset(queueId, "", new ReplicationGroupOffset(peerId, 0), 262 Collections.emptyMap()); 263 manager.claimQueue(queueId); 264 assertThat(manager.getOldSources(), hasSize(1)); 265 } 266 267 @Test 268 public void testSameWALPrefix() throws IOException { 269 String walName1 = "localhost,8080,12345-45678-Peer.34567"; 270 String walName2 = "localhost,8080,12345.56789"; 271 manager.postLogRoll(new Path(walName1)); 272 manager.postLogRoll(new Path(walName2)); 273 274 Set<String> latestWals = 275 manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet()); 276 assertThat(latestWals, 277 Matchers.<Set<String>> both(hasSize(2)).and(hasItems(walName1, walName2))); 278 } 279 280 private MetricsReplicationSourceSource getGlobalSource() { 281 return CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) 282 .getGlobalSource(); 283 } 284 285 @Test 286 public void testRemovePeerMetricsCleanup() throws Exception { 287 MetricsReplicationSourceSource globalSource = getGlobalSource(); 288 int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 289 String peerId = "DummyPeer"; 290 addPeerAndWait(peerId, "hbase", false); 291 // there is no latestPaths so the size of log queue should not change 292 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 293 294 ReplicationSourceInterface source = manager.getSource(peerId); 295 // Sanity check 296 assertNotNull(source); 297 int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 298 // Enqueue log and check if metrics updated 299 Path serverLogDir = new Path(logDir, server.getServerName().toString()); 300 source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1")); 301 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 302 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 303 globalSource.getSizeOfLogQueue()); 304 305 // Removing the peer should reset the global metrics 306 removePeerAndWait(peerId); 307 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 308 309 // Adding the same peer back again should reset the single source metrics 310 addPeerAndWait(peerId, "hbase", false); 311 source = manager.getSource(peerId); 312 assertNotNull(source); 313 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 314 globalSource.getSizeOfLogQueue()); 315 } 316 317 @Test 318 public void testDisablePeerMetricsCleanup() throws Exception { 319 final String peerId = "DummyPeer"; 320 try { 321 MetricsReplicationSourceSource globalSource = getGlobalSource(); 322 final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 323 addPeerAndWait(peerId, "hbase", false); 324 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 325 ReplicationSourceInterface source = manager.getSource(peerId); 326 // Sanity check 327 assertNotNull(source); 328 final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 329 // Enqueue log and check if metrics updated 330 Path serverLogDir = new Path(logDir, server.getServerName().toString()); 331 source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1")); 332 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 333 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 334 globalSource.getSizeOfLogQueue()); 335 336 // Refreshing the peer should decrement the global and single source metrics 337 manager.refreshSources(peerId); 338 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 339 340 source = manager.getSource(peerId); 341 assertNotNull(source); 342 assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 343 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 344 globalSource.getSizeOfLogQueue()); 345 } finally { 346 removePeerAndWait(peerId); 347 } 348 } 349 350 @Test 351 public void testRemoveRemoteWALs() throws Exception { 352 String peerId = "2"; 353 addPeerAndWait(peerId, "hbase", true); 354 // make sure that we can deal with files which does not exist 355 String walNameNotExists = 356 "remoteWAL-12345-" + peerId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX; 357 Path wal = new Path(logDir, walNameNotExists); 358 manager.postLogRoll(wal); 359 360 Path remoteLogDirForPeer = new Path(remoteLogDir, peerId); 361 FS.mkdirs(remoteLogDirForPeer); 362 String walName = "remoteWAL-12345-" + peerId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX; 363 Path remoteWAL = 364 new Path(remoteLogDirForPeer, walName).makeQualified(FS.getUri(), FS.getWorkingDirectory()); 365 FS.create(remoteWAL).close(); 366 wal = new Path(logDir, walName); 367 manager.postLogRoll(wal); 368 369 ReplicationSourceInterface source = manager.getSource(peerId); 370 manager.cleanOldLogs(walName, true, source); 371 assertFalse(FS.exists(remoteWAL)); 372 } 373}