001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.reflect.Field;
028import java.net.URLEncoder;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.NavigableMap;
035import java.util.NavigableSet;
036import java.util.Set;
037import java.util.SortedSet;
038import java.util.TreeMap;
039import java.util.TreeSet;
040import java.util.UUID;
041import java.util.concurrent.CountDownLatch;
042import java.util.stream.Collectors;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.ChoreService;
047import org.apache.hadoop.hbase.ClusterId;
048import org.apache.hadoop.hbase.CoordinatedStateManager;
049import org.apache.hadoop.hbase.HBaseClassTestRule;
050import org.apache.hadoop.hbase.HBaseConfiguration;
051import org.apache.hadoop.hbase.HBaseTestingUtility;
052import org.apache.hadoop.hbase.HColumnDescriptor;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.HRegionInfo;
055import org.apache.hadoop.hbase.HTableDescriptor;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.Server;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.Waiter;
061import org.apache.hadoop.hbase.client.ClusterConnection;
062import org.apache.hadoop.hbase.client.Connection;
063import org.apache.hadoop.hbase.client.RegionInfo;
064import org.apache.hadoop.hbase.client.RegionInfoBuilder;
065import org.apache.hadoop.hbase.regionserver.HRegionServer;
066import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
067import org.apache.hadoop.hbase.replication.ReplicationFactory;
068import org.apache.hadoop.hbase.replication.ReplicationPeer;
069import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
070import org.apache.hadoop.hbase.replication.ReplicationPeers;
071import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
072import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
073import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
074import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
075import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
076import org.apache.hadoop.hbase.testclassification.MediumTests;
077import org.apache.hadoop.hbase.testclassification.ReplicationTests;
078import org.apache.hadoop.hbase.util.Bytes;
079import org.apache.hadoop.hbase.util.FSUtils;
080import org.apache.hadoop.hbase.util.JVMClusterUtil;
081import org.apache.hadoop.hbase.util.Pair;
082import org.apache.hadoop.hbase.wal.WAL;
083import org.apache.hadoop.hbase.wal.WALEdit;
084import org.apache.hadoop.hbase.wal.WALFactory;
085import org.apache.hadoop.hbase.wal.WALKeyImpl;
086import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
087import org.apache.hadoop.hbase.zookeeper.ZKUtil;
088import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
089import org.junit.After;
090import org.junit.AfterClass;
091import org.junit.Before;
092import org.junit.ClassRule;
093import org.junit.Rule;
094import org.junit.Test;
095import org.junit.experimental.categories.Category;
096import org.junit.rules.TestName;
097import org.slf4j.Logger;
098import org.slf4j.LoggerFactory;
099
100import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
101import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
102
103import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
106
107/**
108 * An abstract class that tests ReplicationSourceManager. Classes that extend this class should
109 * set up the proper config for this class and initialize the proper cluster using
110 * HBaseTestingUtility.
111 */
112@Category({ReplicationTests.class, MediumTests.class})
113public abstract class TestReplicationSourceManager {
114
115  @ClassRule
116  public static final HBaseClassTestRule CLASS_RULE =
117      HBaseClassTestRule.forClass(TestReplicationSourceManager.class);
118
119  protected static final Logger LOG =
120      LoggerFactory.getLogger(TestReplicationSourceManager.class);
121
122  protected static Configuration conf;
123
124  protected static HBaseTestingUtility utility;
125
126  protected static Replication replication;
127
128  protected static ReplicationSourceManager manager;
129
130  protected static ReplicationSourceManager managerOfCluster;
131
132  protected static ZKWatcher zkw;
133
134  protected static HTableDescriptor htd;
135
136  protected static HRegionInfo hri;
137
138  protected static final byte[] r1 = Bytes.toBytes("r1");
139
140  protected static final byte[] r2 = Bytes.toBytes("r2");
141
142  protected static final byte[] f1 = Bytes.toBytes("f1");
143
144  protected static final byte[] f2 = Bytes.toBytes("f2");
145
146  protected static final TableName test =
147      TableName.valueOf("test");
148
149  protected static final String slaveId = "1";
150
151  protected static FileSystem fs;
152
153  protected static Path oldLogDir;
154
155  protected static Path logDir;
156
157  protected static CountDownLatch latch;
158
159  protected static List<String> files = new ArrayList<>();
160  protected static NavigableMap<byte[], Integer> scopes;
161
162  protected static void setupZkAndReplication() throws Exception {
163    // The implementing class should set up the conf
164    assertNotNull(conf);
165    zkw = new ZKWatcher(conf, "test", null);
166    ZKUtil.createWithParents(zkw, "/hbase/replication");
167    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
168    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
169        Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
170            + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
171    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
172    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
173      ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
174    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
175    ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
176
177    ZKClusterId.setClusterId(zkw, new ClusterId());
178    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
179    fs = FileSystem.get(conf);
180    oldLogDir = new Path(utility.getDataTestDir(),
181        HConstants.HREGION_OLDLOGDIR_NAME);
182    logDir = new Path(utility.getDataTestDir(),
183        HConstants.HREGION_LOGDIR_NAME);
184    replication = new Replication();
185    replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
186    managerOfCluster = getManagerFromCluster();
187    if (managerOfCluster != null) {
188      // After replication procedure, we need to add peer by hand (other than by receiving
189      // notification from zk)
190      managerOfCluster.addPeer(slaveId);
191    }
192
193    manager = replication.getReplicationManager();
194    manager.addSource(slaveId);
195    if (managerOfCluster != null) {
196      waitPeer(slaveId, managerOfCluster, true);
197    }
198    waitPeer(slaveId, manager, true);
199
200    htd = new HTableDescriptor(test);
201    HColumnDescriptor col = new HColumnDescriptor(f1);
202    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
203    htd.addFamily(col);
204    col = new HColumnDescriptor(f2);
205    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
206    htd.addFamily(col);
207
208    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
209    for(byte[] fam : htd.getFamiliesKeys()) {
210      scopes.put(fam, 0);
211    }
212    hri = new HRegionInfo(htd.getTableName(), r1, r2);
213  }
214
215  private static ReplicationSourceManager getManagerFromCluster() {
216    // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster.
217    if (utility.getMiniHBaseCluster() == null) {
218      return null;
219    }
220    return utility.getMiniHBaseCluster().getRegionServerThreads()
221        .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
222        .findAny()
223        .map(HRegionServer::getReplicationSourceService)
224        .map(r -> (Replication)r)
225        .map(Replication::getReplicationManager)
226        .get();
227  }
228
229  @AfterClass
230  public static void tearDownAfterClass() throws Exception {
231    if (manager != null) {
232      manager.join();
233    }
234    utility.shutdownMiniCluster();
235  }
236
237  @Rule
238  public TestName testName = new TestName();
239
240  private void cleanLogDir() throws IOException {
241    fs.delete(logDir, true);
242    fs.delete(oldLogDir, true);
243  }
244
245  @Before
246  public void setUp() throws Exception {
247    LOG.info("Start " + testName.getMethodName());
248    cleanLogDir();
249  }
250
251  @After
252  public void tearDown() throws Exception {
253    LOG.info("End " + testName.getMethodName());
254    cleanLogDir();
255    List<String> ids = manager.getSources().stream()
256        .map(ReplicationSourceInterface::getPeerId).collect(Collectors.toList());
257    for (String id : ids) {
258      if (slaveId.equals(id)) {
259        continue;
260      }
261      removePeerAndWait(id);
262    }
263  }
264
265  @Test
266  public void testLogRoll() throws Exception {
267    long baseline = 1000;
268    long time = baseline;
269    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
270    KeyValue kv = new KeyValue(r1, f1, r1);
271    WALEdit edit = new WALEdit();
272    edit.add(kv);
273
274    WALFactory wals =
275      new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8"));
276    ReplicationSourceManager replicationManager = replication.getReplicationManager();
277    wals.getWALProvider()
278      .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
279    final WAL wal = wals.getWAL(hri);
280    manager.init();
281    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame"));
282    htd.addFamily(new HColumnDescriptor(f1));
283    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
284    for(byte[] fam : htd.getFamiliesKeys()) {
285      scopes.put(fam, 0);
286    }
287    // Testing normal log rolling every 20
288    for(long i = 1; i < 101; i++) {
289      if(i > 1 && i % 20 == 0) {
290        wal.rollWriter();
291      }
292      LOG.info(Long.toString(i));
293      final long txid = wal.appendData(hri,
294        new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
295        edit);
296      wal.sync(txid);
297    }
298
299    // Simulate a rapid insert that's followed
300    // by a report that's still not totally complete (missing last one)
301    LOG.info(baseline + " and " + time);
302    baseline += 101;
303    time = baseline;
304    LOG.info(baseline + " and " + time);
305
306    for (int i = 0; i < 3; i++) {
307      wal.appendData(hri,
308        new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
309        edit);
310    }
311    wal.sync();
312
313    int logNumber = 0;
314    for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId)
315      .entrySet()) {
316      logNumber += entry.getValue().size();
317    }
318    assertEquals(6, logNumber);
319
320    wal.rollWriter();
321
322    manager.logPositionAndCleanOldLogs("1", false,
323      new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
324
325    wal.appendData(hri,
326      new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
327      edit);
328    wal.sync();
329
330    assertEquals(1, manager.getWALs().size());
331
332
333    // TODO Need a case with only 2 WALs and we only want to delete the first one
334  }
335
336  @Test
337  public void testClaimQueues() throws Exception {
338    Server server = new DummyServer("hostname0.example.org");
339    ReplicationQueueStorage rq = ReplicationStorageFactory
340        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
341    // populate some znodes in the peer znode
342    files.add("log1");
343    files.add("log2");
344    for (String file : files) {
345      rq.addWAL(server.getServerName(), "1", file);
346    }
347    // create 3 DummyServers
348    Server s1 = new DummyServer("dummyserver1.example.org");
349    Server s2 = new DummyServer("dummyserver2.example.org");
350    Server s3 = new DummyServer("dummyserver3.example.org");
351
352    // create 3 DummyNodeFailoverWorkers
353    DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1);
354    DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2);
355    DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3);
356
357    latch = new CountDownLatch(3);
358    // start the threads
359    w1.start();
360    w2.start();
361    w3.start();
362    // make sure only one is successful
363    int populatedMap = 0;
364    // wait for result now... till all the workers are done.
365    latch.await();
366    populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated()
367        + w3.isLogZnodesMapPopulated();
368    assertEquals(1, populatedMap);
369    server.abort("", null);
370  }
371
372  @Test
373  public void testCleanupFailoverQueues() throws Exception {
374    Server server = new DummyServer("hostname1.example.org");
375    ReplicationQueueStorage rq = ReplicationStorageFactory
376        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
377    // populate some znodes in the peer znode
378    SortedSet<String> files = new TreeSet<>();
379    String group = "testgroup";
380    String file1 = group + ".log1";
381    String file2 = group + ".log2";
382    files.add(file1);
383    files.add(file2);
384    for (String file : files) {
385      rq.addWAL(server.getServerName(), "1", file);
386    }
387    Server s1 = new DummyServer("dummyserver1.example.org");
388    ReplicationPeers rp1 =
389        ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration());
390    rp1.init();
391    NodeFailoverWorker w1 =
392        manager.new NodeFailoverWorker(server.getServerName());
393    w1.run();
394    assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
395    String id = "1-" + server.getServerName().getServerName();
396    assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
397    manager.cleanOldLogs(file2, false, id, true);
398    // log1 should be deleted
399    assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
400  }
401
402  @Test
403  public void testCleanupUnknownPeerZNode() throws Exception {
404    Server server = new DummyServer("hostname2.example.org");
405    ReplicationQueueStorage rq = ReplicationStorageFactory
406        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
407    // populate some znodes in the peer znode
408    // add log to an unknown peer
409    String group = "testgroup";
410    rq.addWAL(server.getServerName(), "2", group + ".log1");
411    rq.addWAL(server.getServerName(), "2", group + ".log2");
412
413    NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName());
414    w1.run();
415
416    // The log of the unknown peer should be removed from zk
417    for (String peer : manager.getAllQueues()) {
418      assertTrue(peer.startsWith("1"));
419    }
420  }
421
422  /**
423   * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the
424   * compaction WALEdit.
425   */
426  @Test
427  public void testCompactionWALEdits() throws Exception {
428    TableName tableName = TableName.valueOf("testCompactionWALEdits");
429    WALProtos.CompactionDescriptor compactionDescriptor =
430      WALProtos.CompactionDescriptor.getDefaultInstance();
431    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW)
432        .setEndKey(HConstants.EMPTY_END_ROW).build();
433    WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
434    ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf);
435  }
436
437  @Test
438  public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
439    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
440    // 1. Get the bulk load wal edit event
441    WALEdit logEdit = getBulkLoadWALEdit(scope);
442    // 2. Create wal key
443    WALKeyImpl logKey = new WALKeyImpl(scope);
444
445    // 3. Get the scopes for the key
446    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf);
447
448    // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
449    assertNull("No bulk load entries scope should be added if bulk load replication is disabled.",
450      logKey.getReplicationScopes());
451  }
452
453  @Test
454  public void testBulkLoadWALEdits() throws Exception {
455    // 1. Get the bulk load wal edit event
456    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
457    WALEdit logEdit = getBulkLoadWALEdit(scope);
458    // 2. Create wal key
459    WALKeyImpl logKey = new WALKeyImpl(scope);
460    // 3. Enable bulk load hfile replication
461    Configuration bulkLoadConf = HBaseConfiguration.create(conf);
462    bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
463
464    // 4. Get the scopes for the key
465    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf);
466
467    NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
468    // Assert family with replication scope global is present in the key scopes
469    assertTrue("This family scope is set to global, should be part of replication key scopes.",
470      scopes.containsKey(f1));
471    // Assert family with replication scope local is not present in the key scopes
472    assertFalse("This family scope is set to local, should not be part of replication key scopes",
473      scopes.containsKey(f2));
474  }
475
476  /**
477   * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the
478   * corresponding ReplicationSourceInterface correctly cleans up the corresponding
479   * replication queue and ReplicationPeer.
480   * See HBASE-16096.
481   * @throws Exception
482   */
483  @Test
484  public void testPeerRemovalCleanup() throws Exception{
485    String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
486    final String peerId = "FakePeer";
487    final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
488        .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
489    try {
490      DummyServer server = new DummyServer();
491      ReplicationQueueStorage rq = ReplicationStorageFactory
492          .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
493      // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
494      // initialization to throw an exception.
495      conf.set("replication.replicationsource.implementation",
496          FailInitializeDummyReplicationSource.class.getName());
497      final ReplicationPeers rp = manager.getReplicationPeers();
498      // Set up the znode and ReplicationPeer for the fake peer
499      // Don't wait for replication source to initialize, we know it won't.
500      addPeerAndWait(peerId, peerConfig, false);
501
502      // Sanity check
503      assertNull(manager.getSource(peerId));
504
505      // Create a replication queue for the fake peer
506      rq.addWAL(server.getServerName(), peerId, "FakeFile");
507      // Unregister peer, this should remove the peer and clear all queues associated with it
508      // Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
509      removePeerAndWait(peerId);
510      assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId));
511    } finally {
512      conf.set("replication.replicationsource.implementation", replicationSourceImplName);
513      removePeerAndWait(peerId);
514    }
515  }
516
517  private static MetricsReplicationSourceSource getGlobalSource() throws Exception {
518    ReplicationSourceInterface source = manager.getSource(slaveId);
519    // Retrieve the global replication metrics source
520    Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
521    f.setAccessible(true);
522    return (MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
523  }
524
525  private static long getSizeOfLatestPath() {
526    // If no mini cluster is running, there are extra replication manager influencing the metrics.
527    if (utility.getMiniHBaseCluster() == null) {
528      return 0;
529    }
530    return utility.getMiniHBaseCluster().getRegionServerThreads()
531        .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
532        .map(HRegionServer::getReplicationSourceService)
533        .map(r -> (Replication)r)
534        .map(Replication::getReplicationManager)
535        .mapToLong(ReplicationSourceManager::getSizeOfLatestPath)
536        .sum();
537  }
538
539  @Test
540  public void testRemovePeerMetricsCleanup() throws Exception {
541    final String peerId = "DummyPeer";
542    final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
543        .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
544    try {
545      MetricsReplicationSourceSource globalSource = getGlobalSource();
546      final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
547      final long sizeOfLatestPath = getSizeOfLatestPath();
548      addPeerAndWait(peerId, peerConfig, true);
549      assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
550      ReplicationSourceInterface source = manager.getSource(peerId);
551      // Sanity check
552      assertNotNull(source);
553      final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
554      // Enqueue log and check if metrics updated
555      source.enqueueLog(new Path("abc"));
556      assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
557      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
558        globalSource.getSizeOfLogQueue());
559
560      // Removing the peer should reset the global metrics
561      removePeerAndWait(peerId);
562      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
563
564      // Adding the same peer back again should reset the single source metrics
565      addPeerAndWait(peerId, peerConfig, true);
566      source = manager.getSource(peerId);
567      assertNotNull(source);
568      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
569        globalSource.getSizeOfLogQueue());
570    } finally {
571      removePeerAndWait(peerId);
572    }
573  }
574
575  /**
576   * Add a peer and wait for it to initialize
577   * @param peerId
578   * @param peerConfig
579   * @param waitForSource Whether to wait for replication source to initialize
580   * @throws Exception
581   */
582  private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
583      final boolean waitForSource) throws Exception {
584    final ReplicationPeers rp = manager.getReplicationPeers();
585    rp.getPeerStorage().addPeer(peerId, peerConfig, true);
586    try {
587      manager.addPeer(peerId);
588    } catch (Exception e) {
589      // ignore the failed exception, because we'll test both success & failed case.
590    }
591    waitPeer(peerId, manager, waitForSource);
592    if (managerOfCluster != null) {
593      managerOfCluster.addPeer(peerId);
594      waitPeer(peerId, managerOfCluster, waitForSource);
595    }
596  }
597
598  private static void waitPeer(final String peerId,
599      ReplicationSourceManager manager, final boolean waitForSource) {
600    ReplicationPeers rp = manager.getReplicationPeers();
601    Waiter.waitFor(conf, 20000, () -> {
602      if (waitForSource) {
603        ReplicationSourceInterface rs = manager.getSource(peerId);
604        if (rs == null) {
605          return false;
606        }
607        if (rs instanceof ReplicationSourceDummy) {
608          return ((ReplicationSourceDummy)rs).isStartup();
609        }
610        return true;
611      } else {
612        return (rp.getPeer(peerId) != null);
613      }
614    });
615  }
616
617  /**
618   * Remove a peer and wait for it to get cleaned up
619   * @param peerId
620   * @throws Exception
621   */
622  private void removePeerAndWait(final String peerId) throws Exception {
623    final ReplicationPeers rp = manager.getReplicationPeers();
624    if (rp.getPeerStorage().listPeerIds().contains(peerId)) {
625      rp.getPeerStorage().removePeer(peerId);
626      try {
627        manager.removePeer(peerId);
628      } catch (Exception e) {
629        // ignore the failed exception and continue.
630      }
631    }
632    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
633      @Override
634      public boolean evaluate() throws Exception {
635        Collection<String> peers = rp.getPeerStorage().listPeerIds();
636        return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null)
637            && (!peers.contains(peerId)) && manager.getSource(peerId) == null;
638      }
639    });
640  }
641
642  private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
643    // 1. Create store files for the families
644    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
645    Map<String, Long> storeFilesSize = new HashMap<>(1);
646    List<Path> p = new ArrayList<>(1);
647    Path hfilePath1 = new Path(Bytes.toString(f1));
648    p.add(hfilePath1);
649    try {
650      storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen());
651    } catch (IOException e) {
652      LOG.debug("Failed to calculate the size of hfile " + hfilePath1);
653      storeFilesSize.put(hfilePath1.getName(), 0L);
654    }
655    storeFiles.put(f1, p);
656    scope.put(f1, 1);
657    p = new ArrayList<>(1);
658    Path hfilePath2 = new Path(Bytes.toString(f2));
659    p.add(hfilePath2);
660    try {
661      storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen());
662    } catch (IOException e) {
663      LOG.debug("Failed to calculate the size of hfile " + hfilePath2);
664      storeFilesSize.put(hfilePath2.getName(), 0L);
665    }
666    storeFiles.put(f2, p);
667    // 2. Create bulk load descriptor
668    BulkLoadDescriptor desc =
669        ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
670      UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1);
671
672    // 3. create bulk load wal edit event
673    WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);
674    return logEdit;
675  }
676
677  static class DummyNodeFailoverWorker extends Thread {
678    private Map<String, Set<String>> logZnodesMap;
679    Server server;
680    private ServerName deadRS;
681    ReplicationQueueStorage rq;
682
683    public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception {
684      this.deadRS = deadRS;
685      this.server = s;
686      this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(),
687        server.getConfiguration());
688    }
689
690    @Override
691    public void run() {
692      try {
693        logZnodesMap = new HashMap<>();
694        List<String> queues = rq.getAllQueues(deadRS);
695        for (String queue : queues) {
696          Pair<String, SortedSet<String>> pair =
697              rq.claimQueue(deadRS, queue, server.getServerName());
698          if (pair != null) {
699            logZnodesMap.put(pair.getFirst(), pair.getSecond());
700          }
701        }
702        server.abort("Done with testing", null);
703      } catch (Exception e) {
704        LOG.error("Got exception while running NodeFailoverWorker", e);
705      } finally {
706        latch.countDown();
707      }
708    }
709
710    /**
711     * @return 1 when the map is not empty.
712     */
713    private int isLogZnodesMapPopulated() {
714      Collection<Set<String>> sets = logZnodesMap.values();
715      if (sets.size() > 1) {
716        throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
717      }
718      if (sets.size() == 1) {
719        Set<String> s = sets.iterator().next();
720        for (String file : files) {
721          // at least one file was missing
722          if (!s.contains(file)) {
723            return 0;
724          }
725        }
726        return 1; // we found all the files
727      }
728      return 0;
729    }
730  }
731
732  static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
733
734    @Override
735    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
736        ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
737        UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
738        throws IOException {
739      throw new IOException("Failing deliberately");
740    }
741  }
742
743  static class DummyServer implements Server {
744    String hostname;
745
746    DummyServer() {
747      hostname = "hostname.example.org";
748    }
749
750    DummyServer(String hostname) {
751      this.hostname = hostname;
752    }
753
754    @Override
755    public Configuration getConfiguration() {
756      return conf;
757    }
758
759    @Override
760    public ZKWatcher getZooKeeper() {
761      return zkw;
762    }
763
764    @Override
765    public CoordinatedStateManager getCoordinatedStateManager() {
766      return null;
767    }
768    @Override
769    public ClusterConnection getConnection() {
770      return null;
771    }
772
773    @Override
774    public ServerName getServerName() {
775      return ServerName.valueOf(hostname, 1234, 1L);
776    }
777
778    @Override
779    public void abort(String why, Throwable e) {
780      // To change body of implemented methods use File | Settings | File Templates.
781    }
782
783    @Override
784    public boolean isAborted() {
785      return false;
786    }
787
788    @Override
789    public void stop(String why) {
790      // To change body of implemented methods use File | Settings | File Templates.
791    }
792
793    @Override
794    public boolean isStopped() {
795      return false; // To change body of implemented methods use File | Settings | File Templates.
796    }
797
798    @Override
799    public ChoreService getChoreService() {
800      return null;
801    }
802
803    @Override
804    public ClusterConnection getClusterConnection() {
805      // TODO Auto-generated method stub
806      return null;
807    }
808
809    @Override
810    public FileSystem getFileSystem() {
811      return null;
812    }
813
814    @Override
815    public boolean isStopping() {
816      return false;
817    }
818
819    @Override
820    public Connection createConnection(Configuration conf) throws IOException {
821      return null;
822    }
823  }
824}