001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.List; 023import java.util.stream.Stream; 024import org.apache.hadoop.hbase.HBaseClassTestRule; 025import org.apache.hadoop.hbase.HBaseTestingUtility; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 028import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 029import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 030import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 031import org.apache.hadoop.hbase.testclassification.MediumTests; 032import org.apache.hadoop.hbase.testclassification.ReplicationTests; 033import org.apache.hadoop.hbase.util.HbckErrorReporter.ERROR_CODE; 034import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; 035import org.junit.AfterClass; 036import org.junit.BeforeClass; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040 041@Category({ ReplicationTests.class, MediumTests.class }) 042public class TestHBaseFsckReplication { 043 044 @ClassRule 045 public static final HBaseClassTestRule CLASS_RULE = 046 HBaseClassTestRule.forClass(TestHBaseFsckReplication.class); 047 048 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 049 050 @BeforeClass 051 public static void setUp() throws Exception { 052 UTIL.getConfiguration().setBoolean("hbase.write.hbck1.lock.file", false); 053 UTIL.startMiniCluster(1); 054 } 055 056 @AfterClass 057 public static void tearDown() throws Exception { 058 UTIL.shutdownMiniCluster(); 059 } 060 061 @Test 062 public void test() throws Exception { 063 ReplicationPeerStorage peerStorage = ReplicationStorageFactory 064 .getReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); 065 ReplicationQueueStorage queueStorage = ReplicationStorageFactory 066 .getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); 067 068 String peerId1 = "1"; 069 String peerId2 = "2"; 070 peerStorage.addPeer(peerId1, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(), 071 true); 072 peerStorage.addPeer(peerId2, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(), 073 true); 074 for (int i = 0; i < 10; i++) { 075 queueStorage.addWAL(ServerName.valueOf("localhost", 10000 + i, 100000 + i), peerId1, 076 "file-" + i); 077 } 078 queueStorage.addWAL(ServerName.valueOf("localhost", 10000, 100000), peerId2, "file"); 079 HBaseFsck fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true); 080 HbckTestingUtil.assertNoErrors(fsck); 081 082 // should not remove anything since the replication peer is still alive 083 assertEquals(10, queueStorage.getListOfReplicators().size()); 084 peerStorage.removePeer(peerId1); 085 // there should be orphan queues 086 assertEquals(10, queueStorage.getListOfReplicators().size()); 087 fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), false); 088 HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> { 089 return ERROR_CODE.UNDELETED_REPLICATION_QUEUE; 090 }).limit(10).toArray(ERROR_CODE[]::new)); 091 092 // should not delete anything when fix is false 093 assertEquals(10, queueStorage.getListOfReplicators().size()); 094 095 fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true); 096 HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> { 097 return ERROR_CODE.UNDELETED_REPLICATION_QUEUE; 098 }).limit(10).toArray(ERROR_CODE[]::new)); 099 100 List<ServerName> replicators = queueStorage.getListOfReplicators(); 101 // should not remove the server with queue for peerId2 102 assertEquals(1, replicators.size()); 103 assertEquals(ServerName.valueOf("localhost", 10000, 100000), replicators.get(0)); 104 for (String queueId : queueStorage.getAllQueues(replicators.get(0))) { 105 assertEquals(peerId2, queueId); 106 } 107 } 108}