001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.doNothing;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.OptionalLong;
031import java.util.UUID;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.Future;
035import java.util.concurrent.atomic.AtomicLong;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellBuilderFactory;
041import org.apache.hadoop.hbase.CellBuilderType;
042import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HBaseTestingUtil;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.Server;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.Waiter;
053import org.apache.hadoop.hbase.client.Admin;
054import org.apache.hadoop.hbase.regionserver.HRegionServer;
055import org.apache.hadoop.hbase.regionserver.RegionServerServices;
056import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
057import org.apache.hadoop.hbase.replication.ReplicationPeer;
058import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
059import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
060import org.apache.hadoop.hbase.replication.WALEntryFilter;
061import org.apache.hadoop.hbase.testclassification.MediumTests;
062import org.apache.hadoop.hbase.testclassification.ReplicationTests;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
065import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
066import org.apache.hadoop.hbase.wal.WAL;
067import org.apache.hadoop.hbase.wal.WALEdit;
068import org.apache.hadoop.hbase.wal.WALFactory;
069import org.apache.hadoop.hbase.wal.WALKeyImpl;
070import org.apache.hadoop.hbase.wal.WALProvider;
071import org.junit.AfterClass;
072import org.junit.BeforeClass;
073import org.junit.ClassRule;
074import org.junit.Test;
075import org.junit.experimental.categories.Category;
076import org.mockito.Mockito;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080@Category({ReplicationTests.class, MediumTests.class})
081public class TestReplicationSource {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE =
085      HBaseClassTestRule.forClass(TestReplicationSource.class);
086
087  private static final Logger LOG =
088      LoggerFactory.getLogger(TestReplicationSource.class);
089  private final static HBaseTestingUtil TEST_UTIL =
090      new HBaseTestingUtil();
091  private final static HBaseTestingUtil TEST_UTIL_PEER =
092      new HBaseTestingUtil();
093  private static FileSystem FS;
094  private static Path oldLogDir;
095  private static Path logDir;
096  private static Configuration conf = TEST_UTIL.getConfiguration();
097
098  @BeforeClass
099  public static void setUpBeforeClass() throws Exception {
100    TEST_UTIL.startMiniDFSCluster(1);
101    FS = TEST_UTIL.getDFSCluster().getFileSystem();
102    Path rootDir = TEST_UTIL.createRootDir();
103    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
104    if (FS.exists(oldLogDir)) {
105      FS.delete(oldLogDir, true);
106    }
107    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
108    if (FS.exists(logDir)) {
109      FS.delete(logDir, true);
110    }
111  }
112
113  @AfterClass
114  public static void tearDownAfterClass() throws Exception {
115    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
116    TEST_UTIL.shutdownMiniHBaseCluster();
117    TEST_UTIL.shutdownMiniDFSCluster();
118  }
119
120  /**
121   * Test the default ReplicationSource skips queuing hbase:meta WAL files.
122   */
123  @Test
124  public void testDefaultSkipsMetaWAL() throws IOException {
125    ReplicationSource rs = new ReplicationSource();
126    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
127    conf.setInt("replication.source.maxretriesmultiplier", 1);
128    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
129    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
130    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
131    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
132    Mockito.when(peerConfig.getReplicationEndpointImpl()).
133      thenReturn(DoNothingReplicationEndpoint.class.getName());
134    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
135    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
136    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
137    Mockito.when(manager.getGlobalMetrics()).
138      thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
139    String queueId = "qid";
140    RegionServerServices rss =
141      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
142    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
143      p -> OptionalLong.empty(), new MetricsSource(queueId));
144    try {
145      rs.startup();
146      assertTrue(rs.isSourceActive());
147      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
148      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
149      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
150      rs.enqueueLog(new Path("a.1"));
151      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
152    } finally {
153      rs.terminate("Done");
154      rss.stop("Done");
155    }
156  }
157
158  /**
159   * Test that we filter out meta edits, etc.
160   */
161  @Test
162  public void testWALEntryFilter() throws IOException {
163    // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource
164    // instance and init it.
165    ReplicationSource rs = new ReplicationSource();
166    UUID uuid = UUID.randomUUID();
167    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
168    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
169    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
170    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
171    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
172    Mockito.when(peerConfig.getReplicationEndpointImpl()).
173      thenReturn(DoNothingReplicationEndpoint.class.getName());
174    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
175    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
176    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
177    String queueId = "qid";
178    RegionServerServices rss =
179      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
180    rs.init(conf, null, manager, null, mockPeer, rss, queueId,
181      uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
182    try {
183      rs.startup();
184      TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
185      WALEntryFilter wef = rs.getWalEntryFilter();
186      // Test non-system WAL edit.
187      WALEdit we = new WALEdit().add(CellBuilderFactory.create(CellBuilderType.DEEP_COPY).
188        setRow(HConstants.EMPTY_START_ROW).
189        setFamily(HConstants.CATALOG_FAMILY).
190        setType(Cell.Type.Put).build());
191      WAL.Entry e = new WAL.Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY,
192        TableName.valueOf("test"), -1, -1, uuid), we);
193      assertTrue(wef.filter(e) == e);
194      // Test system WAL edit.
195      e = new WAL.Entry(
196        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1, -1, uuid),
197          we);
198      assertNull(wef.filter(e));
199    } finally {
200      rs.terminate("Done");
201      rss.stop("Done");
202    }
203  }
204
205  /**
206   * Sanity check that we can move logs around while we are reading
207   * from them. Should this test fail, ReplicationSource would have a hard
208   * time reading logs that are being archived.
209   */
210  // This tests doesn't belong in here... it is not about ReplicationSource.
211  @Test
212  public void testLogMoving() throws Exception{
213    Path logPath = new Path(logDir, "log");
214    if (!FS.exists(logDir)) {
215      FS.mkdirs(logDir);
216    }
217    if (!FS.exists(oldLogDir)) {
218      FS.mkdirs(oldLogDir);
219    }
220    WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
221        TEST_UTIL.getConfiguration());
222    for(int i = 0; i < 3; i++) {
223      byte[] b = Bytes.toBytes(Integer.toString(i));
224      KeyValue kv = new KeyValue(b,b,b);
225      WALEdit edit = new WALEdit();
226      edit.add(kv);
227      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
228          HConstants.DEFAULT_CLUSTER_ID);
229      writer.append(new WAL.Entry(key, edit));
230      writer.sync(false);
231    }
232    writer.close();
233
234    WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration());
235    WAL.Entry entry = reader.next();
236    assertNotNull(entry);
237
238    Path oldLogPath = new Path(oldLogDir, "log");
239    FS.rename(logPath, oldLogPath);
240
241    entry = reader.next();
242    assertNotNull(entry);
243
244    reader.next();
245    entry = reader.next();
246
247    assertNull(entry);
248    reader.close();
249  }
250
251  /**
252   * Tests that {@link ReplicationSource#terminate(String)} will timeout properly
253   * Moved here from TestReplicationSource because doesn't need cluster.
254   */
255  @Test
256  public void testTerminateTimeout() throws Exception {
257    ReplicationSource source = new ReplicationSource();
258    ReplicationEndpoint
259      replicationEndpoint = new DoNothingReplicationEndpoint();
260    try {
261      replicationEndpoint.start();
262      ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
263      Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
264      Configuration testConf = HBaseConfiguration.create();
265      testConf.setInt("replication.source.maxretriesmultiplier", 1);
266      ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
267      Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
268      source.init(testConf, null, manager, null, mockPeer, null, "testPeer",
269        null, p -> OptionalLong.empty(), null);
270      ExecutorService executor = Executors.newSingleThreadExecutor();
271      Future<?> future = executor.submit(
272        () -> source.terminate("testing source termination"));
273      long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
274      Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone);
275    } finally {
276      replicationEndpoint.stop();
277    }
278  }
279
280  @Test
281  public void testTerminateClearsBuffer() throws Exception {
282    ReplicationSource source = new ReplicationSource();
283    ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class);
284    MetricsReplicationGlobalSourceSource mockMetrics =
285      mock(MetricsReplicationGlobalSourceSource.class);
286    AtomicLong buffer = new AtomicLong();
287    Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer);
288    Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics);
289    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
290    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
291    Configuration testConf = HBaseConfiguration.create();
292    source.init(testConf, null, mockManager, null, mockPeer, null,
293      "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
294    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
295      conf, null, 0, null, source, null);
296    ReplicationSourceShipper shipper =
297      new ReplicationSourceShipper(conf, null, null, source);
298    shipper.entryReader = reader;
299    source.workerThreads.put("testPeer", shipper);
300    WALEntryBatch batch = new WALEntryBatch(10, logDir);
301    WAL.Entry mockEntry = mock(WAL.Entry.class);
302    WALEdit mockEdit = mock(WALEdit.class);
303    WALKeyImpl mockKey = mock(WALKeyImpl.class);
304    when(mockEntry.getEdit()).thenReturn(mockEdit);
305    when(mockEdit.isEmpty()).thenReturn(false);
306    when(mockEntry.getKey()).thenReturn(mockKey);
307    when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
308    when(mockEdit.heapSize()).thenReturn(10000L);
309    when(mockEdit.size()).thenReturn(0);
310    ArrayList<Cell> cells = new ArrayList<>();
311    KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"),
312      Bytes.toBytes("1"), Bytes.toBytes("v1"));
313    cells.add(kv);
314    when(mockEdit.getCells()).thenReturn(cells);
315    reader.addEntryToBatch(batch, mockEntry);
316    reader.entryBatchQueue.put(batch);
317    source.terminate("test");
318    assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
319  }
320
321  /**
322   * Tests that recovered queues are preserved on a regionserver shutdown.
323   * See HBASE-18192
324   */
325  @Test
326  public void testServerShutdownRecoveredQueue() throws Exception {
327    try {
328      // Ensure single-threaded WAL
329      conf.set("hbase.wal.provider", "defaultProvider");
330      conf.setInt("replication.sleep.before.failover", 2000);
331      // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
332      conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
333      SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
334      TEST_UTIL_PEER.startMiniCluster(1);
335
336      HRegionServer serverA = cluster.getRegionServer(0);
337      final ReplicationSourceManager managerA =
338          serverA.getReplicationSourceService().getReplicationManager();
339      HRegionServer serverB = cluster.getRegionServer(1);
340      final ReplicationSourceManager managerB =
341          serverB.getReplicationSourceService().getReplicationManager();
342      final Admin admin = TEST_UTIL.getAdmin();
343
344      final String peerId = "TestPeer";
345      admin.addReplicationPeer(peerId,
346        ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
347      // Wait for replication sources to come up
348      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
349        @Override public boolean evaluate() {
350          return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
351        }
352      });
353      // Disabling peer makes sure there is at least one log to claim when the server dies
354      // The recovered queue will also stay there until the peer is disabled even if the
355      // WALs it contains have no data.
356      admin.disableReplicationPeer(peerId);
357
358      // Stopping serverA
359      // It's queues should be claimed by the only other alive server i.e. serverB
360      cluster.stopRegionServer(serverA.getServerName());
361      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
362        @Override public boolean evaluate() throws Exception {
363          return managerB.getOldSources().size() == 1;
364        }
365      });
366
367      final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
368      serverC.waitForServerOnline();
369      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
370        @Override public boolean evaluate() throws Exception {
371          return serverC.getReplicationSourceService() != null;
372        }
373      });
374      final ReplicationSourceManager managerC =
375          ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
376      // Sanity check
377      assertEquals(0, managerC.getOldSources().size());
378
379      // Stopping serverB
380      // Now serverC should have two recovered queues:
381      // 1. The serverB's normal queue
382      // 2. serverA's recovered queue on serverB
383      cluster.stopRegionServer(serverB.getServerName());
384      Waiter.waitFor(conf, 20000,
385        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2);
386      admin.enableReplicationPeer(peerId);
387      Waiter.waitFor(conf, 20000,
388        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0);
389    } finally {
390      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
391    }
392  }
393
394  /**
395   * Regionserver implementation that adds a delay on the graceful shutdown.
396   */
397  public static class ShutdownDelayRegionServer extends HRegionServer {
398    public ShutdownDelayRegionServer(Configuration conf) throws IOException {
399      super(conf);
400    }
401
402    @Override
403    protected void stopServiceThreads() {
404      // Add a delay before service threads are shutdown.
405      // This will keep the zookeeper connection alive for the duration of the delay.
406      LOG.info("Adding a delay to the regionserver shutdown");
407      try {
408        Thread.sleep(2000);
409      } catch (InterruptedException ex) {
410        LOG.error("Interrupted while sleeping");
411      }
412      super.stopServiceThreads();
413    }
414  }
415
416  /**
417   * Deadend Endpoint. Does nothing.
418   */
419  public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint {
420    private final UUID uuid = UUID.randomUUID();
421
422    @Override public void init(Context context) throws IOException {
423      this.ctx = context;
424    }
425
426    @Override public WALEntryFilter getWALEntryfilter() {
427      return null;
428    }
429
430    @Override public synchronized UUID getPeerUUID() {
431      return this.uuid;
432    }
433
434    @Override
435    protected void doStart() {
436      notifyStarted();
437    }
438
439    @Override
440    protected void doStop() {
441      notifyStopped();
442    }
443
444    @Override public boolean canReplicateToSameCluster() {
445      return true;
446    }
447  }
448
449  /**
450   * Deadend Endpoint. Does nothing.
451   */
452  public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint {
453
454    static int count = 0;
455
456    @Override
457    public synchronized UUID getPeerUUID() {
458      if (count==0) {
459        count++;
460        throw new RuntimeException();
461      } else {
462        return super.getPeerUUID();
463      }
464    }
465
466  }
467
468  /**
469   * Bad Endpoint with failing connection to peer on demand.
470   */
471  public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint {
472    static boolean failing = true;
473
474    @Override
475    public synchronized UUID getPeerUUID() {
476      return failing ? null : super.getPeerUUID();
477    }
478  }
479
480  public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
481
482    static int count = 0;
483
484    @Override
485    public synchronized UUID getPeerUUID() {
486      throw new RuntimeException();
487    }
488
489  }
490
491  /**
492   * Test HBASE-20497
493   * Moved here from TestReplicationSource because doesn't need cluster.
494   */
495  @Test
496  public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
497    String walGroupId = "fake-wal-group-id";
498    ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
499    ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
500    RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
501    Server server = mock(Server.class);
502    Mockito.when(server.getServerName()).thenReturn(serverName);
503    Mockito.when(source.getServer()).thenReturn(server);
504    Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
505    ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class);
506    Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
507      .thenReturn(1001L);
508    Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
509      .thenReturn(-1L);
510    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
511    conf.setInt("replication.source.maxretriesmultiplier", -1);
512    MetricsSource metricsSource = mock(MetricsSource.class);
513    doNothing().when(metricsSource).incrSizeOfLogQueue();
514    ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metricsSource, source);
515    logQueue.enqueueLog(new Path("/www/html/test"), walGroupId);
516    RecoveredReplicationSourceShipper shipper =
517      new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, source, storage);
518    assertEquals(1001L, shipper.getStartPosition());
519  }
520
521  private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf,
522      String endpointName) throws IOException {
523    conf.setInt("replication.source.maxretriesmultiplier", 1);
524    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
525    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
526    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
527    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
528    FaultyReplicationEndpoint.count = 0;
529    Mockito.when(peerConfig.getReplicationEndpointImpl()).
530      thenReturn(endpointName);
531    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
532    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
533    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
534    Mockito.when(manager.getGlobalMetrics()).
535      thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
536    String queueId = "qid";
537    RegionServerServices rss =
538      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
539    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
540      p -> OptionalLong.empty(), new MetricsSource(queueId));
541    return rss;
542  }
543
544  /**
545   * Test ReplicationSource retries startup once an uncaught exception happens
546   * during initialization and <b>eplication.source.regionserver.abort</b> is set to false.
547   */
548  @Test
549  public void testAbortFalseOnError() throws IOException {
550    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
551    conf.setBoolean("replication.source.regionserver.abort", false);
552    ReplicationSource rs = new ReplicationSource();
553    RegionServerServices rss = setupForAbortTests(rs, conf,
554      FlakyReplicationEndpoint.class.getName());
555    try {
556      rs.startup();
557      assertTrue(rs.isSourceActive());
558      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
559      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
560      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
561      rs.enqueueLog(new Path("a.1"));
562      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
563    } finally {
564      rs.terminate("Done");
565      rss.stop("Done");
566    }
567  }
568
569  @Test
570  public void testReplicationSourceInitializingMetric() throws IOException {
571    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
572    conf.setBoolean("replication.source.regionserver.abort", false);
573    ReplicationSource rs = new ReplicationSource();
574    RegionServerServices rss = setupForAbortTests(rs, conf,
575      BadReplicationEndpoint.class.getName());
576    try {
577      rs.startup();
578      assertTrue(rs.isSourceActive());
579      Waiter.waitFor(conf, 1000, () -> rs.getSourceMetrics().getSourceInitializing() == 1);
580      BadReplicationEndpoint.failing = false;
581      Waiter.waitFor(conf, 1000, () -> rs.getSourceMetrics().getSourceInitializing() == 0);
582    } finally {
583      rs.terminate("Done");
584      rss.stop("Done");
585    }
586  }
587
588  /**
589   * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread,
590   * when <b>eplication.source.regionserver.abort</b> is set to false.
591   */
592  @Test
593  public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException {
594    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
595    ReplicationSource rs = new ReplicationSource();
596    RegionServerServices rss = setupForAbortTests(rs, conf,
597      FaultyReplicationEndpoint.class.getName());
598    try {
599      rs.startup();
600      assertTrue(true);
601    } finally {
602      rs.terminate("Done");
603      rss.stop("Done");
604    }
605  }
606
607  /**
608   * Test ReplicationSource retries startup once an uncaught exception happens
609   * during initialization and <b>replication.source.regionserver.abort</b> is set to true.
610   */
611  @Test
612  public void testAbortTrueOnError() throws IOException {
613    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
614    ReplicationSource rs = new ReplicationSource();
615    RegionServerServices rss = setupForAbortTests(rs, conf,
616      FlakyReplicationEndpoint.class.getName());
617    try {
618      rs.startup();
619      assertTrue(rs.isSourceActive());
620      Waiter.waitFor(conf, 1000, () -> rss.isAborted());
621      assertTrue(rss.isAborted());
622      Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive());
623      assertFalse(rs.isSourceActive());
624    } finally {
625      rs.terminate("Done");
626      rss.stop("Done");
627    }
628  }
629
630  /*
631    Test age of oldest wal metric.
632  */
633  @Test
634  public void testAgeOfOldestWal() throws Exception {
635    try {
636      ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
637      EnvironmentEdgeManager.injectEdge(manualEdge);
638
639      String id = "1";
640      MetricsSource metrics = new MetricsSource(id);
641      Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
642      conf.setInt("replication.source.maxretriesmultiplier", 1);
643      ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
644      Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
645      Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
646      ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
647      Mockito.when(peerConfig.getReplicationEndpointImpl()).
648        thenReturn(DoNothingReplicationEndpoint.class.getName());
649      Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
650      ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
651      Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
652      Mockito.when(manager.getGlobalMetrics()).
653        thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
654      RegionServerServices rss =
655        TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
656
657      ReplicationSource source = new ReplicationSource();
658      source.init(conf, null, manager, null, mockPeer, rss, id, null,
659        p -> OptionalLong.empty(), metrics);
660
661      final Path log1 = new Path(logDir, "log-walgroup-a.8");
662      manualEdge.setValue(10);
663      // Diff of current time (10) and  log-walgroup-a.8 timestamp will be 2.
664      source.enqueueLog(log1);
665      MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id);
666      assertEquals(2, metricsSource1.getOldestWalAge());
667
668      final Path log2 = new Path(logDir, "log-walgroup-b.4");
669      // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
670      source.enqueueLog(log2);
671      assertEquals(6, metricsSource1.getOldestWalAge());
672      // Clear all metrics.
673      metrics.clear();
674    } finally {
675      EnvironmentEdgeManager.reset();
676    }
677  }
678
679  private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
680    MetricsReplicationSourceFactoryImpl factory =
681      (MetricsReplicationSourceFactoryImpl) CompatibilitySingletonFactory.getInstance(
682        MetricsReplicationSourceFactory.class);
683    return factory.getSource(sourceId);
684  }
685}