001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.empty;
022import static org.hamcrest.Matchers.hasItem;
023import static org.hamcrest.Matchers.hasSize;
024import static org.hamcrest.Matchers.not;
025import static org.junit.jupiter.api.Assertions.assertEquals;
026import static org.junit.jupiter.api.Assertions.assertFalse;
027import static org.junit.jupiter.api.Assertions.assertTrue;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.Collections;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.ConcurrentHashMap;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.stream.Collectors;
038import java.util.stream.IntStream;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.testclassification.ReplicationTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.MD5Hash;
050import org.apache.hadoop.hbase.util.Pair;
051import org.apache.zookeeper.KeeperException;
052import org.hamcrest.Matchers;
053import org.hamcrest.collection.IsEmptyCollection;
054import org.junit.jupiter.api.AfterAll;
055import org.junit.jupiter.api.BeforeAll;
056import org.junit.jupiter.api.BeforeEach;
057import org.junit.jupiter.api.Tag;
058import org.junit.jupiter.api.Test;
059import org.junit.jupiter.api.TestInfo;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
064import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
065
066@Tag(ReplicationTests.TAG)
067@Tag(MediumTests.TAG)
068public class TestTableReplicationQueueStorage {
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestTableReplicationQueueStorage.class);
071
072  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
073
074  private TableReplicationQueueStorage storage;
075
076  @BeforeAll
077  public static void setUp() throws Exception {
078    UTIL.startMiniCluster();
079  }
080
081  @AfterAll
082  public static void tearDown() throws IOException {
083    UTIL.shutdownMiniCluster();
084  }
085
086  @BeforeEach
087  public void setUpBeforeTest(TestInfo testInfo) throws Exception {
088    TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
089    TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
090    UTIL.getAdmin().createTable(td);
091    UTIL.waitTableAvailable(tableName);
092    storage = new TableReplicationQueueStorage(UTIL.getConnection(), tableName);
093  }
094
095  private ServerName getServerName(int i) {
096    return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i);
097  }
098
099  private String getFileName(String base, int i) {
100    return String.format(base + "-%04d", i);
101  }
102
103  @Test
104  public void testReplicator() throws ReplicationException {
105    assertTrue(storage.listAllReplicators().isEmpty());
106    String peerId = "1";
107    for (int i = 0; i < 10; i++) {
108      ReplicationQueueId queueId = new ReplicationQueueId(getServerName(i), peerId);
109      storage.setOffset(queueId, "group-" + i, new ReplicationGroupOffset("file-" + i, i * 100),
110        Collections.emptyMap());
111    }
112    List<ServerName> replicators = storage.listAllReplicators();
113    assertEquals(10, replicators.size());
114    for (int i = 0; i < 10; i++) {
115      assertThat(replicators, hasItem(getServerName(i)));
116    }
117    for (int i = 0; i < 5; i++) {
118      ReplicationQueueId queueId = new ReplicationQueueId(getServerName(i), peerId);
119      storage.removeQueue(queueId);
120    }
121    replicators = storage.listAllReplicators();
122    assertEquals(5, replicators.size());
123    for (int i = 0; i < 5; i++) {
124      assertThat(replicators, not(hasItem(getServerName(i))));
125    }
126    for (int i = 5; i < 10; i++) {
127      assertThat(replicators, hasItem(getServerName(i)));
128    }
129  }
130
131  private void assertQueueId(String peerId, ServerName serverName, ReplicationQueueId queueId) {
132    assertEquals(peerId, queueId.getPeerId());
133    assertEquals(serverName, queueId.getServerName());
134    assertFalse(queueId.getSourceServerName().isPresent());
135  }
136
137  @Test
138  public void testPersistLogPositionAndSeqIdAtomically() throws Exception {
139    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
140    assertTrue(storage.listAllQueueIds(serverName1).isEmpty());
141    String peerId1 = "1";
142    String region0 = "6b2c8f8555335cc9af74455b94516cbe";
143    String region1 = "6ecd2e9e010499f8ddef97ee8f70834f";
144
145    for (int i = 0; i < 10; i++) {
146      ReplicationQueueId queueId = new ReplicationQueueId(serverName1, peerId1);
147      assertTrue(storage.getOffsets(queueId).isEmpty());
148    }
149    assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(region0, peerId1));
150    assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(region1, peerId1));
151
152    for (int i = 0; i < 10; i++) {
153      ReplicationQueueId queueId = new ReplicationQueueId(serverName1, peerId1);
154      storage.setOffset(queueId, "group1-" + i,
155        new ReplicationGroupOffset(getFileName("file1", i), (i + 1) * 100),
156        ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L));
157    }
158
159    List<ReplicationQueueId> queueIds = storage.listAllQueueIds(serverName1);
160    assertEquals(1, queueIds.size());
161    assertQueueId(peerId1, serverName1, queueIds.get(0));
162
163    Map<String, ReplicationGroupOffset> offsets =
164      storage.getOffsets(new ReplicationQueueId(serverName1, peerId1));
165    for (int i = 0; i < 10; i++) {
166      ReplicationGroupOffset offset = offsets.get("group1-" + i);
167      assertEquals(getFileName("file1", i), offset.getWal());
168      assertEquals((i + 1) * 100, offset.getOffset());
169    }
170    assertEquals(900L, storage.getLastSequenceId(region0, peerId1));
171    assertEquals(1000L, storage.getLastSequenceId(region1, peerId1));
172
173    // Try to decrease the last pushed id by setWALPosition method.
174    storage.setOffset(new ReplicationQueueId(serverName1, peerId1), "group1-0",
175      new ReplicationGroupOffset(getFileName("file1", 0), 11 * 100),
176      ImmutableMap.of(region0, 899L, region1, 1001L));
177    assertEquals(900L, storage.getLastSequenceId(region0, peerId1));
178    assertEquals(1001L, storage.getLastSequenceId(region1, peerId1));
179  }
180
181  private void assertGroupOffset(String wal, long offset, ReplicationGroupOffset groupOffset) {
182    assertEquals(wal, groupOffset.getWal());
183    assertEquals(offset, groupOffset.getOffset());
184  }
185
186  @Test
187  public void testClaimQueue() throws Exception {
188    String peerId = "1";
189    ServerName serverName1 = getServerName(1);
190    ReplicationQueueId queueId = new ReplicationQueueId(serverName1, peerId);
191    for (int i = 0; i < 10; i++) {
192      storage.setOffset(queueId, "group-" + i, new ReplicationGroupOffset("wal-" + i, i),
193        Collections.emptyMap());
194    }
195
196    ServerName serverName2 = getServerName(2);
197    Map<String, ReplicationGroupOffset> offsets2 = storage.claimQueue(queueId, serverName2);
198    assertEquals(10, offsets2.size());
199    for (int i = 0; i < 10; i++) {
200      assertGroupOffset("wal-" + i, i, offsets2.get("group-" + i));
201    }
202    ReplicationQueueId claimedQueueId2 = new ReplicationQueueId(serverName2, peerId, serverName1);
203    assertThat(storage.listAllQueueIds(peerId, serverName1), IsEmptyCollection.empty());
204    assertThat(storage.listAllQueueIds(peerId, serverName2),
205      Matchers.<List<ReplicationQueueId>> both(hasItem(claimedQueueId2)).and(hasSize(1)));
206    offsets2 = storage.getOffsets(claimedQueueId2);
207    assertEquals(10, offsets2.size());
208    for (int i = 0; i < 10; i++) {
209      assertGroupOffset("wal-" + i, i, offsets2.get("group-" + i));
210    }
211
212    ServerName serverName3 = getServerName(3);
213    Map<String, ReplicationGroupOffset> offsets3 = storage.claimQueue(claimedQueueId2, serverName3);
214    assertEquals(10, offsets3.size());
215    for (int i = 0; i < 10; i++) {
216      assertGroupOffset("wal-" + i, i, offsets3.get("group-" + i));
217    }
218    ReplicationQueueId claimedQueueId3 = new ReplicationQueueId(serverName3, peerId, serverName1);
219    assertThat(storage.listAllQueueIds(peerId, serverName1), IsEmptyCollection.empty());
220    assertThat(storage.listAllQueueIds(peerId, serverName2), IsEmptyCollection.empty());
221    assertThat(storage.listAllQueueIds(peerId, serverName3),
222      Matchers.<List<ReplicationQueueId>> both(hasItem(claimedQueueId3)).and(hasSize(1)));
223    offsets3 = storage.getOffsets(claimedQueueId3);
224    assertEquals(10, offsets3.size());
225    for (int i = 0; i < 10; i++) {
226      assertGroupOffset("wal-" + i, i, offsets3.get("group-" + i));
227    }
228    storage.removeQueue(claimedQueueId3);
229    assertThat(storage.listAllQueueIds(peerId), IsEmptyCollection.empty());
230  }
231
232  @Test
233  public void testClaimQueueMultiThread() throws Exception {
234    String peerId = "3";
235    String walGroup = "group";
236    ReplicationGroupOffset groupOffset = new ReplicationGroupOffset("wal", 123);
237    ServerName sourceServerName = getServerName(100);
238    ReplicationQueueId queueId = new ReplicationQueueId(sourceServerName, peerId);
239    storage.setOffset(queueId, walGroup, groupOffset, Collections.emptyMap());
240    List<ServerName> serverNames =
241      IntStream.range(0, 10).mapToObj(this::getServerName).collect(Collectors.toList());
242    for (int i = 0; i < 10; i++) {
243      final ReplicationQueueId toClaim = queueId;
244      List<Thread> threads = new ArrayList<>();
245      Map<ServerName, Map<String, ReplicationGroupOffset>> claimed = new ConcurrentHashMap<>();
246      Set<ServerName> failed = ConcurrentHashMap.newKeySet();
247      for (ServerName serverName : serverNames) {
248        if (serverName.equals(queueId.getServerName())) {
249          continue;
250        }
251        threads.add(new Thread("Claim-" + i + "-" + serverName) {
252
253          @Override
254          public void run() {
255            try {
256              Map<String, ReplicationGroupOffset> offsets = storage.claimQueue(toClaim, serverName);
257              if (!offsets.isEmpty()) {
258                claimed.put(serverName, offsets);
259              }
260            } catch (ReplicationException e) {
261              LOG.error("failed to claim queue", e);
262              failed.add(serverName);
263            }
264          }
265        });
266      }
267      LOG.info("Claim round {}, there are {} threads to claim {}", i, threads.size(), toClaim);
268      for (Thread thread : threads) {
269        thread.start();
270      }
271      for (Thread thread : threads) {
272        thread.join(30000);
273        assertFalse(thread.isAlive());
274      }
275      LOG.info("Finish claim round {}, claimed={}, failed={}", i, claimed, failed);
276      assertThat(failed, IsEmptyCollection.empty());
277      assertEquals(1, claimed.size());
278      Map<String, ReplicationGroupOffset> offsets = Iterables.getOnlyElement(claimed.values());
279      assertEquals(1, offsets.size());
280      assertGroupOffset("wal", 123, offsets.get("group"));
281      queueId = new ReplicationQueueId(Iterables.getOnlyElement(claimed.keySet()), peerId,
282        sourceServerName);
283      assertThat(storage.listAllQueueIds(peerId),
284        Matchers.<List<ReplicationQueueId>> both(hasItem(queueId)).and(hasSize(1)));
285    }
286  }
287
288  @Test
289  public void testListRemovePeerAllQueues() throws Exception {
290    String peerId1 = "1";
291    String peerId2 = "2";
292    for (int i = 0; i < 100; i++) {
293      ServerName serverName = getServerName(i);
294      String group = "group";
295      ReplicationGroupOffset offset = new ReplicationGroupOffset("wal", i);
296      ReplicationQueueId queueId1 = new ReplicationQueueId(serverName, peerId1);
297      ReplicationQueueId queueId2 = new ReplicationQueueId(serverName, peerId2);
298      storage.setOffset(queueId1, group, offset, Collections.emptyMap());
299      storage.setOffset(queueId2, group, offset, Collections.emptyMap());
300    }
301    List<ReplicationQueueData> queueDatas = storage.listAllQueues();
302    assertThat(queueDatas, hasSize(200));
303    for (int i = 0; i < 100; i++) {
304      ReplicationQueueData peerId1Data = queueDatas.get(i);
305      ReplicationQueueData peerId2Data = queueDatas.get(i + 100);
306      ServerName serverName = getServerName(i);
307      assertEquals(new ReplicationQueueId(serverName, peerId1), peerId1Data.getId());
308      assertEquals(new ReplicationQueueId(serverName, peerId2), peerId2Data.getId());
309      assertEquals(1, peerId1Data.getOffsets().size());
310      assertEquals(1, peerId2Data.getOffsets().size());
311      assertGroupOffset("wal", i, peerId1Data.getOffsets().get("group"));
312      assertGroupOffset("wal", i, peerId2Data.getOffsets().get("group"));
313    }
314    List<ReplicationQueueId> queueIds1 = storage.listAllQueueIds(peerId1);
315    assertThat(queueIds1, hasSize(100));
316    for (int i = 0; i < 100; i++) {
317      ServerName serverName = getServerName(i);
318      assertEquals(new ReplicationQueueId(serverName, peerId1), queueIds1.get(i));
319    }
320    List<ReplicationQueueId> queueIds2 = storage.listAllQueueIds(peerId2);
321    assertThat(queueIds2, hasSize(100));
322    for (int i = 0; i < 100; i++) {
323      ServerName serverName = getServerName(i);
324      assertEquals(new ReplicationQueueId(serverName, peerId2), queueIds2.get(i));
325    }
326
327    storage.removeAllQueues(peerId1);
328    assertThat(storage.listAllQueues(), hasSize(100));
329    assertThat(storage.listAllQueueIds(peerId1), IsEmptyCollection.empty());
330    assertThat(storage.listAllQueueIds(peerId2), hasSize(100));
331
332    storage.removeAllQueues(peerId2);
333    assertThat(storage.listAllQueues(), IsEmptyCollection.empty());
334    assertThat(storage.listAllQueueIds(peerId1), IsEmptyCollection.empty());
335    assertThat(storage.listAllQueueIds(peerId2), IsEmptyCollection.empty());
336  }
337
338  @Test
339  public void testRemoveAllLastPushedSeqIdsForPeer() throws Exception {
340    String peerId = "1";
341    String peerIdToDelete = "2";
342    for (int i = 0; i < 100; i++) {
343      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
344      storage.setLastSequenceIds(peerId, ImmutableMap.of(encodedRegionName, (long) i));
345      storage.setLastSequenceIds(peerIdToDelete, ImmutableMap.of(encodedRegionName, (long) i));
346    }
347    for (int i = 0; i < 100; i++) {
348      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
349      assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId));
350      assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerIdToDelete));
351    }
352    storage.removeLastSequenceIds(peerIdToDelete);
353    for (int i = 0; i < 100; i++) {
354      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
355      assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId));
356      assertEquals(HConstants.NO_SEQNUM,
357        storage.getLastSequenceId(encodedRegionName, peerIdToDelete));
358    }
359  }
360
361  @Test
362  public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
363    String peerId1 = "1";
364
365    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
366    files1.add(new Pair<>(null, new Path("file_1")));
367    files1.add(new Pair<>(null, new Path("file_2")));
368    files1.add(new Pair<>(null, new Path("file_3")));
369    assertTrue(storage.getReplicableHFiles(peerId1).isEmpty());
370    assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size());
371
372    storage.addHFileRefs(peerId1, files1);
373    assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
374    assertEquals(3, storage.getReplicableHFiles(peerId1).size());
375    List<String> hfiles2 = new ArrayList<>(files1.size());
376    for (Pair<Path, Path> p : files1) {
377      hfiles2.add(p.getSecond().getName());
378    }
379    String removedString = hfiles2.remove(0);
380    storage.removeHFileRefs(peerId1, hfiles2);
381    assertEquals(1, storage.getReplicableHFiles(peerId1).size());
382    hfiles2 = new ArrayList<>(1);
383    hfiles2.add(removedString);
384    storage.removeHFileRefs(peerId1, hfiles2);
385    assertEquals(0, storage.getReplicableHFiles(peerId1).size());
386  }
387
388  @Test
389  public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
390    String peerId1 = "1";
391    String peerId2 = "2";
392
393    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
394    files1.add(new Pair<>(null, new Path("file_1")));
395    files1.add(new Pair<>(null, new Path("file_2")));
396    files1.add(new Pair<>(null, new Path("file_3")));
397    storage.addHFileRefs(peerId1, files1);
398    storage.addHFileRefs(peerId2, files1);
399    assertEquals(2, storage.getAllPeersFromHFileRefsQueue().size());
400    assertEquals(3, storage.getReplicableHFiles(peerId1).size());
401    assertEquals(3, storage.getReplicableHFiles(peerId2).size());
402
403    storage.removePeerFromHFileRefs(peerId1);
404    assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
405    assertTrue(storage.getReplicableHFiles(peerId1).isEmpty());
406    assertEquals(3, storage.getReplicableHFiles(peerId2).size());
407
408    storage.removePeerFromHFileRefs(peerId2);
409    assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size());
410    assertTrue(storage.getReplicableHFiles(peerId2).isEmpty());
411  }
412
413  private void addLastSequenceIdsAndHFileRefs(String peerId1, String peerId2)
414    throws ReplicationException {
415    for (int i = 0; i < 100; i++) {
416      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
417      storage.setLastSequenceIds(peerId1, ImmutableMap.of(encodedRegionName, (long) i));
418    }
419
420    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
421    files1.add(new Pair<>(null, new Path("file_1")));
422    files1.add(new Pair<>(null, new Path("file_2")));
423    files1.add(new Pair<>(null, new Path("file_3")));
424    storage.addHFileRefs(peerId2, files1);
425  }
426
427  @Test
428  public void testRemoveLastSequenceIdsAndHFileRefsBefore()
429    throws ReplicationException, InterruptedException {
430    String peerId1 = "1";
431    String peerId2 = "2";
432    addLastSequenceIdsAndHFileRefs(peerId1, peerId2);
433    // make sure we have write these out
434    for (int i = 0; i < 100; i++) {
435      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
436      assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1));
437    }
438    assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
439    assertEquals(3, storage.getReplicableHFiles(peerId2).size());
440
441    // should have nothing after removal
442    long ts = EnvironmentEdgeManager.currentTime();
443    storage.removeLastSequenceIdsAndHFileRefsBefore(ts);
444    for (int i = 0; i < 100; i++) {
445      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
446      assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(encodedRegionName, peerId1));
447    }
448    assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size());
449
450    Thread.sleep(100);
451    // add again and remove with the old timestamp
452    addLastSequenceIdsAndHFileRefs(peerId1, peerId2);
453    storage.removeLastSequenceIdsAndHFileRefsBefore(ts);
454    // make sure we do not delete the data which are written after the give timestamp
455    for (int i = 0; i < 100; i++) {
456      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
457      assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1));
458    }
459    assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
460    assertEquals(3, storage.getReplicableHFiles(peerId2).size());
461  }
462
463  @Test
464  public void testListAllPeerIds() throws ReplicationException {
465    assertThat(storage.listAllPeerIds(), empty());
466
467    for (int i = 0; i < 20; i++) {
468      int numQueues = ThreadLocalRandom.current().nextInt(10, 100);
469      for (int j = 0; j < numQueues; j++) {
470        ReplicationQueueId queueId = new ReplicationQueueId(getServerName(j), "Peer_" + i);
471        storage.setOffset(queueId, "group-" + j, new ReplicationGroupOffset("file-" + j, j * 100),
472          Collections.emptyMap());
473      }
474    }
475    List<String> peerIds = storage.listAllPeerIds();
476    assertThat(peerIds, hasSize(20));
477    for (int i = 0; i < 20; i++) {
478      assertThat(peerIds, hasItem("Peer_" + i));
479    }
480  }
481}