001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.CoreMatchers.hasItems; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertThat; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.List; 029import java.util.Set; 030import java.util.SortedSet; 031 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseZKTestingUtility; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.testclassification.ReplicationTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.MD5Hash; 041import org.apache.hadoop.hbase.util.Pair; 042import org.apache.hadoop.hbase.zookeeper.ZKUtil; 043import org.apache.zookeeper.KeeperException; 044import org.junit.After; 045import org.junit.AfterClass; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050 051import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 052 053@Category({ ReplicationTests.class, MediumTests.class }) 054public class TestZKReplicationQueueStorage { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class); 059 060 private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); 061 062 private static ZKReplicationQueueStorage STORAGE; 063 064 @BeforeClass 065 public static void setUp() throws Exception { 066 UTIL.startMiniZKCluster(); 067 STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); 068 } 069 070 @AfterClass 071 public static void tearDown() throws IOException { 072 UTIL.shutdownMiniZKCluster(); 073 } 074 075 @After 076 public void tearDownAfterTest() throws ReplicationException, KeeperException, IOException { 077 for (ServerName serverName : STORAGE.getListOfReplicators()) { 078 for (String queue : STORAGE.getAllQueues(serverName)) { 079 STORAGE.removeQueue(serverName, queue); 080 } 081 STORAGE.removeReplicatorIfQueueIsEmpty(serverName); 082 } 083 for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) { 084 STORAGE.removePeerFromHFileRefs(peerId); 085 } 086 } 087 088 private ServerName getServerName(int i) { 089 return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i); 090 } 091 092 @Test 093 public void testReplicator() throws ReplicationException { 094 assertTrue(STORAGE.getListOfReplicators().isEmpty()); 095 String queueId = "1"; 096 for (int i = 0; i < 10; i++) { 097 STORAGE.addWAL(getServerName(i), queueId, "file" + i); 098 } 099 List<ServerName> replicators = STORAGE.getListOfReplicators(); 100 assertEquals(10, replicators.size()); 101 for (int i = 0; i < 10; i++) { 102 assertThat(replicators, hasItems(getServerName(i))); 103 } 104 for (int i = 0; i < 5; i++) { 105 STORAGE.removeQueue(getServerName(i), queueId); 106 } 107 for (int i = 0; i < 10; i++) { 108 STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i)); 109 } 110 replicators = STORAGE.getListOfReplicators(); 111 assertEquals(5, replicators.size()); 112 for (int i = 5; i < 10; i++) { 113 assertThat(replicators, hasItems(getServerName(i))); 114 } 115 } 116 117 private String getFileName(String base, int i) { 118 return String.format(base + "-%04d", i); 119 } 120 121 @Test 122 public void testAddRemoveLog() throws ReplicationException { 123 ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); 124 assertTrue(STORAGE.getAllQueues(serverName1).isEmpty()); 125 String queue1 = "1"; 126 String queue2 = "2"; 127 for (int i = 0; i < 10; i++) { 128 STORAGE.addWAL(serverName1, queue1, getFileName("file1", i)); 129 STORAGE.addWAL(serverName1, queue2, getFileName("file2", i)); 130 } 131 List<String> queueIds = STORAGE.getAllQueues(serverName1); 132 assertEquals(2, queueIds.size()); 133 assertThat(queueIds, hasItems("1", "2")); 134 135 List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1); 136 List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2); 137 assertEquals(10, wals1.size()); 138 assertEquals(10, wals2.size()); 139 for (int i = 0; i < 10; i++) { 140 assertThat(wals1, hasItems(getFileName("file1", i))); 141 assertThat(wals2, hasItems(getFileName("file2", i))); 142 } 143 144 for (int i = 0; i < 10; i++) { 145 assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); 146 assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); 147 STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, 148 Collections.emptyMap()); 149 STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10, 150 Collections.emptyMap()); 151 } 152 153 for (int i = 0; i < 10; i++) { 154 assertEquals((i + 1) * 100, 155 STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); 156 assertEquals((i + 1) * 100 + 10, 157 STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); 158 } 159 160 for (int i = 0; i < 10; i++) { 161 if (i % 2 == 0) { 162 STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i)); 163 } else { 164 STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i)); 165 } 166 } 167 168 queueIds = STORAGE.getAllQueues(serverName1); 169 assertEquals(2, queueIds.size()); 170 assertThat(queueIds, hasItems("1", "2")); 171 172 ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); 173 Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, "1", serverName2); 174 175 assertEquals("1-" + serverName1.getServerName(), peer1.getFirst()); 176 assertEquals(5, peer1.getSecond().size()); 177 int i = 1; 178 for (String wal : peer1.getSecond()) { 179 assertEquals(getFileName("file1", i), wal); 180 assertEquals((i + 1) * 100, 181 STORAGE.getWALPosition(serverName2, peer1.getFirst(), getFileName("file1", i))); 182 i += 2; 183 } 184 185 queueIds = STORAGE.getAllQueues(serverName1); 186 assertEquals(1, queueIds.size()); 187 assertThat(queueIds, hasItems("2")); 188 wals2 = STORAGE.getWALsInQueue(serverName1, queue2); 189 assertEquals(5, wals2.size()); 190 for (i = 0; i < 10; i += 2) { 191 assertThat(wals2, hasItems(getFileName("file2", i))); 192 } 193 194 queueIds = STORAGE.getAllQueues(serverName2); 195 assertEquals(1, queueIds.size()); 196 assertThat(queueIds, hasItems(peer1.getFirst())); 197 wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst()); 198 assertEquals(5, wals1.size()); 199 for (i = 1; i < 10; i += 2) { 200 assertThat(wals1, hasItems(getFileName("file1", i))); 201 } 202 203 Set<String> allWals = STORAGE.getAllWALs(); 204 assertEquals(10, allWals.size()); 205 for (i = 0; i < 10; i++) { 206 assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i))); 207 } 208 } 209 210 // For HBASE-12865 211 @Test 212 public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException { 213 ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); 214 STORAGE.addWAL(serverName1, "1", "file"); 215 216 int v0 = STORAGE.getQueuesZNodeCversion(); 217 ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); 218 STORAGE.claimQueue(serverName1, "1", serverName2); 219 int v1 = STORAGE.getQueuesZNodeCversion(); 220 // cversion should increase by 1 since a child node is deleted 221 assertEquals(1, v1 - v0); 222 } 223 224 private ZKReplicationQueueStorage createWithUnstableVersion() throws IOException { 225 return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) { 226 227 private int called = 0; 228 private int getLastSeqIdOpIndex = 0; 229 230 @Override 231 protected int getQueuesZNodeCversion() throws KeeperException { 232 if (called < 4) { 233 called++; 234 } 235 return called; 236 } 237 238 @Override 239 protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName, 240 String peerId) throws KeeperException { 241 Pair<Long, Integer> oldPair = super.getLastSequenceIdWithVersion(encodedRegionName, peerId); 242 if (getLastSeqIdOpIndex < 100) { 243 // Let the ZNode version increase. 244 String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId); 245 ZKUtil.createWithParents(zookeeper, path); 246 ZKUtil.setData(zookeeper, path, ZKUtil.positionToByteArray(100L)); 247 } 248 getLastSeqIdOpIndex++; 249 return oldPair; 250 } 251 }; 252 } 253 254 @Test 255 public void testGetAllWALsCversionChange() throws IOException, ReplicationException { 256 ZKReplicationQueueStorage storage = createWithUnstableVersion(); 257 storage.addWAL(getServerName(0), "1", "file"); 258 // This should return eventually when cversion stabilizes 259 Set<String> allWals = storage.getAllWALs(); 260 assertEquals(1, allWals.size()); 261 assertThat(allWals, hasItems("file")); 262 } 263 264 // For HBASE-14621 265 @Test 266 public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException { 267 ZKReplicationQueueStorage storage = createWithUnstableVersion(); 268 storage.addPeerToHFileRefs("1"); 269 Path p = new Path("/test"); 270 storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p))); 271 // This should return eventually when cversion stabilizes 272 Set<String> allHFileRefs = storage.getAllHFileRefs(); 273 assertEquals(1, allHFileRefs.size()); 274 assertThat(allHFileRefs, hasItems("test")); 275 } 276 277 // For HBASE-20138 278 @Test 279 public void testSetWALPositionBadVersion() throws IOException, ReplicationException { 280 ZKReplicationQueueStorage storage = createWithUnstableVersion(); 281 ServerName serverName1 = ServerName.valueOf("128.0.0.1", 8000, 10000); 282 assertTrue(storage.getAllQueues(serverName1).isEmpty()); 283 String queue1 = "1"; 284 String fileName = getFileName("file1", 0); 285 String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc6"; 286 storage.addWAL(serverName1, queue1, fileName); 287 288 List<String> wals1 = storage.getWALsInQueue(serverName1, queue1); 289 assertEquals(1, wals1.size()); 290 291 assertEquals(0, storage.getWALPosition(serverName1, queue1, fileName)); 292 // This should return eventually when data version stabilizes 293 storage.setWALPosition(serverName1, queue1, fileName, 100, 294 ImmutableMap.of(encodedRegionName, 120L)); 295 296 assertEquals(100, storage.getWALPosition(serverName1, queue1, fileName)); 297 assertEquals(120L, storage.getLastSequenceId(encodedRegionName, queue1)); 298 } 299 300 @Test 301 public void testRegionsZNodeLayout() throws Exception { 302 String peerId = "1"; 303 String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc7"; 304 String expectedPath = "/hbase/replication/regions/31/d9/792f4435b99d9fb1016f6fbc8dc7-" + peerId; 305 String path = STORAGE.getSerialReplicationRegionPeerNode(encodedRegionName, peerId); 306 assertEquals(expectedPath, path); 307 } 308 309 @Test 310 public void testRemoveAllLastPushedSeqIdsForPeer() throws Exception { 311 String peerId = "1"; 312 String peerIdToDelete = "2"; 313 for (int i = 0; i < 100; i++) { 314 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 315 STORAGE.setLastSequenceIds(peerId, ImmutableMap.of(encodedRegionName, (long) i)); 316 STORAGE.setLastSequenceIds(peerIdToDelete, ImmutableMap.of(encodedRegionName, (long) i)); 317 } 318 for (int i = 0; i < 100; i++) { 319 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 320 assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId)); 321 assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete)); 322 } 323 STORAGE.removeLastSequenceIds(peerIdToDelete); 324 for (int i = 0; i < 100; i++) { 325 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 326 assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId)); 327 assertEquals(HConstants.NO_SEQNUM, 328 STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete)); 329 } 330 } 331}