001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertNotNull;
024import static org.junit.jupiter.api.Assertions.assertNull;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.OptionalLong;
032import java.util.UUID;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.Future;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellBuilderType;
041import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
042import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HBaseTestingUtil;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.KeyValue;
047import org.apache.hadoop.hbase.Server;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.Waiter;
052import org.apache.hadoop.hbase.client.Admin;
053import org.apache.hadoop.hbase.regionserver.HRegionServer;
054import org.apache.hadoop.hbase.regionserver.RegionServerServices;
055import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
056import org.apache.hadoop.hbase.replication.ReplicationPeer;
057import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
058import org.apache.hadoop.hbase.replication.ReplicationQueueData;
059import org.apache.hadoop.hbase.replication.ReplicationQueueId;
060import org.apache.hadoop.hbase.replication.WALEntryFilter;
061import org.apache.hadoop.hbase.testclassification.MediumTests;
062import org.apache.hadoop.hbase.testclassification.ReplicationTests;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
065import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
066import org.apache.hadoop.hbase.wal.WAL;
067import org.apache.hadoop.hbase.wal.WALEdit;
068import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
069import org.apache.hadoop.hbase.wal.WALFactory;
070import org.apache.hadoop.hbase.wal.WALKeyImpl;
071import org.apache.hadoop.hbase.wal.WALProvider;
072import org.apache.hadoop.hbase.wal.WALStreamReader;
073import org.junit.jupiter.api.AfterAll;
074import org.junit.jupiter.api.BeforeAll;
075import org.junit.jupiter.api.Tag;
076import org.junit.jupiter.api.Test;
077import org.mockito.Mockito;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
082
083@Tag(ReplicationTests.TAG)
084@Tag(MediumTests.TAG)
085public class TestReplicationSource {
086
087  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSource.class);
088  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
089  private final static HBaseTestingUtil TEST_UTIL_PEER = new HBaseTestingUtil();
090  private static FileSystem FS;
091  private static Path oldLogDir;
092  private static Path logDir;
093  private static Configuration conf = TEST_UTIL.getConfiguration();
094
095  @BeforeAll
096  public static void setUpBeforeClass() throws Exception {
097    TEST_UTIL.startMiniDFSCluster(1);
098    FS = TEST_UTIL.getDFSCluster().getFileSystem();
099    Path rootDir = TEST_UTIL.createRootDir();
100    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
101    if (FS.exists(oldLogDir)) {
102      FS.delete(oldLogDir, true);
103    }
104    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
105    if (FS.exists(logDir)) {
106      FS.delete(logDir, true);
107    }
108  }
109
110  @AfterAll
111  public static void tearDownAfterClass() throws Exception {
112    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
113    TEST_UTIL.shutdownMiniHBaseCluster();
114    TEST_UTIL.shutdownMiniDFSCluster();
115  }
116
117  /**
118   * Test the default ReplicationSource skips queuing hbase:meta WAL files.
119   */
120  @Test
121  public void testDefaultSkipsMetaWAL() throws IOException {
122    ReplicationSource rs = new ReplicationSource();
123    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
124    conf.setInt("replication.source.maxretriesmultiplier", 1);
125    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
126    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
127    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
128    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
129    Mockito.when(peerConfig.getReplicationEndpointImpl())
130      .thenReturn(DoNothingReplicationEndpoint.class.getName());
131    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
132    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
133    Mockito.when(manager.getGlobalMetrics())
134      .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
135
136    RegionServerServices rss =
137      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
138    ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid");
139    rs.init(conf, null, manager, null, mockPeer, rss,
140      new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
141      new MetricsSource(queueId.toString()));
142    try {
143      rs.startup();
144      assertTrue(rs.isSourceActive());
145      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
146      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
147      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
148      rs.enqueueLog(new Path("a.1"));
149      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
150    } finally {
151      rs.terminate("Done");
152      rss.stop("Done");
153    }
154  }
155
156  /**
157   * Test that we filter out meta edits, etc.
158   */
159  @Test
160  public void testWALEntryFilter() throws IOException {
161    // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource
162    // instance and init it.
163    ReplicationSource rs = new ReplicationSource();
164    UUID uuid = UUID.randomUUID();
165    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
166    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
167    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
168    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
169    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
170    Mockito.when(peerConfig.getReplicationEndpointImpl())
171      .thenReturn(DoNothingReplicationEndpoint.class.getName());
172    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
173    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
174    RegionServerServices rss =
175      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
176    ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid");
177    rs.init(conf, null, manager, null, mockPeer, rss,
178      new ReplicationQueueData(queueId, ImmutableMap.of()), uuid, p -> OptionalLong.empty(),
179      new MetricsSource(queueId.toString()));
180    try {
181      rs.startup();
182      TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
183      WALEntryFilter wef = rs.getWalEntryFilter();
184      // Test non-system WAL edit.
185      WALEdit we = WALEditInternalHelper.addExtendedCell(new WALEdit(),
186        ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
187          .setRow(HConstants.EMPTY_START_ROW).setFamily(HConstants.CATALOG_FAMILY)
188          .setType(Cell.Type.Put).build());
189      WAL.Entry e = new WAL.Entry(
190        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.valueOf("test"), -1, -1, uuid), we);
191      assertTrue(wef.filter(e) == e);
192      // Test system WAL edit.
193      e = new WAL.Entry(
194        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1, -1, uuid), we);
195      assertNull(wef.filter(e));
196    } finally {
197      rs.terminate("Done");
198      rss.stop("Done");
199    }
200  }
201
202  /**
203   * Sanity check that we can move logs around while we are reading from them. Should this test
204   * fail, ReplicationSource would have a hard time reading logs that are being archived.
205   */
206  // This tests doesn't belong in here... it is not about ReplicationSource.
207  @Test
208  public void testLogMoving() throws Exception {
209    Path logPath = new Path(logDir, "log");
210    if (!FS.exists(logDir)) {
211      FS.mkdirs(logDir);
212    }
213    if (!FS.exists(oldLogDir)) {
214      FS.mkdirs(oldLogDir);
215    }
216    WALProvider.Writer writer =
217      WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration());
218    for (int i = 0; i < 3; i++) {
219      byte[] b = Bytes.toBytes(Integer.toString(i));
220      KeyValue kv = new KeyValue(b, b, b);
221      WALEdit edit = new WALEdit();
222      WALEditInternalHelper.addExtendedCell(edit, kv);
223      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID);
224      writer.append(new WAL.Entry(key, edit));
225      writer.sync(false);
226    }
227    writer.close();
228
229    WALStreamReader reader =
230      WALFactory.createStreamReader(FS, logPath, TEST_UTIL.getConfiguration());
231    WAL.Entry entry = reader.next();
232    assertNotNull(entry);
233
234    Path oldLogPath = new Path(oldLogDir, "log");
235    FS.rename(logPath, oldLogPath);
236
237    entry = reader.next();
238    assertNotNull(entry);
239
240    reader.next();
241    entry = reader.next();
242
243    assertNull(entry);
244    reader.close();
245  }
246
247  /**
248   * Tests that {@link ReplicationSource#terminate(String)} will timeout properly Moved here from
249   * TestReplicationSource because doesn't need cluster.
250   */
251  @Test
252  public void testTerminateTimeout() throws Exception {
253    ReplicationSource source = new ReplicationSource();
254    ReplicationEndpoint replicationEndpoint = new DoNothingReplicationEndpoint();
255    try {
256      replicationEndpoint.start();
257      ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
258      Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
259      Configuration testConf = HBaseConfiguration.create();
260      testConf.setInt("replication.source.maxretriesmultiplier", 1);
261      ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
262      ReplicationQueueId queueId =
263        new ReplicationQueueId(ServerName.valueOf("test,123,123"), "testPeer");
264      source.init(testConf, null, manager, null, mockPeer, null,
265        new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
266        null);
267      ExecutorService executor = Executors.newSingleThreadExecutor();
268      Future<?> future = executor.submit(() -> source.terminate("testing source termination"));
269      long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
270      Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone);
271    } finally {
272      replicationEndpoint.stop();
273    }
274  }
275
276  @Test
277  public void testTerminateClearsBuffer() throws Exception {
278    ReplicationSource source = new ReplicationSource();
279    ReplicationSourceManager mockManager = new ReplicationSourceManager(null, null, conf, null,
280      null, null, null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class));
281    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
282    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
283    Configuration testConf = HBaseConfiguration.create();
284    ReplicationQueueId queueId =
285      new ReplicationQueueId(ServerName.valueOf("test,123,123"), "testPeer");
286    source.init(testConf, null, mockManager, null, mockPeer, Mockito.mock(Server.class),
287      new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
288      mock(MetricsSource.class));
289    ReplicationSourceWALReader reader =
290      new ReplicationSourceWALReader(null, conf, null, 0, null, source, null);
291    ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, source, reader);
292    source.workerThreads.put("testPeer", shipper);
293    WALEntryBatch batch = new WALEntryBatch(10, logDir);
294    WAL.Entry mockEntry = mock(WAL.Entry.class);
295    WALEdit mockEdit = mock(WALEdit.class);
296    WALKeyImpl mockKey = mock(WALKeyImpl.class);
297    when(mockEntry.getEdit()).thenReturn(mockEdit);
298    when(mockEdit.isEmpty()).thenReturn(false);
299    when(mockEntry.getKey()).thenReturn(mockKey);
300    when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
301    when(mockEdit.heapSize()).thenReturn(10000L);
302    when(mockEdit.size()).thenReturn(0);
303    ArrayList<Cell> cells = new ArrayList<>();
304    KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), Bytes.toBytes("1"),
305      Bytes.toBytes("v1"));
306    cells.add(kv);
307    when(mockEdit.getCells()).thenReturn(cells);
308    reader.addEntryToBatch(batch, mockEntry);
309    reader.entryBatchQueue.put(batch);
310    source.terminate("test");
311    assertEquals(0, source.getSourceManager().getTotalBufferUsed());
312  }
313
314  /**
315   * Tests that recovered queues are preserved on a regionserver shutdown. See HBASE-18192
316   */
317  @Test
318  public void testServerShutdownRecoveredQueue() throws Exception {
319    try {
320      // Ensure single-threaded WAL
321      conf.set("hbase.wal.provider", "defaultProvider");
322      conf.setInt("replication.sleep.before.failover", 2000);
323      // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
324      conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
325      SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
326      TEST_UTIL_PEER.startMiniCluster(1);
327
328      HRegionServer serverA = cluster.getRegionServer(0);
329      final ReplicationSourceManager managerA =
330        serverA.getReplicationSourceService().getReplicationManager();
331      HRegionServer serverB = cluster.getRegionServer(1);
332      final ReplicationSourceManager managerB =
333        serverB.getReplicationSourceService().getReplicationManager();
334      final Admin admin = TEST_UTIL.getAdmin();
335
336      final String peerId = "TestPeer";
337      admin.addReplicationPeer(peerId, ReplicationPeerConfig.newBuilder()
338        .setClusterKey(TEST_UTIL_PEER.getRpcConnnectionURI()).build());
339      // Wait for replication sources to come up
340      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
341        @Override
342        public boolean evaluate() {
343          return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
344        }
345      });
346      // Disabling peer makes sure there is at least one log to claim when the server dies
347      // The recovered queue will also stay there until the peer is disabled even if the
348      // WALs it contains have no data.
349      admin.disableReplicationPeer(peerId);
350
351      // Stopping serverA
352      // It's queues should be claimed by the only other alive server i.e. serverB
353      cluster.stopRegionServer(serverA.getServerName());
354      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
355        @Override
356        public boolean evaluate() throws Exception {
357          return managerB.getOldSources().size() == 1;
358        }
359      });
360
361      final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
362      serverC.waitForServerOnline();
363      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
364        @Override
365        public boolean evaluate() throws Exception {
366          return serverC.getReplicationSourceService() != null;
367        }
368      });
369      final ReplicationSourceManager managerC =
370        ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
371      // Sanity check
372      assertEquals(0, managerC.getOldSources().size());
373
374      // Stopping serverB
375      // Now serverC should have two recovered queues:
376      // 1. The serverB's normal queue
377      // 2. serverA's recovered queue on serverB
378      cluster.stopRegionServer(serverB.getServerName());
379      Waiter.waitFor(conf, 20000,
380        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2);
381      admin.enableReplicationPeer(peerId);
382      Waiter.waitFor(conf, 20000,
383        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0);
384    } finally {
385      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
386    }
387  }
388
389  /**
390   * Regionserver implementation that adds a delay on the graceful shutdown.
391   */
392  public static class ShutdownDelayRegionServer extends HRegionServer {
393    public ShutdownDelayRegionServer(Configuration conf) throws IOException {
394      super(conf);
395    }
396
397    @Override
398    protected void stopServiceThreads() {
399      // Add a delay before service threads are shutdown.
400      // This will keep the zookeeper connection alive for the duration of the delay.
401      LOG.info("Adding a delay to the regionserver shutdown");
402      try {
403        Thread.sleep(2000);
404      } catch (InterruptedException ex) {
405        LOG.error("Interrupted while sleeping");
406      }
407      super.stopServiceThreads();
408    }
409  }
410
411  /**
412   * Deadend Endpoint. Does nothing.
413   */
414  public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint {
415    private final UUID uuid = UUID.randomUUID();
416
417    @Override
418    public void init(Context context) throws IOException {
419      this.ctx = context;
420    }
421
422    @Override
423    public WALEntryFilter getWALEntryfilter() {
424      return null;
425    }
426
427    @Override
428    public synchronized UUID getPeerUUID() {
429      return this.uuid;
430    }
431
432    @Override
433    protected void doStart() {
434      notifyStarted();
435    }
436
437    @Override
438    protected void doStop() {
439      notifyStopped();
440    }
441
442    @Override
443    public boolean canReplicateToSameCluster() {
444      return true;
445    }
446  }
447
448  /**
449   * Deadend Endpoint. Does nothing.
450   */
451  public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint {
452
453    static int count = 0;
454
455    @Override
456    public synchronized UUID getPeerUUID() {
457      if (count == 0) {
458        count++;
459        throw new RuntimeException();
460      } else {
461        return super.getPeerUUID();
462      }
463    }
464
465  }
466
467  /**
468   * Bad Endpoint with failing connection to peer on demand.
469   */
470  public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint {
471    static boolean failing = true;
472
473    @Override
474    public synchronized UUID getPeerUUID() {
475      return failing ? null : super.getPeerUUID();
476    }
477  }
478
479  public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
480
481    static int count = 0;
482
483    @Override
484    public synchronized UUID getPeerUUID() {
485      throw new RuntimeException();
486    }
487
488  }
489
490  private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf,
491    String endpointName) throws IOException {
492    conf.setInt("replication.source.maxretriesmultiplier", 1);
493    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
494    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
495    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
496    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
497    FaultyReplicationEndpoint.count = 0;
498    Mockito.when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName);
499    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
500    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
501    Mockito.when(manager.getGlobalMetrics())
502      .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
503    RegionServerServices rss =
504      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
505    ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid");
506    rs.init(conf, null, manager, null, mockPeer, rss,
507      new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
508      new MetricsSource(queueId.toString()));
509    return rss;
510  }
511
512  /**
513   * Test ReplicationSource retries startup once an uncaught exception happens during initialization
514   * and <b>eplication.source.regionserver.abort</b> is set to false.
515   */
516  @Test
517  public void testAbortFalseOnError() throws IOException {
518    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
519    conf.setBoolean("replication.source.regionserver.abort", false);
520    ReplicationSource rs = new ReplicationSource();
521    RegionServerServices rss =
522      setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName());
523    try {
524      rs.startup();
525      assertTrue(rs.isSourceActive());
526      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
527      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
528      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
529      rs.enqueueLog(new Path("a.1"));
530      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
531    } finally {
532      rs.terminate("Done");
533      rss.stop("Done");
534    }
535  }
536
537  @Test
538  public void testReplicationSourceInitializingMetric() throws IOException {
539    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
540    conf.setBoolean("replication.source.regionserver.abort", false);
541    ReplicationSource rs = new ReplicationSource();
542    RegionServerServices rss = setupForAbortTests(rs, conf, BadReplicationEndpoint.class.getName());
543    try {
544      rs.startup();
545      assertTrue(rs.isSourceActive());
546      Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 1);
547      BadReplicationEndpoint.failing = false;
548      Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 0);
549    } finally {
550      rs.terminate("Done");
551      rss.stop("Done");
552    }
553  }
554
555  /**
556   * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread,
557   * when <b>replication.source.regionserver.abort</b> is set to false.
558   */
559  @Test
560  public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException {
561    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
562    ReplicationSource rs = new ReplicationSource();
563    RegionServerServices rss =
564      setupForAbortTests(rs, conf, FaultyReplicationEndpoint.class.getName());
565    try {
566      rs.startup();
567      assertTrue(true);
568    } finally {
569      rs.terminate("Done");
570      rss.stop("Done");
571    }
572  }
573
574  /**
575   * Test ReplicationSource retries startup once an uncaught exception happens during initialization
576   * and <b>replication.source.regionserver.abort</b> is set to true.
577   */
578  @Test
579  public void testAbortTrueOnError() throws IOException {
580    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
581    ReplicationSource rs = new ReplicationSource();
582    RegionServerServices rss =
583      setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName());
584    try {
585      rs.startup();
586      assertTrue(rs.isSourceActive());
587      Waiter.waitFor(conf, 1000, () -> rss.isAborted());
588      assertTrue(rss.isAborted());
589      Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive());
590      assertFalse(rs.isSourceActive());
591    } finally {
592      rs.terminate("Done");
593      rss.stop("Done");
594    }
595  }
596
597  /*
598   * Test age of oldest wal metric.
599   */
600  @Test
601  public void testAgeOfOldestWal() throws Exception {
602    try {
603      ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
604      EnvironmentEdgeManager.injectEdge(manualEdge);
605
606      String peerId = "1";
607      MetricsSource metrics = new MetricsSource(peerId);
608      Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
609      conf.setInt("replication.source.maxretriesmultiplier", 1);
610      ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
611      Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
612      Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
613      ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
614      Mockito.when(peerConfig.getReplicationEndpointImpl())
615        .thenReturn(DoNothingReplicationEndpoint.class.getName());
616      Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
617      ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
618      Mockito.when(manager.getGlobalMetrics())
619        .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
620      RegionServerServices rss =
621        TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
622      ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), peerId);
623      ReplicationSource source = new ReplicationSource();
624      source.init(conf, null, manager, null, mockPeer, rss,
625        new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
626        metrics);
627
628      final Path log1 = new Path(logDir, "log-walgroup-a.8");
629      manualEdge.setValue(10);
630      // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2.
631      source.enqueueLog(log1);
632      MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(peerId);
633      assertEquals(2, metricsSource1.getOldestWalAge());
634
635      final Path log2 = new Path(logDir, "log-walgroup-b.4");
636      // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
637      source.enqueueLog(log2);
638      assertEquals(6, metricsSource1.getOldestWalAge());
639      // Clear all metrics.
640      metrics.clear();
641    } finally {
642      EnvironmentEdgeManager.reset();
643    }
644  }
645
646  private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
647    MetricsReplicationSourceFactoryImpl factory =
648      (MetricsReplicationSourceFactoryImpl) CompatibilitySingletonFactory
649        .getInstance(MetricsReplicationSourceFactory.class);
650    return factory.getSource(sourceId);
651  }
652}