001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertTrue; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Set; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 036import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 037import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 038import org.apache.hadoop.hbase.replication.ReplicationQueueId; 039import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 040import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 041import org.apache.hadoop.hbase.testclassification.ReplicationTests; 042import org.apache.hadoop.hbase.testclassification.SmallTests; 043import org.apache.hadoop.hbase.util.CommonFSUtils; 044import org.apache.hadoop.hbase.util.Pair; 045import org.junit.jupiter.api.AfterEach; 046import org.junit.jupiter.api.BeforeEach; 047import org.junit.jupiter.api.Tag; 048import org.junit.jupiter.api.Test; 049import org.junit.jupiter.api.TestInfo; 050 051/** 052 * Tests for DumpReplicationQueues tool 053 */ 054@Tag(ReplicationTests.TAG) 055@Tag(SmallTests.TAG) 056public class TestDumpReplicationQueues { 057 058 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 059 private static Configuration CONF; 060 private static FileSystem FS = null; 061 private Path root; 062 private Path logDir; 063 064 private String testName; 065 066 @BeforeEach 067 public void setup(TestInfo testInfo) throws Exception { 068 testName = testInfo.getTestMethod().get().getName(); 069 UTIL.startMiniCluster(3); 070 CONF = UTIL.getConfiguration(); 071 TableName tableName = TableName.valueOf("replication_" + testName); 072 UTIL.getAdmin() 073 .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName)); 074 CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString()); 075 FS = FileSystem.get(CONF); 076 root = UTIL.getDataTestDirOnTestFS("hbase"); 077 logDir = new Path(root, HConstants.HREGION_LOGDIR_NAME); 078 FS.mkdirs(logDir); 079 CommonFSUtils.setRootDir(CONF, root); 080 CommonFSUtils.setWALRootDir(CONF, root); 081 } 082 083 @Test 084 public void testDumpReplication() throws Exception { 085 String peerId = "1"; 086 String serverNameStr = "rs1,12345,123"; 087 addPeer(peerId, "hbase"); 088 ServerName serverName = ServerName.valueOf(serverNameStr); 089 String walName = "rs1%2C12345%2C123.10"; 090 Path walPath = new Path(logDir, serverNameStr + "/" + walName); 091 FS.createNewFile(walPath); 092 093 ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId); 094 ReplicationQueueStorage queueStorage = 095 ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), CONF); 096 queueStorage.setOffset(queueId, "wal-group", 097 new ReplicationGroupOffset(FS.listStatus(walPath)[0].getPath().toString(), 123), 098 Collections.emptyMap()); 099 100 DumpReplicationQueues dumpQueues = new DumpReplicationQueues(); 101 Set<String> peerIds = new HashSet<>(); 102 peerIds.add(peerId); 103 List<String> wals = new ArrayList<>(); 104 wals.add("rs1%2C12345%2C123.12"); 105 wals.add("rs1%2C12345%2C123.15"); 106 wals.add("rs1%2C12345%2C123.11"); 107 for (String wal : wals) { 108 Path wPath = new Path(logDir, serverNameStr + "/" + wal); 109 FS.createNewFile(wPath); 110 } 111 112 String dump = dumpQueues.dumpQueues(UTIL.getConnection(), peerIds, false, CONF); 113 assertTrue(dump.indexOf("Queue id: 1-rs1,12345,123") > 0); 114 assertTrue(dump.indexOf("Number of WALs in replication queue: 4") > 0); 115 // test for 'Returns wal sorted' 116 String[] parsedDump = dump.split("Replication position for"); 117 assertTrue(parsedDump[1].indexOf("rs1%2C12345%2C123.10: 123") >= 0, 118 "First wal should be rs1%2C12345%2C123.10: 123, but got: " + parsedDump[1]); 119 assertTrue( 120 parsedDump[2].indexOf("rs1%2C12345%2C123.11: 0 (not started or nothing to replicate)") >= 0, 121 "Second wal should be rs1%2C12345%2C123.11: 0, but got: " + parsedDump[2]); 122 assertTrue( 123 parsedDump[3].indexOf("rs1%2C12345%2C123.12: 0 (not started or nothing to replicate)") >= 0, 124 "Third wal should be rs1%2C12345%2C123.12: 0, but got: " + parsedDump[3]); 125 assertTrue( 126 parsedDump[4].indexOf("rs1%2C12345%2C123.15: 0 (not started or nothing to replicate)") >= 0, 127 "Fourth wal should be rs1%2C12345%2C123.15: 0, but got: " + parsedDump[4]); 128 129 Path file1 = new Path("testHFile1"); 130 Path file2 = new Path("testHFile2"); 131 List<Pair<Path, Path>> files = new ArrayList<>(1); 132 files.add(new Pair<>(null, file1)); 133 files.add(new Pair<>(null, file2)); 134 queueStorage.addHFileRefs(peerId, files); 135 // test for 'Dump Replication via replication table' 136 String dump2 = dumpQueues.dumpReplicationViaTable(UTIL.getConnection(), CONF); 137 assertTrue(dump2.indexOf("peers/1/peer-state: ENABLED") > 0); 138 assertTrue(dump2.indexOf("rs1,12345,123/rs1%2C12345%2C123.10: 123") >= 0); 139 assertTrue(dump2.indexOf("hfile-refs/1/testHFile1,testHFile2") >= 0); 140 } 141 142 /** 143 * Add a peer 144 */ 145 private void addPeer(String peerId, String clusterKey) throws IOException { 146 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() 147 .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey) 148 .setReplicationEndpointImpl( 149 TestReplicationSourceManager.ReplicationEndpointForTest.class.getName()); 150 UTIL.getAdmin().addReplicationPeer(peerId, builder.build(), true); 151 } 152 153 @AfterEach 154 public void tearDown() throws Exception { 155 UTIL.shutdownMiniCluster(); 156 } 157}