001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.hasItems;
022import static org.hamcrest.Matchers.hasSize;
023import static org.junit.jupiter.api.Assertions.assertEquals;
024import static org.junit.jupiter.api.Assertions.assertFalse;
025import static org.junit.jupiter.api.Assertions.assertNotNull;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.util.Collections;
031import java.util.Map;
032import java.util.NavigableMap;
033import java.util.Set;
034import java.util.TreeMap;
035import java.util.stream.Collectors;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellBuilderType;
041import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
042import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
043import org.apache.hadoop.hbase.HBaseTestingUtil;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
051import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
052import org.apache.hadoop.hbase.replication.ReplicationException;
053import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
054import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
055import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
056import org.apache.hadoop.hbase.replication.ReplicationPeers;
057import org.apache.hadoop.hbase.replication.ReplicationQueueId;
058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
059import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
060import org.apache.hadoop.hbase.replication.ReplicationUtils;
061import org.apache.hadoop.hbase.replication.SyncReplicationState;
062import org.apache.hadoop.hbase.testclassification.MediumTests;
063import org.apache.hadoop.hbase.testclassification.ReplicationTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.CommonFSUtils;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.wal.WAL;
068import org.apache.hadoop.hbase.wal.WALEdit;
069import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
070import org.apache.hadoop.hbase.wal.WALFactory;
071import org.apache.hadoop.hbase.wal.WALKeyImpl;
072import org.hamcrest.Matchers;
073import org.junit.jupiter.api.AfterAll;
074import org.junit.jupiter.api.AfterEach;
075import org.junit.jupiter.api.BeforeAll;
076import org.junit.jupiter.api.BeforeEach;
077import org.junit.jupiter.api.Tag;
078import org.junit.jupiter.api.Test;
079import org.junit.jupiter.api.TestInfo;
080
081import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
082
083@Tag(ReplicationTests.TAG)
084@Tag(MediumTests.TAG)
085public class TestReplicationSourceManager {
086
087  public static final class ReplicationEndpointForTest extends DummyReplicationEndpoint {
088
089    private String clusterKey;
090
091    @Override
092    public boolean replicate(ReplicateContext replicateContext) {
093      // if you want to block the replication, for example, do not want the recovered source to be
094      // removed
095      if (clusterKey.endsWith("error")) {
096        throw new RuntimeException("Inject error");
097      }
098      return true;
099    }
100
101    @Override
102    public void init(Context context) throws IOException {
103      super.init(context);
104      this.clusterKey = context.getReplicationPeer().getPeerConfig().getClusterKey();
105    }
106
107  }
108
109  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
110
111  private static Configuration CONF;
112
113  private static FileSystem FS;
114
115  private static final byte[] F1 = Bytes.toBytes("f1");
116
117  private static final byte[] F2 = Bytes.toBytes("f2");
118
119  private static final TableName TABLE_NAME = TableName.valueOf("test");
120
121  private static RegionInfo RI;
122
123  private static NavigableMap<byte[], Integer> SCOPES;
124
125  private String testName;
126
127  private Path oldLogDir;
128
129  private Path logDir;
130
131  private Path remoteLogDir;
132
133  private Server server;
134
135  private Replication replication;
136
137  private ReplicationSourceManager manager;
138
139  @BeforeAll
140  public static void setUpBeforeClass() throws Exception {
141    UTIL.startMiniCluster(1);
142    FS = UTIL.getTestFileSystem();
143    CONF = new Configuration(UTIL.getConfiguration());
144    CONF.setLong("replication.sleep.before.failover", 0);
145
146    RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
147    SCOPES = new TreeMap<>(Bytes.BYTES_COMPARATOR);
148    SCOPES.put(F1, 1);
149    SCOPES.put(F2, 0);
150  }
151
152  @AfterAll
153  public static void tearDownAfterClass() throws IOException {
154    UTIL.shutdownMiniCluster();
155  }
156
157  @BeforeEach
158  public void setUp(TestInfo testInfo) throws Exception {
159    testName = testInfo.getTestMethod().get().getName();
160    Path rootDir = UTIL.getDataTestDirOnTestFS(testName);
161    CommonFSUtils.setRootDir(CONF, rootDir);
162    server = mock(Server.class);
163    when(server.getConfiguration()).thenReturn(CONF);
164    when(server.getZooKeeper()).thenReturn(UTIL.getZooKeeperWatcher());
165    when(server.getConnection()).thenReturn(UTIL.getConnection());
166    ServerName sn = ServerName.valueOf("hostname.example.org", 1234, 1);
167    when(server.getServerName()).thenReturn(sn);
168    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
169    FS.mkdirs(oldLogDir);
170    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
171    FS.mkdirs(logDir);
172    remoteLogDir = new Path(rootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
173    FS.mkdirs(remoteLogDir);
174    TableName tableName = TableName.valueOf("replication_" + testName);
175    UTIL.getAdmin()
176      .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName));
177    CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
178
179    replication = new Replication();
180    replication.initialize(server, FS, new Path(logDir, sn.toString()), oldLogDir,
181      new WALFactory(CONF, server.getServerName(), null));
182    manager = replication.getReplicationManager();
183  }
184
185  @AfterEach
186  public void tearDown() {
187    replication.stopReplicationService();
188  }
189
190  /**
191   * Add a peer and wait for it to initialize
192   */
193  private void addPeerAndWait(String peerId, String clusterKey, boolean syncRep)
194    throws ReplicationException, IOException {
195    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
196      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey)
197      .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName());
198    if (syncRep) {
199      builder.setTableCFsMap(ImmutableMap.of(TABLE_NAME, Collections.emptyList()))
200        .setRemoteWALDir(FS.makeQualified(remoteLogDir).toString());
201    }
202
203    manager.getReplicationPeers().getPeerStorage().addPeer(peerId, builder.build(), true,
204      syncRep ? SyncReplicationState.DOWNGRADE_ACTIVE : SyncReplicationState.NONE);
205    manager.addPeer(peerId);
206    UTIL.waitFor(20000, () -> {
207      ReplicationSourceInterface rs = manager.getSource(peerId);
208      return rs != null && rs.isSourceActive();
209    });
210  }
211
212  /**
213   * Remove a peer and wait for it to get cleaned up
214   */
215  private void removePeerAndWait(String peerId) throws Exception {
216    ReplicationPeers rp = manager.getReplicationPeers();
217    rp.getPeerStorage().removePeer(peerId);
218    manager.removePeer(peerId);
219    UTIL.waitFor(20000, () -> {
220      if (rp.getPeer(peerId) != null) {
221        return false;
222      }
223      if (manager.getSource(peerId) != null) {
224        return false;
225      }
226      return manager.getOldSources().stream().noneMatch(rs -> rs.getPeerId().equals(peerId));
227    });
228  }
229
230  private void createWALFile(Path file) throws Exception {
231    ProtobufLogWriter writer = new ProtobufLogWriter();
232    try {
233      writer.init(FS, file, CONF, false, FS.getDefaultBlockSize(file), null);
234      WALKeyImpl key = new WALKeyImpl(RI.getEncodedNameAsBytes(), TABLE_NAME,
235        EnvironmentEdgeManager.currentTime(), SCOPES);
236      WALEdit edit = new WALEdit();
237      WALEditInternalHelper.addExtendedCell(edit,
238        ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F1).setFamily(F1)
239          .setQualifier(F1).setType(Cell.Type.Put).setValue(F1).build());
240      WALEditInternalHelper.addExtendedCell(edit,
241        ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F2).setFamily(F2)
242          .setQualifier(F2).setType(Cell.Type.Put).setValue(F2).build());
243      writer.append(new WAL.Entry(key, edit));
244      writer.sync(false);
245    } finally {
246      writer.close();
247    }
248  }
249
250  @Test
251  public void testClaimQueue() throws Exception {
252    String peerId = "1";
253    addPeerAndWait(peerId, "error", false);
254    ServerName serverName = ServerName.valueOf("hostname0.example.org", 12345, 123);
255    String walName1 = serverName.toString() + ".1";
256    createWALFile(new Path(oldLogDir, walName1));
257    ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId);
258    ReplicationQueueStorage queueStorage = manager.getQueueStorage();
259    queueStorage.setOffset(queueId, "", new ReplicationGroupOffset(peerId, 0),
260      Collections.emptyMap());
261    manager.claimQueue(queueId);
262    assertThat(manager.getOldSources(), hasSize(1));
263  }
264
265  @Test
266  public void testSameWALPrefix() throws IOException {
267    String walName1 = "localhost,8080,12345-45678-Peer.34567";
268    String walName2 = "localhost,8080,12345.56789";
269    manager.postLogRoll(new Path(walName1));
270    manager.postLogRoll(new Path(walName2));
271
272    Set<String> latestWals =
273      manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet());
274    assertThat(latestWals,
275      Matchers.<Set<String>> both(hasSize(2)).and(hasItems(walName1, walName2)));
276  }
277
278  private MetricsReplicationSourceSource getGlobalSource() {
279    return CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
280      .getGlobalSource();
281  }
282
283  @Test
284  public void testRemovePeerMetricsCleanup() throws Exception {
285    MetricsReplicationSourceSource globalSource = getGlobalSource();
286    int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
287    String peerId = "DummyPeer";
288    addPeerAndWait(peerId, "hbase", false);
289    // there is no latestPaths so the size of log queue should not change
290    assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
291
292    ReplicationSourceInterface source = manager.getSource(peerId);
293    // Sanity check
294    assertNotNull(source);
295    int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
296    // Enqueue log and check if metrics updated
297    Path serverLogDir = new Path(logDir, server.getServerName().toString());
298    source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1"));
299    assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
300    assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
301      globalSource.getSizeOfLogQueue());
302
303    // Removing the peer should reset the global metrics
304    removePeerAndWait(peerId);
305    assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
306
307    // Adding the same peer back again should reset the single source metrics
308    addPeerAndWait(peerId, "hbase", false);
309    source = manager.getSource(peerId);
310    assertNotNull(source);
311    assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
312      globalSource.getSizeOfLogQueue());
313  }
314
315  @Test
316  public void testDisablePeerMetricsCleanup() throws Exception {
317    final String peerId = "DummyPeer";
318    try {
319      MetricsReplicationSourceSource globalSource = getGlobalSource();
320      final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
321      addPeerAndWait(peerId, "hbase", false);
322      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
323      ReplicationSourceInterface source = manager.getSource(peerId);
324      // Sanity check
325      assertNotNull(source);
326      final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
327      // Enqueue log and check if metrics updated
328      Path serverLogDir = new Path(logDir, server.getServerName().toString());
329      source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1"));
330      assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
331      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
332        globalSource.getSizeOfLogQueue());
333
334      // Refreshing the peer should decrement the global and single source metrics
335      manager.refreshSources(peerId);
336      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
337
338      source = manager.getSource(peerId);
339      assertNotNull(source);
340      assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
341      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
342        globalSource.getSizeOfLogQueue());
343    } finally {
344      removePeerAndWait(peerId);
345    }
346  }
347
348  @Test
349  public void testRemoveRemoteWALs() throws Exception {
350    String peerId = "2";
351    addPeerAndWait(peerId, "hbase", true);
352    // make sure that we can deal with files which does not exist
353    String walNameNotExists =
354      "remoteWAL-12345-" + peerId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX;
355    Path wal = new Path(logDir, walNameNotExists);
356    manager.postLogRoll(wal);
357
358    Path remoteLogDirForPeer = new Path(remoteLogDir, peerId);
359    FS.mkdirs(remoteLogDirForPeer);
360    String walName = "remoteWAL-12345-" + peerId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX;
361    Path remoteWAL =
362      new Path(remoteLogDirForPeer, walName).makeQualified(FS.getUri(), FS.getWorkingDirectory());
363    FS.create(remoteWAL).close();
364    wal = new Path(logDir, walName);
365    manager.postLogRoll(wal);
366
367    ReplicationSourceInterface source = manager.getSource(peerId);
368    manager.cleanOldLogs(walName, true, source);
369    assertFalse(FS.exists(remoteWAL));
370  }
371
372  @Test
373  public void testPeerConfigurationOverridesPropagate() throws Exception {
374    Configuration globalConf = UTIL.getConfiguration();
375    long globalSleepValue = 1000L;
376    globalConf.setLong("replication.source.sleepforretries", globalSleepValue);
377
378    long peerSleepOverride = 5000L;
379    String peerId = "testConfigOverridePeer";
380    String clusterKey = "testPeerConfigOverride";
381
382    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
383      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey)
384      .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName())
385      .putConfiguration("replication.source.sleepforretries", String.valueOf(peerSleepOverride))
386      .build();
387
388    manager.getReplicationPeers().getPeerStorage().addPeer(peerId, peerConfig, true,
389      SyncReplicationState.NONE);
390    manager.addPeer(peerId);
391    UTIL.waitFor(20000, () -> {
392      ReplicationSourceInterface rs = manager.getSource(peerId);
393      return rs != null && rs.isSourceActive();
394    });
395
396    ReplicationSource source = (ReplicationSource) manager.getSources().stream()
397      .filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null);
398    assertNotNull(source, "Source should be created for peer");
399
400    assertEquals(peerSleepOverride, source.getSleepForRetries(),
401      "ReplicationSource should use peer config override for sleepForRetries");
402
403    Map<String, ReplicationSourceShipper> workers = source.workerThreads;
404    if (!workers.isEmpty()) {
405      ReplicationSourceShipper shipper = workers.values().iterator().next();
406      assertEquals(peerSleepOverride, shipper.getSleepForRetries(),
407        "ReplicationSourceShipper should use peer config override for sleepForRetries");
408
409      ReplicationSourceWALReader reader = shipper.entryReader;
410      if (reader != null) {
411        assertEquals(peerSleepOverride, reader.getSleepForRetries(),
412          "ReplicationSourceWALReader should use peer config override for sleepForRetries");
413      }
414    }
415
416    removePeerAndWait(peerId);
417  }
418
419  @Test
420  public void testPeerConfigurationIsolation() throws Exception {
421    Configuration globalConf = UTIL.getConfiguration();
422    long globalSleepValue = 1000L;
423    globalConf.setLong("replication.source.sleepforretries", globalSleepValue);
424
425    // Create first peer WITH config override
426    long peerSleepOverride = 5000L;
427    String peerIdWithOverride = "peerWithOverride";
428    String clusterKeyWithOverride = "testPeerWithOverride";
429
430    ReplicationPeerConfig configWithOverride = ReplicationPeerConfig.newBuilder()
431      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKeyWithOverride)
432      .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName())
433      .putConfiguration("replication.source.sleepforretries", String.valueOf(peerSleepOverride))
434      .build();
435
436    manager.getReplicationPeers().getPeerStorage().addPeer(peerIdWithOverride, configWithOverride,
437      true, SyncReplicationState.NONE);
438    manager.addPeer(peerIdWithOverride);
439
440    // Create second peer WITHOUT config override
441    String peerIdWithoutOverride = "peerWithoutOverride";
442    String clusterKeyWithoutOverride = "testPeerWithoutOverride";
443    addPeerAndWait(peerIdWithoutOverride, clusterKeyWithoutOverride, false);
444
445    // Wait for both peers to be active
446    UTIL.waitFor(20000, () -> {
447      ReplicationSourceInterface rs1 = manager.getSource(peerIdWithOverride);
448      ReplicationSourceInterface rs2 = manager.getSource(peerIdWithoutOverride);
449      return rs1 != null && rs1.isSourceActive() && rs2 != null && rs2.isSourceActive();
450    });
451
452    // Verify peer with override uses the override value
453    ReplicationSource sourceWithOverride = (ReplicationSource) manager.getSources().stream()
454      .filter(s -> s.getPeerId().equals(peerIdWithOverride)).findFirst().orElse(null);
455    assertNotNull(sourceWithOverride, "Source with override should be created");
456    assertEquals(peerSleepOverride, sourceWithOverride.getSleepForRetries(),
457      "Peer with override should use override value");
458
459    // Verify peer without override uses global config
460    ReplicationSource sourceWithoutOverride = (ReplicationSource) manager.getSources().stream()
461      .filter(s -> s.getPeerId().equals(peerIdWithoutOverride)).findFirst().orElse(null);
462    assertNotNull(sourceWithoutOverride, "Source without override should be created");
463    assertEquals(globalSleepValue, sourceWithoutOverride.getSleepForRetries(),
464      "Peer without override should use global config");
465
466    removePeerAndWait(peerIdWithOverride);
467    removePeerAndWait(peerIdWithoutOverride);
468  }
469}