001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Set;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
036import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
037import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
038import org.apache.hadoop.hbase.replication.ReplicationQueueId;
039import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
040import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
041import org.apache.hadoop.hbase.testclassification.ReplicationTests;
042import org.apache.hadoop.hbase.testclassification.SmallTests;
043import org.apache.hadoop.hbase.util.CommonFSUtils;
044import org.apache.hadoop.hbase.util.Pair;
045import org.junit.jupiter.api.AfterEach;
046import org.junit.jupiter.api.BeforeEach;
047import org.junit.jupiter.api.Tag;
048import org.junit.jupiter.api.Test;
049import org.junit.jupiter.api.TestInfo;
050
051/**
052 * Tests for DumpReplicationQueues tool
053 */
054@Tag(ReplicationTests.TAG)
055@Tag(SmallTests.TAG)
056public class TestDumpReplicationQueues {
057
058  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
059  private static Configuration CONF;
060  private static FileSystem FS = null;
061  private Path root;
062  private Path logDir;
063
064  private String testName;
065
066  @BeforeEach
067  public void setup(TestInfo testInfo) throws Exception {
068    testName = testInfo.getTestMethod().get().getName();
069    UTIL.startMiniCluster(3);
070    CONF = UTIL.getConfiguration();
071    TableName tableName = TableName.valueOf("replication_" + testName);
072    UTIL.getAdmin()
073      .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName));
074    CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
075    FS = FileSystem.get(CONF);
076    root = UTIL.getDataTestDirOnTestFS("hbase");
077    logDir = new Path(root, HConstants.HREGION_LOGDIR_NAME);
078    FS.mkdirs(logDir);
079    CommonFSUtils.setRootDir(CONF, root);
080    CommonFSUtils.setWALRootDir(CONF, root);
081  }
082
083  @Test
084  public void testDumpReplication() throws Exception {
085    String peerId = "1";
086    String serverNameStr = "rs1,12345,123";
087    addPeer(peerId, "hbase");
088    ServerName serverName = ServerName.valueOf(serverNameStr);
089    String walName = "rs1%2C12345%2C123.10";
090    Path walPath = new Path(logDir, serverNameStr + "/" + walName);
091    FS.createNewFile(walPath);
092
093    ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId);
094    ReplicationQueueStorage queueStorage =
095      ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), CONF);
096    queueStorage.setOffset(queueId, "wal-group",
097      new ReplicationGroupOffset(FS.listStatus(walPath)[0].getPath().toString(), 123),
098      Collections.emptyMap());
099
100    DumpReplicationQueues dumpQueues = new DumpReplicationQueues();
101    Set<String> peerIds = new HashSet<>();
102    peerIds.add(peerId);
103    List<String> wals = new ArrayList<>();
104    wals.add("rs1%2C12345%2C123.12");
105    wals.add("rs1%2C12345%2C123.15");
106    wals.add("rs1%2C12345%2C123.11");
107    for (String wal : wals) {
108      Path wPath = new Path(logDir, serverNameStr + "/" + wal);
109      FS.createNewFile(wPath);
110    }
111
112    String dump = dumpQueues.dumpQueues(UTIL.getConnection(), peerIds, false, CONF);
113    assertTrue(dump.indexOf("Queue id: 1-rs1,12345,123") > 0);
114    assertTrue(dump.indexOf("Number of WALs in replication queue: 4") > 0);
115    // test for 'Returns wal sorted'
116    String[] parsedDump = dump.split("Replication position for");
117    assertTrue(parsedDump[1].indexOf("rs1%2C12345%2C123.10: 123") >= 0,
118      "First wal should be rs1%2C12345%2C123.10: 123, but got: " + parsedDump[1]);
119    assertTrue(
120      parsedDump[2].indexOf("rs1%2C12345%2C123.11: 0 (not started or nothing to replicate)") >= 0,
121      "Second wal should be rs1%2C12345%2C123.11: 0, but got: " + parsedDump[2]);
122    assertTrue(
123      parsedDump[3].indexOf("rs1%2C12345%2C123.12: 0 (not started or nothing to replicate)") >= 0,
124      "Third wal should be rs1%2C12345%2C123.12: 0, but got: " + parsedDump[3]);
125    assertTrue(
126      parsedDump[4].indexOf("rs1%2C12345%2C123.15: 0 (not started or nothing to replicate)") >= 0,
127      "Fourth wal should be rs1%2C12345%2C123.15: 0, but got: " + parsedDump[4]);
128
129    Path file1 = new Path("testHFile1");
130    Path file2 = new Path("testHFile2");
131    List<Pair<Path, Path>> files = new ArrayList<>(1);
132    files.add(new Pair<>(null, file1));
133    files.add(new Pair<>(null, file2));
134    queueStorage.addHFileRefs(peerId, files);
135    // test for 'Dump Replication via replication table'
136    String dump2 = dumpQueues.dumpReplicationViaTable(UTIL.getConnection(), CONF);
137    assertTrue(dump2.indexOf("peers/1/peer-state: ENABLED") > 0);
138    assertTrue(dump2.indexOf("rs1,12345,123/rs1%2C12345%2C123.10: 123") >= 0);
139    assertTrue(dump2.indexOf("hfile-refs/1/testHFile1,testHFile2") >= 0);
140  }
141
142  /**
143   * Add a peer
144   */
145  private void addPeer(String peerId, String clusterKey) throws IOException {
146    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
147      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey)
148      .setReplicationEndpointImpl(
149        TestReplicationSourceManager.ReplicationEndpointForTest.class.getName());
150    UTIL.getAdmin().addReplicationPeer(peerId, builder.build(), true);
151  }
152
153  @AfterEach
154  public void tearDown() throws Exception {
155    UTIL.shutdownMiniCluster();
156  }
157}