001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.util.Collections; 023import java.util.List; 024import java.util.stream.Stream; 025import org.apache.hadoop.hbase.HBaseTestingUtil; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 029import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 030import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 031import org.apache.hadoop.hbase.replication.ReplicationQueueId; 032import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 033import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 034import org.apache.hadoop.hbase.replication.SyncReplicationState; 035import org.apache.hadoop.hbase.testclassification.MediumTests; 036import org.apache.hadoop.hbase.testclassification.ReplicationTests; 037import org.apache.hadoop.hbase.util.HbckErrorReporter.ERROR_CODE; 038import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; 039import org.junit.jupiter.api.AfterEach; 040import org.junit.jupiter.api.BeforeEach; 041import org.junit.jupiter.api.Tag; 042import org.junit.jupiter.api.Test; 043import org.junit.jupiter.api.TestInfo; 044 045@Tag(ReplicationTests.TAG) 046@Tag(MediumTests.TAG) 047public class TestHBaseFsckReplication { 048 049 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 050 051 @BeforeEach 052 public void setUp(TestInfo testInfo) throws Exception { 053 UTIL.getConfiguration().setBoolean("hbase.write.hbck1.lock.file", false); 054 UTIL.startMiniCluster(1); 055 TableName tableName = 056 TableName.valueOf("replication_" + testInfo.getTestMethod().get().getName()); 057 UTIL.getAdmin() 058 .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName)); 059 UTIL.getConfiguration().set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, 060 tableName.getNameAsString()); 061 } 062 063 @AfterEach 064 public void tearDown() throws Exception { 065 UTIL.shutdownMiniCluster(); 066 } 067 068 @Test 069 public void test() throws Exception { 070 ReplicationPeerStorage peerStorage = ReplicationStorageFactory.getReplicationPeerStorage( 071 UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); 072 ReplicationQueueStorage queueStorage = ReplicationStorageFactory 073 .getReplicationQueueStorage(UTIL.getConnection(), UTIL.getConfiguration()); 074 075 String peerId1 = "1"; 076 String peerId2 = "2"; 077 peerStorage.addPeer(peerId1, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(), 078 true, SyncReplicationState.NONE); 079 peerStorage.addPeer(peerId2, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(), 080 true, SyncReplicationState.NONE); 081 ReplicationQueueId queueId = null; 082 for (int i = 0; i < 10; i++) { 083 queueId = new ReplicationQueueId(getServerName(i), peerId1); 084 queueStorage.setOffset(queueId, "group-" + i, 085 new ReplicationGroupOffset("file-" + i, i * 100), Collections.emptyMap()); 086 } 087 queueId = new ReplicationQueueId(getServerName(0), peerId2); 088 queueStorage.setOffset(queueId, "group-" + 0, new ReplicationGroupOffset("file-" + 0, 100), 089 Collections.emptyMap()); 090 HBaseFsck fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true); 091 HbckTestingUtil.assertNoErrors(fsck); 092 093 // should not remove anything since the replication peer is still alive 094 assertEquals(10, queueStorage.listAllReplicators().size()); 095 peerStorage.removePeer(peerId1); 096 // there should be orphan queues 097 assertEquals(10, queueStorage.listAllReplicators().size()); 098 fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), false); 099 HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> { 100 return ERROR_CODE.UNDELETED_REPLICATION_QUEUE; 101 }).limit(10).toArray(ERROR_CODE[]::new)); 102 103 // should not delete anything when fix is false 104 assertEquals(10, queueStorage.listAllReplicators().size()); 105 106 fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true); 107 HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> { 108 return ERROR_CODE.UNDELETED_REPLICATION_QUEUE; 109 }).limit(10).toArray(HbckErrorReporter.ERROR_CODE[]::new)); 110 111 List<ServerName> replicators = queueStorage.listAllReplicators(); 112 // should not remove the server with queue for peerId2 113 assertEquals(1, replicators.size()); 114 assertEquals(ServerName.valueOf("localhost", 10000, 100000), replicators.get(0)); 115 for (ReplicationQueueId qId : queueStorage.listAllQueueIds(replicators.get(0))) { 116 assertEquals(peerId2, qId.getPeerId()); 117 } 118 } 119 120 private ServerName getServerName(int i) { 121 return ServerName.valueOf("localhost", 10000 + i, 100000 + i); 122 } 123}