001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.empty; 022import static org.hamcrest.Matchers.hasItem; 023import static org.hamcrest.Matchers.hasSize; 024import static org.hamcrest.Matchers.not; 025import static org.junit.jupiter.api.Assertions.assertEquals; 026import static org.junit.jupiter.api.Assertions.assertFalse; 027import static org.junit.jupiter.api.Assertions.assertTrue; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.Collections; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.concurrent.ConcurrentHashMap; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.stream.Collectors; 038import java.util.stream.IntStream; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.testclassification.ReplicationTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.util.MD5Hash; 050import org.apache.hadoop.hbase.util.Pair; 051import org.apache.zookeeper.KeeperException; 052import org.hamcrest.Matchers; 053import org.hamcrest.collection.IsEmptyCollection; 054import org.junit.jupiter.api.AfterAll; 055import org.junit.jupiter.api.BeforeAll; 056import org.junit.jupiter.api.BeforeEach; 057import org.junit.jupiter.api.Tag; 058import org.junit.jupiter.api.Test; 059import org.junit.jupiter.api.TestInfo; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 064import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 065 066@Tag(ReplicationTests.TAG) 067@Tag(MediumTests.TAG) 068public class TestTableReplicationQueueStorage { 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestTableReplicationQueueStorage.class); 071 072 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 073 074 private TableReplicationQueueStorage storage; 075 076 @BeforeAll 077 public static void setUp() throws Exception { 078 UTIL.startMiniCluster(); 079 } 080 081 @AfterAll 082 public static void tearDown() throws IOException { 083 UTIL.shutdownMiniCluster(); 084 } 085 086 @BeforeEach 087 public void setUpBeforeTest(TestInfo testInfo) throws Exception { 088 TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 089 TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName); 090 UTIL.getAdmin().createTable(td); 091 UTIL.waitTableAvailable(tableName); 092 storage = new TableReplicationQueueStorage(UTIL.getConnection(), tableName); 093 } 094 095 private ServerName getServerName(int i) { 096 return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i); 097 } 098 099 private String getFileName(String base, int i) { 100 return String.format(base + "-%04d", i); 101 } 102 103 @Test 104 public void testReplicator() throws ReplicationException { 105 assertTrue(storage.listAllReplicators().isEmpty()); 106 String peerId = "1"; 107 for (int i = 0; i < 10; i++) { 108 ReplicationQueueId queueId = new ReplicationQueueId(getServerName(i), peerId); 109 storage.setOffset(queueId, "group-" + i, new ReplicationGroupOffset("file-" + i, i * 100), 110 Collections.emptyMap()); 111 } 112 List<ServerName> replicators = storage.listAllReplicators(); 113 assertEquals(10, replicators.size()); 114 for (int i = 0; i < 10; i++) { 115 assertThat(replicators, hasItem(getServerName(i))); 116 } 117 for (int i = 0; i < 5; i++) { 118 ReplicationQueueId queueId = new ReplicationQueueId(getServerName(i), peerId); 119 storage.removeQueue(queueId); 120 } 121 replicators = storage.listAllReplicators(); 122 assertEquals(5, replicators.size()); 123 for (int i = 0; i < 5; i++) { 124 assertThat(replicators, not(hasItem(getServerName(i)))); 125 } 126 for (int i = 5; i < 10; i++) { 127 assertThat(replicators, hasItem(getServerName(i))); 128 } 129 } 130 131 private void assertQueueId(String peerId, ServerName serverName, ReplicationQueueId queueId) { 132 assertEquals(peerId, queueId.getPeerId()); 133 assertEquals(serverName, queueId.getServerName()); 134 assertFalse(queueId.getSourceServerName().isPresent()); 135 } 136 137 @Test 138 public void testPersistLogPositionAndSeqIdAtomically() throws Exception { 139 ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); 140 assertTrue(storage.listAllQueueIds(serverName1).isEmpty()); 141 String peerId1 = "1"; 142 String region0 = "6b2c8f8555335cc9af74455b94516cbe"; 143 String region1 = "6ecd2e9e010499f8ddef97ee8f70834f"; 144 145 for (int i = 0; i < 10; i++) { 146 ReplicationQueueId queueId = new ReplicationQueueId(serverName1, peerId1); 147 assertTrue(storage.getOffsets(queueId).isEmpty()); 148 } 149 assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(region0, peerId1)); 150 assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(region1, peerId1)); 151 152 for (int i = 0; i < 10; i++) { 153 ReplicationQueueId queueId = new ReplicationQueueId(serverName1, peerId1); 154 storage.setOffset(queueId, "group1-" + i, 155 new ReplicationGroupOffset(getFileName("file1", i), (i + 1) * 100), 156 ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L)); 157 } 158 159 List<ReplicationQueueId> queueIds = storage.listAllQueueIds(serverName1); 160 assertEquals(1, queueIds.size()); 161 assertQueueId(peerId1, serverName1, queueIds.get(0)); 162 163 Map<String, ReplicationGroupOffset> offsets = 164 storage.getOffsets(new ReplicationQueueId(serverName1, peerId1)); 165 for (int i = 0; i < 10; i++) { 166 ReplicationGroupOffset offset = offsets.get("group1-" + i); 167 assertEquals(getFileName("file1", i), offset.getWal()); 168 assertEquals((i + 1) * 100, offset.getOffset()); 169 } 170 assertEquals(900L, storage.getLastSequenceId(region0, peerId1)); 171 assertEquals(1000L, storage.getLastSequenceId(region1, peerId1)); 172 173 // Try to decrease the last pushed id by setWALPosition method. 174 storage.setOffset(new ReplicationQueueId(serverName1, peerId1), "group1-0", 175 new ReplicationGroupOffset(getFileName("file1", 0), 11 * 100), 176 ImmutableMap.of(region0, 899L, region1, 1001L)); 177 assertEquals(900L, storage.getLastSequenceId(region0, peerId1)); 178 assertEquals(1001L, storage.getLastSequenceId(region1, peerId1)); 179 } 180 181 private void assertGroupOffset(String wal, long offset, ReplicationGroupOffset groupOffset) { 182 assertEquals(wal, groupOffset.getWal()); 183 assertEquals(offset, groupOffset.getOffset()); 184 } 185 186 @Test 187 public void testClaimQueue() throws Exception { 188 String peerId = "1"; 189 ServerName serverName1 = getServerName(1); 190 ReplicationQueueId queueId = new ReplicationQueueId(serverName1, peerId); 191 for (int i = 0; i < 10; i++) { 192 storage.setOffset(queueId, "group-" + i, new ReplicationGroupOffset("wal-" + i, i), 193 Collections.emptyMap()); 194 } 195 196 ServerName serverName2 = getServerName(2); 197 Map<String, ReplicationGroupOffset> offsets2 = storage.claimQueue(queueId, serverName2); 198 assertEquals(10, offsets2.size()); 199 for (int i = 0; i < 10; i++) { 200 assertGroupOffset("wal-" + i, i, offsets2.get("group-" + i)); 201 } 202 ReplicationQueueId claimedQueueId2 = new ReplicationQueueId(serverName2, peerId, serverName1); 203 assertThat(storage.listAllQueueIds(peerId, serverName1), IsEmptyCollection.empty()); 204 assertThat(storage.listAllQueueIds(peerId, serverName2), 205 Matchers.<List<ReplicationQueueId>> both(hasItem(claimedQueueId2)).and(hasSize(1))); 206 offsets2 = storage.getOffsets(claimedQueueId2); 207 assertEquals(10, offsets2.size()); 208 for (int i = 0; i < 10; i++) { 209 assertGroupOffset("wal-" + i, i, offsets2.get("group-" + i)); 210 } 211 212 ServerName serverName3 = getServerName(3); 213 Map<String, ReplicationGroupOffset> offsets3 = storage.claimQueue(claimedQueueId2, serverName3); 214 assertEquals(10, offsets3.size()); 215 for (int i = 0; i < 10; i++) { 216 assertGroupOffset("wal-" + i, i, offsets3.get("group-" + i)); 217 } 218 ReplicationQueueId claimedQueueId3 = new ReplicationQueueId(serverName3, peerId, serverName1); 219 assertThat(storage.listAllQueueIds(peerId, serverName1), IsEmptyCollection.empty()); 220 assertThat(storage.listAllQueueIds(peerId, serverName2), IsEmptyCollection.empty()); 221 assertThat(storage.listAllQueueIds(peerId, serverName3), 222 Matchers.<List<ReplicationQueueId>> both(hasItem(claimedQueueId3)).and(hasSize(1))); 223 offsets3 = storage.getOffsets(claimedQueueId3); 224 assertEquals(10, offsets3.size()); 225 for (int i = 0; i < 10; i++) { 226 assertGroupOffset("wal-" + i, i, offsets3.get("group-" + i)); 227 } 228 storage.removeQueue(claimedQueueId3); 229 assertThat(storage.listAllQueueIds(peerId), IsEmptyCollection.empty()); 230 } 231 232 @Test 233 public void testClaimQueueMultiThread() throws Exception { 234 String peerId = "3"; 235 String walGroup = "group"; 236 ReplicationGroupOffset groupOffset = new ReplicationGroupOffset("wal", 123); 237 ServerName sourceServerName = getServerName(100); 238 ReplicationQueueId queueId = new ReplicationQueueId(sourceServerName, peerId); 239 storage.setOffset(queueId, walGroup, groupOffset, Collections.emptyMap()); 240 List<ServerName> serverNames = 241 IntStream.range(0, 10).mapToObj(this::getServerName).collect(Collectors.toList()); 242 for (int i = 0; i < 10; i++) { 243 final ReplicationQueueId toClaim = queueId; 244 List<Thread> threads = new ArrayList<>(); 245 Map<ServerName, Map<String, ReplicationGroupOffset>> claimed = new ConcurrentHashMap<>(); 246 Set<ServerName> failed = ConcurrentHashMap.newKeySet(); 247 for (ServerName serverName : serverNames) { 248 if (serverName.equals(queueId.getServerName())) { 249 continue; 250 } 251 threads.add(new Thread("Claim-" + i + "-" + serverName) { 252 253 @Override 254 public void run() { 255 try { 256 Map<String, ReplicationGroupOffset> offsets = storage.claimQueue(toClaim, serverName); 257 if (!offsets.isEmpty()) { 258 claimed.put(serverName, offsets); 259 } 260 } catch (ReplicationException e) { 261 LOG.error("failed to claim queue", e); 262 failed.add(serverName); 263 } 264 } 265 }); 266 } 267 LOG.info("Claim round {}, there are {} threads to claim {}", i, threads.size(), toClaim); 268 for (Thread thread : threads) { 269 thread.start(); 270 } 271 for (Thread thread : threads) { 272 thread.join(30000); 273 assertFalse(thread.isAlive()); 274 } 275 LOG.info("Finish claim round {}, claimed={}, failed={}", i, claimed, failed); 276 assertThat(failed, IsEmptyCollection.empty()); 277 assertEquals(1, claimed.size()); 278 Map<String, ReplicationGroupOffset> offsets = Iterables.getOnlyElement(claimed.values()); 279 assertEquals(1, offsets.size()); 280 assertGroupOffset("wal", 123, offsets.get("group")); 281 queueId = new ReplicationQueueId(Iterables.getOnlyElement(claimed.keySet()), peerId, 282 sourceServerName); 283 assertThat(storage.listAllQueueIds(peerId), 284 Matchers.<List<ReplicationQueueId>> both(hasItem(queueId)).and(hasSize(1))); 285 } 286 } 287 288 @Test 289 public void testListRemovePeerAllQueues() throws Exception { 290 String peerId1 = "1"; 291 String peerId2 = "2"; 292 for (int i = 0; i < 100; i++) { 293 ServerName serverName = getServerName(i); 294 String group = "group"; 295 ReplicationGroupOffset offset = new ReplicationGroupOffset("wal", i); 296 ReplicationQueueId queueId1 = new ReplicationQueueId(serverName, peerId1); 297 ReplicationQueueId queueId2 = new ReplicationQueueId(serverName, peerId2); 298 storage.setOffset(queueId1, group, offset, Collections.emptyMap()); 299 storage.setOffset(queueId2, group, offset, Collections.emptyMap()); 300 } 301 List<ReplicationQueueData> queueDatas = storage.listAllQueues(); 302 assertThat(queueDatas, hasSize(200)); 303 for (int i = 0; i < 100; i++) { 304 ReplicationQueueData peerId1Data = queueDatas.get(i); 305 ReplicationQueueData peerId2Data = queueDatas.get(i + 100); 306 ServerName serverName = getServerName(i); 307 assertEquals(new ReplicationQueueId(serverName, peerId1), peerId1Data.getId()); 308 assertEquals(new ReplicationQueueId(serverName, peerId2), peerId2Data.getId()); 309 assertEquals(1, peerId1Data.getOffsets().size()); 310 assertEquals(1, peerId2Data.getOffsets().size()); 311 assertGroupOffset("wal", i, peerId1Data.getOffsets().get("group")); 312 assertGroupOffset("wal", i, peerId2Data.getOffsets().get("group")); 313 } 314 List<ReplicationQueueId> queueIds1 = storage.listAllQueueIds(peerId1); 315 assertThat(queueIds1, hasSize(100)); 316 for (int i = 0; i < 100; i++) { 317 ServerName serverName = getServerName(i); 318 assertEquals(new ReplicationQueueId(serverName, peerId1), queueIds1.get(i)); 319 } 320 List<ReplicationQueueId> queueIds2 = storage.listAllQueueIds(peerId2); 321 assertThat(queueIds2, hasSize(100)); 322 for (int i = 0; i < 100; i++) { 323 ServerName serverName = getServerName(i); 324 assertEquals(new ReplicationQueueId(serverName, peerId2), queueIds2.get(i)); 325 } 326 327 storage.removeAllQueues(peerId1); 328 assertThat(storage.listAllQueues(), hasSize(100)); 329 assertThat(storage.listAllQueueIds(peerId1), IsEmptyCollection.empty()); 330 assertThat(storage.listAllQueueIds(peerId2), hasSize(100)); 331 332 storage.removeAllQueues(peerId2); 333 assertThat(storage.listAllQueues(), IsEmptyCollection.empty()); 334 assertThat(storage.listAllQueueIds(peerId1), IsEmptyCollection.empty()); 335 assertThat(storage.listAllQueueIds(peerId2), IsEmptyCollection.empty()); 336 } 337 338 @Test 339 public void testRemoveAllLastPushedSeqIdsForPeer() throws Exception { 340 String peerId = "1"; 341 String peerIdToDelete = "2"; 342 for (int i = 0; i < 100; i++) { 343 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 344 storage.setLastSequenceIds(peerId, ImmutableMap.of(encodedRegionName, (long) i)); 345 storage.setLastSequenceIds(peerIdToDelete, ImmutableMap.of(encodedRegionName, (long) i)); 346 } 347 for (int i = 0; i < 100; i++) { 348 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 349 assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId)); 350 assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerIdToDelete)); 351 } 352 storage.removeLastSequenceIds(peerIdToDelete); 353 for (int i = 0; i < 100; i++) { 354 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 355 assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId)); 356 assertEquals(HConstants.NO_SEQNUM, 357 storage.getLastSequenceId(encodedRegionName, peerIdToDelete)); 358 } 359 } 360 361 @Test 362 public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { 363 String peerId1 = "1"; 364 365 List<Pair<Path, Path>> files1 = new ArrayList<>(3); 366 files1.add(new Pair<>(null, new Path("file_1"))); 367 files1.add(new Pair<>(null, new Path("file_2"))); 368 files1.add(new Pair<>(null, new Path("file_3"))); 369 assertTrue(storage.getReplicableHFiles(peerId1).isEmpty()); 370 assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size()); 371 372 storage.addHFileRefs(peerId1, files1); 373 assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size()); 374 assertEquals(3, storage.getReplicableHFiles(peerId1).size()); 375 List<String> hfiles2 = new ArrayList<>(files1.size()); 376 for (Pair<Path, Path> p : files1) { 377 hfiles2.add(p.getSecond().getName()); 378 } 379 String removedString = hfiles2.remove(0); 380 storage.removeHFileRefs(peerId1, hfiles2); 381 assertEquals(1, storage.getReplicableHFiles(peerId1).size()); 382 hfiles2 = new ArrayList<>(1); 383 hfiles2.add(removedString); 384 storage.removeHFileRefs(peerId1, hfiles2); 385 assertEquals(0, storage.getReplicableHFiles(peerId1).size()); 386 } 387 388 @Test 389 public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { 390 String peerId1 = "1"; 391 String peerId2 = "2"; 392 393 List<Pair<Path, Path>> files1 = new ArrayList<>(3); 394 files1.add(new Pair<>(null, new Path("file_1"))); 395 files1.add(new Pair<>(null, new Path("file_2"))); 396 files1.add(new Pair<>(null, new Path("file_3"))); 397 storage.addHFileRefs(peerId1, files1); 398 storage.addHFileRefs(peerId2, files1); 399 assertEquals(2, storage.getAllPeersFromHFileRefsQueue().size()); 400 assertEquals(3, storage.getReplicableHFiles(peerId1).size()); 401 assertEquals(3, storage.getReplicableHFiles(peerId2).size()); 402 403 storage.removePeerFromHFileRefs(peerId1); 404 assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size()); 405 assertTrue(storage.getReplicableHFiles(peerId1).isEmpty()); 406 assertEquals(3, storage.getReplicableHFiles(peerId2).size()); 407 408 storage.removePeerFromHFileRefs(peerId2); 409 assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size()); 410 assertTrue(storage.getReplicableHFiles(peerId2).isEmpty()); 411 } 412 413 private void addLastSequenceIdsAndHFileRefs(String peerId1, String peerId2) 414 throws ReplicationException { 415 for (int i = 0; i < 100; i++) { 416 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 417 storage.setLastSequenceIds(peerId1, ImmutableMap.of(encodedRegionName, (long) i)); 418 } 419 420 List<Pair<Path, Path>> files1 = new ArrayList<>(3); 421 files1.add(new Pair<>(null, new Path("file_1"))); 422 files1.add(new Pair<>(null, new Path("file_2"))); 423 files1.add(new Pair<>(null, new Path("file_3"))); 424 storage.addHFileRefs(peerId2, files1); 425 } 426 427 @Test 428 public void testRemoveLastSequenceIdsAndHFileRefsBefore() 429 throws ReplicationException, InterruptedException { 430 String peerId1 = "1"; 431 String peerId2 = "2"; 432 addLastSequenceIdsAndHFileRefs(peerId1, peerId2); 433 // make sure we have write these out 434 for (int i = 0; i < 100; i++) { 435 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 436 assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1)); 437 } 438 assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size()); 439 assertEquals(3, storage.getReplicableHFiles(peerId2).size()); 440 441 // should have nothing after removal 442 long ts = EnvironmentEdgeManager.currentTime(); 443 storage.removeLastSequenceIdsAndHFileRefsBefore(ts); 444 for (int i = 0; i < 100; i++) { 445 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 446 assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(encodedRegionName, peerId1)); 447 } 448 assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size()); 449 450 Thread.sleep(100); 451 // add again and remove with the old timestamp 452 addLastSequenceIdsAndHFileRefs(peerId1, peerId2); 453 storage.removeLastSequenceIdsAndHFileRefsBefore(ts); 454 // make sure we do not delete the data which are written after the give timestamp 455 for (int i = 0; i < 100; i++) { 456 String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); 457 assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1)); 458 } 459 assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size()); 460 assertEquals(3, storage.getReplicableHFiles(peerId2).size()); 461 } 462 463 @Test 464 public void testListAllPeerIds() throws ReplicationException { 465 assertThat(storage.listAllPeerIds(), empty()); 466 467 for (int i = 0; i < 20; i++) { 468 int numQueues = ThreadLocalRandom.current().nextInt(10, 100); 469 for (int j = 0; j < numQueues; j++) { 470 ReplicationQueueId queueId = new ReplicationQueueId(getServerName(j), "Peer_" + i); 471 storage.setOffset(queueId, "group-" + j, new ReplicationGroupOffset("file-" + j, j * 100), 472 Collections.emptyMap()); 473 } 474 } 475 List<String> peerIds = storage.listAllPeerIds(); 476 assertThat(peerIds, hasSize(20)); 477 for (int i = 0; i < 20; i++) { 478 assertThat(peerIds, hasItem("Peer_" + i)); 479 } 480 } 481}