001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026import static org.mockito.Mockito.doNothing;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.when;
029
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.OptionalLong;
033import java.util.UUID;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.Future;
037import java.util.concurrent.atomic.AtomicLong;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.CellBuilderFactory;
043import org.apache.hadoop.hbase.CellBuilderType;
044import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HBaseTestingUtility;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.KeyValue;
050import org.apache.hadoop.hbase.MiniHBaseCluster;
051import org.apache.hadoop.hbase.Server;
052import org.apache.hadoop.hbase.ServerName;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.Waiter;
055import org.apache.hadoop.hbase.client.Admin;
056import org.apache.hadoop.hbase.regionserver.HRegionServer;
057import org.apache.hadoop.hbase.regionserver.RegionServerServices;
058import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
059import org.apache.hadoop.hbase.replication.ReplicationPeer;
060import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
061import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
062import org.apache.hadoop.hbase.replication.WALEntryFilter;
063import org.apache.hadoop.hbase.testclassification.MediumTests;
064import org.apache.hadoop.hbase.testclassification.ReplicationTests;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
068import org.apache.hadoop.hbase.wal.WAL;
069import org.apache.hadoop.hbase.wal.WALEdit;
070import org.apache.hadoop.hbase.wal.WALFactory;
071import org.apache.hadoop.hbase.wal.WALKeyImpl;
072import org.apache.hadoop.hbase.wal.WALProvider;
073import org.junit.AfterClass;
074import org.junit.BeforeClass;
075import org.junit.ClassRule;
076import org.junit.Test;
077import org.junit.experimental.categories.Category;
078import org.mockito.Mockito;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082@Category({ ReplicationTests.class, MediumTests.class })
083public class TestReplicationSource {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE =
087    HBaseClassTestRule.forClass(TestReplicationSource.class);
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSource.class);
090  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
091  private final static HBaseTestingUtility TEST_UTIL_PEER = new HBaseTestingUtility();
092  private static FileSystem FS;
093  private static Path oldLogDir;
094  private static Path logDir;
095  private static Configuration conf = TEST_UTIL.getConfiguration();
096
097  @BeforeClass
098  public static void setUpBeforeClass() throws Exception {
099    TEST_UTIL.startMiniDFSCluster(1);
100    FS = TEST_UTIL.getDFSCluster().getFileSystem();
101    Path rootDir = TEST_UTIL.createRootDir();
102    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
103    if (FS.exists(oldLogDir)) {
104      FS.delete(oldLogDir, true);
105    }
106    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
107    if (FS.exists(logDir)) {
108      FS.delete(logDir, true);
109    }
110  }
111
112  @AfterClass
113  public static void tearDownAfterClass() throws Exception {
114    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
115    TEST_UTIL.shutdownMiniHBaseCluster();
116    TEST_UTIL.shutdownMiniDFSCluster();
117  }
118
119  /**
120   * Test the default ReplicationSource skips queuing hbase:meta WAL files.
121   */
122  @Test
123  public void testDefaultSkipsMetaWAL() throws IOException {
124    ReplicationSource rs = new ReplicationSource();
125    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
126    conf.setInt("replication.source.maxretriesmultiplier", 1);
127    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
128    when(mockPeer.getConfiguration()).thenReturn(conf);
129    when(mockPeer.getPeerBandwidth()).thenReturn(0L);
130    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
131    when(peerConfig.getReplicationEndpointImpl())
132      .thenReturn(DoNothingReplicationEndpoint.class.getName());
133    when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
134    ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
135    when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
136    Mockito.when(manager.getGlobalMetrics())
137      .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
138    String queueId = "qid";
139    RegionServerServices rss =
140      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
141    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(),
142      new MetricsSource(queueId));
143    try {
144      rs.startup();
145      assertTrue(rs.isSourceActive());
146      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
147      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
148      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
149      rs.enqueueLog(new Path("a.1"));
150      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
151    } finally {
152      rs.terminate("Done");
153      rss.stop("Done");
154    }
155  }
156
157  /**
158   * Test that we filter out meta edits, etc.
159   */
160  @Test
161  public void testWALEntryFilter() throws IOException {
162    // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource
163    // instance and init it.
164    ReplicationSource rs = new ReplicationSource();
165    UUID uuid = UUID.randomUUID();
166    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
167    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
168    when(mockPeer.getConfiguration()).thenReturn(conf);
169    when(mockPeer.getPeerBandwidth()).thenReturn(0L);
170    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
171    when(peerConfig.getReplicationEndpointImpl())
172      .thenReturn(DoNothingReplicationEndpoint.class.getName());
173    when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
174    ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
175    when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
176    String queueId = "qid";
177    RegionServerServices rss =
178      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
179    rs.init(conf, null, manager, null, mockPeer, rss, queueId, uuid, p -> OptionalLong.empty(),
180      new MetricsSource(queueId));
181    try {
182      rs.startup();
183      TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
184      WALEntryFilter wef = rs.getWalEntryFilter();
185      // Test non-system WAL edit.
186      WALEdit we = new WALEdit()
187        .add(CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(HConstants.EMPTY_START_ROW)
188          .setFamily(HConstants.CATALOG_FAMILY).setType(Cell.Type.Put).build());
189      WAL.Entry e = new WAL.Entry(
190        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.valueOf("test"), -1, -1, uuid), we);
191      assertTrue(wef.filter(e) == e);
192      // Test system WAL edit.
193      e = new WAL.Entry(
194        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1, -1, uuid), we);
195      assertNull(wef.filter(e));
196    } finally {
197      rs.terminate("Done");
198      rss.stop("Done");
199    }
200  }
201
202  /**
203   * Sanity check that we can move logs around while we are reading from them. Should this test
204   * fail, ReplicationSource would have a hard time reading logs that are being archived.
205   */
206  // This tests doesn't belong in here... it is not about ReplicationSource.
207  @Test
208  public void testLogMoving() throws Exception {
209    Path logPath = new Path(logDir, "log");
210    if (!FS.exists(logDir)) {
211      FS.mkdirs(logDir);
212    }
213    if (!FS.exists(oldLogDir)) {
214      FS.mkdirs(oldLogDir);
215    }
216    WALProvider.Writer writer =
217      WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration());
218    for (int i = 0; i < 3; i++) {
219      byte[] b = Bytes.toBytes(Integer.toString(i));
220      KeyValue kv = new KeyValue(b, b, b);
221      WALEdit edit = new WALEdit();
222      edit.add(kv);
223      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID);
224      writer.append(new WAL.Entry(key, edit));
225      writer.sync(false);
226    }
227    writer.close();
228
229    WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration());
230    WAL.Entry entry = reader.next();
231    assertNotNull(entry);
232
233    Path oldLogPath = new Path(oldLogDir, "log");
234    FS.rename(logPath, oldLogPath);
235
236    entry = reader.next();
237    assertNotNull(entry);
238
239    reader.next();
240    entry = reader.next();
241
242    assertNull(entry);
243    reader.close();
244  }
245
246  /**
247   * Tests that {@link ReplicationSource#terminate(String)} will timeout properly Moved here from
248   * TestReplicationSource because doesn't need cluster.
249   */
250  @Test
251  public void testTerminateTimeout() throws Exception {
252    ReplicationSource source = new ReplicationSource();
253    ReplicationEndpoint replicationEndpoint = new DoNothingReplicationEndpoint();
254    try {
255      replicationEndpoint.start();
256      ReplicationPeer mockPeer = mock(ReplicationPeer.class);
257      when(mockPeer.getPeerBandwidth()).thenReturn(0L);
258      Configuration testConf = HBaseConfiguration.create();
259      testConf.setInt("replication.source.maxretriesmultiplier", 1);
260      ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
261      when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
262      source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
263        p -> OptionalLong.empty(), null);
264      ExecutorService executor = Executors.newSingleThreadExecutor();
265      Future<?> future = executor.submit(() -> source.terminate("testing source termination"));
266      long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
267      Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone);
268    } finally {
269      replicationEndpoint.stop();
270    }
271  }
272
273  @Test
274  public void testTerminateClearsBuffer() throws Exception {
275    ReplicationSource source = new ReplicationSource();
276    ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class);
277    MetricsReplicationGlobalSourceSource mockMetrics =
278      mock(MetricsReplicationGlobalSourceSource.class);
279    AtomicLong buffer = new AtomicLong();
280    Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer);
281    Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics);
282    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
283    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
284    Configuration testConf = HBaseConfiguration.create();
285    source.init(testConf, null, mockManager, null, mockPeer, Mockito.mock(Server.class), "testPeer",
286      null, p -> OptionalLong.empty(), mock(MetricsSource.class));
287    ReplicationSourceWALReader reader =
288      new ReplicationSourceWALReader(null, conf, null, 0, null, source, null);
289    ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, null, source);
290    shipper.entryReader = reader;
291    source.workerThreads.put("testPeer", shipper);
292    WALEntryBatch batch = new WALEntryBatch(10, logDir);
293    WAL.Entry mockEntry = mock(WAL.Entry.class);
294    WALEdit mockEdit = mock(WALEdit.class);
295    WALKeyImpl mockKey = mock(WALKeyImpl.class);
296    when(mockEntry.getEdit()).thenReturn(mockEdit);
297    when(mockEdit.isEmpty()).thenReturn(false);
298    when(mockEntry.getKey()).thenReturn(mockKey);
299    when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
300    when(mockEdit.heapSize()).thenReturn(10000L);
301    when(mockEdit.size()).thenReturn(0);
302    ArrayList<Cell> cells = new ArrayList<>();
303    KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), Bytes.toBytes("1"),
304      Bytes.toBytes("v1"));
305    cells.add(kv);
306    when(mockEdit.getCells()).thenReturn(cells);
307    reader.addEntryToBatch(batch, mockEntry);
308    reader.entryBatchQueue.put(batch);
309    source.terminate("test");
310    assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
311  }
312
313  /**
314   * Tests that recovered queues are preserved on a regionserver shutdown. See HBASE-18192
315   */
316  @Test
317  public void testServerShutdownRecoveredQueue() throws Exception {
318    try {
319      // Ensure single-threaded WAL
320      conf.set("hbase.wal.provider", "defaultProvider");
321      conf.setInt("replication.sleep.before.failover", 2000);
322      // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
323      conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
324      MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
325      TEST_UTIL_PEER.startMiniCluster(1);
326
327      HRegionServer serverA = cluster.getRegionServer(0);
328      final ReplicationSourceManager managerA =
329        serverA.getReplicationSourceService().getReplicationManager();
330      HRegionServer serverB = cluster.getRegionServer(1);
331      final ReplicationSourceManager managerB =
332        serverB.getReplicationSourceService().getReplicationManager();
333      final Admin admin = TEST_UTIL.getAdmin();
334
335      final String peerId = "TestPeer";
336      admin.addReplicationPeer(peerId,
337        ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
338      // Wait for replication sources to come up
339      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
340        @Override
341        public boolean evaluate() {
342          return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
343        }
344      });
345      // Disabling peer makes sure there is at least one log to claim when the server dies
346      // The recovered queue will also stay there until the peer is disabled even if the
347      // WALs it contains have no data.
348      admin.disableReplicationPeer(peerId);
349
350      // Stopping serverA
351      // It's queues should be claimed by the only other alive server i.e. serverB
352      cluster.stopRegionServer(serverA.getServerName());
353      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
354        @Override
355        public boolean evaluate() throws Exception {
356          return managerB.getOldSources().size() == 1;
357        }
358      });
359
360      final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
361      serverC.waitForServerOnline();
362      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
363        @Override
364        public boolean evaluate() throws Exception {
365          return serverC.getReplicationSourceService() != null;
366        }
367      });
368      final ReplicationSourceManager managerC =
369        ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
370      // Sanity check
371      assertEquals(0, managerC.getOldSources().size());
372
373      // Stopping serverB
374      // Now serverC should have two recovered queues:
375      // 1. The serverB's normal queue
376      // 2. serverA's recovered queue on serverB
377      cluster.stopRegionServer(serverB.getServerName());
378      Waiter.waitFor(conf, 20000,
379        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2);
380      admin.enableReplicationPeer(peerId);
381      Waiter.waitFor(conf, 20000,
382        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0);
383    } finally {
384      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
385    }
386  }
387
388  /**
389   * Regionserver implementation that adds a delay on the graceful shutdown.
390   */
391  public static class ShutdownDelayRegionServer extends HRegionServer {
392    public ShutdownDelayRegionServer(Configuration conf) throws IOException {
393      super(conf);
394    }
395
396    @Override
397    protected void stopServiceThreads() {
398      // Add a delay before service threads are shutdown.
399      // This will keep the zookeeper connection alive for the duration of the delay.
400      LOG.info("Adding a delay to the regionserver shutdown");
401      try {
402        Thread.sleep(2000);
403      } catch (InterruptedException ex) {
404        LOG.error("Interrupted while sleeping");
405      }
406      super.stopServiceThreads();
407    }
408  }
409
410  /**
411   * Deadend Endpoint. Does nothing.
412   */
413  public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint {
414    private final UUID uuid = UUID.randomUUID();
415
416    @Override
417    public void init(Context context) throws IOException {
418      this.ctx = context;
419    }
420
421    @Override
422    public WALEntryFilter getWALEntryfilter() {
423      return null;
424    }
425
426    @Override
427    public synchronized UUID getPeerUUID() {
428      return this.uuid;
429    }
430
431    @Override
432    protected void doStart() {
433      notifyStarted();
434    }
435
436    @Override
437    protected void doStop() {
438      notifyStopped();
439    }
440
441    @Override
442    public boolean canReplicateToSameCluster() {
443      return true;
444    }
445  }
446
447  /**
448   * Deadend Endpoint. Does nothing.
449   */
450  public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint {
451
452    static int count = 0;
453
454    @Override
455    public synchronized UUID getPeerUUID() {
456      if (count == 0) {
457        count++;
458        throw new RuntimeException();
459      } else {
460        return super.getPeerUUID();
461      }
462    }
463
464  }
465
466  /**
467   * Bad Endpoint with failing connection to peer on demand.
468   */
469  public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint {
470    static boolean failing = true;
471
472    @Override
473    public synchronized UUID getPeerUUID() {
474      return failing ? null : super.getPeerUUID();
475    }
476  }
477
478  public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
479
480    static int count = 0;
481
482    @Override
483    public synchronized UUID getPeerUUID() {
484      throw new RuntimeException();
485    }
486
487  }
488
489  /**
490   * Test HBASE-20497 Moved here from TestReplicationSource because doesn't need cluster.
491   */
492  @Test
493  public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
494    String walGroupId = "fake-wal-group-id";
495    ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
496    ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
497    RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
498    Server server = mock(Server.class);
499    when(server.getServerName()).thenReturn(serverName);
500    when(source.getServer()).thenReturn(server);
501    when(source.getServerWALsBelongTo()).thenReturn(deadServer);
502    ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class);
503    when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
504      .thenReturn(1001L);
505    when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
506      .thenReturn(-1L);
507    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
508    conf.setInt("replication.source.maxretriesmultiplier", -1);
509    MetricsSource metricsSource = mock(MetricsSource.class);
510    doNothing().when(metricsSource).incrSizeOfLogQueue();
511    ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metricsSource, source);
512    logQueue.enqueueLog(new Path("/www/html/test"), walGroupId);
513    RecoveredReplicationSourceShipper shipper =
514      new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, source, storage);
515    assertEquals(1001L, shipper.getStartPosition());
516  }
517
518  private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf,
519    String endpointName) throws IOException {
520    conf.setInt("replication.source.maxretriesmultiplier", 1);
521    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
522    when(mockPeer.getConfiguration()).thenReturn(conf);
523    when(mockPeer.getPeerBandwidth()).thenReturn(0L);
524    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
525    FaultyReplicationEndpoint.count = 0;
526    when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName);
527    when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
528    ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
529    when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
530    Mockito.when(manager.getGlobalMetrics())
531      .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
532    String queueId = "qid";
533    RegionServerServices rss =
534      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
535    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(),
536      new MetricsSource(queueId));
537    return rss;
538  }
539
540  /**
541   * Test ReplicationSource retries startup once an uncaught exception happens during initialization
542   * and <b>eplication.source.regionserver.abort</b> is set to false.
543   */
544  @Test
545  public void testAbortFalseOnError() throws IOException {
546    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
547    conf.setBoolean("replication.source.regionserver.abort", false);
548    ReplicationSource rs = new ReplicationSource();
549    RegionServerServices rss =
550      setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName());
551    try {
552      rs.startup();
553      assertTrue(rs.isSourceActive());
554      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
555      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
556      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
557      rs.enqueueLog(new Path("a.1"));
558      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
559    } finally {
560      rs.terminate("Done");
561      rss.stop("Done");
562    }
563  }
564
565  @Test
566  public void testReplicationSourceInitializingMetric() throws IOException {
567    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
568    conf.setBoolean("replication.source.regionserver.abort", false);
569    ReplicationSource rs = new ReplicationSource();
570    RegionServerServices rss = setupForAbortTests(rs, conf, BadReplicationEndpoint.class.getName());
571    try {
572      rs.startup();
573      assertTrue(rs.isSourceActive());
574      Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 1);
575      BadReplicationEndpoint.failing = false;
576      Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 0);
577    } finally {
578      rs.terminate("Done");
579      rss.stop("Done");
580    }
581  }
582
583  /**
584   * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread,
585   * when <b>replication.source.regionserver.abort</b> is set to false.
586   */
587  @Test
588  public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException {
589    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
590    ReplicationSource rs = new ReplicationSource();
591    RegionServerServices rss =
592      setupForAbortTests(rs, conf, FaultyReplicationEndpoint.class.getName());
593    try {
594      rs.startup();
595      assertTrue(true);
596    } finally {
597      rs.terminate("Done");
598      rss.stop("Done");
599    }
600  }
601
602  /**
603   * Test ReplicationSource retries startup once an uncaught exception happens during initialization
604   * and <b>replication.source.regionserver.abort</b> is set to true.
605   */
606  @Test
607  public void testAbortTrueOnError() throws IOException {
608    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
609    ReplicationSource rs = new ReplicationSource();
610    RegionServerServices rss =
611      setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName());
612    try {
613      rs.startup();
614      assertTrue(rs.isSourceActive());
615      Waiter.waitFor(conf, 1000, () -> rss.isAborted());
616      assertTrue(rss.isAborted());
617      Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive());
618      assertFalse(rs.isSourceActive());
619    } finally {
620      rs.terminate("Done");
621      rss.stop("Done");
622    }
623  }
624
625  /*
626   * Test age of oldest wal metric.
627   */
628  @Test
629  public void testAgeOfOldestWal() throws Exception {
630    try {
631      ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
632      EnvironmentEdgeManager.injectEdge(manualEdge);
633
634      String id = "1";
635      MetricsSource metrics = new MetricsSource(id);
636      Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
637      conf.setInt("replication.source.maxretriesmultiplier", 1);
638      ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
639      Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
640      Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
641      ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
642      Mockito.when(peerConfig.getReplicationEndpointImpl())
643        .thenReturn(DoNothingReplicationEndpoint.class.getName());
644      Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
645      ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
646      Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
647      Mockito.when(manager.getGlobalMetrics())
648        .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
649      RegionServerServices rss =
650        TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
651
652      ReplicationSource source = new ReplicationSource();
653      source.init(conf, null, manager, null, mockPeer, rss, id, null, p -> OptionalLong.empty(),
654        metrics);
655
656      final Path log1 = new Path(logDir, "log-walgroup-a.8");
657      manualEdge.setValue(10);
658      // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2.
659      source.enqueueLog(log1);
660      MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id);
661      assertEquals(2, metricsSource1.getOldestWalAge());
662
663      final Path log2 = new Path(logDir, "log-walgroup-b.4");
664      // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
665      source.enqueueLog(log2);
666      assertEquals(6, metricsSource1.getOldestWalAge());
667      // Clear all metrics.
668      metrics.clear();
669    } finally {
670      EnvironmentEdgeManager.reset();
671    }
672  }
673
674  private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
675    MetricsReplicationSourceFactory factory =
676      CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class);
677    return factory.getSource(sourceId);
678  }
679}