001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertFalse;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025import static org.junit.jupiter.api.Assertions.fail;
026
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.ExecutionException;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.client.AsyncConnection;
034import org.apache.hadoop.hbase.client.AsyncTable;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Get;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.regionserver.HRegion;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.testclassification.ReplicationTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
043import org.apache.hadoop.hbase.wal.WAL.Entry;
044import org.apache.hadoop.hbase.wal.WALStreamReader;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.Test;
047
048@Tag(ReplicationTests.TAG)
049@Tag(LargeTests.TAG)
050public class SyncReplicationActiveTestBase extends SyncReplicationTestBaseNoBeforeAll {
051
052  @Test
053  public void testActive() throws Exception {
054    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
055      SyncReplicationState.STANDBY);
056    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
057      SyncReplicationState.ACTIVE);
058
059    // confirm that peer with state A will reject replication request.
060    verifyReplicationRequestRejection(UTIL1, true);
061    verifyReplicationRequestRejection(UTIL2, false);
062
063    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
064    write(UTIL1, 0, 100);
065    Thread.sleep(2000);
066    // peer is disabled so no data have been replicated
067    verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
068
069    // Ensure that there's no cluster id in remote log entries.
070    verifyNoClusterIdInRemoteLog(UTIL2, REMOTE_WAL_DIR2, PEER_ID);
071
072    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
073      SyncReplicationState.DOWNGRADE_ACTIVE);
074    // confirm that peer with state DA will reject replication request.
075    verifyReplicationRequestRejection(UTIL2, true);
076    // confirm that the data is there after we convert the peer to DA
077    verify(UTIL2, 0, 100);
078
079    try (AsyncConnection conn =
080      ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) {
081      AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build();
082      CompletableFuture<Void> future =
083        table.put(new Put(Bytes.toBytes(1000)).addColumn(CF, CQ, Bytes.toBytes(1000)));
084      Thread.sleep(2000);
085      // should hang on rolling
086      assertFalse(future.isDone());
087      UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
088        SyncReplicationState.STANDBY);
089      try {
090        future.get();
091        fail("should fail because of the wal is closing");
092      } catch (ExecutionException e) {
093        // expected
094        assertThat(e.getCause().getMessage(), containsString("only marker edit is allowed"));
095      }
096    }
097    // confirm that the data has not been persisted
098    HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
099    assertTrue(region.get(new Get(Bytes.toBytes(1000))).isEmpty());
100    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
101      SyncReplicationState.ACTIVE);
102
103    writeAndVerifyReplication(UTIL2, UTIL1, 100, 200);
104
105    // shutdown the cluster completely
106    UTIL1.shutdownMiniCluster();
107    // confirm that we can convert to DA even if the remote slave cluster is down
108    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
109      SyncReplicationState.DOWNGRADE_ACTIVE);
110    // confirm that peer with state DA will reject replication request.
111    verifyReplicationRequestRejection(UTIL2, true);
112    write(UTIL2, 200, 300);
113  }
114
115  private void verifyNoClusterIdInRemoteLog(HBaseTestingUtil utility, Path remoteDir, String peerId)
116    throws Exception {
117    FileSystem fs2 = utility.getTestFileSystem();
118    FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId));
119    assertTrue(files.length > 0);
120    for (FileStatus file : files) {
121      try (WALStreamReader reader =
122        NoEOFWALStreamReader.create(fs2, file.getPath(), utility.getConfiguration())) {
123        Entry entry = reader.next();
124        assertTrue(entry != null);
125        while (entry != null) {
126          assertEquals(entry.getKey().getClusterIds().size(), 0);
127          entry = reader.next();
128        }
129      }
130    }
131  }
132}