001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.reflect.Field;
028import java.net.URLEncoder;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.NavigableMap;
035import java.util.NavigableSet;
036import java.util.Set;
037import java.util.SortedSet;
038import java.util.TreeMap;
039import java.util.TreeSet;
040import java.util.UUID;
041import java.util.concurrent.CountDownLatch;
042import java.util.stream.Collectors;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.ChoreService;
047import org.apache.hadoop.hbase.ClusterId;
048import org.apache.hadoop.hbase.CoordinatedStateManager;
049import org.apache.hadoop.hbase.HBaseClassTestRule;
050import org.apache.hadoop.hbase.HBaseConfiguration;
051import org.apache.hadoop.hbase.HBaseTestingUtility;
052import org.apache.hadoop.hbase.HColumnDescriptor;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.HRegionInfo;
055import org.apache.hadoop.hbase.HTableDescriptor;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.Server;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.Waiter;
061import org.apache.hadoop.hbase.client.ClusterConnection;
062import org.apache.hadoop.hbase.client.Connection;
063import org.apache.hadoop.hbase.client.RegionInfo;
064import org.apache.hadoop.hbase.client.RegionInfoBuilder;
065import org.apache.hadoop.hbase.regionserver.HRegionServer;
066import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
067import org.apache.hadoop.hbase.replication.ReplicationFactory;
068import org.apache.hadoop.hbase.replication.ReplicationPeer;
069import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
070import org.apache.hadoop.hbase.replication.ReplicationPeers;
071import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
072import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
073import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
074import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
075import org.apache.hadoop.hbase.testclassification.MediumTests;
076import org.apache.hadoop.hbase.testclassification.ReplicationTests;
077import org.apache.hadoop.hbase.util.Bytes;
078import org.apache.hadoop.hbase.util.CommonFSUtils;
079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
080import org.apache.hadoop.hbase.util.JVMClusterUtil;
081import org.apache.hadoop.hbase.util.Pair;
082import org.apache.hadoop.hbase.wal.WAL;
083import org.apache.hadoop.hbase.wal.WALEdit;
084import org.apache.hadoop.hbase.wal.WALFactory;
085import org.apache.hadoop.hbase.wal.WALKeyImpl;
086import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
087import org.apache.hadoop.hbase.zookeeper.ZKUtil;
088import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
089import org.junit.After;
090import org.junit.AfterClass;
091import org.junit.Before;
092import org.junit.ClassRule;
093import org.junit.Rule;
094import org.junit.Test;
095import org.junit.experimental.categories.Category;
096import org.junit.rules.TestName;
097import org.slf4j.Logger;
098import org.slf4j.LoggerFactory;
099
100import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
101import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
102
103import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
106
107/**
108 * An abstract class that tests ReplicationSourceManager. Classes that extend this class should set
109 * up the proper config for this class and initialize the proper cluster using HBaseTestingUtility.
110 */
111@Category({ ReplicationTests.class, MediumTests.class })
112public abstract class TestReplicationSourceManager {
113
114  @ClassRule
115  public static final HBaseClassTestRule CLASS_RULE =
116    HBaseClassTestRule.forClass(TestReplicationSourceManager.class);
117
118  protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationSourceManager.class);
119
120  protected static Configuration conf;
121
122  protected static HBaseTestingUtility utility;
123
124  protected static Replication replication;
125
126  protected static ReplicationSourceManager manager;
127
128  protected static ReplicationSourceManager managerOfCluster;
129
130  protected static ZKWatcher zkw;
131
132  protected static HTableDescriptor htd;
133
134  protected static HRegionInfo hri;
135
136  protected static final byte[] r1 = Bytes.toBytes("r1");
137
138  protected static final byte[] r2 = Bytes.toBytes("r2");
139
140  protected static final byte[] f1 = Bytes.toBytes("f1");
141
142  protected static final byte[] f2 = Bytes.toBytes("f2");
143
144  protected static final TableName test = TableName.valueOf("test");
145
146  protected static final String slaveId = "1";
147
148  protected static FileSystem fs;
149
150  protected static Path oldLogDir;
151
152  protected static Path logDir;
153
154  protected static CountDownLatch latch;
155
156  protected static List<String> files = new ArrayList<>();
157  protected static NavigableMap<byte[], Integer> scopes;
158
159  protected static void setupZkAndReplication() throws Exception {
160    // The implementing class should set up the conf
161    assertNotNull(conf);
162    zkw = new ZKWatcher(conf, "test", null);
163    ZKUtil.createWithParents(zkw, "/hbase/replication");
164    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
165    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
166      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
167        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
168    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
169    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
170      ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
171    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
172    ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
173
174    ZKClusterId.setClusterId(zkw, new ClusterId());
175    CommonFSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
176    fs = FileSystem.get(conf);
177    oldLogDir = new Path(utility.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
178    logDir = new Path(utility.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME);
179    replication = new Replication();
180    replication.initialize(new DummyServer(), fs, logDir, oldLogDir,
181      new WALFactory(conf, "test", null));
182    managerOfCluster = getManagerFromCluster();
183    if (managerOfCluster != null) {
184      // After replication procedure, we need to add peer by hand (other than by receiving
185      // notification from zk)
186      managerOfCluster.addPeer(slaveId);
187    }
188
189    manager = replication.getReplicationManager();
190    manager.addSource(slaveId);
191    if (managerOfCluster != null) {
192      waitPeer(slaveId, managerOfCluster, true);
193    }
194    waitPeer(slaveId, manager, true);
195
196    htd = new HTableDescriptor(test);
197    HColumnDescriptor col = new HColumnDescriptor(f1);
198    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
199    htd.addFamily(col);
200    col = new HColumnDescriptor(f2);
201    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
202    htd.addFamily(col);
203
204    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
205    for (byte[] fam : htd.getFamiliesKeys()) {
206      scopes.put(fam, 0);
207    }
208    hri = new HRegionInfo(htd.getTableName(), r1, r2);
209  }
210
211  private static ReplicationSourceManager getManagerFromCluster() {
212    // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster.
213    if (utility.getMiniHBaseCluster() == null) {
214      return null;
215    }
216    return utility.getMiniHBaseCluster().getRegionServerThreads().stream()
217      .map(JVMClusterUtil.RegionServerThread::getRegionServer).findAny()
218      .map(HRegionServer::getReplicationSourceService).map(r -> (Replication) r)
219      .map(Replication::getReplicationManager).get();
220  }
221
222  @AfterClass
223  public static void tearDownAfterClass() throws Exception {
224    if (manager != null) {
225      manager.join();
226    }
227    utility.shutdownMiniCluster();
228  }
229
230  @Rule
231  public TestName testName = new TestName();
232
233  private void cleanLogDir() throws IOException {
234    fs.delete(logDir, true);
235    fs.delete(oldLogDir, true);
236  }
237
238  @Before
239  public void setUp() throws Exception {
240    LOG.info("Start " + testName.getMethodName());
241    cleanLogDir();
242  }
243
244  @After
245  public void tearDown() throws Exception {
246    LOG.info("End " + testName.getMethodName());
247    cleanLogDir();
248    List<String> ids = manager.getSources().stream().map(ReplicationSourceInterface::getPeerId)
249      .collect(Collectors.toList());
250    for (String id : ids) {
251      if (slaveId.equals(id)) {
252        continue;
253      }
254      removePeerAndWait(id);
255    }
256  }
257
258  @Test
259  public void testLogRoll() throws Exception {
260    long baseline = 1000;
261    long time = baseline;
262    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
263    KeyValue kv = new KeyValue(r1, f1, r1);
264    WALEdit edit = new WALEdit();
265    edit.add(kv);
266
267    WALFactory wals =
268      new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8"));
269    ReplicationSourceManager replicationManager = replication.getReplicationManager();
270    wals.getWALProvider()
271      .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
272    final WAL wal = wals.getWAL(hri);
273    manager.init();
274    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame"));
275    htd.addFamily(new HColumnDescriptor(f1));
276    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
277    for (byte[] fam : htd.getFamiliesKeys()) {
278      scopes.put(fam, 0);
279    }
280    // Testing normal log rolling every 20
281    for (long i = 1; i < 101; i++) {
282      if (i > 1 && i % 20 == 0) {
283        wal.rollWriter();
284      }
285      LOG.info(Long.toString(i));
286      final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test,
287        EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit);
288      wal.sync(txid);
289    }
290
291    // Simulate a rapid insert that's followed
292    // by a report that's still not totally complete (missing last one)
293    LOG.info(baseline + " and " + time);
294    baseline += 101;
295    time = baseline;
296    LOG.info(baseline + " and " + time);
297
298    for (int i = 0; i < 3; i++) {
299      wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test,
300        EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit);
301    }
302    wal.sync();
303
304    int logNumber = 0;
305    for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId)
306      .entrySet()) {
307      logNumber += entry.getValue().size();
308    }
309    assertEquals(6, logNumber);
310
311    wal.rollWriter();
312
313    manager.logPositionAndCleanOldLogs(manager.getSources().get(0),
314      new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
315
316    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test,
317      EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit);
318    wal.sync();
319
320    assertEquals(1, manager.getWALs().size());
321
322    // TODO Need a case with only 2 WALs and we only want to delete the first one
323  }
324
325  @Test
326  public void testClaimQueues() throws Exception {
327    Server server = new DummyServer("hostname0.example.org");
328    ReplicationQueueStorage rq = ReplicationStorageFactory
329      .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
330    // populate some znodes in the peer znode
331    files.add("log1");
332    files.add("log2");
333    for (String file : files) {
334      rq.addWAL(server.getServerName(), "1", file);
335    }
336    // create 3 DummyServers
337    Server s1 = new DummyServer("dummyserver1.example.org");
338    Server s2 = new DummyServer("dummyserver2.example.org");
339    Server s3 = new DummyServer("dummyserver3.example.org");
340
341    // create 3 DummyNodeFailoverWorkers
342    DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1);
343    DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2);
344    DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3);
345
346    latch = new CountDownLatch(3);
347    // start the threads
348    w1.start();
349    w2.start();
350    w3.start();
351    // make sure only one is successful
352    int populatedMap = 0;
353    // wait for result now... till all the workers are done.
354    latch.await();
355    populatedMap +=
356      w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() + w3.isLogZnodesMapPopulated();
357    assertEquals(1, populatedMap);
358    server.abort("", null);
359  }
360
361  @Test
362  public void testCleanupFailoverQueues() throws Exception {
363    Server server = new DummyServer("hostname1.example.org");
364    ReplicationQueueStorage rq = ReplicationStorageFactory
365      .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
366    // populate some znodes in the peer znode
367    SortedSet<String> files = new TreeSet<>();
368    String group = "testgroup";
369    String file1 = group + ".log1";
370    String file2 = group + ".log2";
371    files.add(file1);
372    files.add(file2);
373    for (String file : files) {
374      rq.addWAL(server.getServerName(), "1", file);
375    }
376    Server s1 = new DummyServer("dummyserver1.example.org");
377    ReplicationPeers rp1 =
378      ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration());
379    rp1.init();
380    manager.claimQueue(server.getServerName(), "1");
381    assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
382    String id = "1-" + server.getServerName().getServerName();
383    assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
384    manager.cleanOldLogs(file2, false, id, true);
385    // log1 should be deleted
386    assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
387  }
388
389  @Test
390  public void testCleanupUnknownPeerZNode() throws Exception {
391    Server server = new DummyServer("hostname2.example.org");
392    ReplicationQueueStorage rq = ReplicationStorageFactory
393      .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
394    // populate some znodes in the peer znode
395    // add log to an unknown peer
396    String group = "testgroup";
397    rq.addWAL(server.getServerName(), "2", group + ".log1");
398    rq.addWAL(server.getServerName(), "2", group + ".log2");
399
400    manager.claimQueue(server.getServerName(), "2");
401
402    // The log of the unknown peer should be removed from zk
403    for (String peer : manager.getAllQueues()) {
404      assertTrue(peer.startsWith("1"));
405    }
406  }
407
408  /**
409   * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the
410   * compaction WALEdit.
411   */
412  @Test
413  public void testCompactionWALEdits() throws Exception {
414    TableName tableName = TableName.valueOf("testCompactionWALEdits");
415    WALProtos.CompactionDescriptor compactionDescriptor =
416      WALProtos.CompactionDescriptor.getDefaultInstance();
417    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW)
418      .setEndKey(HConstants.EMPTY_END_ROW).build();
419    WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
420    ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf);
421  }
422
423  @Test
424  public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
425    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
426    // 1. Get the bulk load wal edit event
427    WALEdit logEdit = getBulkLoadWALEdit(scope);
428    // 2. Create wal key
429    WALKeyImpl logKey = new WALKeyImpl(scope);
430
431    // 3. Get the scopes for the key
432    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf);
433
434    // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
435    assertNull("No bulk load entries scope should be added if bulk load replication is disabled.",
436      logKey.getReplicationScopes());
437  }
438
439  @Test
440  public void testBulkLoadWALEdits() throws Exception {
441    // 1. Get the bulk load wal edit event
442    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
443    WALEdit logEdit = getBulkLoadWALEdit(scope);
444    // 2. Create wal key
445    WALKeyImpl logKey = new WALKeyImpl(scope);
446    // 3. Enable bulk load hfile replication
447    Configuration bulkLoadConf = HBaseConfiguration.create(conf);
448    bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
449
450    // 4. Get the scopes for the key
451    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf);
452
453    NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
454    // Assert family with replication scope global is present in the key scopes
455    assertTrue("This family scope is set to global, should be part of replication key scopes.",
456      scopes.containsKey(f1));
457    // Assert family with replication scope local is not present in the key scopes
458    assertFalse("This family scope is set to local, should not be part of replication key scopes",
459      scopes.containsKey(f2));
460  }
461
462  /**
463   * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the
464   * corresponding ReplicationSourceInterface correctly cleans up the corresponding replication
465   * queue and ReplicationPeer. See HBASE-16096. n
466   */
467  @Test
468  public void testPeerRemovalCleanup() throws Exception {
469    String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
470    final String peerId = "FakePeer";
471    final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
472      .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase");
473    try {
474      DummyServer server = new DummyServer();
475      ReplicationQueueStorage rq = ReplicationStorageFactory
476        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
477      // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
478      // initialization to throw an exception.
479      conf.set("replication.replicationsource.implementation",
480        FailInitializeDummyReplicationSource.class.getName());
481      final ReplicationPeers rp = manager.getReplicationPeers();
482      // Set up the znode and ReplicationPeer for the fake peer
483      // Don't wait for replication source to initialize, we know it won't.
484      addPeerAndWait(peerId, peerConfig, false);
485
486      // Sanity check
487      assertNull(manager.getSource(peerId));
488
489      // Create a replication queue for the fake peer
490      rq.addWAL(server.getServerName(), peerId, "FakeFile");
491      // Unregister peer, this should remove the peer and clear all queues associated with it
492      // Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
493      removePeerAndWait(peerId);
494      assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId));
495    } finally {
496      conf.set("replication.replicationsource.implementation", replicationSourceImplName);
497      removePeerAndWait(peerId);
498    }
499  }
500
501  private static MetricsReplicationSourceSource getGlobalSource() throws Exception {
502    ReplicationSourceInterface source = manager.getSource(slaveId);
503    // Retrieve the global replication metrics source
504    Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
505    f.setAccessible(true);
506    return (MetricsReplicationSourceSource) f.get(source.getSourceMetrics());
507  }
508
509  private static long getSizeOfLatestPath() {
510    // If no mini cluster is running, there are extra replication manager influencing the metrics.
511    if (utility.getMiniHBaseCluster() == null) {
512      return 0;
513    }
514    return utility.getMiniHBaseCluster().getRegionServerThreads().stream()
515      .map(JVMClusterUtil.RegionServerThread::getRegionServer)
516      .map(HRegionServer::getReplicationSourceService).map(r -> (Replication) r)
517      .map(Replication::getReplicationManager)
518      .mapToLong(ReplicationSourceManager::getSizeOfLatestPath).sum();
519  }
520
521  @Test
522  public void testRemovePeerMetricsCleanup() throws Exception {
523    final String peerId = "DummyPeer";
524    final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
525      .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase");
526    try {
527      MetricsReplicationSourceSource globalSource = getGlobalSource();
528      final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
529      final long sizeOfLatestPath = getSizeOfLatestPath();
530      addPeerAndWait(peerId, peerConfig, true);
531      assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
532      ReplicationSourceInterface source = manager.getSource(peerId);
533      // Sanity check
534      assertNotNull(source);
535      final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
536      // Enqueue log and check if metrics updated
537      source.enqueueLog(new Path("abc"));
538      assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
539      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
540        globalSource.getSizeOfLogQueue());
541
542      // Removing the peer should reset the global metrics
543      removePeerAndWait(peerId);
544      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
545
546      // Adding the same peer back again should reset the single source metrics
547      addPeerAndWait(peerId, peerConfig, true);
548      source = manager.getSource(peerId);
549      assertNotNull(source);
550      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
551        globalSource.getSizeOfLogQueue());
552    } finally {
553      removePeerAndWait(peerId);
554    }
555  }
556
557  /**
558   * Add a peer and wait for it to initialize nn * @param waitForSource Whether to wait for
559   * replication source to initialize n
560   */
561  private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
562    final boolean waitForSource) throws Exception {
563    final ReplicationPeers rp = manager.getReplicationPeers();
564    rp.getPeerStorage().addPeer(peerId, peerConfig, true);
565    try {
566      manager.addPeer(peerId);
567    } catch (Exception e) {
568      // ignore the failed exception, because we'll test both success & failed case.
569    }
570    waitPeer(peerId, manager, waitForSource);
571    if (managerOfCluster != null) {
572      managerOfCluster.addPeer(peerId);
573      waitPeer(peerId, managerOfCluster, waitForSource);
574    }
575  }
576
577  private static void waitPeer(final String peerId, ReplicationSourceManager manager,
578    final boolean waitForSource) {
579    ReplicationPeers rp = manager.getReplicationPeers();
580    Waiter.waitFor(conf, 20000, () -> {
581      if (waitForSource) {
582        ReplicationSourceInterface rs = manager.getSource(peerId);
583        if (rs == null) {
584          return false;
585        }
586        if (rs instanceof ReplicationSourceDummy) {
587          return ((ReplicationSourceDummy) rs).isStartup();
588        }
589        return true;
590      } else {
591        return (rp.getPeer(peerId) != null);
592      }
593    });
594  }
595
596  /**
597   * Remove a peer and wait for it to get cleaned up nn
598   */
599  private void removePeerAndWait(final String peerId) throws Exception {
600    final ReplicationPeers rp = manager.getReplicationPeers();
601    if (rp.getPeerStorage().listPeerIds().contains(peerId)) {
602      rp.getPeerStorage().removePeer(peerId);
603      try {
604        manager.removePeer(peerId);
605      } catch (Exception e) {
606        // ignore the failed exception and continue.
607      }
608    }
609    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
610      @Override
611      public boolean evaluate() throws Exception {
612        Collection<String> peers = rp.getPeerStorage().listPeerIds();
613        return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null)
614          && (!peers.contains(peerId)) && manager.getSource(peerId) == null;
615      }
616    });
617  }
618
619  @Test
620  public void testSameWALPrefix() throws IOException {
621    Set<String> latestWalsBefore =
622      manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet());
623    String walName1 = "localhost,8080,12345-45678-Peer.34567";
624    String walName2 = "localhost,8080,12345.56789";
625    manager.preLogRoll(new Path(walName1));
626    manager.preLogRoll(new Path(walName2));
627
628    Set<String> latestWals = manager.getLastestPath().stream().map(Path::getName)
629      .filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet());
630    assertEquals(2, latestWals.size());
631    assertTrue(latestWals.contains(walName1));
632    assertTrue(latestWals.contains(walName2));
633  }
634
635  private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
636    // 1. Create store files for the families
637    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
638    Map<String, Long> storeFilesSize = new HashMap<>(1);
639    List<Path> p = new ArrayList<>(1);
640    Path hfilePath1 = new Path(Bytes.toString(f1));
641    p.add(hfilePath1);
642    try {
643      storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen());
644    } catch (IOException e) {
645      LOG.debug("Failed to calculate the size of hfile " + hfilePath1);
646      storeFilesSize.put(hfilePath1.getName(), 0L);
647    }
648    storeFiles.put(f1, p);
649    scope.put(f1, 1);
650    p = new ArrayList<>(1);
651    Path hfilePath2 = new Path(Bytes.toString(f2));
652    p.add(hfilePath2);
653    try {
654      storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen());
655    } catch (IOException e) {
656      LOG.debug("Failed to calculate the size of hfile " + hfilePath2);
657      storeFilesSize.put(hfilePath2.getName(), 0L);
658    }
659    storeFiles.put(f2, p);
660    // 2. Create bulk load descriptor
661    BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
662      UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1);
663
664    // 3. create bulk load wal edit event
665    WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);
666    return logEdit;
667  }
668
669  static class DummyNodeFailoverWorker extends Thread {
670    private Map<String, Set<String>> logZnodesMap;
671    Server server;
672    private ServerName deadRS;
673    ReplicationQueueStorage rq;
674
675    public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception {
676      this.deadRS = deadRS;
677      this.server = s;
678      this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(),
679        server.getConfiguration());
680    }
681
682    @Override
683    public void run() {
684      try {
685        logZnodesMap = new HashMap<>();
686        List<String> queues = rq.getAllQueues(deadRS);
687        for (String queue : queues) {
688          Pair<String, SortedSet<String>> pair =
689            rq.claimQueue(deadRS, queue, server.getServerName());
690          if (pair != null) {
691            logZnodesMap.put(pair.getFirst(), pair.getSecond());
692          }
693        }
694        server.abort("Done with testing", null);
695      } catch (Exception e) {
696        LOG.error("Got exception while running NodeFailoverWorker", e);
697      } finally {
698        latch.countDown();
699      }
700    }
701
702    /** Returns 1 when the map is not empty. */
703    private int isLogZnodesMapPopulated() {
704      Collection<Set<String>> sets = logZnodesMap.values();
705      if (sets.size() > 1) {
706        throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
707      }
708      if (sets.size() == 1) {
709        Set<String> s = sets.iterator().next();
710        for (String file : files) {
711          // at least one file was missing
712          if (!s.contains(file)) {
713            return 0;
714          }
715        }
716        return 1; // we found all the files
717      }
718      return 0;
719    }
720  }
721
722  static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
723
724    @Override
725    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
726      ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
727      UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
728      throws IOException {
729      throw new IOException("Failing deliberately");
730    }
731  }
732
733  static class DummyServer implements Server {
734    String hostname;
735
736    DummyServer() {
737      hostname = "hostname.example.org";
738    }
739
740    DummyServer(String hostname) {
741      this.hostname = hostname;
742    }
743
744    @Override
745    public Configuration getConfiguration() {
746      return conf;
747    }
748
749    @Override
750    public ZKWatcher getZooKeeper() {
751      return zkw;
752    }
753
754    @Override
755    public CoordinatedStateManager getCoordinatedStateManager() {
756      return null;
757    }
758
759    @Override
760    public ClusterConnection getConnection() {
761      return null;
762    }
763
764    @Override
765    public ServerName getServerName() {
766      return ServerName.valueOf(hostname, 1234, 1L);
767    }
768
769    @Override
770    public void abort(String why, Throwable e) {
771      // To change body of implemented methods use File | Settings | File Templates.
772    }
773
774    @Override
775    public boolean isAborted() {
776      return false;
777    }
778
779    @Override
780    public void stop(String why) {
781      // To change body of implemented methods use File | Settings | File Templates.
782    }
783
784    @Override
785    public boolean isStopped() {
786      return false; // To change body of implemented methods use File | Settings | File Templates.
787    }
788
789    @Override
790    public ChoreService getChoreService() {
791      return null;
792    }
793
794    @Override
795    public ClusterConnection getClusterConnection() {
796      // TODO Auto-generated method stub
797      return null;
798    }
799
800    @Override
801    public FileSystem getFileSystem() {
802      return null;
803    }
804
805    @Override
806    public boolean isStopping() {
807      return false;
808    }
809
810    @Override
811    public Connection createConnection(Configuration conf) throws IOException {
812      return null;
813    }
814  }
815}