001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021
022import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
023import org.apache.hadoop.hbase.client.Delete;
024import org.apache.hadoop.hbase.client.Get;
025import org.apache.hadoop.hbase.client.RegionInfoBuilder;
026import org.apache.hadoop.hbase.client.Table;
027import org.apache.hadoop.hbase.regionserver.HRegion;
028import org.apache.hadoop.hbase.regionserver.HRegionServer;
029import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
030import org.apache.hadoop.hbase.replication.SyncReplicationState;
031import org.apache.hadoop.hbase.replication.SyncReplicationTestBase;
032import org.apache.hadoop.hbase.testclassification.MediumTests;
033import org.apache.hadoop.hbase.testclassification.ReplicationTests;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
036import org.junit.jupiter.api.Tag;
037import org.junit.jupiter.api.Test;
038
039@Tag(ReplicationTests.TAG)
040@Tag(MediumTests.TAG)
041public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase {
042
043  @Test
044  public void test() throws Exception {
045    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
046      SyncReplicationState.STANDBY);
047    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
048      SyncReplicationState.ACTIVE);
049    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
050    write(UTIL1, 0, 100);
051
052    HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
053    String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(
054      ((AbstractFSWAL<?>) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build()))
055        .getCurrentFileName().getName());
056    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
057      SyncReplicationState.DOWNGRADE_ACTIVE);
058    // transit cluster2 to DA and cluster 1 to S
059    verify(UTIL2, 0, 100);
060
061    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
062      SyncReplicationState.STANDBY);
063    // delete the original value, and then major compact
064    try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
065      for (int i = 0; i < 100; i++) {
066        table.delete(new Delete(Bytes.toBytes(i)));
067      }
068    }
069    UTIL2.flush(TABLE_NAME);
070    UTIL2.compact(TABLE_NAME, true);
071    // wait until the new values are replicated back to cluster1
072    HRegion region = rs.getRegions(TABLE_NAME).get(0);
073    UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {
074
075      @Override
076      public boolean evaluate() throws Exception {
077        return region.get(new Get(Bytes.toBytes(99))).isEmpty();
078      }
079
080      @Override
081      public String explainFailure() throws Exception {
082        return "Replication has not been catched up yet";
083      }
084    });
085    // transit cluster1 to DA and cluster2 to S, then we will start replicating from cluster1 to
086    // cluster2
087    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
088      SyncReplicationState.DOWNGRADE_ACTIVE);
089    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
090      SyncReplicationState.STANDBY);
091    UTIL1.getAdmin().enableReplicationPeer(PEER_ID);
092
093    // confirm that we will not replicate the old data which causes inconsistency
094    ReplicationSource source = (ReplicationSource) ((Replication) rs.getReplicationSourceService())
095      .getReplicationManager().getSource(PEER_ID);
096    UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {
097
098      @Override
099      public boolean evaluate() throws Exception {
100        return !source.workerThreads.containsKey(walGroupId);
101      }
102
103      @Override
104      public String explainFailure() throws Exception {
105        return "Replication has not been catched up yet";
106      }
107    });
108    HRegion region2 = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
109    for (int i = 0; i < 100; i++) {
110      assertTrue(region2.get(new Get(Bytes.toBytes(i))).isEmpty());
111    }
112  }
113}