001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.lang.reflect.Field;
030import java.net.URLEncoder;
031import java.util.ArrayList;
032import java.util.Collection;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.NavigableMap;
037import java.util.NavigableSet;
038import java.util.Set;
039import java.util.SortedSet;
040import java.util.TreeMap;
041import java.util.TreeSet;
042import java.util.UUID;
043import java.util.concurrent.CountDownLatch;
044import java.util.stream.Collectors;
045
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.FileSystem;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.hbase.ChoreService;
050import org.apache.hadoop.hbase.ClusterId;
051import org.apache.hadoop.hbase.HBaseClassTestRule;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.HBaseTestingUtil;
054import org.apache.hadoop.hbase.HConstants;
055import org.apache.hadoop.hbase.KeyValue;
056import org.apache.hadoop.hbase.Server;
057import org.apache.hadoop.hbase.ServerName;
058import org.apache.hadoop.hbase.TableName;
059import org.apache.hadoop.hbase.Waiter;
060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
061import org.apache.hadoop.hbase.client.Connection;
062import org.apache.hadoop.hbase.client.RegionInfo;
063import org.apache.hadoop.hbase.client.RegionInfoBuilder;
064import org.apache.hadoop.hbase.client.TableDescriptor;
065import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
066import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
067import org.apache.hadoop.hbase.regionserver.RegionServerServices;
068import org.apache.hadoop.hbase.replication.ReplicationFactory;
069import org.apache.hadoop.hbase.replication.ReplicationPeer;
070import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
071import org.apache.hadoop.hbase.replication.ReplicationPeers;
072import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
073import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
074import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
075import org.apache.hadoop.hbase.replication.ReplicationUtils;
076import org.apache.hadoop.hbase.replication.SyncReplicationState;
077import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
078import org.apache.hadoop.hbase.testclassification.MediumTests;
079import org.apache.hadoop.hbase.testclassification.ReplicationTests;
080import org.apache.hadoop.hbase.util.Bytes;
081import org.apache.hadoop.hbase.util.CommonFSUtils;
082import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
083import org.apache.hadoop.hbase.util.JVMClusterUtil;
084import org.apache.hadoop.hbase.util.MockServer;
085import org.apache.hadoop.hbase.util.Pair;
086import org.apache.hadoop.hbase.wal.WAL;
087import org.apache.hadoop.hbase.wal.WALEdit;
088import org.apache.hadoop.hbase.wal.WALFactory;
089import org.apache.hadoop.hbase.wal.WALKeyImpl;
090import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
091import org.apache.hadoop.hbase.zookeeper.ZKUtil;
092import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
093import org.junit.After;
094import org.junit.AfterClass;
095import org.junit.Before;
096import org.junit.ClassRule;
097import org.junit.Rule;
098import org.junit.Test;
099import org.junit.experimental.categories.Category;
100import org.junit.rules.TestName;
101import org.slf4j.Logger;
102import org.slf4j.LoggerFactory;
103
104import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
105import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
106
107import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
110
111/**
112 * An abstract class that tests ReplicationSourceManager. Classes that extend this class should
113 * set up the proper config for this class and initialize the proper cluster using
114 * HBaseTestingUtility.
115 */
116@Category({ReplicationTests.class, MediumTests.class})
117public abstract class TestReplicationSourceManager {
118
119  @ClassRule
120  public static final HBaseClassTestRule CLASS_RULE =
121      HBaseClassTestRule.forClass(TestReplicationSourceManager.class);
122
123  protected static final Logger LOG =
124      LoggerFactory.getLogger(TestReplicationSourceManager.class);
125
126  protected static Configuration conf;
127
128  protected static HBaseTestingUtil utility;
129
130  protected static Replication replication;
131
132  protected static ReplicationSourceManager manager;
133
134  protected static ReplicationSourceManager managerOfCluster;
135
136  protected static ZKWatcher zkw;
137
138  protected static TableDescriptor htd;
139
140  protected static RegionInfo hri;
141
142  protected static final byte[] r1 = Bytes.toBytes("r1");
143
144  protected static final byte[] r2 = Bytes.toBytes("r2");
145
146  protected static final byte[] f1 = Bytes.toBytes("f1");
147
148  protected static final byte[] f2 = Bytes.toBytes("f2");
149
150  protected static final TableName test =
151      TableName.valueOf("test");
152
153  protected static final String slaveId = "1";
154
155  protected static FileSystem fs;
156
157  protected static Path oldLogDir;
158
159  protected static Path logDir;
160
161  protected static Path remoteLogDir;
162
163  protected static CountDownLatch latch;
164
165  protected static List<String> files = new ArrayList<>();
166  protected static NavigableMap<byte[], Integer> scopes;
167
168  protected static void setupZkAndReplication() throws Exception {
169    // The implementing class should set up the conf
170    assertNotNull(conf);
171    zkw = new ZKWatcher(conf, "test", null);
172    ZKUtil.createWithParents(zkw, "/hbase/replication");
173    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
174    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
175        Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
176            + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
177    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
178    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
179      ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
180    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/sync-rep-state");
181    ZKUtil.setData(zkw, "/hbase/replication/peers/1/sync-rep-state",
182      ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES);
183    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/new-sync-rep-state");
184    ZKUtil.setData(zkw, "/hbase/replication/peers/1/new-sync-rep-state",
185      ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES);
186    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
187    ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
188
189    ZKClusterId.setClusterId(zkw, new ClusterId());
190    CommonFSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
191    fs = FileSystem.get(conf);
192    oldLogDir = utility.getDataTestDir(HConstants.HREGION_OLDLOGDIR_NAME);
193    logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME);
194    remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME);
195    replication = new Replication();
196    replication.initialize(new DummyServer(), fs, logDir, oldLogDir,
197      new WALFactory(conf, "test", null, false));
198    managerOfCluster = getManagerFromCluster();
199    if (managerOfCluster != null) {
200      // After replication procedure, we need to add peer by hand (other than by receiving
201      // notification from zk)
202      managerOfCluster.addPeer(slaveId);
203    }
204
205    manager = replication.getReplicationManager();
206    manager.addSource(slaveId);
207    if (managerOfCluster != null) {
208      waitPeer(slaveId, managerOfCluster, true);
209    }
210    waitPeer(slaveId, manager, true);
211
212    htd = TableDescriptorBuilder.newBuilder(test)
213      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1)
214        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
215      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f2)).build();
216
217    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
218    for(byte[] fam : htd.getColumnFamilyNames()) {
219      scopes.put(fam, 0);
220    }
221    hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(r1).setEndKey(r2).build();
222  }
223
224  private static ReplicationSourceManager getManagerFromCluster() {
225    // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster.
226    if (utility.getMiniHBaseCluster() == null) {
227      return null;
228    }
229    return utility.getMiniHBaseCluster().getRegionServerThreads()
230        .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
231        .findAny()
232        .map(RegionServerServices::getReplicationSourceService)
233        .map(r -> (Replication)r)
234        .map(Replication::getReplicationManager)
235        .get();
236  }
237
238  @AfterClass
239  public static void tearDownAfterClass() throws Exception {
240    if (manager != null) {
241      manager.join();
242    }
243    utility.shutdownMiniCluster();
244  }
245
246  @Rule
247  public TestName testName = new TestName();
248
249  private void cleanLogDir() throws IOException {
250    fs.delete(logDir, true);
251    fs.delete(oldLogDir, true);
252    fs.delete(remoteLogDir, true);
253  }
254
255  @Before
256  public void setUp() throws Exception {
257    LOG.info("Start " + testName.getMethodName());
258    cleanLogDir();
259  }
260
261  @After
262  public void tearDown() throws Exception {
263    LOG.info("End " + testName.getMethodName());
264    cleanLogDir();
265    List<String> ids = manager.getSources().stream()
266        .map(ReplicationSourceInterface::getPeerId).collect(Collectors.toList());
267    for (String id : ids) {
268      if (slaveId.equals(id)) {
269        continue;
270      }
271      removePeerAndWait(id);
272    }
273  }
274
275  @Test
276  public void testLogRoll() throws Exception {
277    long baseline = 1000;
278    long time = baseline;
279    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
280    KeyValue kv = new KeyValue(r1, f1, r1);
281    WALEdit edit = new WALEdit();
282    edit.add(kv);
283
284    WALFactory wals =
285      new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8"));
286    ReplicationSourceManager replicationManager = replication.getReplicationManager();
287    wals.getWALProvider()
288      .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
289    final WAL wal = wals.getWAL(hri);
290    manager.init();
291    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("tableame"))
292      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f1)).build();
293    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
294    for(byte[] fam : htd.getColumnFamilyNames()) {
295      scopes.put(fam, 0);
296    }
297    // Testing normal log rolling every 20
298    for(long i = 1; i < 101; i++) {
299      if(i > 1 && i % 20 == 0) {
300        wal.rollWriter();
301      }
302      LOG.info(Long.toString(i));
303      final long txid = wal.appendData(hri,
304        new WALKeyImpl(hri.getEncodedNameAsBytes(), test, EnvironmentEdgeManager.currentTime(),
305          mvcc, scopes),
306        edit);
307      wal.sync(txid);
308    }
309
310    // Simulate a rapid insert that's followed
311    // by a report that's still not totally complete (missing last one)
312    LOG.info(baseline + " and " + time);
313    baseline += 101;
314    time = baseline;
315    LOG.info(baseline + " and " + time);
316
317    for (int i = 0; i < 3; i++) {
318      wal.appendData(hri,
319        new WALKeyImpl(hri.getEncodedNameAsBytes(), test, EnvironmentEdgeManager.currentTime(),
320          mvcc, scopes),
321        edit);
322    }
323    wal.sync();
324
325    int logNumber = 0;
326    for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId)
327      .entrySet()) {
328      logNumber += entry.getValue().size();
329    }
330    assertEquals(6, logNumber);
331
332    wal.rollWriter();
333
334    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
335    when(source.getQueueId()).thenReturn("1");
336    when(source.isRecovered()).thenReturn(false);
337    when(source.isSyncReplication()).thenReturn(false);
338    manager.logPositionAndCleanOldLogs(source,
339      new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
340
341    wal.appendData(hri,
342      new WALKeyImpl(hri.getEncodedNameAsBytes(), test, EnvironmentEdgeManager.currentTime(),
343        mvcc, scopes),
344      edit);
345    wal.sync();
346
347    assertEquals(1, manager.getWALs().size());
348
349
350    // TODO Need a case with only 2 WALs and we only want to delete the first one
351  }
352
353  @Test
354  public void testClaimQueues() throws Exception {
355    Server server = new DummyServer("hostname0.example.org");
356    ReplicationQueueStorage rq = ReplicationStorageFactory
357        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
358    // populate some znodes in the peer znode
359    files.add("log1");
360    files.add("log2");
361    for (String file : files) {
362      rq.addWAL(server.getServerName(), "1", file);
363    }
364    // create 3 DummyServers
365    Server s1 = new DummyServer("dummyserver1.example.org");
366    Server s2 = new DummyServer("dummyserver2.example.org");
367    Server s3 = new DummyServer("dummyserver3.example.org");
368
369    // create 3 DummyNodeFailoverWorkers
370    DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1);
371    DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2);
372    DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3);
373
374    latch = new CountDownLatch(3);
375    // start the threads
376    w1.start();
377    w2.start();
378    w3.start();
379    // make sure only one is successful
380    int populatedMap = 0;
381    // wait for result now... till all the workers are done.
382    latch.await();
383    populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated()
384        + w3.isLogZnodesMapPopulated();
385    assertEquals(1, populatedMap);
386    server.abort("", null);
387  }
388
389  @Test
390  public void testCleanupFailoverQueues() throws Exception {
391    Server server = new DummyServer("hostname1.example.org");
392    ReplicationQueueStorage rq = ReplicationStorageFactory
393        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
394    // populate some znodes in the peer znode
395    SortedSet<String> files = new TreeSet<>();
396    String group = "testgroup";
397    String file1 = group + "." + EnvironmentEdgeManager.currentTime() + ".log1";
398    String file2 = group + "." + EnvironmentEdgeManager.currentTime() + ".log2";
399    files.add(file1);
400    files.add(file2);
401    for (String file : files) {
402      rq.addWAL(server.getServerName(), "1", file);
403    }
404    Server s1 = new DummyServer("dummyserver1.example.org");
405    ReplicationPeers rp1 =
406        ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration());
407    rp1.init();
408    manager.claimQueue(server.getServerName(), "1");
409    assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
410    String id = "1-" + server.getServerName().getServerName();
411    assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
412    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
413    when(source.getQueueId()).thenReturn(id);
414    when(source.isRecovered()).thenReturn(true);
415    when(source.isSyncReplication()).thenReturn(false);
416    manager.cleanOldLogs(file2, false, source);
417    // log1 should be deleted
418    assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
419  }
420
421  @Test
422  public void testCleanupUnknownPeerZNode() throws Exception {
423    Server server = new DummyServer("hostname2.example.org");
424    ReplicationQueueStorage rq = ReplicationStorageFactory
425        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
426    // populate some znodes in the peer znode
427    // add log to an unknown peer
428    String group = "testgroup";
429    rq.addWAL(server.getServerName(), "2", group + ".log1");
430    rq.addWAL(server.getServerName(), "2", group + ".log2");
431
432    manager.claimQueue(server.getServerName(), "2");
433
434    // The log of the unknown peer should be removed from zk
435    for (String peer : manager.getAllQueues()) {
436      assertTrue(peer.startsWith("1"));
437    }
438  }
439
440  /**
441   * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the
442   * compaction WALEdit.
443   */
444  @Test
445  public void testCompactionWALEdits() throws Exception {
446    TableName tableName = TableName.valueOf("testCompactionWALEdits");
447    WALProtos.CompactionDescriptor compactionDescriptor =
448      WALProtos.CompactionDescriptor.getDefaultInstance();
449    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW)
450        .setEndKey(HConstants.EMPTY_END_ROW).build();
451    WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
452    ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf);
453  }
454
455  @Test
456  public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
457    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
458    // 1. Get the bulk load wal edit event
459    WALEdit logEdit = getBulkLoadWALEdit(scope);
460    // 2. Create wal key
461    WALKeyImpl logKey = new WALKeyImpl(scope);
462
463    // 3. Get the scopes for the key
464    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf);
465
466    // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
467    assertNull("No bulk load entries scope should be added if bulk load replication is disabled.",
468      logKey.getReplicationScopes());
469  }
470
471  @Test
472  public void testBulkLoadWALEdits() throws Exception {
473    // 1. Get the bulk load wal edit event
474    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
475    WALEdit logEdit = getBulkLoadWALEdit(scope);
476    // 2. Create wal key
477    WALKeyImpl logKey = new WALKeyImpl(scope);
478    // 3. Enable bulk load hfile replication
479    Configuration bulkLoadConf = HBaseConfiguration.create(conf);
480    bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
481
482    // 4. Get the scopes for the key
483    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf);
484
485    NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
486    // Assert family with replication scope global is present in the key scopes
487    assertTrue("This family scope is set to global, should be part of replication key scopes.",
488      scopes.containsKey(f1));
489    // Assert family with replication scope local is not present in the key scopes
490    assertFalse("This family scope is set to local, should not be part of replication key scopes",
491      scopes.containsKey(f2));
492  }
493
494  /**
495   * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the
496   * corresponding ReplicationSourceInterface correctly cleans up the corresponding
497   * replication queue and ReplicationPeer.
498   * See HBASE-16096.
499   */
500  @Test
501  public void testPeerRemovalCleanup() throws Exception{
502    String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
503    final String peerId = "FakePeer";
504    final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
505      .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
506    try {
507      DummyServer server = new DummyServer();
508      ReplicationQueueStorage rq = ReplicationStorageFactory
509          .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
510      // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
511      // initialization to throw an exception.
512      conf.set("replication.replicationsource.implementation",
513          FailInitializeDummyReplicationSource.class.getName());
514      manager.getReplicationPeers();
515      // Set up the znode and ReplicationPeer for the fake peer
516      // Don't wait for replication source to initialize, we know it won't.
517      addPeerAndWait(peerId, peerConfig, false);
518
519      // Sanity check
520      assertNull(manager.getSource(peerId));
521
522      // Create a replication queue for the fake peer
523      rq.addWAL(server.getServerName(), peerId, "FakeFile");
524      // Unregister peer, this should remove the peer and clear all queues associated with it
525      // Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
526      removePeerAndWait(peerId);
527      assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId));
528    } finally {
529      conf.set("replication.replicationsource.implementation", replicationSourceImplName);
530      removePeerAndWait(peerId);
531    }
532  }
533
534  private static MetricsReplicationSourceSource getGlobalSource() throws Exception {
535    ReplicationSourceInterface source = manager.getSource(slaveId);
536    // Retrieve the global replication metrics source
537    Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
538    f.setAccessible(true);
539    return (MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
540  }
541
542  private static long getSizeOfLatestPath() {
543    // If no mini cluster is running, there are extra replication manager influencing the metrics.
544    if (utility.getMiniHBaseCluster() == null) {
545      return 0;
546    }
547    return utility.getMiniHBaseCluster().getRegionServerThreads()
548        .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
549        .map(RegionServerServices::getReplicationSourceService)
550        .map(r -> (Replication)r)
551        .map(Replication::getReplicationManager)
552        .mapToLong(ReplicationSourceManager::getSizeOfLatestPath)
553        .sum();
554  }
555
556  @Test
557  public void testRemovePeerMetricsCleanup() throws Exception {
558    final String peerId = "DummyPeer";
559    final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
560      .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
561    try {
562      MetricsReplicationSourceSource globalSource = getGlobalSource();
563      final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
564      final long sizeOfLatestPath = getSizeOfLatestPath();
565      addPeerAndWait(peerId, peerConfig, true);
566      assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
567      ReplicationSourceInterface source = manager.getSource(peerId);
568      // Sanity check
569      assertNotNull(source);
570      final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
571      // Enqueue log and check if metrics updated
572      source.enqueueLog(new Path("abc"));
573      assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
574      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
575        globalSource.getSizeOfLogQueue());
576
577      // Removing the peer should reset the global metrics
578      removePeerAndWait(peerId);
579      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
580
581      // Adding the same peer back again should reset the single source metrics
582      addPeerAndWait(peerId, peerConfig, true);
583      source = manager.getSource(peerId);
584      assertNotNull(source);
585      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
586        globalSource.getSizeOfLogQueue());
587    } finally {
588      removePeerAndWait(peerId);
589    }
590  }
591
592  private ReplicationSourceInterface mockReplicationSource(String peerId) {
593    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
594    when(source.getPeerId()).thenReturn(peerId);
595    when(source.getQueueId()).thenReturn(peerId);
596    when(source.isRecovered()).thenReturn(false);
597    when(source.isSyncReplication()).thenReturn(true);
598    ReplicationPeerConfig config = mock(ReplicationPeerConfig.class);
599    when(config.getRemoteWALDir())
600      .thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
601    ReplicationPeer peer = mock(ReplicationPeer.class);
602    when(peer.getPeerConfig()).thenReturn(config);
603    when(source.getPeer()).thenReturn(peer);
604    return source;
605  }
606
607  @Test
608  public void testRemoveRemoteWALs() throws Exception {
609    String peerId2 = slaveId + "_2";
610    addPeerAndWait(peerId2,
611      ReplicationPeerConfig.newBuilder()
612        .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build(),
613      true);
614    try {
615      // make sure that we can deal with files which does not exist
616      String walNameNotExists =
617        "remoteWAL-12345-" + slaveId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX;
618      Path wal = new Path(logDir, walNameNotExists);
619      manager.preLogRoll(wal);
620      manager.postLogRoll(wal);
621
622      Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
623      fs.mkdirs(remoteLogDirForPeer);
624      String walName =
625        "remoteWAL-12345-" + slaveId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX;
626      Path remoteWAL =
627        new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
628      fs.create(remoteWAL).close();
629      wal = new Path(logDir, walName);
630      manager.preLogRoll(wal);
631      manager.postLogRoll(wal);
632
633      ReplicationSourceInterface source = mockReplicationSource(peerId2);
634      manager.cleanOldLogs(walName, true, source);
635      // still there if peer id does not match
636      assertTrue(fs.exists(remoteWAL));
637
638      source = mockReplicationSource(slaveId);
639      manager.cleanOldLogs(walName, true, source);
640      assertFalse(fs.exists(remoteWAL));
641    } finally {
642      removePeerAndWait(peerId2);
643    }
644  }
645
646  @Test
647  public void testSameWALPrefix() throws IOException {
648    Set<String> latestWalsBefore =
649      manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet());
650    String walName1 = "localhost,8080,12345-45678-Peer.34567";
651    String walName2 = "localhost,8080,12345.56789";
652    manager.preLogRoll(new Path(walName1));
653    manager.preLogRoll(new Path(walName2));
654
655    Set<String> latestWals = manager.getLastestPath().stream().map(Path::getName)
656      .filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet());
657    assertEquals(2, latestWals.size());
658    assertTrue(latestWals.contains(walName1));
659    assertTrue(latestWals.contains(walName2));
660  }
661
662  /**
663   * Add a peer and wait for it to initialize
664   * @param waitForSource Whether to wait for replication source to initialize
665   */
666  private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
667      final boolean waitForSource) throws Exception {
668    final ReplicationPeers rp = manager.getReplicationPeers();
669    rp.getPeerStorage().addPeer(peerId, peerConfig, true, SyncReplicationState.NONE);
670    try {
671      manager.addPeer(peerId);
672    } catch (Exception e) {
673      // ignore the failed exception, because we'll test both success & failed case.
674    }
675    waitPeer(peerId, manager, waitForSource);
676    if (managerOfCluster != null) {
677      managerOfCluster.addPeer(peerId);
678      waitPeer(peerId, managerOfCluster, waitForSource);
679    }
680  }
681
682  private static void waitPeer(final String peerId,
683      ReplicationSourceManager manager, final boolean waitForSource) {
684    ReplicationPeers rp = manager.getReplicationPeers();
685    Waiter.waitFor(conf, 20000, () -> {
686      if (waitForSource) {
687        ReplicationSourceInterface rs = manager.getSource(peerId);
688        if (rs == null) {
689          return false;
690        }
691        if (rs instanceof ReplicationSourceDummy) {
692          return ((ReplicationSourceDummy)rs).isStartup();
693        }
694        return true;
695      } else {
696        return (rp.getPeer(peerId) != null);
697      }
698    });
699  }
700
701  /**
702   * Remove a peer and wait for it to get cleaned up
703   */
704  private void removePeerAndWait(final String peerId) throws Exception {
705    final ReplicationPeers rp = manager.getReplicationPeers();
706    if (rp.getPeerStorage().listPeerIds().contains(peerId)) {
707      rp.getPeerStorage().removePeer(peerId);
708      try {
709        manager.removePeer(peerId);
710      } catch (Exception e) {
711        // ignore the failed exception and continue.
712      }
713    }
714    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
715      @Override
716      public boolean evaluate() throws Exception {
717        Collection<String> peers = rp.getPeerStorage().listPeerIds();
718        return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null)
719            && (!peers.contains(peerId)) && manager.getSource(peerId) == null;
720      }
721    });
722  }
723
724  private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
725    // 1. Create store files for the families
726    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
727    Map<String, Long> storeFilesSize = new HashMap<>(1);
728    List<Path> p = new ArrayList<>(1);
729    Path hfilePath1 = new Path(Bytes.toString(f1));
730    p.add(hfilePath1);
731    try {
732      storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen());
733    } catch (IOException e) {
734      LOG.debug("Failed to calculate the size of hfile " + hfilePath1);
735      storeFilesSize.put(hfilePath1.getName(), 0L);
736    }
737    storeFiles.put(f1, p);
738    scope.put(f1, 1);
739    p = new ArrayList<>(1);
740    Path hfilePath2 = new Path(Bytes.toString(f2));
741    p.add(hfilePath2);
742    try {
743      storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen());
744    } catch (IOException e) {
745      LOG.debug("Failed to calculate the size of hfile " + hfilePath2);
746      storeFilesSize.put(hfilePath2.getName(), 0L);
747    }
748    storeFiles.put(f2, p);
749    // 2. Create bulk load descriptor
750    BulkLoadDescriptor desc =
751        ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
752      UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1);
753
754    // 3. create bulk load wal edit event
755    WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);
756    return logEdit;
757  }
758
759  static class DummyNodeFailoverWorker extends Thread {
760    private Map<String, Set<String>> logZnodesMap;
761    Server server;
762    private ServerName deadRS;
763    ReplicationQueueStorage rq;
764
765    public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception {
766      this.deadRS = deadRS;
767      this.server = s;
768      this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(),
769        server.getConfiguration());
770    }
771
772    @Override
773    public void run() {
774      try {
775        logZnodesMap = new HashMap<>();
776        List<String> queues = rq.getAllQueues(deadRS);
777        for (String queue : queues) {
778          Pair<String, SortedSet<String>> pair =
779              rq.claimQueue(deadRS, queue, server.getServerName());
780          if (pair != null) {
781            logZnodesMap.put(pair.getFirst(), pair.getSecond());
782          }
783        }
784        server.abort("Done with testing", null);
785      } catch (Exception e) {
786        LOG.error("Got exception while running NodeFailoverWorker", e);
787      } finally {
788        latch.countDown();
789      }
790    }
791
792    /**
793     * @return 1 when the map is not empty.
794     */
795    private int isLogZnodesMapPopulated() {
796      Collection<Set<String>> sets = logZnodesMap.values();
797      if (sets.size() > 1) {
798        throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
799      }
800      if (sets.size() == 1) {
801        Set<String> s = sets.iterator().next();
802        for (String file : files) {
803          // at least one file was missing
804          if (!s.contains(file)) {
805            return 0;
806          }
807        }
808        return 1; // we found all the files
809      }
810      return 0;
811    }
812  }
813
814  static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
815
816    @Override
817    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
818        ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
819        UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
820        throws IOException {
821      throw new IOException("Failing deliberately");
822    }
823  }
824
825  static class DummyServer extends MockServer {
826    String hostname;
827
828    DummyServer() {
829      hostname = "hostname.example.org";
830    }
831
832    DummyServer(String hostname) {
833      this.hostname = hostname;
834    }
835
836    @Override
837    public Configuration getConfiguration() {
838      return conf;
839    }
840
841    @Override
842    public ZKWatcher getZooKeeper() {
843      return zkw;
844    }
845
846    @Override
847    public Connection getConnection() {
848      return null;
849    }
850
851    @Override
852    public ChoreService getChoreService() {
853      return null;
854    }
855
856    @Override
857    public ServerName getServerName() {
858      return ServerName.valueOf(hostname, 1234, 1L);
859    }
860  }
861}