001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.IOException;
028import java.lang.reflect.Field;
029import java.net.URLEncoder;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.NavigableMap;
036import java.util.Set;
037import java.util.SortedSet;
038import java.util.TreeMap;
039import java.util.TreeSet;
040import java.util.UUID;
041import java.util.concurrent.CountDownLatch;
042import java.util.stream.Collectors;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.ChoreService;
047import org.apache.hadoop.hbase.ClusterId;
048import org.apache.hadoop.hbase.CoordinatedStateManager;
049import org.apache.hadoop.hbase.HBaseClassTestRule;
050import org.apache.hadoop.hbase.HBaseConfiguration;
051import org.apache.hadoop.hbase.HBaseTestingUtility;
052import org.apache.hadoop.hbase.HColumnDescriptor;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.HRegionInfo;
055import org.apache.hadoop.hbase.HTableDescriptor;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.Server;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.Waiter;
061import org.apache.hadoop.hbase.client.ClusterConnection;
062import org.apache.hadoop.hbase.client.Connection;
063import org.apache.hadoop.hbase.client.RegionInfo;
064import org.apache.hadoop.hbase.client.RegionInfoBuilder;
065import org.apache.hadoop.hbase.regionserver.HRegionServer;
066import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
067import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
068import org.apache.hadoop.hbase.replication.ReplicationFactory;
069import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
070import org.apache.hadoop.hbase.replication.ReplicationPeers;
071import org.apache.hadoop.hbase.replication.ReplicationQueues;
072import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
073import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
074import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
075import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
076import org.apache.hadoop.hbase.testclassification.MediumTests;
077import org.apache.hadoop.hbase.testclassification.ReplicationTests;
078import org.apache.hadoop.hbase.util.Bytes;
079import org.apache.hadoop.hbase.util.FSUtils;
080import org.apache.hadoop.hbase.util.JVMClusterUtil;
081import org.apache.hadoop.hbase.util.Pair;
082import org.apache.hadoop.hbase.wal.WAL;
083import org.apache.hadoop.hbase.wal.WALEdit;
084import org.apache.hadoop.hbase.wal.WALFactory;
085import org.apache.hadoop.hbase.wal.WALKeyImpl;
086import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
087import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
088import org.apache.hadoop.hbase.zookeeper.ZKUtil;
089import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
090import org.junit.After;
091import org.junit.AfterClass;
092import org.junit.Before;
093import org.junit.ClassRule;
094import org.junit.Rule;
095import org.junit.Test;
096import org.junit.experimental.categories.Category;
097import org.junit.rules.TestName;
098import org.slf4j.Logger;
099import org.slf4j.LoggerFactory;
100
101import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
102import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
103
104import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
107
108/**
109 * An abstract class that tests ReplicationSourceManager. Classes that extend this class should
110 * set up the proper config for this class and initialize the proper cluster using
111 * HBaseTestingUtility.
112 */
113@Category({ReplicationTests.class, MediumTests.class})
114public abstract class TestReplicationSourceManager {
115
116  @ClassRule
117  public static final HBaseClassTestRule CLASS_RULE =
118      HBaseClassTestRule.forClass(TestReplicationSourceManager.class);
119
120  protected static final Logger LOG =
121      LoggerFactory.getLogger(TestReplicationSourceManager.class);
122
123  protected static Configuration conf;
124
125  protected static HBaseTestingUtility utility;
126
127  protected static Replication replication;
128
129  protected static ReplicationSourceManager manager;
130
131  protected static ReplicationSourceManager managerOfCluster;
132
133  protected static ZKWatcher zkw;
134
135  protected static HTableDescriptor htd;
136
137  protected static HRegionInfo hri;
138
139  protected static final byte[] r1 = Bytes.toBytes("r1");
140
141  protected static final byte[] r2 = Bytes.toBytes("r2");
142
143  protected static final byte[] f1 = Bytes.toBytes("f1");
144
145  protected static final byte[] f2 = Bytes.toBytes("f2");
146
147  protected static final TableName test =
148      TableName.valueOf("test");
149
150  protected static final String slaveId = "1";
151
152  protected static FileSystem fs;
153
154  protected static Path oldLogDir;
155
156  protected static Path logDir;
157
158  protected static CountDownLatch latch;
159
160  protected static List<String> files = new ArrayList<>();
161  protected static NavigableMap<byte[], Integer> scopes;
162
163  protected static void setupZkAndReplication() throws Exception {
164    // The implementing class should set up the conf
165    assertNotNull(conf);
166    zkw = new ZKWatcher(conf, "test", null);
167    ZKUtil.createWithParents(zkw, "/hbase/replication");
168    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
169    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
170        Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
171            + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
172    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
173    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
174      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
175    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
176    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
177
178    ZKClusterId.setClusterId(zkw, new ClusterId());
179    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
180    fs = FileSystem.get(conf);
181    oldLogDir = new Path(utility.getDataTestDir(),
182        HConstants.HREGION_OLDLOGDIR_NAME);
183    logDir = new Path(utility.getDataTestDir(),
184        HConstants.HREGION_LOGDIR_NAME);
185    replication = new Replication();
186    replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
187    managerOfCluster = getManagerFromCluster();
188    manager = replication.getReplicationManager();
189    manager.addSource(slaveId);
190    if (managerOfCluster != null) {
191      waitPeer(slaveId, managerOfCluster, true);
192    }
193    waitPeer(slaveId, manager, true);
194
195    htd = new HTableDescriptor(test);
196    HColumnDescriptor col = new HColumnDescriptor(f1);
197    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
198    htd.addFamily(col);
199    col = new HColumnDescriptor(f2);
200    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
201    htd.addFamily(col);
202
203    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
204    for(byte[] fam : htd.getFamiliesKeys()) {
205      scopes.put(fam, 0);
206    }
207    hri = new HRegionInfo(htd.getTableName(), r1, r2);
208  }
209
210  private static ReplicationSourceManager getManagerFromCluster() {
211    // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster.
212    if (utility.getMiniHBaseCluster() == null) {
213      return null;
214    }
215    return utility.getMiniHBaseCluster().getRegionServerThreads()
216        .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
217        .findAny()
218        .map(HRegionServer::getReplicationSourceService)
219        .map(r -> (Replication)r)
220        .map(Replication::getReplicationManager)
221        .get();
222  }
223
224  @AfterClass
225  public static void tearDownAfterClass() throws Exception {
226    if (manager != null) {
227      manager.join();
228    }
229    utility.shutdownMiniCluster();
230  }
231
232  @Rule
233  public TestName testName = new TestName();
234
235  private void cleanLogDir() throws IOException {
236    fs.delete(logDir, true);
237    fs.delete(oldLogDir, true);
238  }
239
240  @Before
241  public void setUp() throws Exception {
242    LOG.info("Start " + testName.getMethodName());
243    cleanLogDir();
244  }
245
246  @After
247  public void tearDown() throws Exception {
248    LOG.info("End " + testName.getMethodName());
249    cleanLogDir();
250    List<String> ids = manager.getSources().stream()
251        .map(ReplicationSourceInterface::getPeerId).collect(Collectors.toList());
252    for (String id : ids) {
253      if (slaveId.equals(id)) {
254        continue;
255      }
256      removePeerAndWait(id);
257    }
258  }
259
260  @Test
261  public void testLogRoll() throws Exception {
262    long baseline = 1000;
263    long time = baseline;
264    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
265    KeyValue kv = new KeyValue(r1, f1, r1);
266    WALEdit edit = new WALEdit();
267    edit.add(kv);
268
269    WALFactory wals =
270      new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8"));
271    ReplicationSourceManager replicationManager = replication.getReplicationManager();
272    wals.getWALProvider()
273      .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
274    final WAL wal = wals.getWAL(hri);
275    manager.init();
276    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame"));
277    htd.addFamily(new HColumnDescriptor(f1));
278    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
279    for(byte[] fam : htd.getFamiliesKeys()) {
280      scopes.put(fam, 0);
281    }
282    // Testing normal log rolling every 20
283    for(long i = 1; i < 101; i++) {
284      if(i > 1 && i % 20 == 0) {
285        wal.rollWriter();
286      }
287      LOG.info(Long.toString(i));
288      final long txid = wal.append(
289          hri,
290          new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
291          edit,
292          true);
293      wal.sync(txid);
294    }
295
296    // Simulate a rapid insert that's followed
297    // by a report that's still not totally complete (missing last one)
298    LOG.info(baseline + " and " + time);
299    baseline += 101;
300    time = baseline;
301    LOG.info(baseline + " and " + time);
302
303    for (int i = 0; i < 3; i++) {
304      wal.append(hri,
305        new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
306          edit, true);
307    }
308    wal.sync();
309
310    int logNumber = 0;
311    for (Map.Entry<String, SortedSet<String>> entry : manager.getWALs().get(slaveId).entrySet()) {
312      logNumber += entry.getValue().size();
313    }
314    assertEquals(6, logNumber);
315
316    wal.rollWriter();
317
318    manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
319        "1", 0, false, false);
320
321    wal.append(hri,
322        new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
323        edit,
324        true);
325    wal.sync();
326
327    assertEquals(1, manager.getWALs().size());
328
329
330    // TODO Need a case with only 2 WALs and we only want to delete the first one
331  }
332
333  @Test
334  public void testClaimQueues() throws Exception {
335    final Server server = new DummyServer("hostname0.example.org");
336
337
338    ReplicationQueues rq =
339        ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
340          server.getZooKeeper()));
341    rq.init(server.getServerName().toString());
342    // populate some znodes in the peer znode
343    files.add("log1");
344    files.add("log2");
345    for (String file : files) {
346      rq.addLog("1", file);
347    }
348    // create 3 DummyServers
349    Server s1 = new DummyServer("dummyserver1.example.org");
350    Server s2 = new DummyServer("dummyserver2.example.org");
351    Server s3 = new DummyServer("dummyserver3.example.org");
352
353    // create 3 DummyNodeFailoverWorkers
354    DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
355        server.getServerName().getServerName(), s1);
356    DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(
357        server.getServerName().getServerName(), s2);
358    DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(
359        server.getServerName().getServerName(), s3);
360
361    latch = new CountDownLatch(3);
362    // start the threads
363    w1.start();
364    w2.start();
365    w3.start();
366    // make sure only one is successful
367    int populatedMap = 0;
368    // wait for result now... till all the workers are done.
369    latch.await();
370    populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated()
371        + w3.isLogZnodesMapPopulated();
372    assertEquals(1, populatedMap);
373    server.abort("", null);
374  }
375
376  @Test
377  public void testCleanupFailoverQueues() throws Exception {
378    final Server server = new DummyServer("hostname1.example.org");
379    ReplicationQueues rq =
380        ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
381          server.getZooKeeper()));
382    rq.init(server.getServerName().toString());
383    // populate some znodes in the peer znode
384    SortedSet<String> files = new TreeSet<>();
385    String group = "testgroup";
386    String file1 = group + ".log1";
387    String file2 = group + ".log2";
388    files.add(file1);
389    files.add(file2);
390    for (String file : files) {
391      rq.addLog("1", file);
392    }
393    Server s1 = new DummyServer("dummyserver1.example.org");
394    ReplicationQueues rq1 =
395        ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
396            s1.getZooKeeper()));
397    rq1.init(s1.getServerName().toString());
398    ReplicationPeers rp1 =
399        ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
400    rp1.init();
401    NodeFailoverWorker w1 =
402        manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
403            new Long(1), new Long(2)));
404    w1.run();
405    assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
406    String id = "1-" + server.getServerName().getServerName();
407    assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
408    manager.cleanOldLogs(file2, id, true);
409    // log1 should be deleted
410    assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
411  }
412
413  @Test
414  public void testCleanupUnknownPeerZNode() throws Exception {
415    final Server server = new DummyServer("hostname2.example.org");
416    ReplicationQueues rq = ReplicationFactory.getReplicationQueues(
417      new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper()));
418    rq.init(server.getServerName().toString());
419    // populate some znodes in the peer znode
420    // add log to an unknown peer
421    String group = "testgroup";
422    rq.addLog("2", group + ".log1");
423    rq.addLog("2", group + ".log2");
424
425    NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName());
426    w1.run();
427
428    // The log of the unknown peer should be removed from zk
429    for (String peer : manager.getAllQueues()) {
430      assertTrue(peer.startsWith("1"));
431    }
432  }
433
434  /**
435   * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the
436   * compaction WALEdit.
437   */
438  @Test
439  public void testCompactionWALEdits() throws Exception {
440    TableName tableName = TableName.valueOf("testCompactionWALEdits");
441    WALProtos.CompactionDescriptor compactionDescriptor =
442      WALProtos.CompactionDescriptor.getDefaultInstance();
443    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW)
444        .setEndKey(HConstants.EMPTY_END_ROW).build();
445    WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
446    ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf);
447  }
448
449  @Test
450  public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
451    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
452    // 1. Get the bulk load wal edit event
453    WALEdit logEdit = getBulkLoadWALEdit(scope);
454    // 2. Create wal key
455    WALKeyImpl logKey = new WALKeyImpl(scope);
456
457    // 3. Get the scopes for the key
458    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf);
459
460    // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
461    assertNull("No bulk load entries scope should be added if bulk load replication is disabled.",
462      logKey.getReplicationScopes());
463  }
464
465  @Test
466  public void testBulkLoadWALEdits() throws Exception {
467    // 1. Get the bulk load wal edit event
468    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
469    WALEdit logEdit = getBulkLoadWALEdit(scope);
470    // 2. Create wal key
471    WALKeyImpl logKey = new WALKeyImpl(scope);
472    // 3. Enable bulk load hfile replication
473    Configuration bulkLoadConf = HBaseConfiguration.create(conf);
474    bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
475
476    // 4. Get the scopes for the key
477    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf);
478
479    NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
480    // Assert family with replication scope global is present in the key scopes
481    assertTrue("This family scope is set to global, should be part of replication key scopes.",
482      scopes.containsKey(f1));
483    // Assert family with replication scope local is not present in the key scopes
484    assertFalse("This family scope is set to local, should not be part of replication key scopes",
485      scopes.containsKey(f2));
486  }
487
488  /**
489   * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the
490   * corresponding ReplicationSourceInterface correctly cleans up the corresponding
491   * replication queue and ReplicationPeer.
492   * See HBASE-16096.
493   * @throws Exception
494   */
495  @Test
496  public void testPeerRemovalCleanup() throws Exception{
497    String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
498    final String peerId = "FakePeer";
499    final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
500        .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
501    try {
502      DummyServer server = new DummyServer();
503      final ReplicationQueues rq =
504          ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
505              server.getConfiguration(), server, server.getZooKeeper()));
506      rq.init(server.getServerName().toString());
507      // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
508      // initialization to throw an exception.
509      conf.set("replication.replicationsource.implementation",
510          FailInitializeDummyReplicationSource.class.getName());
511      final ReplicationPeers rp = manager.getReplicationPeers();
512      // Set up the znode and ReplicationPeer for the fake peer
513      // Don't wait for replication source to initialize, we know it won't.
514      addPeerAndWait(peerId, peerConfig, false);
515
516      // Sanity check
517      assertNull(manager.getSource(peerId));
518
519      // Create a replication queue for the fake peer
520      rq.addLog(peerId, "FakeFile");
521      // Unregister peer, this should remove the peer and clear all queues associated with it
522      // Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
523      removePeerAndWait(peerId);
524      assertFalse(rq.getAllQueues().contains(peerId));
525    } finally {
526      conf.set("replication.replicationsource.implementation", replicationSourceImplName);
527      removePeerAndWait(peerId);
528    }
529  }
530
531  private static MetricsReplicationSourceSource getGlobalSource() throws Exception {
532    ReplicationSourceInterface source = manager.getSource(slaveId);
533    // Retrieve the global replication metrics source
534    Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
535    f.setAccessible(true);
536    return (MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
537  }
538
539  private static long getSizeOfLatestPath() {
540    // If no mini cluster is running, there are extra replication manager influencing the metrics.
541    if (utility.getMiniHBaseCluster() == null) {
542      return 0;
543    }
544    return utility.getMiniHBaseCluster().getRegionServerThreads()
545        .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
546        .map(HRegionServer::getReplicationSourceService)
547        .map(r -> (Replication)r)
548        .map(Replication::getReplicationManager)
549        .mapToLong(ReplicationSourceManager::getSizeOfLatestPath)
550        .sum();
551  }
552
553  @Test
554  public void testRemovePeerMetricsCleanup() throws Exception {
555    final String peerId = "DummyPeer";
556    final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
557        .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
558    try {
559      MetricsReplicationSourceSource globalSource = getGlobalSource();
560      final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
561      final long sizeOfLatestPath = getSizeOfLatestPath();
562      addPeerAndWait(peerId, peerConfig, true);
563      assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial,
564          globalSource.getSizeOfLogQueue());
565      ReplicationSourceInterface source = manager.getSource(peerId);
566      // Sanity check
567      assertNotNull(source);
568      final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
569      // Enqueue log and check if metrics updated
570      source.enqueueLog(new Path("abc"));
571      assertEquals(1 + sizeOfSingleLogQueue,
572          source.getSourceMetrics().getSizeOfLogQueue());
573      assertEquals(source.getSourceMetrics().getSizeOfLogQueue()
574              + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
575
576      // Removing the peer should reset the global metrics
577      removePeerAndWait(peerId);
578      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
579
580      // Adding the same peer back again should reset the single source metrics
581      addPeerAndWait(peerId, peerConfig, true);
582      source = manager.getSource(peerId);
583      assertNotNull(source);
584      assertEquals(sizeOfLatestPath, source.getSourceMetrics().getSizeOfLogQueue());
585      assertEquals(source.getSourceMetrics().getSizeOfLogQueue()
586          + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
587    } finally {
588      removePeerAndWait(peerId);
589    }
590  }
591
592  /**
593   * Add a peer and wait for it to initialize
594   * @param peerId
595   * @param peerConfig
596   * @param waitForSource Whether to wait for replication source to initialize
597   * @throws Exception
598   */
599  private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
600      final boolean waitForSource) throws Exception {
601    final ReplicationPeers rp = manager.getReplicationPeers();
602    rp.registerPeer(peerId, peerConfig);
603    waitPeer(peerId, manager, waitForSource);
604    if (managerOfCluster != null) {
605      waitPeer(peerId, managerOfCluster, waitForSource);
606    }
607  }
608
609  private static void waitPeer(final String peerId,
610      ReplicationSourceManager manager, final boolean waitForSource) {
611    ReplicationPeers rp = manager.getReplicationPeers();
612    Waiter.waitFor(conf, 20000, () -> {
613      if (waitForSource) {
614        ReplicationSourceInterface rs = manager.getSource(peerId);
615        if (rs == null) {
616          return false;
617        }
618        if (rs instanceof ReplicationSourceDummy) {
619          return ((ReplicationSourceDummy)rs).isStartup();
620        }
621        return true;
622      } else {
623        return (rp.getConnectedPeer(peerId) != null);
624      }
625    });
626  }
627
628  /**
629   * Remove a peer and wait for it to get cleaned up
630   * @param peerId
631   * @throws Exception
632   */
633  private void removePeerAndWait(final String peerId) throws Exception {
634    final ReplicationPeers rp = manager.getReplicationPeers();
635    if (rp.getAllPeerIds().contains(peerId)) {
636      rp.unregisterPeer(peerId);
637    }
638    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
639      @Override public boolean evaluate() throws Exception {
640        List<String> peers = rp.getAllPeerIds();
641        return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null)
642            && (!peers.contains(peerId))
643            && manager.getSource(peerId) == null;
644      }
645    });
646  }
647
648  private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
649    // 1. Create store files for the families
650    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
651    Map<String, Long> storeFilesSize = new HashMap<>(1);
652    List<Path> p = new ArrayList<>(1);
653    Path hfilePath1 = new Path(Bytes.toString(f1));
654    p.add(hfilePath1);
655    try {
656      storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen());
657    } catch (IOException e) {
658      LOG.debug("Failed to calculate the size of hfile " + hfilePath1);
659      storeFilesSize.put(hfilePath1.getName(), 0L);
660    }
661    storeFiles.put(f1, p);
662    scope.put(f1, 1);
663    p = new ArrayList<>(1);
664    Path hfilePath2 = new Path(Bytes.toString(f2));
665    p.add(hfilePath2);
666    try {
667      storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen());
668    } catch (IOException e) {
669      LOG.debug("Failed to calculate the size of hfile " + hfilePath2);
670      storeFilesSize.put(hfilePath2.getName(), 0L);
671    }
672    storeFiles.put(f2, p);
673    // 2. Create bulk load descriptor
674    BulkLoadDescriptor desc =
675        ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
676      UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1);
677
678    // 3. create bulk load wal edit event
679    WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);
680    return logEdit;
681  }
682
683  static class DummyNodeFailoverWorker extends Thread {
684    private Map<String, Set<String>> logZnodesMap;
685    Server server;
686    private String deadRsZnode;
687    ReplicationQueues rq;
688
689    public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
690      this.deadRsZnode = znode;
691      this.server = s;
692      this.rq =
693          ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
694            server.getZooKeeper()));
695      this.rq.init(this.server.getServerName().toString());
696    }
697
698    @Override
699    public void run() {
700      try {
701        logZnodesMap = new HashMap<>();
702        List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode);
703        for(String queue:queues){
704          Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue);
705          if (pair != null) {
706            logZnodesMap.put(pair.getFirst(), pair.getSecond());
707          }
708        }
709        server.abort("Done with testing", null);
710      } catch (Exception e) {
711        LOG.error("Got exception while running NodeFailoverWorker", e);
712      } finally {
713        latch.countDown();
714      }
715    }
716
717    /**
718     * @return 1 when the map is not empty.
719     */
720    private int isLogZnodesMapPopulated() {
721      Collection<Set<String>> sets = logZnodesMap.values();
722      if (sets.size() > 1) {
723        throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
724      }
725      if (sets.size() == 1) {
726        Set<String> s = sets.iterator().next();
727        for (String file : files) {
728          // at least one file was missing
729          if (!s.contains(file)) {
730            return 0;
731          }
732        }
733        return 1; // we found all the files
734      }
735      return 0;
736    }
737  }
738
739  static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
740
741    @Override
742    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
743        ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
744        UUID clusterId, ReplicationEndpoint replicationEndpoint,
745        WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
746      throw new IOException("Failing deliberately");
747    }
748  }
749
750  static class DummyServer implements Server {
751    String hostname;
752
753    DummyServer() {
754      hostname = "hostname.example.org";
755    }
756
757    DummyServer(String hostname) {
758      this.hostname = hostname;
759    }
760
761    @Override
762    public Configuration getConfiguration() {
763      return conf;
764    }
765
766    @Override
767    public ZKWatcher getZooKeeper() {
768      return zkw;
769    }
770
771    @Override
772    public CoordinatedStateManager getCoordinatedStateManager() {
773      return null;
774    }
775    @Override
776    public ClusterConnection getConnection() {
777      return null;
778    }
779
780    @Override
781    public MetaTableLocator getMetaTableLocator() {
782      return null;
783    }
784
785    @Override
786    public ServerName getServerName() {
787      return ServerName.valueOf(hostname, 1234, 1L);
788    }
789
790    @Override
791    public void abort(String why, Throwable e) {
792      // To change body of implemented methods use File | Settings | File Templates.
793    }
794
795    @Override
796    public boolean isAborted() {
797      return false;
798    }
799
800    @Override
801    public void stop(String why) {
802      // To change body of implemented methods use File | Settings | File Templates.
803    }
804
805    @Override
806    public boolean isStopped() {
807      return false; // To change body of implemented methods use File | Settings | File Templates.
808    }
809
810    @Override
811    public ChoreService getChoreService() {
812      return null;
813    }
814
815    @Override
816    public ClusterConnection getClusterConnection() {
817      // TODO Auto-generated method stub
818      return null;
819    }
820
821    @Override
822    public FileSystem getFileSystem() {
823      return null;
824    }
825
826    @Override
827    public boolean isStopping() {
828      return false;
829    }
830
831    @Override
832    public Connection createConnection(Configuration conf) throws IOException {
833      return null;
834    }
835  }
836}