001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertThrows;
022
023import java.util.concurrent.ExecutionException;
024import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
025import org.apache.hadoop.hbase.client.AsyncConnection;
026import org.apache.hadoop.hbase.client.AsyncTable;
027import org.apache.hadoop.hbase.client.ConnectionFactory;
028import org.apache.hadoop.hbase.client.Get;
029import org.apache.hadoop.hbase.client.Put;
030import org.apache.hadoop.hbase.client.RegionInfoBuilder;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.regionserver.HRegionServer;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.ReplicationTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.wal.WALFactory;
037import org.apache.hadoop.hbase.wal.WALProvider;
038import org.junit.jupiter.api.BeforeAll;
039import org.junit.jupiter.api.Tag;
040import org.junit.jupiter.api.Test;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044@Tag(ReplicationTests.TAG)
045@Tag(LargeTests.TAG)
046public class TestSyncReplicationMoreLogsInLocalCopyToRemote
047  extends SyncReplicationTestBaseNoBeforeAll {
048
049  private static final Logger LOG =
050    LoggerFactory.getLogger(TestSyncReplicationMoreLogsInLocalCopyToRemote.class);
051
052  @BeforeAll
053  public static void setUp() throws Exception {
054    UTIL1.getConfiguration().setClass(WALFactory.WAL_PROVIDER, BrokenRemoteAsyncFSWALProvider.class,
055      WALProvider.class);
056    UTIL2.getConfiguration().setClass(WALFactory.WAL_PROVIDER, BrokenRemoteAsyncFSWALProvider.class,
057      WALProvider.class);
058    startClusters();
059  }
060
061  @Test
062  public void testSplitLog() throws Exception {
063    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
064    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
065      SyncReplicationState.STANDBY);
066    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
067      SyncReplicationState.ACTIVE);
068    HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
069
070    BrokenRemoteAsyncFSWALProvider.BrokenRemoteAsyncFSWAL wal =
071      (BrokenRemoteAsyncFSWALProvider.BrokenRemoteAsyncFSWAL) rs.getWalFactory()
072        .getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
073    wal.setRemoteBroken();
074
075    try (AsyncConnection conn =
076      ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) {
077      AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build();
078      ExecutionException error = assertThrows(ExecutionException.class,
079        () -> table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0))).get());
080      LOG.info("Expected error:", error);
081    }
082    UTIL1.waitFor(60000, new ExplainingPredicate<Exception>() {
083
084      @Override
085      public boolean evaluate() throws Exception {
086        try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
087          return table.exists(new Get(Bytes.toBytes(0)));
088        }
089      }
090
091      @Override
092      public String explainFailure() throws Exception {
093        return "The row is still not available";
094      }
095    });
096    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
097      SyncReplicationState.DOWNGRADE_ACTIVE);
098    // We should have copied the local log to remote, so we should be able to get the value
099    try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
100      assertEquals(0, Bytes.toInt(table.get(new Get(Bytes.toBytes(0))).getValue(CF, CQ)));
101    }
102  }
103}