001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026import static org.mockito.Mockito.doNothing;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.when;
029
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.OptionalLong;
033import java.util.UUID;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.Future;
037import java.util.concurrent.atomic.AtomicLong;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.CellBuilderFactory;
043import org.apache.hadoop.hbase.CellBuilderType;
044import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HBaseTestingUtil;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.KeyValue;
050import org.apache.hadoop.hbase.Server;
051import org.apache.hadoop.hbase.ServerName;
052import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.Waiter;
055import org.apache.hadoop.hbase.client.Admin;
056import org.apache.hadoop.hbase.regionserver.HRegionServer;
057import org.apache.hadoop.hbase.regionserver.RegionServerServices;
058import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
059import org.apache.hadoop.hbase.replication.ReplicationPeer;
060import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
061import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
062import org.apache.hadoop.hbase.replication.WALEntryFilter;
063import org.apache.hadoop.hbase.testclassification.MediumTests;
064import org.apache.hadoop.hbase.testclassification.ReplicationTests;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
068import org.apache.hadoop.hbase.wal.WAL;
069import org.apache.hadoop.hbase.wal.WALEdit;
070import org.apache.hadoop.hbase.wal.WALFactory;
071import org.apache.hadoop.hbase.wal.WALKeyImpl;
072import org.apache.hadoop.hbase.wal.WALProvider;
073import org.apache.hadoop.hbase.wal.WALStreamReader;
074import org.junit.AfterClass;
075import org.junit.BeforeClass;
076import org.junit.ClassRule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.mockito.Mockito;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083@Category({ ReplicationTests.class, MediumTests.class })
084public class TestReplicationSource {
085
086  @ClassRule
087  public static final HBaseClassTestRule CLASS_RULE =
088    HBaseClassTestRule.forClass(TestReplicationSource.class);
089
090  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSource.class);
091  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
092  private final static HBaseTestingUtil TEST_UTIL_PEER = new HBaseTestingUtil();
093  private static FileSystem FS;
094  private static Path oldLogDir;
095  private static Path logDir;
096  private static Configuration conf = TEST_UTIL.getConfiguration();
097
098  @BeforeClass
099  public static void setUpBeforeClass() throws Exception {
100    TEST_UTIL.startMiniDFSCluster(1);
101    FS = TEST_UTIL.getDFSCluster().getFileSystem();
102    Path rootDir = TEST_UTIL.createRootDir();
103    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
104    if (FS.exists(oldLogDir)) {
105      FS.delete(oldLogDir, true);
106    }
107    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
108    if (FS.exists(logDir)) {
109      FS.delete(logDir, true);
110    }
111  }
112
113  @AfterClass
114  public static void tearDownAfterClass() throws Exception {
115    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
116    TEST_UTIL.shutdownMiniHBaseCluster();
117    TEST_UTIL.shutdownMiniDFSCluster();
118  }
119
120  /**
121   * Test the default ReplicationSource skips queuing hbase:meta WAL files.
122   */
123  @Test
124  public void testDefaultSkipsMetaWAL() throws IOException {
125    ReplicationSource rs = new ReplicationSource();
126    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
127    conf.setInt("replication.source.maxretriesmultiplier", 1);
128    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
129    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
130    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
131    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
132    Mockito.when(peerConfig.getReplicationEndpointImpl())
133      .thenReturn(DoNothingReplicationEndpoint.class.getName());
134    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
135    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
136    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
137    Mockito.when(manager.getGlobalMetrics())
138      .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
139    String queueId = "qid";
140    RegionServerServices rss =
141      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
142    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(),
143      new MetricsSource(queueId));
144    try {
145      rs.startup();
146      assertTrue(rs.isSourceActive());
147      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
148      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
149      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
150      rs.enqueueLog(new Path("a.1"));
151      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
152    } finally {
153      rs.terminate("Done");
154      rss.stop("Done");
155    }
156  }
157
158  /**
159   * Test that we filter out meta edits, etc.
160   */
161  @Test
162  public void testWALEntryFilter() throws IOException {
163    // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource
164    // instance and init it.
165    ReplicationSource rs = new ReplicationSource();
166    UUID uuid = UUID.randomUUID();
167    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
168    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
169    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
170    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
171    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
172    Mockito.when(peerConfig.getReplicationEndpointImpl())
173      .thenReturn(DoNothingReplicationEndpoint.class.getName());
174    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
175    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
176    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
177    String queueId = "qid";
178    RegionServerServices rss =
179      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
180    rs.init(conf, null, manager, null, mockPeer, rss, queueId, uuid, p -> OptionalLong.empty(),
181      new MetricsSource(queueId));
182    try {
183      rs.startup();
184      TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
185      WALEntryFilter wef = rs.getWalEntryFilter();
186      // Test non-system WAL edit.
187      WALEdit we = new WALEdit()
188        .add(CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(HConstants.EMPTY_START_ROW)
189          .setFamily(HConstants.CATALOG_FAMILY).setType(Cell.Type.Put).build());
190      WAL.Entry e = new WAL.Entry(
191        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.valueOf("test"), -1, -1, uuid), we);
192      assertTrue(wef.filter(e) == e);
193      // Test system WAL edit.
194      e = new WAL.Entry(
195        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1, -1, uuid), we);
196      assertNull(wef.filter(e));
197    } finally {
198      rs.terminate("Done");
199      rss.stop("Done");
200    }
201  }
202
203  /**
204   * Sanity check that we can move logs around while we are reading from them. Should this test
205   * fail, ReplicationSource would have a hard time reading logs that are being archived.
206   */
207  // This tests doesn't belong in here... it is not about ReplicationSource.
208  @Test
209  public void testLogMoving() throws Exception {
210    Path logPath = new Path(logDir, "log");
211    if (!FS.exists(logDir)) {
212      FS.mkdirs(logDir);
213    }
214    if (!FS.exists(oldLogDir)) {
215      FS.mkdirs(oldLogDir);
216    }
217    WALProvider.Writer writer =
218      WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration());
219    for (int i = 0; i < 3; i++) {
220      byte[] b = Bytes.toBytes(Integer.toString(i));
221      KeyValue kv = new KeyValue(b, b, b);
222      WALEdit edit = new WALEdit();
223      edit.add(kv);
224      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID);
225      writer.append(new WAL.Entry(key, edit));
226      writer.sync(false);
227    }
228    writer.close();
229
230    WALStreamReader reader =
231      WALFactory.createStreamReader(FS, logPath, TEST_UTIL.getConfiguration());
232    WAL.Entry entry = reader.next();
233    assertNotNull(entry);
234
235    Path oldLogPath = new Path(oldLogDir, "log");
236    FS.rename(logPath, oldLogPath);
237
238    entry = reader.next();
239    assertNotNull(entry);
240
241    reader.next();
242    entry = reader.next();
243
244    assertNull(entry);
245    reader.close();
246  }
247
248  /**
249   * Tests that {@link ReplicationSource#terminate(String)} will timeout properly Moved here from
250   * TestReplicationSource because doesn't need cluster.
251   */
252  @Test
253  public void testTerminateTimeout() throws Exception {
254    ReplicationSource source = new ReplicationSource();
255    ReplicationEndpoint replicationEndpoint = new DoNothingReplicationEndpoint();
256    try {
257      replicationEndpoint.start();
258      ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
259      Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
260      Configuration testConf = HBaseConfiguration.create();
261      testConf.setInt("replication.source.maxretriesmultiplier", 1);
262      ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
263      Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
264      source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
265        p -> OptionalLong.empty(), null);
266      ExecutorService executor = Executors.newSingleThreadExecutor();
267      Future<?> future = executor.submit(() -> source.terminate("testing source termination"));
268      long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
269      Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone);
270    } finally {
271      replicationEndpoint.stop();
272    }
273  }
274
275  @Test
276  public void testTerminateClearsBuffer() throws Exception {
277    ReplicationSource source = new ReplicationSource();
278    ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class);
279    MetricsReplicationGlobalSourceSource mockMetrics =
280      mock(MetricsReplicationGlobalSourceSource.class);
281    AtomicLong buffer = new AtomicLong();
282    Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer);
283    Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics);
284    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
285    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
286    Configuration testConf = HBaseConfiguration.create();
287    source.init(testConf, null, mockManager, null, mockPeer, Mockito.mock(Server.class), "testPeer",
288      null, p -> OptionalLong.empty(), mock(MetricsSource.class));
289    ReplicationSourceWALReader reader =
290      new ReplicationSourceWALReader(null, conf, null, 0, null, source, null);
291    ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, null, source);
292    shipper.entryReader = reader;
293    source.workerThreads.put("testPeer", shipper);
294    WALEntryBatch batch = new WALEntryBatch(10, logDir);
295    WAL.Entry mockEntry = mock(WAL.Entry.class);
296    WALEdit mockEdit = mock(WALEdit.class);
297    WALKeyImpl mockKey = mock(WALKeyImpl.class);
298    when(mockEntry.getEdit()).thenReturn(mockEdit);
299    when(mockEdit.isEmpty()).thenReturn(false);
300    when(mockEntry.getKey()).thenReturn(mockKey);
301    when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
302    when(mockEdit.heapSize()).thenReturn(10000L);
303    when(mockEdit.size()).thenReturn(0);
304    ArrayList<Cell> cells = new ArrayList<>();
305    KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), Bytes.toBytes("1"),
306      Bytes.toBytes("v1"));
307    cells.add(kv);
308    when(mockEdit.getCells()).thenReturn(cells);
309    reader.addEntryToBatch(batch, mockEntry);
310    reader.entryBatchQueue.put(batch);
311    source.terminate("test");
312    assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
313  }
314
315  /**
316   * Tests that recovered queues are preserved on a regionserver shutdown. See HBASE-18192
317   */
318  @Test
319  public void testServerShutdownRecoveredQueue() throws Exception {
320    try {
321      // Ensure single-threaded WAL
322      conf.set("hbase.wal.provider", "defaultProvider");
323      conf.setInt("replication.sleep.before.failover", 2000);
324      // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
325      conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
326      SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
327      TEST_UTIL_PEER.startMiniCluster(1);
328
329      HRegionServer serverA = cluster.getRegionServer(0);
330      final ReplicationSourceManager managerA =
331        serverA.getReplicationSourceService().getReplicationManager();
332      HRegionServer serverB = cluster.getRegionServer(1);
333      final ReplicationSourceManager managerB =
334        serverB.getReplicationSourceService().getReplicationManager();
335      final Admin admin = TEST_UTIL.getAdmin();
336
337      final String peerId = "TestPeer";
338      admin.addReplicationPeer(peerId,
339        ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
340      // Wait for replication sources to come up
341      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
342        @Override
343        public boolean evaluate() {
344          return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
345        }
346      });
347      // Disabling peer makes sure there is at least one log to claim when the server dies
348      // The recovered queue will also stay there until the peer is disabled even if the
349      // WALs it contains have no data.
350      admin.disableReplicationPeer(peerId);
351
352      // Stopping serverA
353      // It's queues should be claimed by the only other alive server i.e. serverB
354      cluster.stopRegionServer(serverA.getServerName());
355      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
356        @Override
357        public boolean evaluate() throws Exception {
358          return managerB.getOldSources().size() == 1;
359        }
360      });
361
362      final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
363      serverC.waitForServerOnline();
364      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
365        @Override
366        public boolean evaluate() throws Exception {
367          return serverC.getReplicationSourceService() != null;
368        }
369      });
370      final ReplicationSourceManager managerC =
371        ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
372      // Sanity check
373      assertEquals(0, managerC.getOldSources().size());
374
375      // Stopping serverB
376      // Now serverC should have two recovered queues:
377      // 1. The serverB's normal queue
378      // 2. serverA's recovered queue on serverB
379      cluster.stopRegionServer(serverB.getServerName());
380      Waiter.waitFor(conf, 20000,
381        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2);
382      admin.enableReplicationPeer(peerId);
383      Waiter.waitFor(conf, 20000,
384        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0);
385    } finally {
386      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
387    }
388  }
389
390  /**
391   * Regionserver implementation that adds a delay on the graceful shutdown.
392   */
393  public static class ShutdownDelayRegionServer extends HRegionServer {
394    public ShutdownDelayRegionServer(Configuration conf) throws IOException {
395      super(conf);
396    }
397
398    @Override
399    protected void stopServiceThreads() {
400      // Add a delay before service threads are shutdown.
401      // This will keep the zookeeper connection alive for the duration of the delay.
402      LOG.info("Adding a delay to the regionserver shutdown");
403      try {
404        Thread.sleep(2000);
405      } catch (InterruptedException ex) {
406        LOG.error("Interrupted while sleeping");
407      }
408      super.stopServiceThreads();
409    }
410  }
411
412  /**
413   * Deadend Endpoint. Does nothing.
414   */
415  public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint {
416    private final UUID uuid = UUID.randomUUID();
417
418    @Override
419    public void init(Context context) throws IOException {
420      this.ctx = context;
421    }
422
423    @Override
424    public WALEntryFilter getWALEntryfilter() {
425      return null;
426    }
427
428    @Override
429    public synchronized UUID getPeerUUID() {
430      return this.uuid;
431    }
432
433    @Override
434    protected void doStart() {
435      notifyStarted();
436    }
437
438    @Override
439    protected void doStop() {
440      notifyStopped();
441    }
442
443    @Override
444    public boolean canReplicateToSameCluster() {
445      return true;
446    }
447  }
448
449  /**
450   * Deadend Endpoint. Does nothing.
451   */
452  public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint {
453
454    static int count = 0;
455
456    @Override
457    public synchronized UUID getPeerUUID() {
458      if (count == 0) {
459        count++;
460        throw new RuntimeException();
461      } else {
462        return super.getPeerUUID();
463      }
464    }
465
466  }
467
468  /**
469   * Bad Endpoint with failing connection to peer on demand.
470   */
471  public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint {
472    static boolean failing = true;
473
474    @Override
475    public synchronized UUID getPeerUUID() {
476      return failing ? null : super.getPeerUUID();
477    }
478  }
479
480  public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
481
482    static int count = 0;
483
484    @Override
485    public synchronized UUID getPeerUUID() {
486      throw new RuntimeException();
487    }
488
489  }
490
491  /**
492   * Test HBASE-20497 Moved here from TestReplicationSource because doesn't need cluster.
493   */
494  @Test
495  public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
496    String walGroupId = "fake-wal-group-id";
497    ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
498    ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
499    RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
500    Server server = mock(Server.class);
501    Mockito.when(server.getServerName()).thenReturn(serverName);
502    Mockito.when(source.getServer()).thenReturn(server);
503    Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
504    ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class);
505    Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
506      .thenReturn(1001L);
507    Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
508      .thenReturn(-1L);
509    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
510    conf.setInt("replication.source.maxretriesmultiplier", -1);
511    MetricsSource metricsSource = mock(MetricsSource.class);
512    doNothing().when(metricsSource).incrSizeOfLogQueue();
513    ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metricsSource, source);
514    logQueue.enqueueLog(new Path("/www/html/test"), walGroupId);
515    RecoveredReplicationSourceShipper shipper =
516      new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, source, storage);
517    assertEquals(1001L, shipper.getStartPosition());
518  }
519
520  private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf,
521    String endpointName) throws IOException {
522    conf.setInt("replication.source.maxretriesmultiplier", 1);
523    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
524    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
525    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
526    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
527    FaultyReplicationEndpoint.count = 0;
528    Mockito.when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName);
529    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
530    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
531    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
532    Mockito.when(manager.getGlobalMetrics())
533      .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
534    String queueId = "qid";
535    RegionServerServices rss =
536      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
537    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(),
538      new MetricsSource(queueId));
539    return rss;
540  }
541
542  /**
543   * Test ReplicationSource retries startup once an uncaught exception happens during initialization
544   * and <b>eplication.source.regionserver.abort</b> is set to false.
545   */
546  @Test
547  public void testAbortFalseOnError() throws IOException {
548    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
549    conf.setBoolean("replication.source.regionserver.abort", false);
550    ReplicationSource rs = new ReplicationSource();
551    RegionServerServices rss =
552      setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName());
553    try {
554      rs.startup();
555      assertTrue(rs.isSourceActive());
556      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
557      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
558      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
559      rs.enqueueLog(new Path("a.1"));
560      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
561    } finally {
562      rs.terminate("Done");
563      rss.stop("Done");
564    }
565  }
566
567  @Test
568  public void testReplicationSourceInitializingMetric() throws IOException {
569    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
570    conf.setBoolean("replication.source.regionserver.abort", false);
571    ReplicationSource rs = new ReplicationSource();
572    RegionServerServices rss = setupForAbortTests(rs, conf, BadReplicationEndpoint.class.getName());
573    try {
574      rs.startup();
575      assertTrue(rs.isSourceActive());
576      Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 1);
577      BadReplicationEndpoint.failing = false;
578      Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 0);
579    } finally {
580      rs.terminate("Done");
581      rss.stop("Done");
582    }
583  }
584
585  /**
586   * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread,
587   * when <b>replication.source.regionserver.abort</b> is set to false.
588   */
589  @Test
590  public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException {
591    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
592    ReplicationSource rs = new ReplicationSource();
593    RegionServerServices rss =
594      setupForAbortTests(rs, conf, FaultyReplicationEndpoint.class.getName());
595    try {
596      rs.startup();
597      assertTrue(true);
598    } finally {
599      rs.terminate("Done");
600      rss.stop("Done");
601    }
602  }
603
604  /**
605   * Test ReplicationSource retries startup once an uncaught exception happens during initialization
606   * and <b>replication.source.regionserver.abort</b> is set to true.
607   */
608  @Test
609  public void testAbortTrueOnError() throws IOException {
610    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
611    ReplicationSource rs = new ReplicationSource();
612    RegionServerServices rss =
613      setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName());
614    try {
615      rs.startup();
616      assertTrue(rs.isSourceActive());
617      Waiter.waitFor(conf, 1000, () -> rss.isAborted());
618      assertTrue(rss.isAborted());
619      Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive());
620      assertFalse(rs.isSourceActive());
621    } finally {
622      rs.terminate("Done");
623      rss.stop("Done");
624    }
625  }
626
627  /*
628   * Test age of oldest wal metric.
629   */
630  @Test
631  public void testAgeOfOldestWal() throws Exception {
632    try {
633      ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
634      EnvironmentEdgeManager.injectEdge(manualEdge);
635
636      String id = "1";
637      MetricsSource metrics = new MetricsSource(id);
638      Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
639      conf.setInt("replication.source.maxretriesmultiplier", 1);
640      ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
641      Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
642      Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
643      ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
644      Mockito.when(peerConfig.getReplicationEndpointImpl())
645        .thenReturn(DoNothingReplicationEndpoint.class.getName());
646      Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
647      ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
648      Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
649      Mockito.when(manager.getGlobalMetrics())
650        .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
651      RegionServerServices rss =
652        TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
653
654      ReplicationSource source = new ReplicationSource();
655      source.init(conf, null, manager, null, mockPeer, rss, id, null, p -> OptionalLong.empty(),
656        metrics);
657
658      final Path log1 = new Path(logDir, "log-walgroup-a.8");
659      manualEdge.setValue(10);
660      // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2.
661      source.enqueueLog(log1);
662      MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id);
663      assertEquals(2, metricsSource1.getOldestWalAge());
664
665      final Path log2 = new Path(logDir, "log-walgroup-b.4");
666      // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
667      source.enqueueLog(log2);
668      assertEquals(6, metricsSource1.getOldestWalAge());
669      // Clear all metrics.
670      metrics.clear();
671    } finally {
672      EnvironmentEdgeManager.reset();
673    }
674  }
675
676  private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
677    MetricsReplicationSourceFactoryImpl factory =
678      (MetricsReplicationSourceFactoryImpl) CompatibilitySingletonFactory
679        .getInstance(MetricsReplicationSourceFactory.class);
680    return factory.getSource(sourceId);
681  }
682}