001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.empty; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertTrue; 027 028import java.io.IOException; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033import java.util.concurrent.ThreadLocalRandom; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseZKTestingUtil; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator; 039import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; 040import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.apache.hadoop.hbase.testclassification.ReplicationTests; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.hbase.util.MD5Hash; 045import org.apache.hadoop.hbase.util.Pair; 046import org.apache.hadoop.hbase.zookeeper.ZKUtil; 047import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 048import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 049import org.apache.zookeeper.KeeperException; 050import org.junit.After; 051import org.junit.AfterClass; 052import org.junit.Before; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059 060import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 061import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 062import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 063import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 064 065@Category({ ReplicationTests.class, MediumTests.class }) 066public class TestZKReplicationQueueStorage { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class); 071 072 private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil(); 073 074 private ZKWatcher zk; 075 076 private ZKReplicationQueueStorageForMigration storage; 077 078 @Rule 079 public final TestName name = new TestName(); 080 081 @BeforeClass 082 public static void setUpBeforeClass() throws Exception { 083 UTIL.startMiniZKCluster(); 084 } 085 086 @AfterClass 087 public static void tearDownAfterClass() throws IOException { 088 UTIL.shutdownMiniZKCluster(); 089 } 090 091 @Before 092 public void setUp() throws IOException { 093 Configuration conf = UTIL.getConfiguration(); 094 conf.set(ZKReplicationStorageBase.REPLICATION_ZNODE, name.getMethodName()); 095 zk = new ZKWatcher(conf, name.getMethodName(), null); 096 storage = new ZKReplicationQueueStorageForMigration(zk, conf); 097 } 098 099 @After 100 public void tearDown() throws Exception { 101 ZKUtil.deleteNodeRecursively(zk, storage.replicationZNode); 102 Closeables.close(zk, true); 103 } 104 105 public static void mockQueuesData(ZKReplicationQueueStorageForMigration storage, int nServers, 106 String peerId, ServerName deadServer) throws KeeperException { 107 ZKWatcher zk = storage.zookeeper; 108 for (int i = 0; i < nServers; i++) { 109 ServerName sn = 110 ServerName.valueOf("test-hbase-" + i, 12345, EnvironmentEdgeManager.currentTime()); 111 String rsZNode = ZNodePaths.joinZNode(storage.getQueuesZNode(), sn.toString()); 112 String peerZNode = ZNodePaths.joinZNode(rsZNode, peerId); 113 ZKUtil.createWithParents(zk, peerZNode); 114 for (int j = 0; j < i; j++) { 115 String wal = ZNodePaths.joinZNode(peerZNode, sn.toString() + "." + j); 116 ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j)); 117 } 118 String deadServerPeerZNode = ZNodePaths.joinZNode(rsZNode, peerId + "-" + deadServer); 119 ZKUtil.createWithParents(zk, deadServerPeerZNode); 120 for (int j = 0; j < i; j++) { 121 String wal = ZNodePaths.joinZNode(deadServerPeerZNode, deadServer.toString() + "." + j); 122 if (j > 0) { 123 ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j)); 124 } else { 125 ZKUtil.createWithParents(zk, wal); 126 } 127 } 128 if (i % 2 == 0) { 129 // add a region_replica_replication znode 130 ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(rsZNode, 131 ZKReplicationQueueStorageForMigration.REGION_REPLICA_REPLICATION_PEER)); 132 } 133 } 134 ZKUtil.createWithParents(zk, 135 ZNodePaths.joinZNode(storage.getQueuesZNode(), deadServer.toString())); 136 } 137 138 private static String getLastPushedSeqIdZNode(String regionsZNode, String encodedName, 139 String peerId) { 140 return ZNodePaths.joinZNode(regionsZNode, encodedName.substring(0, 2), 141 encodedName.substring(2, 4), encodedName.substring(4) + "-" + peerId); 142 } 143 144 public static Map<String, Set<String>> mockLastPushedSeqIds( 145 ZKReplicationQueueStorageForMigration storage, String peerId1, String peerId2, int nRegions, 146 int emptyLevel1Count, int emptyLevel2Count) throws KeeperException { 147 ZKWatcher zk = storage.zookeeper; 148 Map<String, Set<String>> name2PeerIds = new HashMap<>(); 149 byte[] bytes = new byte[32]; 150 for (int i = 0; i < nRegions; i++) { 151 ThreadLocalRandom.current().nextBytes(bytes); 152 String encodeName = MD5Hash.getMD5AsHex(bytes); 153 String znode1 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId1); 154 ZKUtil.createSetData(zk, znode1, ZKUtil.positionToByteArray(1)); 155 String znode2 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId2); 156 ZKUtil.createSetData(zk, znode2, ZKUtil.positionToByteArray(2)); 157 name2PeerIds.put(encodeName, Sets.newHashSet(peerId1, peerId2)); 158 } 159 int addedEmptyZNodes = 0; 160 for (int i = 0; i < 256; i++) { 161 String level1ZNode = 162 ZNodePaths.joinZNode(storage.getRegionsZNode(), String.format("%02x", i)); 163 if (ZKUtil.checkExists(zk, level1ZNode) == -1) { 164 ZKUtil.createWithParents(zk, level1ZNode); 165 addedEmptyZNodes++; 166 if (addedEmptyZNodes <= emptyLevel2Count) { 167 ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(level1ZNode, "ab")); 168 } 169 if (addedEmptyZNodes >= emptyLevel1Count + emptyLevel2Count) { 170 break; 171 } 172 } 173 } 174 return name2PeerIds; 175 } 176 177 public static void mockHFileRefs(ZKReplicationQueueStorageForMigration storage, int nPeers) 178 throws KeeperException { 179 ZKWatcher zk = storage.zookeeper; 180 for (int i = 0; i < nPeers; i++) { 181 String peerId = "peer_" + i; 182 ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId)); 183 for (int j = 0; j < i; j++) { 184 ZKUtil.createWithParents(zk, 185 ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId, "hfile-" + j)); 186 } 187 } 188 } 189 190 @Test 191 public void testDeleteAllData() throws Exception { 192 assertFalse(storage.hasData()); 193 ZKUtil.createWithParents(zk, storage.getQueuesZNode()); 194 assertTrue(storage.hasData()); 195 storage.deleteAllData(); 196 assertFalse(storage.hasData()); 197 } 198 199 @Test 200 public void testEmptyIter() throws Exception { 201 ZKUtil.createWithParents(zk, storage.getQueuesZNode()); 202 ZKUtil.createWithParents(zk, storage.getRegionsZNode()); 203 ZKUtil.createWithParents(zk, storage.getHfileRefsZNode()); 204 assertNull(storage.listAllQueues().next()); 205 assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode())); 206 assertNull(storage.listAllLastPushedSeqIds().next()); 207 assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode())); 208 assertNull(storage.listAllHFileRefs().next()); 209 assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode())); 210 } 211 212 @Test 213 public void testListAllQueues() throws Exception { 214 String peerId = "1"; 215 ServerName deadServer = 216 ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime()); 217 int nServers = 10; 218 mockQueuesData(storage, nServers, peerId, deadServer); 219 MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter = 220 storage.listAllQueues(); 221 ServerName previousServerName = null; 222 for (int i = 0; i < nServers + 1; i++) { 223 Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next(); 224 assertNotNull(pair); 225 if (previousServerName != null) { 226 int index = previousServerName.equals(deadServer) 227 ? -1 228 : Integer 229 .parseInt(Iterables.getLast(Splitter.on('-').split(previousServerName.getHostname()))); 230 if (index % 2 == 0) { 231 List<String> children = ZKUtil.listChildrenNoWatch(zk, 232 ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString())); 233 assertEquals(1, children.size()); 234 assertEquals(ZKReplicationQueueStorageForMigration.REGION_REPLICA_REPLICATION_PEER, 235 children.get(0)); 236 } else { 237 assertEquals(-1, ZKUtil.checkExists(zk, 238 ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString()))); 239 } 240 } 241 ServerName sn = pair.getFirst(); 242 previousServerName = sn; 243 if (sn.equals(deadServer)) { 244 assertThat(pair.getSecond(), empty()); 245 } else { 246 assertEquals(2, pair.getSecond().size()); 247 int n = Integer.parseInt(Iterables.getLast(Splitter.on('-').split(sn.getHostname()))); 248 ZkReplicationQueueData data0 = pair.getSecond().get(0); 249 assertEquals(peerId, data0.getQueueId().getPeerId()); 250 assertEquals(sn, data0.getQueueId().getServerName()); 251 assertEquals(n, data0.getWalOffsets().size()); 252 for (int j = 0; j < n; j++) { 253 assertEquals(j, 254 data0.getWalOffsets().get( 255 (data0.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j) 256 .intValue()); 257 } 258 ZkReplicationQueueData data1 = pair.getSecond().get(1); 259 assertEquals(peerId, data1.getQueueId().getPeerId()); 260 assertEquals(sn, data1.getQueueId().getServerName()); 261 assertEquals(n, data1.getWalOffsets().size()); 262 for (int j = 0; j < n; j++) { 263 assertEquals(j, 264 data1.getWalOffsets().get( 265 (data1.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j) 266 .intValue()); 267 } 268 // the order of the returned result is undetermined 269 if (data0.getQueueId().getSourceServerName().isPresent()) { 270 assertEquals(deadServer, data0.getQueueId().getSourceServerName().get()); 271 assertFalse(data1.getQueueId().getSourceServerName().isPresent()); 272 } else { 273 assertEquals(deadServer, data1.getQueueId().getSourceServerName().get()); 274 } 275 } 276 } 277 assertNull(iter.next()); 278 assertEquals(nServers / 2, ZKUtil.listChildrenNoWatch(zk, storage.getQueuesZNode()).size()); 279 } 280 281 @Test 282 public void testListAllLastPushedSeqIds() throws Exception { 283 String peerId1 = "1"; 284 String peerId2 = "2"; 285 Map<String, Set<String>> name2PeerIds = 286 mockLastPushedSeqIds(storage, peerId1, peerId2, 100, 10, 10); 287 MigrationIterator<List<ZkLastPushedSeqId>> iter = storage.listAllLastPushedSeqIds(); 288 int emptyListCount = 0; 289 for (;;) { 290 List<ZkLastPushedSeqId> list = iter.next(); 291 if (list == null) { 292 break; 293 } 294 if (list.isEmpty()) { 295 emptyListCount++; 296 continue; 297 } 298 for (ZkLastPushedSeqId seqId : list) { 299 name2PeerIds.get(seqId.getEncodedRegionName()).remove(seqId.getPeerId()); 300 if (seqId.getPeerId().equals(peerId1)) { 301 assertEquals(1, seqId.getLastPushedSeqId()); 302 } else { 303 assertEquals(2, seqId.getLastPushedSeqId()); 304 } 305 } 306 } 307 assertEquals(10, emptyListCount); 308 name2PeerIds.forEach((encodedRegionName, peerIds) -> { 309 assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty()); 310 }); 311 assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode())); 312 } 313 314 @Test 315 public void testListAllHFileRefs() throws Exception { 316 int nPeers = 10; 317 mockHFileRefs(storage, nPeers); 318 MigrationIterator<Pair<String, List<String>>> iter = storage.listAllHFileRefs(); 319 String previousPeerId = null; 320 for (int i = 0; i < nPeers; i++) { 321 Pair<String, List<String>> pair = iter.next(); 322 if (previousPeerId != null) { 323 assertEquals(-1, ZKUtil.checkExists(zk, 324 ZNodePaths.joinZNode(storage.getHfileRefsZNode(), previousPeerId))); 325 } 326 String peerId = pair.getFirst(); 327 previousPeerId = peerId; 328 int index = Integer.parseInt(Iterables.getLast(Splitter.on('_').split(peerId))); 329 assertEquals(index, pair.getSecond().size()); 330 } 331 assertNull(iter.next()); 332 assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode())); 333 } 334}