001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.Collections;
024import org.apache.hadoop.hbase.HBaseClassTestRule;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
028import org.apache.hadoop.hbase.client.Put;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.client.Table;
031import org.apache.hadoop.hbase.testclassification.MediumTests;
032import org.apache.hadoop.hbase.testclassification.ReplicationTests;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
035import org.junit.Before;
036import org.junit.ClassRule;
037import org.junit.Test;
038import org.junit.experimental.categories.Category;
039
040import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
041
042/**
043 * Testcase for HBASE-20296.
044 */
045@Category({ ReplicationTests.class, MediumTests.class })
046public class TestRemoveFromSerialReplicationPeer extends SerialReplicationTestBase {
047
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050    HBaseClassTestRule.forClass(TestRemoveFromSerialReplicationPeer.class);
051
052  @Before
053  public void setUp() throws IOException, StreamLacksCapabilityException {
054    setupWALWriter();
055  }
056
057  private void waitUntilHasLastPushedSequenceId(RegionInfo region) throws Exception {
058    ReplicationQueueStorage queueStorage =
059      UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
060    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
061
062      @Override
063      public boolean evaluate() throws Exception {
064        return queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID) > 0;
065      }
066
067      @Override
068      public String explainFailure() throws Exception {
069        return "Still no last pushed sequence id for " + region;
070      }
071    });
072  }
073
074  @Test
075  public void testRemoveTable() throws Exception {
076    TableName tableName = createTable();
077    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
078      .setClusterKey("127.0.0.1:2181:/hbase")
079      .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName())
080      .setReplicateAllUserTables(false)
081      .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).setSerial(true).build();
082    UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
083    try (Table table = UTIL.getConnection().getTable(tableName)) {
084      for (int i = 0; i < 100; i++) {
085        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
086      }
087    }
088    RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo();
089    waitUntilHasLastPushedSequenceId(region);
090
091    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
092      ReplicationPeerConfig.newBuilder(peerConfig).setTableCFsMap(Collections.emptyMap()).build());
093
094    ReplicationQueueStorage queueStorage =
095      UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
096    assertEquals(HConstants.NO_SEQNUM,
097      queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID));
098  }
099
100  @Test
101  public void testRemoveSerialFlag() throws Exception {
102    TableName tableName = createTable();
103    addPeer(true);
104    try (Table table = UTIL.getConnection().getTable(tableName)) {
105      for (int i = 0; i < 100; i++) {
106        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
107      }
108    }
109    RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo();
110    waitUntilHasLastPushedSequenceId(region);
111    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
112      .newBuilder(UTIL.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(false).build());
113    waitUntilReplicationDone(100);
114
115    ReplicationQueueStorage queueStorage =
116      UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
117    assertEquals(HConstants.NO_SEQNUM,
118      queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID));
119  }
120}