001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.OptionalLong;
032import java.util.UUID;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.Future;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellBuilderFactory;
041import org.apache.hadoop.hbase.CellBuilderType;
042import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HBaseTestingUtil;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.Server;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.Waiter;
053import org.apache.hadoop.hbase.client.Admin;
054import org.apache.hadoop.hbase.regionserver.HRegionServer;
055import org.apache.hadoop.hbase.regionserver.RegionServerServices;
056import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
057import org.apache.hadoop.hbase.replication.ReplicationPeer;
058import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
059import org.apache.hadoop.hbase.replication.ReplicationQueueData;
060import org.apache.hadoop.hbase.replication.ReplicationQueueId;
061import org.apache.hadoop.hbase.replication.WALEntryFilter;
062import org.apache.hadoop.hbase.testclassification.MediumTests;
063import org.apache.hadoop.hbase.testclassification.ReplicationTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
067import org.apache.hadoop.hbase.wal.WAL;
068import org.apache.hadoop.hbase.wal.WALEdit;
069import org.apache.hadoop.hbase.wal.WALFactory;
070import org.apache.hadoop.hbase.wal.WALKeyImpl;
071import org.apache.hadoop.hbase.wal.WALProvider;
072import org.apache.hadoop.hbase.wal.WALStreamReader;
073import org.junit.AfterClass;
074import org.junit.BeforeClass;
075import org.junit.ClassRule;
076import org.junit.Test;
077import org.junit.experimental.categories.Category;
078import org.mockito.Mockito;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
083
084@Category({ ReplicationTests.class, MediumTests.class })
085public class TestReplicationSource {
086
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089    HBaseClassTestRule.forClass(TestReplicationSource.class);
090
091  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSource.class);
092  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
093  private final static HBaseTestingUtil TEST_UTIL_PEER = new HBaseTestingUtil();
094  private static FileSystem FS;
095  private static Path oldLogDir;
096  private static Path logDir;
097  private static Configuration conf = TEST_UTIL.getConfiguration();
098
099  @BeforeClass
100  public static void setUpBeforeClass() throws Exception {
101    TEST_UTIL.startMiniDFSCluster(1);
102    FS = TEST_UTIL.getDFSCluster().getFileSystem();
103    Path rootDir = TEST_UTIL.createRootDir();
104    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
105    if (FS.exists(oldLogDir)) {
106      FS.delete(oldLogDir, true);
107    }
108    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
109    if (FS.exists(logDir)) {
110      FS.delete(logDir, true);
111    }
112  }
113
114  @AfterClass
115  public static void tearDownAfterClass() throws Exception {
116    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
117    TEST_UTIL.shutdownMiniHBaseCluster();
118    TEST_UTIL.shutdownMiniDFSCluster();
119  }
120
121  /**
122   * Test the default ReplicationSource skips queuing hbase:meta WAL files.
123   */
124  @Test
125  public void testDefaultSkipsMetaWAL() throws IOException {
126    ReplicationSource rs = new ReplicationSource();
127    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
128    conf.setInt("replication.source.maxretriesmultiplier", 1);
129    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
130    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
131    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
132    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
133    Mockito.when(peerConfig.getReplicationEndpointImpl())
134      .thenReturn(DoNothingReplicationEndpoint.class.getName());
135    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
136    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
137    Mockito.when(manager.getGlobalMetrics())
138      .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
139
140    RegionServerServices rss =
141      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
142    ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid");
143    rs.init(conf, null, manager, null, mockPeer, rss,
144      new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
145      new MetricsSource(queueId.toString()));
146    try {
147      rs.startup();
148      assertTrue(rs.isSourceActive());
149      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
150      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
151      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
152      rs.enqueueLog(new Path("a.1"));
153      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
154    } finally {
155      rs.terminate("Done");
156      rss.stop("Done");
157    }
158  }
159
160  /**
161   * Test that we filter out meta edits, etc.
162   */
163  @Test
164  public void testWALEntryFilter() throws IOException {
165    // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource
166    // instance and init it.
167    ReplicationSource rs = new ReplicationSource();
168    UUID uuid = UUID.randomUUID();
169    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
170    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
171    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
172    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
173    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
174    Mockito.when(peerConfig.getReplicationEndpointImpl())
175      .thenReturn(DoNothingReplicationEndpoint.class.getName());
176    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
177    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
178    RegionServerServices rss =
179      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
180    ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid");
181    rs.init(conf, null, manager, null, mockPeer, rss,
182      new ReplicationQueueData(queueId, ImmutableMap.of()), uuid, p -> OptionalLong.empty(),
183      new MetricsSource(queueId.toString()));
184    try {
185      rs.startup();
186      TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
187      WALEntryFilter wef = rs.getWalEntryFilter();
188      // Test non-system WAL edit.
189      WALEdit we = new WALEdit()
190        .add(CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(HConstants.EMPTY_START_ROW)
191          .setFamily(HConstants.CATALOG_FAMILY).setType(Cell.Type.Put).build());
192      WAL.Entry e = new WAL.Entry(
193        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.valueOf("test"), -1, -1, uuid), we);
194      assertTrue(wef.filter(e) == e);
195      // Test system WAL edit.
196      e = new WAL.Entry(
197        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1, -1, uuid), we);
198      assertNull(wef.filter(e));
199    } finally {
200      rs.terminate("Done");
201      rss.stop("Done");
202    }
203  }
204
205  /**
206   * Sanity check that we can move logs around while we are reading from them. Should this test
207   * fail, ReplicationSource would have a hard time reading logs that are being archived.
208   */
209  // This tests doesn't belong in here... it is not about ReplicationSource.
210  @Test
211  public void testLogMoving() throws Exception {
212    Path logPath = new Path(logDir, "log");
213    if (!FS.exists(logDir)) {
214      FS.mkdirs(logDir);
215    }
216    if (!FS.exists(oldLogDir)) {
217      FS.mkdirs(oldLogDir);
218    }
219    WALProvider.Writer writer =
220      WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration());
221    for (int i = 0; i < 3; i++) {
222      byte[] b = Bytes.toBytes(Integer.toString(i));
223      KeyValue kv = new KeyValue(b, b, b);
224      WALEdit edit = new WALEdit();
225      edit.add(kv);
226      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID);
227      writer.append(new WAL.Entry(key, edit));
228      writer.sync(false);
229    }
230    writer.close();
231
232    WALStreamReader reader =
233      WALFactory.createStreamReader(FS, logPath, TEST_UTIL.getConfiguration());
234    WAL.Entry entry = reader.next();
235    assertNotNull(entry);
236
237    Path oldLogPath = new Path(oldLogDir, "log");
238    FS.rename(logPath, oldLogPath);
239
240    entry = reader.next();
241    assertNotNull(entry);
242
243    reader.next();
244    entry = reader.next();
245
246    assertNull(entry);
247    reader.close();
248  }
249
250  /**
251   * Tests that {@link ReplicationSource#terminate(String)} will timeout properly Moved here from
252   * TestReplicationSource because doesn't need cluster.
253   */
254  @Test
255  public void testTerminateTimeout() throws Exception {
256    ReplicationSource source = new ReplicationSource();
257    ReplicationEndpoint replicationEndpoint = new DoNothingReplicationEndpoint();
258    try {
259      replicationEndpoint.start();
260      ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
261      Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
262      Configuration testConf = HBaseConfiguration.create();
263      testConf.setInt("replication.source.maxretriesmultiplier", 1);
264      ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
265      ReplicationQueueId queueId =
266        new ReplicationQueueId(ServerName.valueOf("test,123,123"), "testPeer");
267      source.init(testConf, null, manager, null, mockPeer, null,
268        new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
269        null);
270      ExecutorService executor = Executors.newSingleThreadExecutor();
271      Future<?> future = executor.submit(() -> source.terminate("testing source termination"));
272      long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
273      Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone);
274    } finally {
275      replicationEndpoint.stop();
276    }
277  }
278
279  @Test
280  public void testTerminateClearsBuffer() throws Exception {
281    ReplicationSource source = new ReplicationSource();
282    ReplicationSourceManager mockManager = new ReplicationSourceManager(null, null, conf, null,
283      null, null, null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class));
284    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
285    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
286    Configuration testConf = HBaseConfiguration.create();
287    ReplicationQueueId queueId =
288      new ReplicationQueueId(ServerName.valueOf("test,123,123"), "testPeer");
289    source.init(testConf, null, mockManager, null, mockPeer, Mockito.mock(Server.class),
290      new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
291      mock(MetricsSource.class));
292    ReplicationSourceWALReader reader =
293      new ReplicationSourceWALReader(null, conf, null, 0, null, source, null);
294    ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, source, reader);
295    source.workerThreads.put("testPeer", shipper);
296    WALEntryBatch batch = new WALEntryBatch(10, logDir);
297    WAL.Entry mockEntry = mock(WAL.Entry.class);
298    WALEdit mockEdit = mock(WALEdit.class);
299    WALKeyImpl mockKey = mock(WALKeyImpl.class);
300    when(mockEntry.getEdit()).thenReturn(mockEdit);
301    when(mockEdit.isEmpty()).thenReturn(false);
302    when(mockEntry.getKey()).thenReturn(mockKey);
303    when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
304    when(mockEdit.heapSize()).thenReturn(10000L);
305    when(mockEdit.size()).thenReturn(0);
306    ArrayList<Cell> cells = new ArrayList<>();
307    KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), Bytes.toBytes("1"),
308      Bytes.toBytes("v1"));
309    cells.add(kv);
310    when(mockEdit.getCells()).thenReturn(cells);
311    reader.addEntryToBatch(batch, mockEntry);
312    reader.entryBatchQueue.put(batch);
313    source.terminate("test");
314    assertEquals(0, source.getSourceManager().getTotalBufferUsed());
315  }
316
317  /**
318   * Tests that recovered queues are preserved on a regionserver shutdown. See HBASE-18192
319   */
320  @Test
321  public void testServerShutdownRecoveredQueue() throws Exception {
322    try {
323      // Ensure single-threaded WAL
324      conf.set("hbase.wal.provider", "defaultProvider");
325      conf.setInt("replication.sleep.before.failover", 2000);
326      // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
327      conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
328      SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
329      TEST_UTIL_PEER.startMiniCluster(1);
330
331      HRegionServer serverA = cluster.getRegionServer(0);
332      final ReplicationSourceManager managerA =
333        serverA.getReplicationSourceService().getReplicationManager();
334      HRegionServer serverB = cluster.getRegionServer(1);
335      final ReplicationSourceManager managerB =
336        serverB.getReplicationSourceService().getReplicationManager();
337      final Admin admin = TEST_UTIL.getAdmin();
338
339      final String peerId = "TestPeer";
340      admin.addReplicationPeer(peerId,
341        ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
342      // Wait for replication sources to come up
343      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
344        @Override
345        public boolean evaluate() {
346          return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
347        }
348      });
349      // Disabling peer makes sure there is at least one log to claim when the server dies
350      // The recovered queue will also stay there until the peer is disabled even if the
351      // WALs it contains have no data.
352      admin.disableReplicationPeer(peerId);
353
354      // Stopping serverA
355      // It's queues should be claimed by the only other alive server i.e. serverB
356      cluster.stopRegionServer(serverA.getServerName());
357      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
358        @Override
359        public boolean evaluate() throws Exception {
360          return managerB.getOldSources().size() == 1;
361        }
362      });
363
364      final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
365      serverC.waitForServerOnline();
366      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
367        @Override
368        public boolean evaluate() throws Exception {
369          return serverC.getReplicationSourceService() != null;
370        }
371      });
372      final ReplicationSourceManager managerC =
373        ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
374      // Sanity check
375      assertEquals(0, managerC.getOldSources().size());
376
377      // Stopping serverB
378      // Now serverC should have two recovered queues:
379      // 1. The serverB's normal queue
380      // 2. serverA's recovered queue on serverB
381      cluster.stopRegionServer(serverB.getServerName());
382      Waiter.waitFor(conf, 20000,
383        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2);
384      admin.enableReplicationPeer(peerId);
385      Waiter.waitFor(conf, 20000,
386        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0);
387    } finally {
388      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
389    }
390  }
391
392  /**
393   * Regionserver implementation that adds a delay on the graceful shutdown.
394   */
395  public static class ShutdownDelayRegionServer extends HRegionServer {
396    public ShutdownDelayRegionServer(Configuration conf) throws IOException {
397      super(conf);
398    }
399
400    @Override
401    protected void stopServiceThreads() {
402      // Add a delay before service threads are shutdown.
403      // This will keep the zookeeper connection alive for the duration of the delay.
404      LOG.info("Adding a delay to the regionserver shutdown");
405      try {
406        Thread.sleep(2000);
407      } catch (InterruptedException ex) {
408        LOG.error("Interrupted while sleeping");
409      }
410      super.stopServiceThreads();
411    }
412  }
413
414  /**
415   * Deadend Endpoint. Does nothing.
416   */
417  public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint {
418    private final UUID uuid = UUID.randomUUID();
419
420    @Override
421    public void init(Context context) throws IOException {
422      this.ctx = context;
423    }
424
425    @Override
426    public WALEntryFilter getWALEntryfilter() {
427      return null;
428    }
429
430    @Override
431    public synchronized UUID getPeerUUID() {
432      return this.uuid;
433    }
434
435    @Override
436    protected void doStart() {
437      notifyStarted();
438    }
439
440    @Override
441    protected void doStop() {
442      notifyStopped();
443    }
444
445    @Override
446    public boolean canReplicateToSameCluster() {
447      return true;
448    }
449  }
450
451  /**
452   * Deadend Endpoint. Does nothing.
453   */
454  public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint {
455
456    static int count = 0;
457
458    @Override
459    public synchronized UUID getPeerUUID() {
460      if (count == 0) {
461        count++;
462        throw new RuntimeException();
463      } else {
464        return super.getPeerUUID();
465      }
466    }
467
468  }
469
470  /**
471   * Bad Endpoint with failing connection to peer on demand.
472   */
473  public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint {
474    static boolean failing = true;
475
476    @Override
477    public synchronized UUID getPeerUUID() {
478      return failing ? null : super.getPeerUUID();
479    }
480  }
481
482  public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
483
484    static int count = 0;
485
486    @Override
487    public synchronized UUID getPeerUUID() {
488      throw new RuntimeException();
489    }
490
491  }
492
493  private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf,
494    String endpointName) throws IOException {
495    conf.setInt("replication.source.maxretriesmultiplier", 1);
496    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
497    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
498    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
499    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
500    FaultyReplicationEndpoint.count = 0;
501    Mockito.when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName);
502    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
503    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
504    Mockito.when(manager.getGlobalMetrics())
505      .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
506    RegionServerServices rss =
507      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
508    ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid");
509    rs.init(conf, null, manager, null, mockPeer, rss,
510      new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
511      new MetricsSource(queueId.toString()));
512    return rss;
513  }
514
515  /**
516   * Test ReplicationSource retries startup once an uncaught exception happens during initialization
517   * and <b>eplication.source.regionserver.abort</b> is set to false.
518   */
519  @Test
520  public void testAbortFalseOnError() throws IOException {
521    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
522    conf.setBoolean("replication.source.regionserver.abort", false);
523    ReplicationSource rs = new ReplicationSource();
524    RegionServerServices rss =
525      setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName());
526    try {
527      rs.startup();
528      assertTrue(rs.isSourceActive());
529      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
530      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
531      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
532      rs.enqueueLog(new Path("a.1"));
533      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
534    } finally {
535      rs.terminate("Done");
536      rss.stop("Done");
537    }
538  }
539
540  @Test
541  public void testReplicationSourceInitializingMetric() throws IOException {
542    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
543    conf.setBoolean("replication.source.regionserver.abort", false);
544    ReplicationSource rs = new ReplicationSource();
545    RegionServerServices rss = setupForAbortTests(rs, conf, BadReplicationEndpoint.class.getName());
546    try {
547      rs.startup();
548      assertTrue(rs.isSourceActive());
549      Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 1);
550      BadReplicationEndpoint.failing = false;
551      Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 0);
552    } finally {
553      rs.terminate("Done");
554      rss.stop("Done");
555    }
556  }
557
558  /**
559   * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread,
560   * when <b>replication.source.regionserver.abort</b> is set to false.
561   */
562  @Test
563  public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException {
564    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
565    ReplicationSource rs = new ReplicationSource();
566    RegionServerServices rss =
567      setupForAbortTests(rs, conf, FaultyReplicationEndpoint.class.getName());
568    try {
569      rs.startup();
570      assertTrue(true);
571    } finally {
572      rs.terminate("Done");
573      rss.stop("Done");
574    }
575  }
576
577  /**
578   * Test ReplicationSource retries startup once an uncaught exception happens during initialization
579   * and <b>replication.source.regionserver.abort</b> is set to true.
580   */
581  @Test
582  public void testAbortTrueOnError() throws IOException {
583    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
584    ReplicationSource rs = new ReplicationSource();
585    RegionServerServices rss =
586      setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName());
587    try {
588      rs.startup();
589      assertTrue(rs.isSourceActive());
590      Waiter.waitFor(conf, 1000, () -> rss.isAborted());
591      assertTrue(rss.isAborted());
592      Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive());
593      assertFalse(rs.isSourceActive());
594    } finally {
595      rs.terminate("Done");
596      rss.stop("Done");
597    }
598  }
599
600  /*
601   * Test age of oldest wal metric.
602   */
603  @Test
604  public void testAgeOfOldestWal() throws Exception {
605    try {
606      ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
607      EnvironmentEdgeManager.injectEdge(manualEdge);
608
609      String peerId = "1";
610      MetricsSource metrics = new MetricsSource(peerId);
611      Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
612      conf.setInt("replication.source.maxretriesmultiplier", 1);
613      ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
614      Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
615      Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
616      ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
617      Mockito.when(peerConfig.getReplicationEndpointImpl())
618        .thenReturn(DoNothingReplicationEndpoint.class.getName());
619      Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
620      ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
621      Mockito.when(manager.getGlobalMetrics())
622        .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
623      RegionServerServices rss =
624        TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
625      ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), peerId);
626      ReplicationSource source = new ReplicationSource();
627      source.init(conf, null, manager, null, mockPeer, rss,
628        new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
629        metrics);
630
631      final Path log1 = new Path(logDir, "log-walgroup-a.8");
632      manualEdge.setValue(10);
633      // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2.
634      source.enqueueLog(log1);
635      MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(peerId);
636      assertEquals(2, metricsSource1.getOldestWalAge());
637
638      final Path log2 = new Path(logDir, "log-walgroup-b.4");
639      // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
640      source.enqueueLog(log2);
641      assertEquals(6, metricsSource1.getOldestWalAge());
642      // Clear all metrics.
643      metrics.clear();
644    } finally {
645      EnvironmentEdgeManager.reset();
646    }
647  }
648
649  private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
650    MetricsReplicationSourceFactoryImpl factory =
651      (MetricsReplicationSourceFactoryImpl) CompatibilitySingletonFactory
652        .getInstance(MetricsReplicationSourceFactory.class);
653    return factory.getSource(sourceId);
654  }
655}