001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.lang.reflect.Field;
030import java.net.URLEncoder;
031import java.util.ArrayList;
032import java.util.Collection;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.NavigableMap;
037import java.util.NavigableSet;
038import java.util.Set;
039import java.util.SortedSet;
040import java.util.TreeMap;
041import java.util.TreeSet;
042import java.util.UUID;
043import java.util.concurrent.CountDownLatch;
044import java.util.stream.Collectors;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.hbase.ChoreService;
049import org.apache.hadoop.hbase.ClusterId;
050import org.apache.hadoop.hbase.HBaseClassTestRule;
051import org.apache.hadoop.hbase.HBaseConfiguration;
052import org.apache.hadoop.hbase.HBaseTestingUtil;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.KeyValue;
055import org.apache.hadoop.hbase.Server;
056import org.apache.hadoop.hbase.ServerName;
057import org.apache.hadoop.hbase.TableName;
058import org.apache.hadoop.hbase.Waiter;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
060import org.apache.hadoop.hbase.client.Connection;
061import org.apache.hadoop.hbase.client.RegionInfo;
062import org.apache.hadoop.hbase.client.RegionInfoBuilder;
063import org.apache.hadoop.hbase.client.TableDescriptor;
064import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
065import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
066import org.apache.hadoop.hbase.regionserver.RegionServerServices;
067import org.apache.hadoop.hbase.replication.ReplicationFactory;
068import org.apache.hadoop.hbase.replication.ReplicationPeer;
069import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
070import org.apache.hadoop.hbase.replication.ReplicationPeers;
071import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
072import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
073import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
074import org.apache.hadoop.hbase.replication.ReplicationUtils;
075import org.apache.hadoop.hbase.replication.SyncReplicationState;
076import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
077import org.apache.hadoop.hbase.testclassification.MediumTests;
078import org.apache.hadoop.hbase.testclassification.ReplicationTests;
079import org.apache.hadoop.hbase.util.Bytes;
080import org.apache.hadoop.hbase.util.CommonFSUtils;
081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
082import org.apache.hadoop.hbase.util.JVMClusterUtil;
083import org.apache.hadoop.hbase.util.MockServer;
084import org.apache.hadoop.hbase.util.Pair;
085import org.apache.hadoop.hbase.wal.WAL;
086import org.apache.hadoop.hbase.wal.WALEdit;
087import org.apache.hadoop.hbase.wal.WALFactory;
088import org.apache.hadoop.hbase.wal.WALKeyImpl;
089import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
090import org.apache.hadoop.hbase.zookeeper.ZKUtil;
091import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
092import org.junit.After;
093import org.junit.AfterClass;
094import org.junit.Before;
095import org.junit.ClassRule;
096import org.junit.Rule;
097import org.junit.Test;
098import org.junit.experimental.categories.Category;
099import org.junit.rules.TestName;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
104import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
105
106import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
109
110/**
111 * An abstract class that tests ReplicationSourceManager. Classes that extend this class should set
112 * up the proper config for this class and initialize the proper cluster using HBaseTestingUtility.
113 */
114@Category({ ReplicationTests.class, MediumTests.class })
115public abstract class TestReplicationSourceManager {
116
117  @ClassRule
118  public static final HBaseClassTestRule CLASS_RULE =
119    HBaseClassTestRule.forClass(TestReplicationSourceManager.class);
120
121  protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationSourceManager.class);
122
123  protected static Configuration conf;
124
125  protected static HBaseTestingUtil utility;
126
127  protected static Replication replication;
128
129  protected static ReplicationSourceManager manager;
130
131  protected static ReplicationSourceManager managerOfCluster;
132
133  protected static ZKWatcher zkw;
134
135  protected static TableDescriptor htd;
136
137  protected static RegionInfo hri;
138
139  protected static final byte[] r1 = Bytes.toBytes("r1");
140
141  protected static final byte[] r2 = Bytes.toBytes("r2");
142
143  protected static final byte[] f1 = Bytes.toBytes("f1");
144
145  protected static final byte[] f2 = Bytes.toBytes("f2");
146
147  protected static final TableName test = TableName.valueOf("test");
148
149  protected static final String slaveId = "1";
150
151  protected static FileSystem fs;
152
153  protected static Path oldLogDir;
154
155  protected static Path logDir;
156
157  protected static Path remoteLogDir;
158
159  protected static CountDownLatch latch;
160
161  protected static List<String> files = new ArrayList<>();
162  protected static NavigableMap<byte[], Integer> scopes;
163
164  protected static void setupZkAndReplication() throws Exception {
165    // The implementing class should set up the conf
166    assertNotNull(conf);
167    zkw = new ZKWatcher(conf, "test", null);
168    ZKUtil.createWithParents(zkw, "/hbase/replication");
169    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
170    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
171      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
172        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
173    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
174    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
175      ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
176    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/sync-rep-state");
177    ZKUtil.setData(zkw, "/hbase/replication/peers/1/sync-rep-state",
178      ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES);
179    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/new-sync-rep-state");
180    ZKUtil.setData(zkw, "/hbase/replication/peers/1/new-sync-rep-state",
181      ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES);
182    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
183    ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
184
185    ZKClusterId.setClusterId(zkw, new ClusterId());
186    CommonFSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
187    fs = FileSystem.get(conf);
188    oldLogDir = utility.getDataTestDir(HConstants.HREGION_OLDLOGDIR_NAME);
189    logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME);
190    remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME);
191    replication = new Replication();
192    replication.initialize(new DummyServer(), fs, logDir, oldLogDir,
193      new WALFactory(conf, "test", null, false));
194    managerOfCluster = getManagerFromCluster();
195    if (managerOfCluster != null) {
196      // After replication procedure, we need to add peer by hand (other than by receiving
197      // notification from zk)
198      managerOfCluster.addPeer(slaveId);
199    }
200
201    manager = replication.getReplicationManager();
202    manager.addSource(slaveId);
203    if (managerOfCluster != null) {
204      waitPeer(slaveId, managerOfCluster, true);
205    }
206    waitPeer(slaveId, manager, true);
207
208    htd = TableDescriptorBuilder.newBuilder(test)
209      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1)
210        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
211      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f2)).build();
212
213    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
214    for (byte[] fam : htd.getColumnFamilyNames()) {
215      scopes.put(fam, 0);
216    }
217    hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(r1).setEndKey(r2).build();
218  }
219
220  private static ReplicationSourceManager getManagerFromCluster() {
221    // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster.
222    if (utility.getMiniHBaseCluster() == null) {
223      return null;
224    }
225    return utility.getMiniHBaseCluster().getRegionServerThreads().stream()
226      .map(JVMClusterUtil.RegionServerThread::getRegionServer).findAny()
227      .map(RegionServerServices::getReplicationSourceService).map(r -> (Replication) r)
228      .map(Replication::getReplicationManager).get();
229  }
230
231  @AfterClass
232  public static void tearDownAfterClass() throws Exception {
233    if (manager != null) {
234      manager.join();
235    }
236    utility.shutdownMiniCluster();
237  }
238
239  @Rule
240  public TestName testName = new TestName();
241
242  private void cleanLogDir() throws IOException {
243    fs.delete(logDir, true);
244    fs.delete(oldLogDir, true);
245    fs.delete(remoteLogDir, true);
246  }
247
248  @Before
249  public void setUp() throws Exception {
250    LOG.info("Start " + testName.getMethodName());
251    cleanLogDir();
252  }
253
254  @After
255  public void tearDown() throws Exception {
256    LOG.info("End " + testName.getMethodName());
257    cleanLogDir();
258    List<String> ids = manager.getSources().stream().map(ReplicationSourceInterface::getPeerId)
259      .collect(Collectors.toList());
260    for (String id : ids) {
261      if (slaveId.equals(id)) {
262        continue;
263      }
264      removePeerAndWait(id);
265    }
266  }
267
268  @Test
269  public void testLogRoll() throws Exception {
270    long baseline = 1000;
271    long time = baseline;
272    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
273    KeyValue kv = new KeyValue(r1, f1, r1);
274    WALEdit edit = new WALEdit();
275    edit.add(kv);
276
277    WALFactory wals =
278      new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8"));
279    ReplicationSourceManager replicationManager = replication.getReplicationManager();
280    wals.getWALProvider()
281      .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
282    final WAL wal = wals.getWAL(hri);
283    manager.init();
284    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("tableame"))
285      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f1)).build();
286    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
287    for (byte[] fam : htd.getColumnFamilyNames()) {
288      scopes.put(fam, 0);
289    }
290    // Testing normal log rolling every 20
291    for (long i = 1; i < 101; i++) {
292      if (i > 1 && i % 20 == 0) {
293        wal.rollWriter();
294      }
295      LOG.info(Long.toString(i));
296      final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test,
297        EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit);
298      wal.sync(txid);
299    }
300
301    // Simulate a rapid insert that's followed
302    // by a report that's still not totally complete (missing last one)
303    LOG.info(baseline + " and " + time);
304    baseline += 101;
305    time = baseline;
306    LOG.info(baseline + " and " + time);
307
308    for (int i = 0; i < 3; i++) {
309      wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test,
310        EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit);
311    }
312    wal.sync();
313
314    int logNumber = 0;
315    for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId)
316      .entrySet()) {
317      logNumber += entry.getValue().size();
318    }
319    assertEquals(6, logNumber);
320
321    wal.rollWriter();
322
323    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
324    when(source.getQueueId()).thenReturn("1");
325    when(source.isRecovered()).thenReturn(false);
326    when(source.isSyncReplication()).thenReturn(false);
327    manager.logPositionAndCleanOldLogs(source,
328      new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
329
330    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test,
331      EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit);
332    wal.sync();
333
334    assertEquals(1, manager.getWALs().size());
335
336    // TODO Need a case with only 2 WALs and we only want to delete the first one
337  }
338
339  @Test
340  public void testClaimQueues() throws Exception {
341    Server server = new DummyServer("hostname0.example.org");
342    ReplicationQueueStorage rq = ReplicationStorageFactory
343      .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
344    // populate some znodes in the peer znode
345    files.add("log1");
346    files.add("log2");
347    for (String file : files) {
348      rq.addWAL(server.getServerName(), "1", file);
349    }
350    // create 3 DummyServers
351    Server s1 = new DummyServer("dummyserver1.example.org");
352    Server s2 = new DummyServer("dummyserver2.example.org");
353    Server s3 = new DummyServer("dummyserver3.example.org");
354
355    // create 3 DummyNodeFailoverWorkers
356    DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1);
357    DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2);
358    DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3);
359
360    latch = new CountDownLatch(3);
361    // start the threads
362    w1.start();
363    w2.start();
364    w3.start();
365    // make sure only one is successful
366    int populatedMap = 0;
367    // wait for result now... till all the workers are done.
368    latch.await();
369    populatedMap +=
370      w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() + w3.isLogZnodesMapPopulated();
371    assertEquals(1, populatedMap);
372    server.abort("", null);
373  }
374
375  @Test
376  public void testCleanupFailoverQueues() throws Exception {
377    Server server = new DummyServer("hostname1.example.org");
378    ReplicationQueueStorage rq = ReplicationStorageFactory
379      .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
380    // populate some znodes in the peer znode
381    SortedSet<String> files = new TreeSet<>();
382    String group = "testgroup";
383    String file1 = group + "." + EnvironmentEdgeManager.currentTime() + ".log1";
384    String file2 = group + "." + EnvironmentEdgeManager.currentTime() + ".log2";
385    files.add(file1);
386    files.add(file2);
387    for (String file : files) {
388      rq.addWAL(server.getServerName(), "1", file);
389    }
390    Server s1 = new DummyServer("dummyserver1.example.org");
391    ReplicationPeers rp1 =
392      ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration());
393    rp1.init();
394    manager.claimQueue(server.getServerName(), "1");
395    assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
396    String id = "1-" + server.getServerName().getServerName();
397    assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
398    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
399    when(source.getQueueId()).thenReturn(id);
400    when(source.isRecovered()).thenReturn(true);
401    when(source.isSyncReplication()).thenReturn(false);
402    manager.cleanOldLogs(file2, false, source);
403    // log1 should be deleted
404    assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
405  }
406
407  @Test
408  public void testCleanupUnknownPeerZNode() throws Exception {
409    Server server = new DummyServer("hostname2.example.org");
410    ReplicationQueueStorage rq = ReplicationStorageFactory
411      .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
412    // populate some znodes in the peer znode
413    // add log to an unknown peer
414    String group = "testgroup";
415    rq.addWAL(server.getServerName(), "2", group + ".log1");
416    rq.addWAL(server.getServerName(), "2", group + ".log2");
417
418    manager.claimQueue(server.getServerName(), "2");
419
420    // The log of the unknown peer should be removed from zk
421    for (String peer : manager.getAllQueues()) {
422      assertTrue(peer.startsWith("1"));
423    }
424  }
425
426  /**
427   * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the
428   * compaction WALEdit.
429   */
430  @Test
431  public void testCompactionWALEdits() throws Exception {
432    TableName tableName = TableName.valueOf("testCompactionWALEdits");
433    WALProtos.CompactionDescriptor compactionDescriptor =
434      WALProtos.CompactionDescriptor.getDefaultInstance();
435    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW)
436      .setEndKey(HConstants.EMPTY_END_ROW).build();
437    WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
438    ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf);
439  }
440
441  @Test
442  public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
443    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
444    // 1. Get the bulk load wal edit event
445    WALEdit logEdit = getBulkLoadWALEdit(scope);
446    // 2. Create wal key
447    WALKeyImpl logKey = new WALKeyImpl(scope);
448
449    // 3. Get the scopes for the key
450    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf);
451
452    // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
453    assertNull("No bulk load entries scope should be added if bulk load replication is disabled.",
454      logKey.getReplicationScopes());
455  }
456
457  @Test
458  public void testBulkLoadWALEdits() throws Exception {
459    // 1. Get the bulk load wal edit event
460    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
461    WALEdit logEdit = getBulkLoadWALEdit(scope);
462    // 2. Create wal key
463    WALKeyImpl logKey = new WALKeyImpl(scope);
464    // 3. Enable bulk load hfile replication
465    Configuration bulkLoadConf = HBaseConfiguration.create(conf);
466    bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
467
468    // 4. Get the scopes for the key
469    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf);
470
471    NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
472    // Assert family with replication scope global is present in the key scopes
473    assertTrue("This family scope is set to global, should be part of replication key scopes.",
474      scopes.containsKey(f1));
475    // Assert family with replication scope local is not present in the key scopes
476    assertFalse("This family scope is set to local, should not be part of replication key scopes",
477      scopes.containsKey(f2));
478  }
479
480  /**
481   * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the
482   * corresponding ReplicationSourceInterface correctly cleans up the corresponding replication
483   * queue and ReplicationPeer. See HBASE-16096.
484   */
485  @Test
486  public void testPeerRemovalCleanup() throws Exception {
487    String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
488    final String peerId = "FakePeer";
489    final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
490      .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
491    try {
492      DummyServer server = new DummyServer();
493      ReplicationQueueStorage rq = ReplicationStorageFactory
494        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
495      // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
496      // initialization to throw an exception.
497      conf.set("replication.replicationsource.implementation",
498        FailInitializeDummyReplicationSource.class.getName());
499      manager.getReplicationPeers();
500      // Set up the znode and ReplicationPeer for the fake peer
501      // Don't wait for replication source to initialize, we know it won't.
502      addPeerAndWait(peerId, peerConfig, false);
503
504      // Sanity check
505      assertNull(manager.getSource(peerId));
506
507      // Create a replication queue for the fake peer
508      rq.addWAL(server.getServerName(), peerId, "FakeFile");
509      // Unregister peer, this should remove the peer and clear all queues associated with it
510      // Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
511      removePeerAndWait(peerId);
512      assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId));
513    } finally {
514      conf.set("replication.replicationsource.implementation", replicationSourceImplName);
515      removePeerAndWait(peerId);
516    }
517  }
518
519  private static MetricsReplicationSourceSource getGlobalSource() throws Exception {
520    ReplicationSourceInterface source = manager.getSource(slaveId);
521    // Retrieve the global replication metrics source
522    Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
523    f.setAccessible(true);
524    return (MetricsReplicationSourceSource) f.get(source.getSourceMetrics());
525  }
526
527  private static long getSizeOfLatestPath() {
528    // If no mini cluster is running, there are extra replication manager influencing the metrics.
529    if (utility.getMiniHBaseCluster() == null) {
530      return 0;
531    }
532    return utility.getMiniHBaseCluster().getRegionServerThreads().stream()
533      .map(JVMClusterUtil.RegionServerThread::getRegionServer)
534      .map(RegionServerServices::getReplicationSourceService).map(r -> (Replication) r)
535      .map(Replication::getReplicationManager)
536      .mapToLong(ReplicationSourceManager::getSizeOfLatestPath).sum();
537  }
538
539  @Test
540  public void testRemovePeerMetricsCleanup() throws Exception {
541    final String peerId = "DummyPeer";
542    final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
543      .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
544    try {
545      MetricsReplicationSourceSource globalSource = getGlobalSource();
546      final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
547      final long sizeOfLatestPath = getSizeOfLatestPath();
548      addPeerAndWait(peerId, peerConfig, true);
549      assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
550      ReplicationSourceInterface source = manager.getSource(peerId);
551      // Sanity check
552      assertNotNull(source);
553      final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
554      // Enqueue log and check if metrics updated
555      source.enqueueLog(new Path("abc"));
556      assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
557      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
558        globalSource.getSizeOfLogQueue());
559
560      // Removing the peer should reset the global metrics
561      removePeerAndWait(peerId);
562      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
563
564      // Adding the same peer back again should reset the single source metrics
565      addPeerAndWait(peerId, peerConfig, true);
566      source = manager.getSource(peerId);
567      assertNotNull(source);
568      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
569        globalSource.getSizeOfLogQueue());
570    } finally {
571      removePeerAndWait(peerId);
572    }
573  }
574
575  @Test
576  public void testDisablePeerMetricsCleanup() throws Exception {
577    final String peerId = "DummyPeer";
578    final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
579      .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
580    try {
581      MetricsReplicationSourceSource globalSource = getGlobalSource();
582      final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
583      final long sizeOfLatestPath = getSizeOfLatestPath();
584      addPeerAndWait(peerId, peerConfig, true);
585      assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
586      ReplicationSourceInterface source = manager.getSource(peerId);
587      // Sanity check
588      assertNotNull(source);
589      final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
590      // Enqueue log and check if metrics updated
591      source.enqueueLog(new Path("abc"));
592      assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
593      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
594        globalSource.getSizeOfLogQueue());
595
596      // Refreshing the peer should decrement the global and single source metrics
597      manager.refreshSources(peerId);
598      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
599
600      source = manager.getSource(peerId);
601      assertNotNull(source);
602      assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
603      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
604        globalSource.getSizeOfLogQueue());
605    } finally {
606      removePeerAndWait(peerId);
607    }
608  }
609
610  private ReplicationSourceInterface mockReplicationSource(String peerId) {
611    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
612    when(source.getPeerId()).thenReturn(peerId);
613    when(source.getQueueId()).thenReturn(peerId);
614    when(source.isRecovered()).thenReturn(false);
615    when(source.isSyncReplication()).thenReturn(true);
616    ReplicationPeerConfig config = mock(ReplicationPeerConfig.class);
617    when(config.getRemoteWALDir())
618      .thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
619    ReplicationPeer peer = mock(ReplicationPeer.class);
620    when(peer.getPeerConfig()).thenReturn(config);
621    when(source.getPeer()).thenReturn(peer);
622    return source;
623  }
624
625  @Test
626  public void testRemoveRemoteWALs() throws Exception {
627    String peerId2 = slaveId + "_2";
628    addPeerAndWait(peerId2,
629      ReplicationPeerConfig.newBuilder()
630        .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build(),
631      true);
632    try {
633      // make sure that we can deal with files which does not exist
634      String walNameNotExists =
635        "remoteWAL-12345-" + slaveId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX;
636      Path wal = new Path(logDir, walNameNotExists);
637      manager.preLogRoll(wal);
638      manager.postLogRoll(wal);
639
640      Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
641      fs.mkdirs(remoteLogDirForPeer);
642      String walName = "remoteWAL-12345-" + slaveId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX;
643      Path remoteWAL =
644        new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
645      fs.create(remoteWAL).close();
646      wal = new Path(logDir, walName);
647      manager.preLogRoll(wal);
648      manager.postLogRoll(wal);
649
650      ReplicationSourceInterface source = mockReplicationSource(peerId2);
651      manager.cleanOldLogs(walName, true, source);
652      // still there if peer id does not match
653      assertTrue(fs.exists(remoteWAL));
654
655      source = mockReplicationSource(slaveId);
656      manager.cleanOldLogs(walName, true, source);
657      assertFalse(fs.exists(remoteWAL));
658    } finally {
659      removePeerAndWait(peerId2);
660    }
661  }
662
663  @Test
664  public void testSameWALPrefix() throws IOException {
665    Set<String> latestWalsBefore =
666      manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet());
667    String walName1 = "localhost,8080,12345-45678-Peer.34567";
668    String walName2 = "localhost,8080,12345.56789";
669    manager.preLogRoll(new Path(walName1));
670    manager.preLogRoll(new Path(walName2));
671
672    Set<String> latestWals = manager.getLastestPath().stream().map(Path::getName)
673      .filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet());
674    assertEquals(2, latestWals.size());
675    assertTrue(latestWals.contains(walName1));
676    assertTrue(latestWals.contains(walName2));
677  }
678
679  /**
680   * Add a peer and wait for it to initialize
681   * @param waitForSource Whether to wait for replication source to initialize
682   */
683  private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
684    final boolean waitForSource) throws Exception {
685    final ReplicationPeers rp = manager.getReplicationPeers();
686    rp.getPeerStorage().addPeer(peerId, peerConfig, true, SyncReplicationState.NONE);
687    try {
688      manager.addPeer(peerId);
689    } catch (Exception e) {
690      // ignore the failed exception, because we'll test both success & failed case.
691    }
692    waitPeer(peerId, manager, waitForSource);
693    if (managerOfCluster != null) {
694      managerOfCluster.addPeer(peerId);
695      waitPeer(peerId, managerOfCluster, waitForSource);
696    }
697  }
698
699  private static void waitPeer(final String peerId, ReplicationSourceManager manager,
700    final boolean waitForSource) {
701    ReplicationPeers rp = manager.getReplicationPeers();
702    Waiter.waitFor(conf, 20000, () -> {
703      if (waitForSource) {
704        ReplicationSourceInterface rs = manager.getSource(peerId);
705        if (rs == null) {
706          return false;
707        }
708        if (rs instanceof ReplicationSourceDummy) {
709          return ((ReplicationSourceDummy) rs).isStartup();
710        }
711        return true;
712      } else {
713        return (rp.getPeer(peerId) != null);
714      }
715    });
716  }
717
718  /**
719   * Remove a peer and wait for it to get cleaned up
720   */
721  private void removePeerAndWait(final String peerId) throws Exception {
722    final ReplicationPeers rp = manager.getReplicationPeers();
723    if (rp.getPeerStorage().listPeerIds().contains(peerId)) {
724      rp.getPeerStorage().removePeer(peerId);
725      try {
726        manager.removePeer(peerId);
727      } catch (Exception e) {
728        // ignore the failed exception and continue.
729      }
730    }
731    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
732      @Override
733      public boolean evaluate() throws Exception {
734        Collection<String> peers = rp.getPeerStorage().listPeerIds();
735        return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null)
736          && (!peers.contains(peerId)) && manager.getSource(peerId) == null;
737      }
738    });
739  }
740
741  private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
742    // 1. Create store files for the families
743    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
744    Map<String, Long> storeFilesSize = new HashMap<>(1);
745    List<Path> p = new ArrayList<>(1);
746    Path hfilePath1 = new Path(Bytes.toString(f1));
747    p.add(hfilePath1);
748    try {
749      storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen());
750    } catch (IOException e) {
751      LOG.debug("Failed to calculate the size of hfile " + hfilePath1);
752      storeFilesSize.put(hfilePath1.getName(), 0L);
753    }
754    storeFiles.put(f1, p);
755    scope.put(f1, 1);
756    p = new ArrayList<>(1);
757    Path hfilePath2 = new Path(Bytes.toString(f2));
758    p.add(hfilePath2);
759    try {
760      storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen());
761    } catch (IOException e) {
762      LOG.debug("Failed to calculate the size of hfile " + hfilePath2);
763      storeFilesSize.put(hfilePath2.getName(), 0L);
764    }
765    storeFiles.put(f2, p);
766    // 2. Create bulk load descriptor
767    BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
768      UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1);
769
770    // 3. create bulk load wal edit event
771    WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);
772    return logEdit;
773  }
774
775  static class DummyNodeFailoverWorker extends Thread {
776    private Map<String, Set<String>> logZnodesMap;
777    Server server;
778    private ServerName deadRS;
779    ReplicationQueueStorage rq;
780
781    public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception {
782      this.deadRS = deadRS;
783      this.server = s;
784      this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(),
785        server.getConfiguration());
786    }
787
788    @Override
789    public void run() {
790      try {
791        logZnodesMap = new HashMap<>();
792        List<String> queues = rq.getAllQueues(deadRS);
793        for (String queue : queues) {
794          Pair<String, SortedSet<String>> pair =
795            rq.claimQueue(deadRS, queue, server.getServerName());
796          if (pair != null) {
797            logZnodesMap.put(pair.getFirst(), pair.getSecond());
798          }
799        }
800        server.abort("Done with testing", null);
801      } catch (Exception e) {
802        LOG.error("Got exception while running NodeFailoverWorker", e);
803      } finally {
804        latch.countDown();
805      }
806    }
807
808    /** Returns 1 when the map is not empty. */
809    private int isLogZnodesMapPopulated() {
810      Collection<Set<String>> sets = logZnodesMap.values();
811      if (sets.size() > 1) {
812        throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
813      }
814      if (sets.size() == 1) {
815        Set<String> s = sets.iterator().next();
816        for (String file : files) {
817          // at least one file was missing
818          if (!s.contains(file)) {
819            return 0;
820          }
821        }
822        return 1; // we found all the files
823      }
824      return 0;
825    }
826  }
827
828  static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
829
830    @Override
831    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
832      ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
833      UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
834      throws IOException {
835      throw new IOException("Failing deliberately");
836    }
837  }
838
839  static class DummyServer extends MockServer {
840    String hostname;
841
842    DummyServer() {
843      hostname = "hostname.example.org";
844    }
845
846    DummyServer(String hostname) {
847      this.hostname = hostname;
848    }
849
850    @Override
851    public Configuration getConfiguration() {
852      return conf;
853    }
854
855    @Override
856    public ZKWatcher getZooKeeper() {
857      return zkw;
858    }
859
860    @Override
861    public Connection getConnection() {
862      return null;
863    }
864
865    @Override
866    public ChoreService getChoreService() {
867      return null;
868    }
869
870    @Override
871    public ServerName getServerName() {
872      return ServerName.valueOf(hostname, 1234, 1L);
873    }
874  }
875}