001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.empty; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertFalse; 024import static org.junit.jupiter.api.Assertions.assertNotNull; 025import static org.junit.jupiter.api.Assertions.assertNull; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027 028import java.io.IOException; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033import java.util.concurrent.ThreadLocalRandom; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HBaseZKTestingUtil; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator; 038import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; 039import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData; 040import org.apache.hadoop.hbase.testclassification.MediumTests; 041import org.apache.hadoop.hbase.testclassification.ReplicationTests; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 043import org.apache.hadoop.hbase.util.MD5Hash; 044import org.apache.hadoop.hbase.util.Pair; 045import org.apache.hadoop.hbase.zookeeper.ZKUtil; 046import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 047import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 048import org.apache.zookeeper.KeeperException; 049import org.junit.jupiter.api.AfterAll; 050import org.junit.jupiter.api.AfterEach; 051import org.junit.jupiter.api.BeforeAll; 052import org.junit.jupiter.api.BeforeEach; 053import org.junit.jupiter.api.Tag; 054import org.junit.jupiter.api.Test; 055import org.junit.jupiter.api.TestInfo; 056 057import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 058import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 059import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 060import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 061 062@Tag(ReplicationTests.TAG) 063@Tag(MediumTests.TAG) 064public class TestZKReplicationQueueStorage { 065 066 private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil(); 067 068 private ZKWatcher zk; 069 070 private ZKReplicationQueueStorageForMigration storage; 071 072 @BeforeAll 073 public static void setUpBeforeClass() throws Exception { 074 UTIL.startMiniZKCluster(); 075 } 076 077 @AfterAll 078 public static void tearDownAfterClass() throws IOException { 079 UTIL.shutdownMiniZKCluster(); 080 } 081 082 @BeforeEach 083 public void setUp(TestInfo testInfo) throws IOException { 084 String methodName = testInfo.getTestMethod().get().getName(); 085 Configuration conf = UTIL.getConfiguration(); 086 conf.set(ZKReplicationStorageBase.REPLICATION_ZNODE, methodName); 087 zk = new ZKWatcher(conf, methodName, null); 088 storage = new ZKReplicationQueueStorageForMigration(zk, conf); 089 } 090 091 @AfterEach 092 public void tearDown() throws Exception { 093 ZKUtil.deleteNodeRecursively(zk, storage.replicationZNode); 094 Closeables.close(zk, true); 095 } 096 097 public static void mockQueuesData(ZKReplicationQueueStorageForMigration storage, int nServers, 098 String peerId, ServerName deadServer) throws KeeperException { 099 ZKWatcher zk = storage.zookeeper; 100 for (int i = 0; i < nServers; i++) { 101 ServerName sn = 102 ServerName.valueOf("test-hbase-" + i, 12345, EnvironmentEdgeManager.currentTime()); 103 String rsZNode = ZNodePaths.joinZNode(storage.getQueuesZNode(), sn.toString()); 104 String peerZNode = ZNodePaths.joinZNode(rsZNode, peerId); 105 ZKUtil.createWithParents(zk, peerZNode); 106 for (int j = 0; j < i; j++) { 107 String wal = ZNodePaths.joinZNode(peerZNode, sn.toString() + "." + j); 108 ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j)); 109 } 110 String deadServerPeerZNode = ZNodePaths.joinZNode(rsZNode, peerId + "-" + deadServer); 111 ZKUtil.createWithParents(zk, deadServerPeerZNode); 112 for (int j = 0; j < i; j++) { 113 String wal = ZNodePaths.joinZNode(deadServerPeerZNode, deadServer.toString() + "." + j); 114 if (j > 0) { 115 ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j)); 116 } else { 117 ZKUtil.createWithParents(zk, wal); 118 } 119 } 120 if (i % 2 == 0) { 121 // add a region_replica_replication znode 122 ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(rsZNode, 123 ZKReplicationQueueStorageForMigration.REGION_REPLICA_REPLICATION_PEER)); 124 } 125 } 126 ZKUtil.createWithParents(zk, 127 ZNodePaths.joinZNode(storage.getQueuesZNode(), deadServer.toString())); 128 } 129 130 private static String getLastPushedSeqIdZNode(String regionsZNode, String encodedName, 131 String peerId) { 132 return ZNodePaths.joinZNode(regionsZNode, encodedName.substring(0, 2), 133 encodedName.substring(2, 4), encodedName.substring(4) + "-" + peerId); 134 } 135 136 public static Map<String, Set<String>> mockLastPushedSeqIds( 137 ZKReplicationQueueStorageForMigration storage, String peerId1, String peerId2, int nRegions, 138 int emptyLevel1Count, int emptyLevel2Count) throws KeeperException { 139 ZKWatcher zk = storage.zookeeper; 140 Map<String, Set<String>> name2PeerIds = new HashMap<>(); 141 byte[] bytes = new byte[32]; 142 for (int i = 0; i < nRegions; i++) { 143 ThreadLocalRandom.current().nextBytes(bytes); 144 String encodeName = MD5Hash.getMD5AsHex(bytes); 145 String znode1 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId1); 146 ZKUtil.createSetData(zk, znode1, ZKUtil.positionToByteArray(1)); 147 String znode2 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId2); 148 ZKUtil.createSetData(zk, znode2, ZKUtil.positionToByteArray(2)); 149 name2PeerIds.put(encodeName, Sets.newHashSet(peerId1, peerId2)); 150 } 151 int addedEmptyZNodes = 0; 152 for (int i = 0; i < 256; i++) { 153 String level1ZNode = 154 ZNodePaths.joinZNode(storage.getRegionsZNode(), String.format("%02x", i)); 155 if (ZKUtil.checkExists(zk, level1ZNode) == -1) { 156 ZKUtil.createWithParents(zk, level1ZNode); 157 addedEmptyZNodes++; 158 if (addedEmptyZNodes <= emptyLevel2Count) { 159 ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(level1ZNode, "ab")); 160 } 161 if (addedEmptyZNodes >= emptyLevel1Count + emptyLevel2Count) { 162 break; 163 } 164 } 165 } 166 return name2PeerIds; 167 } 168 169 public static void mockHFileRefs(ZKReplicationQueueStorageForMigration storage, int nPeers) 170 throws KeeperException { 171 ZKWatcher zk = storage.zookeeper; 172 for (int i = 0; i < nPeers; i++) { 173 String peerId = "peer_" + i; 174 ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId)); 175 for (int j = 0; j < i; j++) { 176 ZKUtil.createWithParents(zk, 177 ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId, "hfile-" + j)); 178 } 179 } 180 } 181 182 @Test 183 public void testDeleteAllData() throws Exception { 184 assertFalse(storage.hasData()); 185 ZKUtil.createWithParents(zk, storage.getQueuesZNode()); 186 assertTrue(storage.hasData()); 187 storage.deleteAllData(); 188 assertFalse(storage.hasData()); 189 } 190 191 @Test 192 public void testEmptyIter() throws Exception { 193 ZKUtil.createWithParents(zk, storage.getQueuesZNode()); 194 ZKUtil.createWithParents(zk, storage.getRegionsZNode()); 195 ZKUtil.createWithParents(zk, storage.getHfileRefsZNode()); 196 assertNull(storage.listAllQueues().next()); 197 assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode())); 198 assertNull(storage.listAllLastPushedSeqIds().next()); 199 assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode())); 200 assertNull(storage.listAllHFileRefs().next()); 201 assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode())); 202 } 203 204 @Test 205 public void testListAllQueues() throws Exception { 206 String peerId = "1"; 207 ServerName deadServer = 208 ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime()); 209 int nServers = 10; 210 mockQueuesData(storage, nServers, peerId, deadServer); 211 MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter = 212 storage.listAllQueues(); 213 ServerName previousServerName = null; 214 for (int i = 0; i < nServers + 1; i++) { 215 Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next(); 216 assertNotNull(pair); 217 if (previousServerName != null) { 218 int index = previousServerName.equals(deadServer) 219 ? -1 220 : Integer 221 .parseInt(Iterables.getLast(Splitter.on('-').split(previousServerName.getHostname()))); 222 if (index % 2 == 0) { 223 List<String> children = ZKUtil.listChildrenNoWatch(zk, 224 ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString())); 225 assertEquals(1, children.size()); 226 assertEquals(ZKReplicationQueueStorageForMigration.REGION_REPLICA_REPLICATION_PEER, 227 children.get(0)); 228 } else { 229 assertEquals(-1, ZKUtil.checkExists(zk, 230 ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString()))); 231 } 232 } 233 ServerName sn = pair.getFirst(); 234 previousServerName = sn; 235 if (sn.equals(deadServer)) { 236 assertThat(pair.getSecond(), empty()); 237 } else { 238 assertEquals(2, pair.getSecond().size()); 239 int n = Integer.parseInt(Iterables.getLast(Splitter.on('-').split(sn.getHostname()))); 240 ZkReplicationQueueData data0 = pair.getSecond().get(0); 241 assertEquals(peerId, data0.getQueueId().getPeerId()); 242 assertEquals(sn, data0.getQueueId().getServerName()); 243 assertEquals(n, data0.getWalOffsets().size()); 244 for (int j = 0; j < n; j++) { 245 assertEquals(j, 246 data0.getWalOffsets().get( 247 (data0.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j) 248 .intValue()); 249 } 250 ZkReplicationQueueData data1 = pair.getSecond().get(1); 251 assertEquals(peerId, data1.getQueueId().getPeerId()); 252 assertEquals(sn, data1.getQueueId().getServerName()); 253 assertEquals(n, data1.getWalOffsets().size()); 254 for (int j = 0; j < n; j++) { 255 assertEquals(j, 256 data1.getWalOffsets().get( 257 (data1.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j) 258 .intValue()); 259 } 260 // the order of the returned result is undetermined 261 if (data0.getQueueId().getSourceServerName().isPresent()) { 262 assertEquals(deadServer, data0.getQueueId().getSourceServerName().get()); 263 assertFalse(data1.getQueueId().getSourceServerName().isPresent()); 264 } else { 265 assertEquals(deadServer, data1.getQueueId().getSourceServerName().get()); 266 } 267 } 268 } 269 assertNull(iter.next()); 270 assertEquals(nServers / 2, ZKUtil.listChildrenNoWatch(zk, storage.getQueuesZNode()).size()); 271 } 272 273 @Test 274 public void testListAllLastPushedSeqIds() throws Exception { 275 String peerId1 = "1"; 276 String peerId2 = "2"; 277 Map<String, Set<String>> name2PeerIds = 278 mockLastPushedSeqIds(storage, peerId1, peerId2, 100, 10, 10); 279 MigrationIterator<List<ZkLastPushedSeqId>> iter = storage.listAllLastPushedSeqIds(); 280 int emptyListCount = 0; 281 for (;;) { 282 List<ZkLastPushedSeqId> list = iter.next(); 283 if (list == null) { 284 break; 285 } 286 if (list.isEmpty()) { 287 emptyListCount++; 288 continue; 289 } 290 for (ZkLastPushedSeqId seqId : list) { 291 name2PeerIds.get(seqId.getEncodedRegionName()).remove(seqId.getPeerId()); 292 if (seqId.getPeerId().equals(peerId1)) { 293 assertEquals(1, seqId.getLastPushedSeqId()); 294 } else { 295 assertEquals(2, seqId.getLastPushedSeqId()); 296 } 297 } 298 } 299 assertEquals(10, emptyListCount); 300 name2PeerIds.forEach((encodedRegionName, peerIds) -> { 301 assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty()); 302 }); 303 assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode())); 304 } 305 306 @Test 307 public void testListAllHFileRefs() throws Exception { 308 int nPeers = 10; 309 mockHFileRefs(storage, nPeers); 310 MigrationIterator<Pair<String, List<String>>> iter = storage.listAllHFileRefs(); 311 String previousPeerId = null; 312 for (int i = 0; i < nPeers; i++) { 313 Pair<String, List<String>> pair = iter.next(); 314 if (previousPeerId != null) { 315 assertEquals(-1, ZKUtil.checkExists(zk, 316 ZNodePaths.joinZNode(storage.getHfileRefsZNode(), previousPeerId))); 317 } 318 String peerId = pair.getFirst(); 319 previousPeerId = peerId; 320 int index = Integer.parseInt(Iterables.getLast(Splitter.on('_').split(peerId))); 321 assertEquals(index, pair.getSecond().size()); 322 } 323 assertNull(iter.next()); 324 assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode())); 325 } 326}