001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.empty; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertNull; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.verify; 027 028import java.io.IOException; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ConcurrentMap; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.Executors; 037import java.util.concurrent.TimeUnit; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.TableNameTestExtension; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager.ReplicationQueueStorageInitializer; 048import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 049import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 050import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 051import org.apache.hadoop.hbase.replication.ReplicationQueueData; 052import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 053import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 054import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage; 055import org.apache.hadoop.hbase.replication.TestZKReplicationQueueStorage; 056import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; 057import org.apache.hadoop.hbase.testclassification.MasterTests; 058import org.apache.hadoop.hbase.testclassification.MediumTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 061import org.junit.jupiter.api.AfterAll; 062import org.junit.jupiter.api.BeforeAll; 063import org.junit.jupiter.api.BeforeEach; 064import org.junit.jupiter.api.Tag; 065import org.junit.jupiter.api.Test; 066import org.junit.jupiter.api.extension.RegisterExtension; 067 068import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 069 070@Tag(MasterTests.TAG) 071@Tag(MediumTests.TAG) 072public class TestReplicationPeerManagerMigrateQueuesFromZk { 073 074 private static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 075 076 private static ExecutorService EXECUTOR; 077 078 ConcurrentMap<String, ReplicationPeerDescription> peers; 079 080 private ReplicationPeerStorage peerStorage; 081 082 private ReplicationQueueStorage queueStorage; 083 084 private ReplicationQueueStorageInitializer queueStorageInitializer; 085 086 private ReplicationPeerManager manager; 087 088 private int nServers = 10; 089 090 private int nPeers = 10; 091 092 private int nRegions = 100; 093 094 private ServerName deadServerName; 095 096 @RegisterExtension 097 public final TableNameTestExtension tableNameExtension = new TableNameTestExtension(); 098 099 @BeforeAll 100 public static void setUpBeforeClass() throws Exception { 101 UTIL.startMiniCluster(1); 102 EXECUTOR = Executors.newFixedThreadPool(3, 103 new ThreadFactoryBuilder().setDaemon(true) 104 .setNameFormat(TestReplicationPeerManagerMigrateQueuesFromZk.class.getSimpleName() + "-%d") 105 .build()); 106 } 107 108 @AfterAll 109 public static void tearDownAfterClass() throws Exception { 110 EXECUTOR.shutdownNow(); 111 UTIL.shutdownMiniCluster(); 112 } 113 114 @BeforeEach 115 public void setUp() throws IOException { 116 Configuration conf = UTIL.getConfiguration(); 117 peerStorage = mock(ReplicationPeerStorage.class); 118 TableName tableName = tableNameExtension.getTableName(); 119 UTIL.getAdmin() 120 .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName)); 121 queueStorage = new TableReplicationQueueStorage(UTIL.getConnection(), tableName); 122 queueStorageInitializer = mock(ReplicationQueueStorageInitializer.class); 123 peers = new ConcurrentHashMap<>(); 124 deadServerName = 125 ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime()); 126 manager = new ReplicationPeerManager(UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(), 127 peerStorage, queueStorage, peers, conf, "cluster", queueStorageInitializer); 128 } 129 130 private Map<String, Set<String>> prepareData() throws Exception { 131 ZKReplicationQueueStorageForMigration storage = new ZKReplicationQueueStorageForMigration( 132 UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); 133 TestZKReplicationQueueStorage.mockQueuesData(storage, 10, "peer_0", deadServerName); 134 Map<String, Set<String>> encodedName2PeerIds = TestZKReplicationQueueStorage 135 .mockLastPushedSeqIds(storage, "peer_1", "peer_2", nRegions, 10, 10); 136 TestZKReplicationQueueStorage.mockHFileRefs(storage, 10); 137 return encodedName2PeerIds; 138 } 139 140 @Test 141 public void testNoPeers() throws Exception { 142 prepareData(); 143 manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES); 144 // should have called initializer 145 verify(queueStorageInitializer).initialize(); 146 // should have not migrated any data since there is no peer 147 try (Table table = UTIL.getConnection().getTable(tableNameExtension.getTableName())) { 148 assertEquals(0, HBaseTestingUtil.countRows(table)); 149 } 150 } 151 152 @Test 153 public void testMigrate() throws Exception { 154 Map<String, Set<String>> encodedName2PeerIds = prepareData(); 155 // add all peers so we will migrate them all 156 for (int i = 0; i < nPeers; i++) { 157 // value is not used in this test, so just add a mock 158 peers.put("peer_" + i, mock(ReplicationPeerDescription.class)); 159 } 160 manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES); 161 // should have called initializer 162 verify(queueStorageInitializer).initialize(); 163 List<ReplicationQueueData> queueDatas = queueStorage.listAllQueues(); 164 // there should be two empty queues so minus 2 165 assertEquals(2 * nServers - 2, queueDatas.size()); 166 for (ReplicationQueueData queueData : queueDatas) { 167 assertEquals("peer_0", queueData.getId().getPeerId()); 168 assertEquals(1, queueData.getOffsets().size()); 169 String walGroup = queueData.getId().getServerWALsBelongTo().toString(); 170 ReplicationGroupOffset offset = queueData.getOffsets().get(walGroup); 171 assertEquals(0, offset.getOffset()); 172 assertEquals(queueData.getId().getServerWALsBelongTo().toString() + ".0", offset.getWal()); 173 } 174 // there is no method in ReplicationQueueStorage can list all the last pushed sequence ids 175 try (Table table = UTIL.getConnection().getTable(tableNameExtension.getTableName()); 176 ResultScanner scanner = 177 table.getScanner(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY)) { 178 for (int i = 0; i < 2; i++) { 179 Result result = scanner.next(); 180 String peerId = Bytes.toString(result.getRow()); 181 assertEquals(nRegions, result.size()); 182 for (Cell cell : result.rawCells()) { 183 String encodedRegionName = Bytes.toString(cell.getQualifierArray(), 184 cell.getQualifierOffset(), cell.getQualifierLength()); 185 encodedName2PeerIds.get(encodedRegionName).remove(peerId); 186 long seqId = 187 Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); 188 assertEquals(i + 1, seqId); 189 } 190 } 191 encodedName2PeerIds.forEach((encodedRegionName, peerIds) -> { 192 assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty()); 193 }); 194 assertNull(scanner.next()); 195 } 196 for (int i = 0; i < nPeers; i++) { 197 List<String> refs = queueStorage.getReplicableHFiles("peer_" + i); 198 assertEquals(i, refs.size()); 199 Set<String> refsSet = new HashSet<>(refs); 200 for (int j = 0; j < i; j++) { 201 assertTrue(refsSet.remove("hfile-" + j)); 202 } 203 assertThat(refsSet, empty()); 204 } 205 } 206}