001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.List;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
031import org.apache.hadoop.hbase.procedure2.Procedure;
032import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
033import org.apache.hadoop.hbase.replication.ReplicationQueueData;
034import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
035import org.apache.hadoop.hbase.replication.TestReplicationBase;
036import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase;
037import org.apache.hadoop.hbase.testclassification.LargeTests;
038import org.apache.hadoop.hbase.testclassification.MasterTests;
039import org.apache.hadoop.hbase.zookeeper.ZKUtil;
040import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
041import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
042import org.junit.ClassRule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045
046import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
047
048@Category({ MasterTests.class, LargeTests.class })
049public class TestMigrateReplicationQueue extends TestReplicationBase {
050
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053    HBaseClassTestRule.forClass(TestMigrateReplicationQueue.class);
054
055  private int disableAndInsert() throws Exception {
056    UTIL1.getAdmin().disableReplicationPeer(PEER_ID2);
057    return UTIL1.loadTable(htable1, famName);
058  }
059
060  private String getQueuesZNode() throws IOException {
061    Configuration conf = UTIL1.getConfiguration();
062    ZKWatcher zk = UTIL1.getZooKeeperWatcher();
063    String replicationZNode = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode,
064      conf.get(ZKReplicationStorageBase.REPLICATION_ZNODE,
065        ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT));
066    return ZNodePaths.joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs"));
067  }
068
069  private void mockData() throws Exception {
070    // delete the replication queue table to simulate upgrading from an older version of hbase
071    TableName replicationQueueTableName = TableName
072      .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
073        ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
074    List<ReplicationQueueData> queueDatas = UTIL1.getMiniHBaseCluster().getMaster()
075      .getReplicationPeerManager().getQueueStorage().listAllQueues();
076    assertEquals(UTIL1.getMiniHBaseCluster().getRegionServerThreads().size(), queueDatas.size());
077    UTIL1.getAdmin().disableTable(replicationQueueTableName);
078    UTIL1.getAdmin().deleteTable(replicationQueueTableName);
079    // shutdown the hbase cluster
080    UTIL1.shutdownMiniHBaseCluster();
081    ZKWatcher zk = UTIL1.getZooKeeperWatcher();
082    String queuesZNode = getQueuesZNode();
083    for (ReplicationQueueData queueData : queueDatas) {
084      String replicatorZNode =
085        ZNodePaths.joinZNode(queuesZNode, queueData.getId().getServerName().toString());
086      String queueZNode = ZNodePaths.joinZNode(replicatorZNode, queueData.getId().getPeerId());
087      assertEquals(1, queueData.getOffsets().size());
088      ReplicationGroupOffset offset = Iterables.getOnlyElement(queueData.getOffsets().values());
089      String walZNode = ZNodePaths.joinZNode(queueZNode, offset.getWal());
090      ZKUtil.createSetData(zk, walZNode, ZKUtil.positionToByteArray(offset.getOffset()));
091    }
092  }
093
094  @Test
095  public void testMigrate() throws Exception {
096    int count = disableAndInsert();
097    mockData();
098    restartSourceCluster(1);
099    UTIL1.waitFor(60000,
100      () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
101        .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure).findAny()
102        .map(Procedure::isSuccess).orElse(false));
103    TableName replicationQueueTableName = TableName
104      .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
105        ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
106    assertTrue(UTIL1.getAdmin().tableExists(replicationQueueTableName));
107    ZKWatcher zk = UTIL1.getZooKeeperWatcher();
108    assertEquals(-1, ZKUtil.checkExists(zk, getQueuesZNode()));
109    // wait until SCP finishes, which means we can finish the claim queue operation
110    UTIL1.waitFor(60000, () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
111      .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
112    List<ReplicationQueueData> queueDatas = UTIL1.getMiniHBaseCluster().getMaster()
113      .getReplicationPeerManager().getQueueStorage().listAllQueues();
114    assertEquals(1, queueDatas.size());
115    // should have 1 recovered queue, as we haven't replicated anything out so there is no queue
116    // data for the new alive region server
117    assertTrue(queueDatas.get(0).getId().isRecovered());
118    assertEquals(1, queueDatas.get(0).getOffsets().size());
119    // the peer is still disabled, so no data has been replicated
120    assertFalse(UTIL1.getAdmin().isReplicationPeerEnabled(PEER_ID2));
121    assertEquals(0, HBaseTestingUtil.countRows(htable2));
122    // enable peer, and make sure the replication can continue correctly
123    UTIL1.getAdmin().enableReplicationPeer(PEER_ID2);
124    waitForReplication(count, 100);
125  }
126}