001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.CoreMatchers.hasItems; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.List; 029import java.util.Set; 030import java.util.SortedSet; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseZKTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.testclassification.MediumTests; 037import org.apache.hadoop.hbase.testclassification.ReplicationTests; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.MD5Hash; 040import org.apache.hadoop.hbase.util.Pair; 041import org.apache.hadoop.hbase.zookeeper.ZKUtil; 042import org.apache.zookeeper.KeeperException; 043import org.junit.After; 044import org.junit.AfterClass; 045import org.junit.BeforeClass; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049 050import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 051 052@Category({ ReplicationTests.class, MediumTests.class }) 053public class TestZKReplicationQueueStorage { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class); 058 059 private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); 060 061 private static ZKReplicationQueueStorage STORAGE; 062 063 @BeforeClass 064 public static void setUp() throws Exception { 065 UTIL.startMiniZKCluster(); 066 STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); 067 } 068 069 @AfterClass 070 public static void tearDown() throws IOException { 071 UTIL.shutdownMiniZKCluster(); 072 } 073 074 @After 075 public void tearDownAfterTest() throws ReplicationException, KeeperException, IOException { 076 for (ServerName serverName : STORAGE.getListOfReplicators()) { 077 for (String queue : STORAGE.getAllQueues(serverName)) { 078 STORAGE.removeQueue(serverName, queue); 079 } 080 STORAGE.removeReplicatorIfQueueIsEmpty(serverName); 081 } 082 for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) { 083 STORAGE.removePeerFromHFileRefs(peerId); 084 } 085 } 086 087 private ServerName getServerName(int i) { 088 return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i); 089 } 090 091 @Test 092 public void testReplicator() throws ReplicationException { 093 assertTrue(STORAGE.getListOfReplicators().isEmpty()); 094 String queueId = "1"; 095 for (int i = 0; i < 10; i++) { 096 STORAGE.addWAL(getServerName(i), queueId, "file" + i); 097 } 098 List<ServerName> replicators = STORAGE.getListOfReplicators(); 099 assertEquals(10, replicators.size()); 100 for (int i = 0; i < 10; i++) { 101 assertThat(replicators, hasItems(getServerName(i))); 102 } 103 for (int i = 0; i < 5; i++) { 104 STORAGE.removeQueue(getServerName(i), queueId); 105 } 106 for (int i = 0; i < 10; i++) { 107 STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i)); 108 } 109 replicators = STORAGE.getListOfReplicators(); 110 assertEquals(5, replicators.size()); 111 for (int i = 5; i < 10; i++) { 112 assertThat(replicators, hasItems(getServerName(i))); 113 } 114 } 115 116 private String getFileName(String base, int i) { 117 return String.format(base + "-%04d", i); 118 } 119 120 @Test 121 public void testAddRemoveLog() throws ReplicationException { 122 ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); 123 assertTrue(STORAGE.getAllQueues(serverName1).isEmpty()); 124 String queue1 = "1"; 125 String queue2 = "2"; 126 for (int i = 0; i < 10; i++) { 127 STORAGE.addWAL(serverName1, queue1, getFileName("file1", i)); 128 STORAGE.addWAL(serverName1, queue2, getFileName("file2", i)); 129 } 130 List<String> queueIds = STORAGE.getAllQueues(serverName1); 131 assertEquals(2, queueIds.size()); 132 assertThat(queueIds, hasItems("1", "2")); 133 134 List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1); 135 List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2); 136 assertEquals(10, wals1.size()); 137 assertEquals(10, wals2.size()); 138 for (int i = 0; i < 10; i++) { 139 assertThat(wals1, hasItems(getFileName("file1", i))); 140 assertThat(wals2, hasItems(getFileName("file2", i))); 141 } 142 143 for (int i = 0; i < 10; i++) { 144 assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); 145 assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); 146 STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, 147 Collections.emptyMap()); 148 STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10, 149 Collections.emptyMap()); 150 } 151 152 for (int i = 0; i < 10; i++) { 153 assertEquals((i + 1) * 100, 154 STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); 155 assertEquals((i + 1) * 100 + 10, 156 STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); 157 } 158 159 for (int i = 0; i < 10; i++) { 160 if (i % 2 == 0) { 161 STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i)); 162 } else { 163 STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i)); 164 } 165 } 166 167 queueIds = STORAGE.getAllQueues(serverName1); 168 assertEquals(2, queueIds.size()); 169 assertThat(queueIds, hasItems("1", "2")); 170 171 ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); 172 Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, "1", serverName2); 173 174 assertEquals("1-" + serverName1.getServerName(), peer1.getFirst()); 175 assertEquals(5, peer1.getSecond().size()); 176 int i = 1; 177 for (String wal : peer1.getSecond()) { 178 assertEquals(getFileName("file1", i), wal); 179 assertEquals((i + 1) * 100, 180 STORAGE.getWALPosition(serverName2, peer1.getFirst(), getFileName("file1", i))); 181 i += 2; 182 } 183 184 queueIds = STORAGE.getAllQueues(serverName1); 185 assertEquals(1, queueIds.size()); 186 assertThat(queueIds, hasItems("2")); 187 wals2 = STORAGE.getWALsInQueue(serverName1, queue2); 188 assertEquals(5, wals2.size()); 189 for (i = 0; i < 10; i += 2) { 190 assertThat(wals2, hasItems(getFileName("file2", i))); 191 } 192 193 queueIds = STORAGE.getAllQueues(serverName2); 194 assertEquals(1, queueIds.size()); 195 assertThat(queueIds, hasItems(peer1.getFirst())); 196 wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst()); 197 assertEquals(5, wals1.size()); 198 for (i = 1; i < 10; i += 2) { 199 assertThat(wals1, hasItems(getFileName("file1", i))); 200 } 201 202 Set<String> allWals = STORAGE.getAllWALs(); 203 assertEquals(10, allWals.size()); 204 for (i = 0; i < 10; i++) { 205 assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i))); 206 } 207 } 208 209 // For HBASE-12865, HBASE-26482 210 @Test 211 public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException { 212 ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); 213 STORAGE.addWAL(serverName1, "1", "file"); 214 STORAGE.addWAL(serverName1, "2", "file"); 215 216 ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); 217 // Avoid claimQueue update cversion for prepare server2 rsNode. 218 STORAGE.addWAL(serverName2, "1", "file"); 219 STORAGE.addWAL(serverName2, "2", "file"); 220 221 int v0 = STORAGE.getQueuesZNodeCversion(); 222 223 STORAGE.claimQueue(serverName1, "1", serverName2); 224 int v1 = STORAGE.getQueuesZNodeCversion(); 225 // cversion should be increased by claimQueue method. 226 assertTrue(v1 > v0); 227 228 STORAGE.claimQueue(serverName1, "2", serverName2); 229 int v2 = STORAGE.getQueuesZNodeCversion(); 230 // cversion should be increased by claimQueue method. 231 assertTrue(v2 > v1); 232 } 233 234 private ZKReplicationQueueStorage createWithUnstableVersion() throws IOException { 235 return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) { 236 237 private int called = 0; 238 private int getLastSeqIdOpIndex = 0; 239 240 @Override 241 protected int getQueuesZNodeCversion() throws KeeperException { 242 if (called < 4) { 243 called++; 244 } 245 return called; 246 } 247 248 @Override 249 protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName, 250 String peerId) throws KeeperException { 251 Pair<Long, Integer> oldPair = super.getLastSequenceIdWithVersion(encodedRegionName, peerId); 252 if (getLastSeqIdOpIndex < 100) { 253 // Let the ZNode version increase. 254 String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId); 255 ZKUtil.createWithParents(zookeeper, path); 256 ZKUtil.setData(zookeeper, path, ZKUtil.positionToByteArray(100L)); 257 } 258 getLastSeqIdOpIndex++; 259 return oldPair; 260 } 261 }; 262 } 263 264 @Test 265 public void testGetAllWALsCversionChange() throws IOException, ReplicationException { 266 ZKReplicationQueueStorage storage = createWithUnstableVersion(); 267 storage.addWAL(getServerName(0), "1", "file"); 268 // This should return eventually when cversion stabilizes 269 Set<String> allWals = storage.getAllWALs(); 270 assertEquals(1, allWals.size()); 271 assertThat(allWals, hasItems("file")); 272 } 273 274 // For HBASE-14621 275 @Test 276 public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException { 277 ZKReplicationQueueStorage storage = createWithUnstableVersion(); 278 storage.addPeerToHFileRefs("1"); 279 Path p = new Path("/test"); 280 storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p))); 281 // This should return eventually when cversion stabilizes 282 Set<String> allHFileRefs = storage.getAllHFileRefs(); 283 assertEquals(1, allHFileRefs.size()); 284 assertThat(allHFileRefs, hasItems("test")); 285 } 286 287 // For HBASE-20138 288 @Test 289 public void testSetWALPositionBadVersion() throws IOException, ReplicationException { 290 ZKReplicationQueueStorage storage = createWithUnstableVersion(); 291 ServerName serverName1 = ServerName.valueOf("128.0.0.1", 8000, 10000); 292 assertTrue(storage.getAllQueues(serverName1).isEmpty()); 293 String queue1 = "1"; 294 String fileName = getFileName("file1", 0); 295 String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc6"; 296 storage.addWAL(serverName1, queue1, fileName); 297 298 List<String> wals1 = storage.getWALsInQueue(serverName1, queue1); 299 assertEquals(1, wals1.size()); 300 301 assertEquals(0, storage.getWALPosition(serverName1, queue1, fileName)); 302 // This should return eventually when data version stabilizes 303 storage.setWALPosition(serverName1, queue1, fileName, 100, 304 ImmutableMap.of(encodedRegionName, 120L)); 305 306 assertEquals(100, storage.getWALPosition(serverName1, queue1, fileName)); 307 assertEquals(120L, storage.getLastSequenceId(encodedRegionName, queue1)); 308 } 309 310 @Test 311 public void testRegionsZNodeLayout() throws Exception { 312 String peerId = "1"; 313 String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc7"; 314 String expectedPath = "/hbase/replication/regions/31/d9/792f4435b99d9fb1016f6fbc8dc7-" + peerId; 315 String path = STORAGE.getSerialReplicationRegionPeerNode(encodedRegionName, peerId); 316 assertEquals(expectedPath, path); 317 } 318 319 @Test 320 public void testRemoveAllLastPushedSeqIdsForPeer() throws Exception { 321 String peerId = "1"; 322 String peerIdToDelete = "2"; 323 for (int i = 0; i < 100; i++) { 324 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 325 STORAGE.setLastSequenceIds(peerId, ImmutableMap.of(encodedRegionName, (long) i)); 326 STORAGE.setLastSequenceIds(peerIdToDelete, ImmutableMap.of(encodedRegionName, (long) i)); 327 } 328 for (int i = 0; i < 100; i++) { 329 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 330 assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId)); 331 assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete)); 332 } 333 STORAGE.removeLastSequenceIds(peerIdToDelete); 334 for (int i = 0; i < 100; i++) { 335 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 336 assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId)); 337 assertEquals(HConstants.NO_SEQNUM, 338 STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete)); 339 } 340 } 341}