001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.contains;
022import static org.hamcrest.Matchers.not;
023import static org.junit.jupiter.api.Assertions.assertEquals;
024import static org.junit.jupiter.api.Assertions.assertFalse;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026
027import java.io.IOException;
028import java.util.Collections;
029import java.util.List;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.master.HMaster;
035import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
036import org.apache.hadoop.hbase.procedure2.Procedure;
037import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
038import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
039import org.apache.hadoop.hbase.replication.ReplicationQueueData;
040import org.apache.hadoop.hbase.replication.ReplicationQueueId;
041import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
042import org.apache.hadoop.hbase.replication.ReplicationUtils;
043import org.apache.hadoop.hbase.replication.TestReplicationBase;
044import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.apache.hadoop.hbase.testclassification.MasterTests;
047import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
048import org.apache.hadoop.hbase.zookeeper.ZKUtil;
049import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
050import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
051import org.junit.jupiter.api.Tag;
052import org.junit.jupiter.api.Test;
053
054import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
055
056@Tag(MasterTests.TAG)
057@Tag(LargeTests.TAG)
058public class TestMigrateReplicationQueue extends TestReplicationBase {
059
060  private int disableAndInsert() throws Exception {
061    UTIL1.getAdmin().disableReplicationPeer(PEER_ID2);
062    return UTIL1.loadTable(htable1, famName);
063  }
064
065  private String getQueuesZNode() throws IOException {
066    Configuration conf = UTIL1.getConfiguration();
067    ZKWatcher zk = UTIL1.getZooKeeperWatcher();
068    String replicationZNode = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode,
069      conf.get(ZKReplicationStorageBase.REPLICATION_ZNODE,
070        ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT));
071    return ZNodePaths.joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs"));
072  }
073
074  private void mockData() throws Exception {
075    // fake a region_replica_replication peer and its queue data
076    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
077      .setClusterKey("127.0.0.1:2181:/hbase")
078      .setReplicationEndpointImpl(ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME).build();
079    HMaster master = UTIL1.getMiniHBaseCluster().getMaster();
080    master.getReplicationPeerManager()
081      .addPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER, peerConfig, true);
082    ServerName rsName = UTIL1.getMiniHBaseCluster().getRegionServer(0).getServerName();
083    master.getReplicationPeerManager().getQueueStorage().setOffset(
084      new ReplicationQueueId(rsName, ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER), "",
085      new ReplicationGroupOffset("test-wal-file", 0), Collections.emptyMap());
086
087    // delete the replication queue table to simulate upgrading from an older version of hbase
088    TableName replicationQueueTableName = TableName
089      .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
090        ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
091    List<ReplicationQueueData> queueDatas =
092      master.getReplicationPeerManager().getQueueStorage().listAllQueues();
093    // have an extra mocked queue data for region_replica_replication peer
094    assertEquals(UTIL1.getMiniHBaseCluster().getRegionServerThreads().size() + 1,
095      queueDatas.size());
096    UTIL1.getAdmin().disableTable(replicationQueueTableName);
097    UTIL1.getAdmin().deleteTable(replicationQueueTableName);
098    // shutdown the hbase cluster
099    UTIL1.shutdownMiniHBaseCluster();
100    ZKWatcher zk = UTIL1.getZooKeeperWatcher();
101    String queuesZNode = getQueuesZNode();
102    for (ReplicationQueueData queueData : queueDatas) {
103      String replicatorZNode =
104        ZNodePaths.joinZNode(queuesZNode, queueData.getId().getServerName().toString());
105      String queueZNode = ZNodePaths.joinZNode(replicatorZNode, queueData.getId().getPeerId());
106      assertEquals(1, queueData.getOffsets().size());
107      ReplicationGroupOffset offset = Iterables.getOnlyElement(queueData.getOffsets().values());
108      String walZNode = ZNodePaths.joinZNode(queueZNode, offset.getWal());
109      ZKUtil.createSetData(zk, walZNode, ZKUtil.positionToByteArray(offset.getOffset()));
110    }
111  }
112
113  @Test
114  public void testMigrate() throws Exception {
115    int count = disableAndInsert();
116    mockData();
117    restartSourceCluster(1);
118    UTIL1.waitFor(60000,
119      () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
120        .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure).findAny()
121        .map(Procedure::isSuccess).orElse(false));
122    TableName replicationQueueTableName = TableName
123      .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
124        ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
125    assertTrue(UTIL1.getAdmin().tableExists(replicationQueueTableName));
126    ZKWatcher zk = UTIL1.getZooKeeperWatcher();
127    assertEquals(-1, ZKUtil.checkExists(zk, getQueuesZNode()));
128    // wait until MigrateReplicationQueueFromZkToTableProcedure finishes
129    UTIL1.waitFor(15000, () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
130      .anyMatch(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure));
131    UTIL1.waitFor(60000,
132      () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
133        .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
134        .allMatch(Procedure::isSuccess));
135    // make sure the region_replica_replication peer is gone, and there is no data on zk
136    assertThat(UTIL1.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getPeerStorage()
137      .listPeerIds(), not(contains(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER)));
138    assertEquals(-1, ZKUtil.checkExists(zk, getQueuesZNode()));
139
140    // wait until SCP finishes, which means we can finish the claim queue operation
141    UTIL1.waitFor(60000, () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
142      .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
143
144    List<ReplicationQueueData> queueDatas = UTIL1.getMiniHBaseCluster().getMaster()
145      .getReplicationPeerManager().getQueueStorage().listAllQueues();
146    assertEquals(1, queueDatas.size());
147    // should have 1 recovered queue, as we haven't replicated anything out so there is no queue
148    // data for the new alive region server
149    assertTrue(queueDatas.get(0).getId().isRecovered());
150    assertEquals(1, queueDatas.get(0).getOffsets().size());
151    // the peer is still disabled, so no data has been replicated
152    assertFalse(UTIL1.getAdmin().isReplicationPeerEnabled(PEER_ID2));
153    assertEquals(0, HBaseTestingUtil.countRows(htable2));
154    // enable peer, and make sure the replication can continue correctly
155    UTIL1.getAdmin().enableReplicationPeer(PEER_ID2);
156    waitForReplication(count, 100);
157  }
158}