001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.hasItem; 022import static org.hamcrest.Matchers.hasSize; 023 024import java.io.IOException; 025import java.util.List; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseTestingUtil; 028import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 029import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 030import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility; 031import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 032import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 033import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 034import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 035import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; 036import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase; 037import org.apache.hadoop.hbase.testclassification.MasterTests; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.zookeeper.ZKUtil; 040import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 041import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 042import org.hamcrest.Matchers; 043import org.junit.jupiter.api.AfterAll; 044import org.junit.jupiter.api.AfterEach; 045import org.junit.jupiter.api.BeforeAll; 046import org.junit.jupiter.api.BeforeEach; 047import org.junit.jupiter.api.Tag; 048import org.junit.jupiter.api.Test; 049 050@Tag(MasterTests.TAG) 051@Tag(MediumTests.TAG) 052public class TestMigrateReplicationQueueFromZkToTableProcedureRecovery { 053 054 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 055 056 @BeforeAll 057 public static void setupCluster() throws Exception { 058 UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); 059 UTIL.startMiniCluster(1); 060 } 061 062 @AfterAll 063 public static void cleanupTest() throws Exception { 064 UTIL.shutdownMiniCluster(); 065 } 066 067 private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { 068 return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); 069 } 070 071 @BeforeEach 072 public void setup() throws Exception { 073 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false); 074 } 075 076 @AfterEach 077 public void tearDown() throws Exception { 078 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false); 079 } 080 081 private String getHFileRefsZNode() throws IOException { 082 Configuration conf = UTIL.getConfiguration(); 083 ZKWatcher zk = UTIL.getZooKeeperWatcher(); 084 String replicationZNode = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, 085 conf.get(ZKReplicationStorageBase.REPLICATION_ZNODE, 086 ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT)); 087 return ZNodePaths.joinZNode(replicationZNode, 088 conf.get(ZKReplicationQueueStorageForMigration.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, 089 ZKReplicationQueueStorageForMigration.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT)); 090 } 091 092 @Test 093 public void testRecoveryAndDoubleExecution() throws Exception { 094 String peerId = "2"; 095 ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() 096 .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase") 097 .setReplicateAllUserTables(true).build(); 098 UTIL.getAdmin().addReplicationPeer(peerId, rpc); 099 100 // here we only test a simple migration, more complicated migration will be tested in other UTs, 101 // such as TestMigrateReplicationQueue and TestReplicationPeerManagerMigrateFromZk 102 String hfileRefsZNode = getHFileRefsZNode(); 103 String hfile = "hfile"; 104 String hfileZNode = ZNodePaths.joinZNode(hfileRefsZNode, peerId, hfile); 105 ZKUtil.createWithParents(UTIL.getZooKeeperWatcher(), hfileZNode); 106 107 ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); 108 109 ProcedureTestingUtility.waitNoProcedureRunning(procExec); 110 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); 111 112 // Start the migration procedure && kill the executor 113 long procId = procExec.submitProcedure(new MigrateReplicationQueueFromZkToTableProcedure()); 114 // Restart the executor and execute the step twice 115 MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId); 116 // Validate the migration result 117 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 118 ReplicationQueueStorage queueStorage = 119 UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); 120 List<String> hfiles = queueStorage.getReplicableHFiles(peerId); 121 assertThat(hfiles, Matchers.<List<String>> both(hasItem(hfile)).and(hasSize(1))); 122 } 123}