001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.reflect.Field;
028import java.net.URLEncoder;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.NavigableMap;
035import java.util.NavigableSet;
036import java.util.Set;
037import java.util.SortedSet;
038import java.util.TreeMap;
039import java.util.TreeSet;
040import java.util.UUID;
041import java.util.concurrent.CountDownLatch;
042import java.util.stream.Collectors;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.ChoreService;
047import org.apache.hadoop.hbase.ClusterId;
048import org.apache.hadoop.hbase.CoordinatedStateManager;
049import org.apache.hadoop.hbase.HBaseClassTestRule;
050import org.apache.hadoop.hbase.HBaseConfiguration;
051import org.apache.hadoop.hbase.HBaseTestingUtility;
052import org.apache.hadoop.hbase.HColumnDescriptor;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.HRegionInfo;
055import org.apache.hadoop.hbase.HTableDescriptor;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.Server;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.Waiter;
061import org.apache.hadoop.hbase.client.ClusterConnection;
062import org.apache.hadoop.hbase.client.Connection;
063import org.apache.hadoop.hbase.client.RegionInfo;
064import org.apache.hadoop.hbase.client.RegionInfoBuilder;
065import org.apache.hadoop.hbase.regionserver.HRegionServer;
066import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
067import org.apache.hadoop.hbase.replication.ReplicationFactory;
068import org.apache.hadoop.hbase.replication.ReplicationPeer;
069import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
070import org.apache.hadoop.hbase.replication.ReplicationPeers;
071import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
072import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
073import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
074import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
075import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
076import org.apache.hadoop.hbase.testclassification.MediumTests;
077import org.apache.hadoop.hbase.testclassification.ReplicationTests;
078import org.apache.hadoop.hbase.util.Bytes;
079import org.apache.hadoop.hbase.util.CommonFSUtils;
080import org.apache.hadoop.hbase.util.JVMClusterUtil;
081import org.apache.hadoop.hbase.util.Pair;
082import org.apache.hadoop.hbase.wal.WAL;
083import org.apache.hadoop.hbase.wal.WALEdit;
084import org.apache.hadoop.hbase.wal.WALFactory;
085import org.apache.hadoop.hbase.wal.WALKeyImpl;
086import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
087import org.apache.hadoop.hbase.zookeeper.ZKUtil;
088import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
089import org.junit.After;
090import org.junit.AfterClass;
091import org.junit.Before;
092import org.junit.ClassRule;
093import org.junit.Rule;
094import org.junit.Test;
095import org.junit.experimental.categories.Category;
096import org.junit.rules.TestName;
097import org.slf4j.Logger;
098import org.slf4j.LoggerFactory;
099
100import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
101import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
102
103import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
106
107/**
108 * An abstract class that tests ReplicationSourceManager. Classes that extend this class should
109 * set up the proper config for this class and initialize the proper cluster using
110 * HBaseTestingUtility.
111 */
112@Category({ReplicationTests.class, MediumTests.class})
113public abstract class TestReplicationSourceManager {
114
115  @ClassRule
116  public static final HBaseClassTestRule CLASS_RULE =
117      HBaseClassTestRule.forClass(TestReplicationSourceManager.class);
118
119  protected static final Logger LOG =
120      LoggerFactory.getLogger(TestReplicationSourceManager.class);
121
122  protected static Configuration conf;
123
124  protected static HBaseTestingUtility utility;
125
126  protected static Replication replication;
127
128  protected static ReplicationSourceManager manager;
129
130  protected static ReplicationSourceManager managerOfCluster;
131
132  protected static ZKWatcher zkw;
133
134  protected static HTableDescriptor htd;
135
136  protected static HRegionInfo hri;
137
138  protected static final byte[] r1 = Bytes.toBytes("r1");
139
140  protected static final byte[] r2 = Bytes.toBytes("r2");
141
142  protected static final byte[] f1 = Bytes.toBytes("f1");
143
144  protected static final byte[] f2 = Bytes.toBytes("f2");
145
146  protected static final TableName test =
147      TableName.valueOf("test");
148
149  protected static final String slaveId = "1";
150
151  protected static FileSystem fs;
152
153  protected static Path oldLogDir;
154
155  protected static Path logDir;
156
157  protected static CountDownLatch latch;
158
159  protected static List<String> files = new ArrayList<>();
160  protected static NavigableMap<byte[], Integer> scopes;
161
162  protected static void setupZkAndReplication() throws Exception {
163    // The implementing class should set up the conf
164    assertNotNull(conf);
165    zkw = new ZKWatcher(conf, "test", null);
166    ZKUtil.createWithParents(zkw, "/hbase/replication");
167    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
168    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
169        Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
170            + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
171    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
172    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
173      ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
174    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
175    ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
176
177    ZKClusterId.setClusterId(zkw, new ClusterId());
178    CommonFSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
179    fs = FileSystem.get(conf);
180    oldLogDir = new Path(utility.getDataTestDir(),
181        HConstants.HREGION_OLDLOGDIR_NAME);
182    logDir = new Path(utility.getDataTestDir(),
183        HConstants.HREGION_LOGDIR_NAME);
184    replication = new Replication();
185    replication.initialize(new DummyServer(), fs, logDir, oldLogDir,
186      new WALFactory(conf, "test", null));
187    managerOfCluster = getManagerFromCluster();
188    if (managerOfCluster != null) {
189      // After replication procedure, we need to add peer by hand (other than by receiving
190      // notification from zk)
191      managerOfCluster.addPeer(slaveId);
192    }
193
194    manager = replication.getReplicationManager();
195    manager.addSource(slaveId);
196    if (managerOfCluster != null) {
197      waitPeer(slaveId, managerOfCluster, true);
198    }
199    waitPeer(slaveId, manager, true);
200
201    htd = new HTableDescriptor(test);
202    HColumnDescriptor col = new HColumnDescriptor(f1);
203    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
204    htd.addFamily(col);
205    col = new HColumnDescriptor(f2);
206    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
207    htd.addFamily(col);
208
209    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
210    for(byte[] fam : htd.getFamiliesKeys()) {
211      scopes.put(fam, 0);
212    }
213    hri = new HRegionInfo(htd.getTableName(), r1, r2);
214  }
215
216  private static ReplicationSourceManager getManagerFromCluster() {
217    // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster.
218    if (utility.getMiniHBaseCluster() == null) {
219      return null;
220    }
221    return utility.getMiniHBaseCluster().getRegionServerThreads()
222        .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
223        .findAny()
224        .map(HRegionServer::getReplicationSourceService)
225        .map(r -> (Replication)r)
226        .map(Replication::getReplicationManager)
227        .get();
228  }
229
230  @AfterClass
231  public static void tearDownAfterClass() throws Exception {
232    if (manager != null) {
233      manager.join();
234    }
235    utility.shutdownMiniCluster();
236  }
237
238  @Rule
239  public TestName testName = new TestName();
240
241  private void cleanLogDir() throws IOException {
242    fs.delete(logDir, true);
243    fs.delete(oldLogDir, true);
244  }
245
246  @Before
247  public void setUp() throws Exception {
248    LOG.info("Start " + testName.getMethodName());
249    cleanLogDir();
250  }
251
252  @After
253  public void tearDown() throws Exception {
254    LOG.info("End " + testName.getMethodName());
255    cleanLogDir();
256    List<String> ids = manager.getSources().stream()
257        .map(ReplicationSourceInterface::getPeerId).collect(Collectors.toList());
258    for (String id : ids) {
259      if (slaveId.equals(id)) {
260        continue;
261      }
262      removePeerAndWait(id);
263    }
264  }
265
266  @Test
267  public void testLogRoll() throws Exception {
268    long baseline = 1000;
269    long time = baseline;
270    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
271    KeyValue kv = new KeyValue(r1, f1, r1);
272    WALEdit edit = new WALEdit();
273    edit.add(kv);
274
275    WALFactory wals =
276      new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8"));
277    ReplicationSourceManager replicationManager = replication.getReplicationManager();
278    wals.getWALProvider()
279      .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
280    final WAL wal = wals.getWAL(hri);
281    manager.init();
282    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame"));
283    htd.addFamily(new HColumnDescriptor(f1));
284    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
285    for(byte[] fam : htd.getFamiliesKeys()) {
286      scopes.put(fam, 0);
287    }
288    // Testing normal log rolling every 20
289    for(long i = 1; i < 101; i++) {
290      if(i > 1 && i % 20 == 0) {
291        wal.rollWriter();
292      }
293      LOG.info(Long.toString(i));
294      final long txid = wal.appendData(hri,
295        new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
296        edit);
297      wal.sync(txid);
298    }
299
300    // Simulate a rapid insert that's followed
301    // by a report that's still not totally complete (missing last one)
302    LOG.info(baseline + " and " + time);
303    baseline += 101;
304    time = baseline;
305    LOG.info(baseline + " and " + time);
306
307    for (int i = 0; i < 3; i++) {
308      wal.appendData(hri,
309        new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
310        edit);
311    }
312    wal.sync();
313
314    int logNumber = 0;
315    for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId)
316      .entrySet()) {
317      logNumber += entry.getValue().size();
318    }
319    assertEquals(6, logNumber);
320
321    wal.rollWriter();
322
323    manager.logPositionAndCleanOldLogs(manager.getSources().get(0),
324      new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
325
326    wal.appendData(hri,
327      new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
328      edit);
329    wal.sync();
330
331    assertEquals(1, manager.getWALs().size());
332
333
334    // TODO Need a case with only 2 WALs and we only want to delete the first one
335  }
336
337  @Test
338  public void testClaimQueues() throws Exception {
339    Server server = new DummyServer("hostname0.example.org");
340    ReplicationQueueStorage rq = ReplicationStorageFactory
341        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
342    // populate some znodes in the peer znode
343    files.add("log1");
344    files.add("log2");
345    for (String file : files) {
346      rq.addWAL(server.getServerName(), "1", file);
347    }
348    // create 3 DummyServers
349    Server s1 = new DummyServer("dummyserver1.example.org");
350    Server s2 = new DummyServer("dummyserver2.example.org");
351    Server s3 = new DummyServer("dummyserver3.example.org");
352
353    // create 3 DummyNodeFailoverWorkers
354    DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1);
355    DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2);
356    DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3);
357
358    latch = new CountDownLatch(3);
359    // start the threads
360    w1.start();
361    w2.start();
362    w3.start();
363    // make sure only one is successful
364    int populatedMap = 0;
365    // wait for result now... till all the workers are done.
366    latch.await();
367    populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated()
368        + w3.isLogZnodesMapPopulated();
369    assertEquals(1, populatedMap);
370    server.abort("", null);
371  }
372
373  @Test
374  public void testCleanupFailoverQueues() throws Exception {
375    Server server = new DummyServer("hostname1.example.org");
376    ReplicationQueueStorage rq = ReplicationStorageFactory
377        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
378    // populate some znodes in the peer znode
379    SortedSet<String> files = new TreeSet<>();
380    String group = "testgroup";
381    String file1 = group + ".log1";
382    String file2 = group + ".log2";
383    files.add(file1);
384    files.add(file2);
385    for (String file : files) {
386      rq.addWAL(server.getServerName(), "1", file);
387    }
388    Server s1 = new DummyServer("dummyserver1.example.org");
389    ReplicationPeers rp1 =
390        ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration());
391    rp1.init();
392    NodeFailoverWorker w1 =
393        manager.new NodeFailoverWorker(server.getServerName());
394    w1.run();
395    assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
396    String id = "1-" + server.getServerName().getServerName();
397    assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
398    manager.cleanOldLogs(file2, false, id, true);
399    // log1 should be deleted
400    assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
401  }
402
403  @Test
404  public void testCleanupUnknownPeerZNode() throws Exception {
405    Server server = new DummyServer("hostname2.example.org");
406    ReplicationQueueStorage rq = ReplicationStorageFactory
407        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
408    // populate some znodes in the peer znode
409    // add log to an unknown peer
410    String group = "testgroup";
411    rq.addWAL(server.getServerName(), "2", group + ".log1");
412    rq.addWAL(server.getServerName(), "2", group + ".log2");
413
414    NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName());
415    w1.run();
416
417    // The log of the unknown peer should be removed from zk
418    for (String peer : manager.getAllQueues()) {
419      assertTrue(peer.startsWith("1"));
420    }
421  }
422
423  /**
424   * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the
425   * compaction WALEdit.
426   */
427  @Test
428  public void testCompactionWALEdits() throws Exception {
429    TableName tableName = TableName.valueOf("testCompactionWALEdits");
430    WALProtos.CompactionDescriptor compactionDescriptor =
431      WALProtos.CompactionDescriptor.getDefaultInstance();
432    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW)
433        .setEndKey(HConstants.EMPTY_END_ROW).build();
434    WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
435    ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf);
436  }
437
438  @Test
439  public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
440    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
441    // 1. Get the bulk load wal edit event
442    WALEdit logEdit = getBulkLoadWALEdit(scope);
443    // 2. Create wal key
444    WALKeyImpl logKey = new WALKeyImpl(scope);
445
446    // 3. Get the scopes for the key
447    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf);
448
449    // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
450    assertNull("No bulk load entries scope should be added if bulk load replication is disabled.",
451      logKey.getReplicationScopes());
452  }
453
454  @Test
455  public void testBulkLoadWALEdits() throws Exception {
456    // 1. Get the bulk load wal edit event
457    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
458    WALEdit logEdit = getBulkLoadWALEdit(scope);
459    // 2. Create wal key
460    WALKeyImpl logKey = new WALKeyImpl(scope);
461    // 3. Enable bulk load hfile replication
462    Configuration bulkLoadConf = HBaseConfiguration.create(conf);
463    bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
464
465    // 4. Get the scopes for the key
466    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf);
467
468    NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
469    // Assert family with replication scope global is present in the key scopes
470    assertTrue("This family scope is set to global, should be part of replication key scopes.",
471      scopes.containsKey(f1));
472    // Assert family with replication scope local is not present in the key scopes
473    assertFalse("This family scope is set to local, should not be part of replication key scopes",
474      scopes.containsKey(f2));
475  }
476
477  /**
478   * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the
479   * corresponding ReplicationSourceInterface correctly cleans up the corresponding
480   * replication queue and ReplicationPeer.
481   * See HBASE-16096.
482   * @throws Exception
483   */
484  @Test
485  public void testPeerRemovalCleanup() throws Exception{
486    String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
487    final String peerId = "FakePeer";
488    final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
489        .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase");
490    try {
491      DummyServer server = new DummyServer();
492      ReplicationQueueStorage rq = ReplicationStorageFactory
493          .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
494      // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
495      // initialization to throw an exception.
496      conf.set("replication.replicationsource.implementation",
497          FailInitializeDummyReplicationSource.class.getName());
498      final ReplicationPeers rp = manager.getReplicationPeers();
499      // Set up the znode and ReplicationPeer for the fake peer
500      // Don't wait for replication source to initialize, we know it won't.
501      addPeerAndWait(peerId, peerConfig, false);
502
503      // Sanity check
504      assertNull(manager.getSource(peerId));
505
506      // Create a replication queue for the fake peer
507      rq.addWAL(server.getServerName(), peerId, "FakeFile");
508      // Unregister peer, this should remove the peer and clear all queues associated with it
509      // Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
510      removePeerAndWait(peerId);
511      assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId));
512    } finally {
513      conf.set("replication.replicationsource.implementation", replicationSourceImplName);
514      removePeerAndWait(peerId);
515    }
516  }
517
518  private static MetricsReplicationSourceSource getGlobalSource() throws Exception {
519    ReplicationSourceInterface source = manager.getSource(slaveId);
520    // Retrieve the global replication metrics source
521    Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
522    f.setAccessible(true);
523    return (MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
524  }
525
526  private static long getSizeOfLatestPath() {
527    // If no mini cluster is running, there are extra replication manager influencing the metrics.
528    if (utility.getMiniHBaseCluster() == null) {
529      return 0;
530    }
531    return utility.getMiniHBaseCluster().getRegionServerThreads()
532        .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
533        .map(HRegionServer::getReplicationSourceService)
534        .map(r -> (Replication)r)
535        .map(Replication::getReplicationManager)
536        .mapToLong(ReplicationSourceManager::getSizeOfLatestPath)
537        .sum();
538  }
539
540  @Test
541  public void testRemovePeerMetricsCleanup() throws Exception {
542    final String peerId = "DummyPeer";
543    final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
544        .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase");
545    try {
546      MetricsReplicationSourceSource globalSource = getGlobalSource();
547      final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
548      final long sizeOfLatestPath = getSizeOfLatestPath();
549      addPeerAndWait(peerId, peerConfig, true);
550      assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
551      ReplicationSourceInterface source = manager.getSource(peerId);
552      // Sanity check
553      assertNotNull(source);
554      final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
555      // Enqueue log and check if metrics updated
556      source.enqueueLog(new Path("abc"));
557      assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
558      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
559        globalSource.getSizeOfLogQueue());
560
561      // Removing the peer should reset the global metrics
562      removePeerAndWait(peerId);
563      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
564
565      // Adding the same peer back again should reset the single source metrics
566      addPeerAndWait(peerId, peerConfig, true);
567      source = manager.getSource(peerId);
568      assertNotNull(source);
569      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
570        globalSource.getSizeOfLogQueue());
571    } finally {
572      removePeerAndWait(peerId);
573    }
574  }
575
576  /**
577   * Add a peer and wait for it to initialize
578   * @param peerId
579   * @param peerConfig
580   * @param waitForSource Whether to wait for replication source to initialize
581   * @throws Exception
582   */
583  private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
584      final boolean waitForSource) throws Exception {
585    final ReplicationPeers rp = manager.getReplicationPeers();
586    rp.getPeerStorage().addPeer(peerId, peerConfig, true);
587    try {
588      manager.addPeer(peerId);
589    } catch (Exception e) {
590      // ignore the failed exception, because we'll test both success & failed case.
591    }
592    waitPeer(peerId, manager, waitForSource);
593    if (managerOfCluster != null) {
594      managerOfCluster.addPeer(peerId);
595      waitPeer(peerId, managerOfCluster, waitForSource);
596    }
597  }
598
599  private static void waitPeer(final String peerId,
600      ReplicationSourceManager manager, final boolean waitForSource) {
601    ReplicationPeers rp = manager.getReplicationPeers();
602    Waiter.waitFor(conf, 20000, () -> {
603      if (waitForSource) {
604        ReplicationSourceInterface rs = manager.getSource(peerId);
605        if (rs == null) {
606          return false;
607        }
608        if (rs instanceof ReplicationSourceDummy) {
609          return ((ReplicationSourceDummy)rs).isStartup();
610        }
611        return true;
612      } else {
613        return (rp.getPeer(peerId) != null);
614      }
615    });
616  }
617
618  /**
619   * Remove a peer and wait for it to get cleaned up
620   * @param peerId
621   * @throws Exception
622   */
623  private void removePeerAndWait(final String peerId) throws Exception {
624    final ReplicationPeers rp = manager.getReplicationPeers();
625    if (rp.getPeerStorage().listPeerIds().contains(peerId)) {
626      rp.getPeerStorage().removePeer(peerId);
627      try {
628        manager.removePeer(peerId);
629      } catch (Exception e) {
630        // ignore the failed exception and continue.
631      }
632    }
633    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
634      @Override
635      public boolean evaluate() throws Exception {
636        Collection<String> peers = rp.getPeerStorage().listPeerIds();
637        return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null)
638            && (!peers.contains(peerId)) && manager.getSource(peerId) == null;
639      }
640    });
641  }
642
643  private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
644    // 1. Create store files for the families
645    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
646    Map<String, Long> storeFilesSize = new HashMap<>(1);
647    List<Path> p = new ArrayList<>(1);
648    Path hfilePath1 = new Path(Bytes.toString(f1));
649    p.add(hfilePath1);
650    try {
651      storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen());
652    } catch (IOException e) {
653      LOG.debug("Failed to calculate the size of hfile " + hfilePath1);
654      storeFilesSize.put(hfilePath1.getName(), 0L);
655    }
656    storeFiles.put(f1, p);
657    scope.put(f1, 1);
658    p = new ArrayList<>(1);
659    Path hfilePath2 = new Path(Bytes.toString(f2));
660    p.add(hfilePath2);
661    try {
662      storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen());
663    } catch (IOException e) {
664      LOG.debug("Failed to calculate the size of hfile " + hfilePath2);
665      storeFilesSize.put(hfilePath2.getName(), 0L);
666    }
667    storeFiles.put(f2, p);
668    // 2. Create bulk load descriptor
669    BulkLoadDescriptor desc =
670        ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
671      UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1);
672
673    // 3. create bulk load wal edit event
674    WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);
675    return logEdit;
676  }
677
678  static class DummyNodeFailoverWorker extends Thread {
679    private Map<String, Set<String>> logZnodesMap;
680    Server server;
681    private ServerName deadRS;
682    ReplicationQueueStorage rq;
683
684    public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception {
685      this.deadRS = deadRS;
686      this.server = s;
687      this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(),
688        server.getConfiguration());
689    }
690
691    @Override
692    public void run() {
693      try {
694        logZnodesMap = new HashMap<>();
695        List<String> queues = rq.getAllQueues(deadRS);
696        for (String queue : queues) {
697          Pair<String, SortedSet<String>> pair =
698              rq.claimQueue(deadRS, queue, server.getServerName());
699          if (pair != null) {
700            logZnodesMap.put(pair.getFirst(), pair.getSecond());
701          }
702        }
703        server.abort("Done with testing", null);
704      } catch (Exception e) {
705        LOG.error("Got exception while running NodeFailoverWorker", e);
706      } finally {
707        latch.countDown();
708      }
709    }
710
711    /**
712     * @return 1 when the map is not empty.
713     */
714    private int isLogZnodesMapPopulated() {
715      Collection<Set<String>> sets = logZnodesMap.values();
716      if (sets.size() > 1) {
717        throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
718      }
719      if (sets.size() == 1) {
720        Set<String> s = sets.iterator().next();
721        for (String file : files) {
722          // at least one file was missing
723          if (!s.contains(file)) {
724            return 0;
725          }
726        }
727        return 1; // we found all the files
728      }
729      return 0;
730    }
731  }
732
733  static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
734
735    @Override
736    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
737        ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
738        UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
739        throws IOException {
740      throw new IOException("Failing deliberately");
741    }
742  }
743
744  static class DummyServer implements Server {
745    String hostname;
746
747    DummyServer() {
748      hostname = "hostname.example.org";
749    }
750
751    DummyServer(String hostname) {
752      this.hostname = hostname;
753    }
754
755    @Override
756    public Configuration getConfiguration() {
757      return conf;
758    }
759
760    @Override
761    public ZKWatcher getZooKeeper() {
762      return zkw;
763    }
764
765    @Override
766    public CoordinatedStateManager getCoordinatedStateManager() {
767      return null;
768    }
769    @Override
770    public ClusterConnection getConnection() {
771      return null;
772    }
773
774    @Override
775    public ServerName getServerName() {
776      return ServerName.valueOf(hostname, 1234, 1L);
777    }
778
779    @Override
780    public void abort(String why, Throwable e) {
781      // To change body of implemented methods use File | Settings | File Templates.
782    }
783
784    @Override
785    public boolean isAborted() {
786      return false;
787    }
788
789    @Override
790    public void stop(String why) {
791      // To change body of implemented methods use File | Settings | File Templates.
792    }
793
794    @Override
795    public boolean isStopped() {
796      return false; // To change body of implemented methods use File | Settings | File Templates.
797    }
798
799    @Override
800    public ChoreService getChoreService() {
801      return null;
802    }
803
804    @Override
805    public ClusterConnection getClusterConnection() {
806      // TODO Auto-generated method stub
807      return null;
808    }
809
810    @Override
811    public FileSystem getFileSystem() {
812      return null;
813    }
814
815    @Override
816    public boolean isStopping() {
817      return false;
818    }
819
820    @Override
821    public Connection createConnection(Configuration conf) throws IOException {
822      return null;
823    }
824  }
825}