001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
021import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
022import static org.junit.jupiter.api.Assertions.fail;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HBaseConfiguration;
026import org.apache.hadoop.hbase.HBaseTestingUtil;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Admin;
029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
030import org.apache.hadoop.hbase.client.Get;
031import org.apache.hadoop.hbase.client.Put;
032import org.apache.hadoop.hbase.client.Result;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.client.TableDescriptor;
035import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
036import org.apache.hadoop.hbase.testclassification.LargeTests;
037import org.apache.hadoop.hbase.testclassification.ReplicationTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
040import org.junit.jupiter.api.AfterAll;
041import org.junit.jupiter.api.BeforeAll;
042import org.junit.jupiter.api.Tag;
043import org.junit.jupiter.api.Test;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * Replication with dropped table will stuck as the default REPLICATION_DROP_ON_DELETED_TABLE_KEY is
049 * false.
050 */
051@Tag(ReplicationTests.TAG)
052@Tag(LargeTests.TAG)
053public class TestReplicationStuckWithDroppedTable {
054
055  private static final Logger LOG =
056    LoggerFactory.getLogger(TestReplicationStuckWithDroppedTable.class);
057
058  private static Configuration conf1 = HBaseConfiguration.create();
059  private static Configuration conf2 = HBaseConfiguration.create();
060
061  protected static HBaseTestingUtil utility1;
062  protected static HBaseTestingUtil utility2;
063
064  private static Admin admin1;
065  private static Admin admin2;
066
067  private static final TableName NORMAL_TABLE = TableName.valueOf("normal-table");
068  private static final TableName DROPPED_TABLE = TableName.valueOf("dropped-table");
069  private static final byte[] ROW = Bytes.toBytes("row");
070  private static final byte[] FAMILY = Bytes.toBytes("f");
071  private static final byte[] QUALIFIER = Bytes.toBytes("q");
072  private static final byte[] VALUE = Bytes.toBytes("value");
073
074  private static final String PEER_ID = "1";
075  private static final long SLEEP_TIME = 1000;
076  private static final int NB_RETRIES = 10;
077
078  @BeforeAll
079  public static void setUpBeforeClass() throws Exception {
080    conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
081    conf1.setInt("replication.source.nb.capacity", 1);
082    utility1 = new HBaseTestingUtil(conf1);
083    utility1.startMiniZKCluster();
084    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
085    conf1 = utility1.getConfiguration();
086
087    conf2 = HBaseConfiguration.create(conf1);
088    conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
089    utility2 = new HBaseTestingUtil(conf2);
090    utility2.setZkCluster(miniZK);
091
092    utility1.startMiniCluster(1);
093    utility2.startMiniCluster(1);
094
095    admin1 = utility1.getAdmin();
096    admin2 = utility2.getAdmin();
097  }
098
099  @AfterAll
100  public static void tearDownAfterClass() throws Exception {
101    utility2.shutdownMiniCluster();
102    utility1.shutdownMiniCluster();
103  }
104
105  private void createTable(TableName tableName) throws Exception {
106    TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
107      .setColumnFamily(
108        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build())
109      .build();
110    admin1.createTable(desc);
111    admin2.createTable(desc);
112    utility1.waitUntilAllRegionsAssigned(tableName);
113    utility2.waitUntilAllRegionsAssigned(tableName);
114  }
115
116  @Test
117  public void testEditsStuckBehindDroppedTable() throws Exception {
118    // add peer
119    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
120      .setClusterKey(utility2.getRpcConnnectionURI()).setReplicateAllUserTables(true).build();
121    admin1.addReplicationPeer(PEER_ID, rpc);
122
123    // create table
124    createTable(NORMAL_TABLE);
125    createTable(DROPPED_TABLE);
126
127    admin1.disableReplicationPeer(PEER_ID);
128
129    try (Table droppedTable = utility1.getConnection().getTable(DROPPED_TABLE)) {
130      Put put = new Put(ROW);
131      put.addColumn(FAMILY, QUALIFIER, VALUE);
132      droppedTable.put(put);
133    }
134
135    admin1.disableTable(DROPPED_TABLE);
136    admin1.deleteTable(DROPPED_TABLE);
137    admin2.disableTable(DROPPED_TABLE);
138    admin2.deleteTable(DROPPED_TABLE);
139
140    admin1.enableReplicationPeer(PEER_ID);
141
142    verifyReplicationStuck();
143
144    // Remove peer
145    admin1.removeReplicationPeer(PEER_ID);
146    // Drop table
147    admin1.disableTable(NORMAL_TABLE);
148    admin1.deleteTable(NORMAL_TABLE);
149    admin2.disableTable(NORMAL_TABLE);
150    admin2.deleteTable(NORMAL_TABLE);
151  }
152
153  private void verifyReplicationStuck() throws Exception {
154    try (Table normalTable = utility1.getConnection().getTable(NORMAL_TABLE)) {
155      Put put = new Put(ROW);
156      put.addColumn(FAMILY, QUALIFIER, VALUE);
157      normalTable.put(put);
158    }
159    try (Table normalTable = utility2.getConnection().getTable(NORMAL_TABLE)) {
160      for (int i = 0; i < NB_RETRIES; i++) {
161        Result result = normalTable.get(new Get(ROW).addColumn(FAMILY, QUALIFIER));
162        if (result != null && !result.isEmpty()) {
163          fail("Edit should have been stuck behind dropped tables, but value is "
164            + Bytes.toString(result.getValue(FAMILY, QUALIFIER)));
165        } else {
166          LOG.info("Row not replicated, let's wait a bit more...");
167          Thread.sleep(SLEEP_TIME);
168        }
169      }
170    }
171  }
172}