001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.util.Collections;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
027import org.apache.hadoop.hbase.client.Put;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.client.Table;
030import org.apache.hadoop.hbase.testclassification.MediumTests;
031import org.apache.hadoop.hbase.testclassification.ReplicationTests;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
034import org.junit.jupiter.api.BeforeEach;
035import org.junit.jupiter.api.Tag;
036import org.junit.jupiter.api.Test;
037
038import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
039
040/**
041 * Testcase for HBASE-20296.
042 */
043@Tag(ReplicationTests.TAG)
044@Tag(MediumTests.TAG)
045public class TestRemoveFromSerialReplicationPeer extends SerialReplicationTestBase {
046
047  @BeforeEach
048  public void setUp() throws IOException, StreamLacksCapabilityException {
049    setupWALWriter();
050  }
051
052  private void waitUntilHasLastPushedSequenceId(RegionInfo region) throws Exception {
053    ReplicationQueueStorage queueStorage =
054      UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
055    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
056
057      @Override
058      public boolean evaluate() throws Exception {
059        return queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID) > 0;
060      }
061
062      @Override
063      public String explainFailure() throws Exception {
064        return "Still no last pushed sequence id for " + region;
065      }
066    });
067  }
068
069  @Test
070  public void testRemoveTable() throws Exception {
071    TableName tableName = createTable();
072    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
073      .setClusterKey("127.0.0.1:2181:/hbase")
074      .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName())
075      .setReplicateAllUserTables(false)
076      .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).setSerial(true).build();
077    UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
078    try (Table table = UTIL.getConnection().getTable(tableName)) {
079      for (int i = 0; i < 100; i++) {
080        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
081      }
082    }
083    RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo();
084    waitUntilHasLastPushedSequenceId(region);
085
086    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
087      ReplicationPeerConfig.newBuilder(peerConfig).setTableCFsMap(Collections.emptyMap()).build());
088
089    ReplicationQueueStorage queueStorage =
090      UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
091    assertEquals(HConstants.NO_SEQNUM,
092      queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID));
093  }
094
095  @Test
096  public void testRemoveSerialFlag() throws Exception {
097    TableName tableName = createTable();
098    addPeer(true);
099    try (Table table = UTIL.getConnection().getTable(tableName)) {
100      for (int i = 0; i < 100; i++) {
101        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
102      }
103    }
104    RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo();
105    waitUntilHasLastPushedSequenceId(region);
106    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
107      .newBuilder(UTIL.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(false).build());
108    waitUntilReplicationDone(100);
109
110    ReplicationQueueStorage queueStorage =
111      UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
112    assertEquals(HConstants.NO_SEQNUM,
113      queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID));
114  }
115}