001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Set;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
037import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
038import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
039import org.apache.hadoop.hbase.replication.ReplicationQueueId;
040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
041import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
042import org.apache.hadoop.hbase.testclassification.ReplicationTests;
043import org.apache.hadoop.hbase.testclassification.SmallTests;
044import org.apache.hadoop.hbase.util.CommonFSUtils;
045import org.apache.hadoop.hbase.util.Pair;
046import org.junit.After;
047import org.junit.Before;
048import org.junit.ClassRule;
049import org.junit.Rule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.junit.rules.TestName;
053
054/**
055 * Tests for DumpReplicationQueues tool
056 */
057@Category({ ReplicationTests.class, SmallTests.class })
058public class TestDumpReplicationQueues {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestDumpReplicationQueues.class);
063
064  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
065  private static Configuration CONF;
066  private static FileSystem FS = null;
067  private Path root;
068  private Path logDir;
069  @Rule
070  public final TestName name = new TestName();
071
072  @Before
073  public void setup() throws Exception {
074    UTIL.startMiniCluster(3);
075    CONF = UTIL.getConfiguration();
076    TableName tableName = TableName.valueOf("replication_" + name.getMethodName());
077    UTIL.getAdmin()
078      .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName));
079    CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
080    FS = FileSystem.get(CONF);
081    root = UTIL.getDataTestDirOnTestFS("hbase");
082    logDir = new Path(root, HConstants.HREGION_LOGDIR_NAME);
083    FS.mkdirs(logDir);
084    CommonFSUtils.setRootDir(CONF, root);
085    CommonFSUtils.setWALRootDir(CONF, root);
086  }
087
088  @Test
089  public void testDumpReplication() throws Exception {
090    String peerId = "1";
091    String serverNameStr = "rs1,12345,123";
092    addPeer(peerId, "hbase");
093    ServerName serverName = ServerName.valueOf(serverNameStr);
094    String walName = "rs1%2C12345%2C123.10";
095    Path walPath = new Path(logDir, serverNameStr + "/" + walName);
096    FS.createNewFile(walPath);
097
098    ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId);
099    ReplicationQueueStorage queueStorage =
100      ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), CONF);
101    queueStorage.setOffset(queueId, "wal-group",
102      new ReplicationGroupOffset(FS.listStatus(walPath)[0].getPath().toString(), 123),
103      Collections.emptyMap());
104
105    DumpReplicationQueues dumpQueues = new DumpReplicationQueues();
106    Set<String> peerIds = new HashSet<>();
107    peerIds.add(peerId);
108    List<String> wals = new ArrayList<>();
109    wals.add("rs1%2C12345%2C123.12");
110    wals.add("rs1%2C12345%2C123.15");
111    wals.add("rs1%2C12345%2C123.11");
112    for (String wal : wals) {
113      Path wPath = new Path(logDir, serverNameStr + "/" + wal);
114      FS.createNewFile(wPath);
115    }
116
117    String dump = dumpQueues.dumpQueues(UTIL.getConnection(), peerIds, false, CONF);
118    assertTrue(dump.indexOf("Queue id: 1-rs1,12345,123") > 0);
119    assertTrue(dump.indexOf("Number of WALs in replication queue: 4") > 0);
120    // test for 'Returns wal sorted'
121    String[] parsedDump = dump.split("Replication position for");
122    assertTrue("First wal should be rs1%2C12345%2C123.10: 123, but got: " + parsedDump[1],
123      parsedDump[1].indexOf("rs1%2C12345%2C123.10: 123") >= 0);
124    assertTrue("Second wal should be rs1%2C12345%2C123.11: 0, but got: " + parsedDump[2],
125      parsedDump[2].indexOf("rs1%2C12345%2C123.11: 0 (not started or nothing to replicate)") >= 0);
126    assertTrue("Third wal should be rs1%2C12345%2C123.12: 0, but got: " + parsedDump[3],
127      parsedDump[3].indexOf("rs1%2C12345%2C123.12: 0 (not started or nothing to replicate)") >= 0);
128    assertTrue("Fourth wal should be rs1%2C12345%2C123.15: 0, but got: " + parsedDump[4],
129      parsedDump[4].indexOf("rs1%2C12345%2C123.15: 0 (not started or nothing to replicate)") >= 0);
130
131    Path file1 = new Path("testHFile1");
132    Path file2 = new Path("testHFile2");
133    List<Pair<Path, Path>> files = new ArrayList<>(1);
134    files.add(new Pair<>(null, file1));
135    files.add(new Pair<>(null, file2));
136    queueStorage.addHFileRefs(peerId, files);
137    // test for 'Dump Replication via replication table'
138    String dump2 = dumpQueues.dumpReplicationViaTable(UTIL.getConnection(), CONF);
139    assertTrue(dump2.indexOf("peers/1/peer-state: ENABLED") > 0);
140    assertTrue(dump2.indexOf("rs1,12345,123/rs1%2C12345%2C123.10: 123") >= 0);
141    assertTrue(dump2.indexOf("hfile-refs/1/testHFile1,testHFile2") >= 0);
142  }
143
144  /**
145   * Add a peer
146   */
147  private void addPeer(String peerId, String clusterKey) throws IOException {
148    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
149      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey)
150      .setReplicationEndpointImpl(
151        TestReplicationSourceManager.ReplicationEndpointForTest.class.getName());
152    UTIL.getAdmin().addReplicationPeer(peerId, builder.build(), true);
153  }
154
155  @After
156  public void tearDown() throws Exception {
157    UTIL.shutdownMiniCluster();
158  }
159}