001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.hasItem;
022import static org.hamcrest.Matchers.hasSize;
023import static org.hamcrest.Matchers.not;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertTrue;
027
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.stream.Collectors;
036import java.util.stream.IntStream;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.TableNameTestRule;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.testclassification.ReplicationTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.MD5Hash;
050import org.apache.hadoop.hbase.util.Pair;
051import org.apache.zookeeper.KeeperException;
052import org.hamcrest.Matchers;
053import org.hamcrest.collection.IsEmptyCollection;
054import org.junit.AfterClass;
055import org.junit.Before;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
065import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
066
067@Category({ ReplicationTests.class, MediumTests.class })
068public class TestTableReplicationQueueStorage {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestTableReplicationQueueStorage.class);
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestTableReplicationQueueStorage.class);
075
076  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
077
078  @Rule
079  public TableNameTestRule tableNameRule = new TableNameTestRule();
080
081  private TableReplicationQueueStorage storage;
082
083  @BeforeClass
084  public static void setUp() throws Exception {
085    UTIL.startMiniCluster();
086  }
087
088  @AfterClass
089  public static void tearDown() throws IOException {
090    UTIL.shutdownMiniCluster();
091  }
092
093  @Before
094  public void setUpBeforeTest() throws Exception {
095    TableName tableName = tableNameRule.getTableName();
096    TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
097    UTIL.getAdmin().createTable(td);
098    UTIL.waitTableAvailable(tableName);
099    storage = new TableReplicationQueueStorage(UTIL.getConnection(), tableName);
100  }
101
102  private ServerName getServerName(int i) {
103    return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i);
104  }
105
106  private String getFileName(String base, int i) {
107    return String.format(base + "-%04d", i);
108  }
109
110  @Test
111  public void testReplicator() throws ReplicationException {
112    assertTrue(storage.listAllReplicators().isEmpty());
113    String peerId = "1";
114    for (int i = 0; i < 10; i++) {
115      ReplicationQueueId queueId = new ReplicationQueueId(getServerName(i), peerId);
116      storage.setOffset(queueId, "group-" + i, new ReplicationGroupOffset("file-" + i, i * 100),
117        Collections.emptyMap());
118    }
119    List<ServerName> replicators = storage.listAllReplicators();
120    assertEquals(10, replicators.size());
121    for (int i = 0; i < 10; i++) {
122      assertThat(replicators, hasItem(getServerName(i)));
123    }
124    for (int i = 0; i < 5; i++) {
125      ReplicationQueueId queueId = new ReplicationQueueId(getServerName(i), peerId);
126      storage.removeQueue(queueId);
127    }
128    replicators = storage.listAllReplicators();
129    assertEquals(5, replicators.size());
130    for (int i = 0; i < 5; i++) {
131      assertThat(replicators, not(hasItem(getServerName(i))));
132    }
133    for (int i = 5; i < 10; i++) {
134      assertThat(replicators, hasItem(getServerName(i)));
135    }
136  }
137
138  @Test
139  public void testGetSetOffset() {
140
141  }
142
143  private void assertQueueId(String peerId, ServerName serverName, ReplicationQueueId queueId) {
144    assertEquals(peerId, queueId.getPeerId());
145    assertEquals(serverName, queueId.getServerName());
146    assertFalse(queueId.getSourceServerName().isPresent());
147  }
148
149  @Test
150  public void testPersistLogPositionAndSeqIdAtomically() throws Exception {
151    ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
152    assertTrue(storage.listAllQueueIds(serverName1).isEmpty());
153    String peerId1 = "1";
154    String region0 = "6b2c8f8555335cc9af74455b94516cbe";
155    String region1 = "6ecd2e9e010499f8ddef97ee8f70834f";
156
157    for (int i = 0; i < 10; i++) {
158      ReplicationQueueId queueId = new ReplicationQueueId(serverName1, peerId1);
159      assertTrue(storage.getOffsets(queueId).isEmpty());
160    }
161    assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(region0, peerId1));
162    assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(region1, peerId1));
163
164    for (int i = 0; i < 10; i++) {
165      ReplicationQueueId queueId = new ReplicationQueueId(serverName1, peerId1);
166      storage.setOffset(queueId, "group1-" + i,
167        new ReplicationGroupOffset(getFileName("file1", i), (i + 1) * 100),
168        ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L));
169    }
170
171    List<ReplicationQueueId> queueIds = storage.listAllQueueIds(serverName1);
172    assertEquals(1, queueIds.size());
173    assertQueueId(peerId1, serverName1, queueIds.get(0));
174
175    Map<String, ReplicationGroupOffset> offsets =
176      storage.getOffsets(new ReplicationQueueId(serverName1, peerId1));
177    for (int i = 0; i < 10; i++) {
178      ReplicationGroupOffset offset = offsets.get("group1-" + i);
179      assertEquals(getFileName("file1", i), offset.getWal());
180      assertEquals((i + 1) * 100, offset.getOffset());
181    }
182    assertEquals(900L, storage.getLastSequenceId(region0, peerId1));
183    assertEquals(1000L, storage.getLastSequenceId(region1, peerId1));
184
185    // Try to decrease the last pushed id by setWALPosition method.
186    storage.setOffset(new ReplicationQueueId(serverName1, peerId1), "group1-0",
187      new ReplicationGroupOffset(getFileName("file1", 0), 11 * 100),
188      ImmutableMap.of(region0, 899L, region1, 1001L));
189    assertEquals(900L, storage.getLastSequenceId(region0, peerId1));
190    assertEquals(1001L, storage.getLastSequenceId(region1, peerId1));
191  }
192
193  private void assertGroupOffset(String wal, long offset, ReplicationGroupOffset groupOffset) {
194    assertEquals(wal, groupOffset.getWal());
195    assertEquals(offset, groupOffset.getOffset());
196  }
197
198  @Test
199  public void testClaimQueue() throws Exception {
200    String peerId = "1";
201    ServerName serverName1 = getServerName(1);
202    ReplicationQueueId queueId = new ReplicationQueueId(serverName1, peerId);
203    for (int i = 0; i < 10; i++) {
204      storage.setOffset(queueId, "group-" + i, new ReplicationGroupOffset("wal-" + i, i),
205        Collections.emptyMap());
206    }
207
208    ServerName serverName2 = getServerName(2);
209    Map<String, ReplicationGroupOffset> offsets2 = storage.claimQueue(queueId, serverName2);
210    assertEquals(10, offsets2.size());
211    for (int i = 0; i < 10; i++) {
212      assertGroupOffset("wal-" + i, i, offsets2.get("group-" + i));
213    }
214    ReplicationQueueId claimedQueueId2 = new ReplicationQueueId(serverName2, peerId, serverName1);
215    assertThat(storage.listAllQueueIds(peerId, serverName1), IsEmptyCollection.empty());
216    assertThat(storage.listAllQueueIds(peerId, serverName2),
217      Matchers.<List<ReplicationQueueId>> both(hasItem(claimedQueueId2)).and(hasSize(1)));
218    offsets2 = storage.getOffsets(claimedQueueId2);
219    assertEquals(10, offsets2.size());
220    for (int i = 0; i < 10; i++) {
221      assertGroupOffset("wal-" + i, i, offsets2.get("group-" + i));
222    }
223
224    ServerName serverName3 = getServerName(3);
225    Map<String, ReplicationGroupOffset> offsets3 = storage.claimQueue(claimedQueueId2, serverName3);
226    assertEquals(10, offsets3.size());
227    for (int i = 0; i < 10; i++) {
228      assertGroupOffset("wal-" + i, i, offsets3.get("group-" + i));
229    }
230    ReplicationQueueId claimedQueueId3 = new ReplicationQueueId(serverName3, peerId, serverName1);
231    assertThat(storage.listAllQueueIds(peerId, serverName1), IsEmptyCollection.empty());
232    assertThat(storage.listAllQueueIds(peerId, serverName2), IsEmptyCollection.empty());
233    assertThat(storage.listAllQueueIds(peerId, serverName3),
234      Matchers.<List<ReplicationQueueId>> both(hasItem(claimedQueueId3)).and(hasSize(1)));
235    offsets3 = storage.getOffsets(claimedQueueId3);
236    assertEquals(10, offsets3.size());
237    for (int i = 0; i < 10; i++) {
238      assertGroupOffset("wal-" + i, i, offsets3.get("group-" + i));
239    }
240    storage.removeQueue(claimedQueueId3);
241    assertThat(storage.listAllQueueIds(peerId), IsEmptyCollection.empty());
242  }
243
244  @Test
245  public void testClaimQueueMultiThread() throws Exception {
246    String peerId = "3";
247    String walGroup = "group";
248    ReplicationGroupOffset groupOffset = new ReplicationGroupOffset("wal", 123);
249    ServerName sourceServerName = getServerName(100);
250    ReplicationQueueId queueId = new ReplicationQueueId(sourceServerName, peerId);
251    storage.setOffset(queueId, walGroup, groupOffset, Collections.emptyMap());
252    List<ServerName> serverNames =
253      IntStream.range(0, 10).mapToObj(this::getServerName).collect(Collectors.toList());
254    for (int i = 0; i < 10; i++) {
255      final ReplicationQueueId toClaim = queueId;
256      List<Thread> threads = new ArrayList<>();
257      Map<ServerName, Map<String, ReplicationGroupOffset>> claimed = new ConcurrentHashMap<>();
258      Set<ServerName> failed = ConcurrentHashMap.newKeySet();
259      for (ServerName serverName : serverNames) {
260        if (serverName.equals(queueId.getServerName())) {
261          continue;
262        }
263        threads.add(new Thread("Claim-" + i + "-" + serverName) {
264
265          @Override
266          public void run() {
267            try {
268              Map<String, ReplicationGroupOffset> offsets = storage.claimQueue(toClaim, serverName);
269              if (!offsets.isEmpty()) {
270                claimed.put(serverName, offsets);
271              }
272            } catch (ReplicationException e) {
273              LOG.error("failed to claim queue", e);
274              failed.add(serverName);
275            }
276          }
277        });
278      }
279      LOG.info("Claim round {}, there are {} threads to claim {}", i, threads.size(), toClaim);
280      for (Thread thread : threads) {
281        thread.start();
282      }
283      for (Thread thread : threads) {
284        thread.join(30000);
285        assertFalse(thread.isAlive());
286      }
287      LOG.info("Finish claim round {}, claimed={}, failed={}", i, claimed, failed);
288      assertThat(failed, IsEmptyCollection.empty());
289      assertEquals(1, claimed.size());
290      Map<String, ReplicationGroupOffset> offsets = Iterables.getOnlyElement(claimed.values());
291      assertEquals(1, offsets.size());
292      assertGroupOffset("wal", 123, offsets.get("group"));
293      queueId = new ReplicationQueueId(Iterables.getOnlyElement(claimed.keySet()), peerId,
294        sourceServerName);
295      assertThat(storage.listAllQueueIds(peerId),
296        Matchers.<List<ReplicationQueueId>> both(hasItem(queueId)).and(hasSize(1)));
297    }
298  }
299
300  @Test
301  public void testListRemovePeerAllQueues() throws Exception {
302    String peerId1 = "1";
303    String peerId2 = "2";
304    for (int i = 0; i < 100; i++) {
305      ServerName serverName = getServerName(i);
306      String group = "group";
307      ReplicationGroupOffset offset = new ReplicationGroupOffset("wal", i);
308      ReplicationQueueId queueId1 = new ReplicationQueueId(serverName, peerId1);
309      ReplicationQueueId queueId2 = new ReplicationQueueId(serverName, peerId2);
310      storage.setOffset(queueId1, group, offset, Collections.emptyMap());
311      storage.setOffset(queueId2, group, offset, Collections.emptyMap());
312    }
313    List<ReplicationQueueData> queueDatas = storage.listAllQueues();
314    assertThat(queueDatas, hasSize(200));
315    for (int i = 0; i < 100; i++) {
316      ReplicationQueueData peerId1Data = queueDatas.get(i);
317      ReplicationQueueData peerId2Data = queueDatas.get(i + 100);
318      ServerName serverName = getServerName(i);
319      assertEquals(new ReplicationQueueId(serverName, peerId1), peerId1Data.getId());
320      assertEquals(new ReplicationQueueId(serverName, peerId2), peerId2Data.getId());
321      assertEquals(1, peerId1Data.getOffsets().size());
322      assertEquals(1, peerId2Data.getOffsets().size());
323      assertGroupOffset("wal", i, peerId1Data.getOffsets().get("group"));
324      assertGroupOffset("wal", i, peerId2Data.getOffsets().get("group"));
325    }
326    List<ReplicationQueueId> queueIds1 = storage.listAllQueueIds(peerId1);
327    assertThat(queueIds1, hasSize(100));
328    for (int i = 0; i < 100; i++) {
329      ServerName serverName = getServerName(i);
330      assertEquals(new ReplicationQueueId(serverName, peerId1), queueIds1.get(i));
331    }
332    List<ReplicationQueueId> queueIds2 = storage.listAllQueueIds(peerId2);
333    assertThat(queueIds2, hasSize(100));
334    for (int i = 0; i < 100; i++) {
335      ServerName serverName = getServerName(i);
336      assertEquals(new ReplicationQueueId(serverName, peerId2), queueIds2.get(i));
337    }
338
339    storage.removeAllQueues(peerId1);
340    assertThat(storage.listAllQueues(), hasSize(100));
341    assertThat(storage.listAllQueueIds(peerId1), IsEmptyCollection.empty());
342    assertThat(storage.listAllQueueIds(peerId2), hasSize(100));
343
344    storage.removeAllQueues(peerId2);
345    assertThat(storage.listAllQueues(), IsEmptyCollection.empty());
346    assertThat(storage.listAllQueueIds(peerId1), IsEmptyCollection.empty());
347    assertThat(storage.listAllQueueIds(peerId2), IsEmptyCollection.empty());
348  }
349
350  @Test
351  public void testRemoveAllLastPushedSeqIdsForPeer() throws Exception {
352    String peerId = "1";
353    String peerIdToDelete = "2";
354    for (int i = 0; i < 100; i++) {
355      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
356      storage.setLastSequenceIds(peerId, ImmutableMap.of(encodedRegionName, (long) i));
357      storage.setLastSequenceIds(peerIdToDelete, ImmutableMap.of(encodedRegionName, (long) i));
358    }
359    for (int i = 0; i < 100; i++) {
360      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
361      assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId));
362      assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerIdToDelete));
363    }
364    storage.removeLastSequenceIds(peerIdToDelete);
365    for (int i = 0; i < 100; i++) {
366      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
367      assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId));
368      assertEquals(HConstants.NO_SEQNUM,
369        storage.getLastSequenceId(encodedRegionName, peerIdToDelete));
370    }
371  }
372
373  @Test
374  public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
375    String peerId1 = "1";
376
377    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
378    files1.add(new Pair<>(null, new Path("file_1")));
379    files1.add(new Pair<>(null, new Path("file_2")));
380    files1.add(new Pair<>(null, new Path("file_3")));
381    assertTrue(storage.getReplicableHFiles(peerId1).isEmpty());
382    assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size());
383
384    storage.addHFileRefs(peerId1, files1);
385    assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
386    assertEquals(3, storage.getReplicableHFiles(peerId1).size());
387    List<String> hfiles2 = new ArrayList<>(files1.size());
388    for (Pair<Path, Path> p : files1) {
389      hfiles2.add(p.getSecond().getName());
390    }
391    String removedString = hfiles2.remove(0);
392    storage.removeHFileRefs(peerId1, hfiles2);
393    assertEquals(1, storage.getReplicableHFiles(peerId1).size());
394    hfiles2 = new ArrayList<>(1);
395    hfiles2.add(removedString);
396    storage.removeHFileRefs(peerId1, hfiles2);
397    assertEquals(0, storage.getReplicableHFiles(peerId1).size());
398  }
399
400  @Test
401  public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
402    String peerId1 = "1";
403    String peerId2 = "2";
404
405    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
406    files1.add(new Pair<>(null, new Path("file_1")));
407    files1.add(new Pair<>(null, new Path("file_2")));
408    files1.add(new Pair<>(null, new Path("file_3")));
409    storage.addHFileRefs(peerId1, files1);
410    storage.addHFileRefs(peerId2, files1);
411    assertEquals(2, storage.getAllPeersFromHFileRefsQueue().size());
412    assertEquals(3, storage.getReplicableHFiles(peerId1).size());
413    assertEquals(3, storage.getReplicableHFiles(peerId2).size());
414
415    storage.removePeerFromHFileRefs(peerId1);
416    assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
417    assertTrue(storage.getReplicableHFiles(peerId1).isEmpty());
418    assertEquals(3, storage.getReplicableHFiles(peerId2).size());
419
420    storage.removePeerFromHFileRefs(peerId2);
421    assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size());
422    assertTrue(storage.getReplicableHFiles(peerId2).isEmpty());
423  }
424
425  private void addLastSequenceIdsAndHFileRefs(String peerId1, String peerId2)
426    throws ReplicationException {
427    for (int i = 0; i < 100; i++) {
428      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
429      storage.setLastSequenceIds(peerId1, ImmutableMap.of(encodedRegionName, (long) i));
430    }
431
432    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
433    files1.add(new Pair<>(null, new Path("file_1")));
434    files1.add(new Pair<>(null, new Path("file_2")));
435    files1.add(new Pair<>(null, new Path("file_3")));
436    storage.addHFileRefs(peerId2, files1);
437  }
438
439  @Test
440  public void testRemoveLastSequenceIdsAndHFileRefsBefore()
441    throws ReplicationException, InterruptedException {
442    String peerId1 = "1";
443    String peerId2 = "2";
444    addLastSequenceIdsAndHFileRefs(peerId1, peerId2);
445    // make sure we have write these out
446    for (int i = 0; i < 100; i++) {
447      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
448      assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1));
449    }
450    assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
451    assertEquals(3, storage.getReplicableHFiles(peerId2).size());
452
453    // should have nothing after removal
454    long ts = EnvironmentEdgeManager.currentTime();
455    storage.removeLastSequenceIdsAndHFileRefsBefore(ts);
456    for (int i = 0; i < 100; i++) {
457      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
458      assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(encodedRegionName, peerId1));
459    }
460    assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size());
461
462    Thread.sleep(100);
463    // add again and remove with the old timestamp
464    addLastSequenceIdsAndHFileRefs(peerId1, peerId2);
465    storage.removeLastSequenceIdsAndHFileRefsBefore(ts);
466    // make sure we do not delete the data which are written after the give timestamp
467    for (int i = 0; i < 100; i++) {
468      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
469      assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1));
470    }
471    assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
472    assertEquals(3, storage.getReplicableHFiles(peerId2).size());
473  }
474}