001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutionException;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.client.AsyncConnection;
034import org.apache.hadoop.hbase.client.AsyncTable;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Get;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.regionserver.HRegion;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.testclassification.ReplicationTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.wal.WAL.Entry;
043import org.apache.hadoop.hbase.wal.WAL.Reader;
044import org.apache.hadoop.hbase.wal.WALFactory;
045import org.junit.Assert;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050@Category({ ReplicationTests.class, LargeTests.class })
051public class TestSyncReplicationActive extends SyncReplicationTestBase {
052
053  @ClassRule
054  public static final HBaseClassTestRule CLASS_RULE =
055    HBaseClassTestRule.forClass(TestSyncReplicationActive.class);
056
057  @Test
058  public void testActive() throws Exception {
059    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
060      SyncReplicationState.STANDBY);
061    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
062      SyncReplicationState.ACTIVE);
063
064    // confirm that peer with state A will reject replication request.
065    verifyReplicationRequestRejection(UTIL1, true);
066    verifyReplicationRequestRejection(UTIL2, false);
067
068    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
069    write(UTIL1, 0, 100);
070    Thread.sleep(2000);
071    // peer is disabled so no data have been replicated
072    verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
073
074    // Ensure that there's no cluster id in remote log entries.
075    verifyNoClusterIdInRemoteLog(UTIL2, REMOTE_WAL_DIR2, PEER_ID);
076
077    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
078      SyncReplicationState.DOWNGRADE_ACTIVE);
079    // confirm that peer with state DA will reject replication request.
080    verifyReplicationRequestRejection(UTIL2, true);
081    // confirm that the data is there after we convert the peer to DA
082    verify(UTIL2, 0, 100);
083
084    try (AsyncConnection conn =
085      ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) {
086      AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build();
087      CompletableFuture<Void> future =
088        table.put(new Put(Bytes.toBytes(1000)).addColumn(CF, CQ, Bytes.toBytes(1000)));
089      Thread.sleep(2000);
090      // should hang on rolling
091      assertFalse(future.isDone());
092      UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
093        SyncReplicationState.STANDBY);
094      try {
095        future.get();
096        fail("should fail because of the wal is closing");
097      } catch (ExecutionException e) {
098        // expected
099        assertThat(e.getCause().getMessage(), containsString("only marker edit is allowed"));
100      }
101    }
102    // confirm that the data has not been persisted
103    HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
104    assertTrue(region.get(new Get(Bytes.toBytes(1000))).isEmpty());
105    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
106      SyncReplicationState.ACTIVE);
107
108    writeAndVerifyReplication(UTIL2, UTIL1, 100, 200);
109
110    // shutdown the cluster completely
111    UTIL1.shutdownMiniCluster();
112    // confirm that we can convert to DA even if the remote slave cluster is down
113    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
114      SyncReplicationState.DOWNGRADE_ACTIVE);
115    // confirm that peer with state DA will reject replication request.
116    verifyReplicationRequestRejection(UTIL2, true);
117    write(UTIL2, 200, 300);
118  }
119
120  private void verifyNoClusterIdInRemoteLog(HBaseTestingUtil utility, Path remoteDir, String peerId)
121    throws Exception {
122    FileSystem fs2 = utility.getTestFileSystem();
123    FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId));
124    Assert.assertTrue(files.length > 0);
125    for (FileStatus file : files) {
126      try (
127        Reader reader = WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) {
128        Entry entry = reader.next();
129        Assert.assertTrue(entry != null);
130        while (entry != null) {
131          Assert.assertEquals(entry.getKey().getClusterIds().size(), 0);
132          entry = reader.next();
133        }
134      }
135    }
136  }
137}