001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
021import static org.junit.jupiter.api.Assertions.fail;
022
023import java.util.Arrays;
024import java.util.stream.Collectors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.HBaseTestingUtil;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
032import org.apache.hadoop.hbase.client.Get;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.client.TableDescriptor;
037import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.ReplicationTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
042import org.junit.jupiter.api.AfterAll;
043import org.junit.jupiter.api.BeforeAll;
044import org.junit.jupiter.api.Tag;
045import org.junit.jupiter.api.Test;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Replication with dropped table will stuck as the default REPLICATION_DROP_ON_DELETED_TABLE_KEY is
051 * false.
052 */
053@Tag(ReplicationTests.TAG)
054@Tag(LargeTests.TAG)
055public class TestReplicationStuckWithDeletedTableCFs {
056
057  private static final Logger LOG =
058    LoggerFactory.getLogger(TestReplicationStuckWithDeletedTableCFs.class);
059
060  private static Configuration conf1 = HBaseConfiguration.create();
061  private static Configuration conf2 = HBaseConfiguration.create();
062
063  protected static HBaseTestingUtil utility1;
064  protected static HBaseTestingUtil utility2;
065
066  private static Admin admin1;
067  private static Admin admin2;
068
069  private static final TableName TABLE = TableName.valueOf("normal-table");
070  private static final byte[] ROW = Bytes.toBytes("row");
071  private static final byte[] NORMAL_FAMILY = Bytes.toBytes("nf");
072  private static final byte[] DROPPED_FAMILY = Bytes.toBytes("df");
073  private static final byte[] QUALIFIER = Bytes.toBytes("q");
074  private static final byte[] VALUE = Bytes.toBytes("value");
075
076  private static final String PEER_ID = "1";
077  private static final long SLEEP_TIME = 1000;
078  private static final int NB_RETRIES = 10;
079
080  @BeforeAll
081  public static void setUpBeforeClass() throws Exception {
082    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
083    conf1.setInt("replication.source.nb.capacity", 1);
084    utility1 = new HBaseTestingUtil(conf1);
085    utility1.startMiniZKCluster();
086    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
087    conf1 = utility1.getConfiguration();
088
089    conf2 = HBaseConfiguration.create(conf1);
090    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
091    utility2 = new HBaseTestingUtil(conf2);
092    utility2.setZkCluster(miniZK);
093
094    utility1.startMiniCluster(1);
095    utility2.startMiniCluster(1);
096
097    admin1 = utility1.getAdmin();
098    admin2 = utility2.getAdmin();
099  }
100
101  @AfterAll
102  public static void tearDownAfterClass() throws Exception {
103    utility2.shutdownMiniCluster();
104    utility1.shutdownMiniCluster();
105  }
106
107  private void createTable(TableName tableName) throws Exception {
108    TableDescriptor desc = createTableDescriptor(DROPPED_FAMILY, NORMAL_FAMILY);
109    admin1.createTable(desc);
110    admin2.createTable(desc);
111    utility1.waitUntilAllRegionsAssigned(tableName);
112    utility2.waitUntilAllRegionsAssigned(tableName);
113  }
114
115  @Test
116  public void testEditsStuckBehindDeletedCFs() throws Exception {
117    // add peer
118    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
119      .setClusterKey(utility2.getRpcConnnectionURI()).setReplicateAllUserTables(true).build();
120    admin1.addReplicationPeer(PEER_ID, rpc);
121
122    // create table
123    createTable(TABLE);
124
125    admin1.disableReplicationPeer(PEER_ID);
126
127    try (Table droppedTable = utility1.getConnection().getTable(TABLE)) {
128      Put put = new Put(ROW);
129      put.addColumn(DROPPED_FAMILY, QUALIFIER, VALUE).addColumn(NORMAL_FAMILY, QUALIFIER, VALUE);
130      droppedTable.put(put);
131    }
132
133    // delete cf
134    TableDescriptor desc = createTableDescriptor(NORMAL_FAMILY);
135    admin1.modifyTable(desc);
136    admin2.modifyTable(desc);
137
138    admin1.enableReplicationPeer(PEER_ID);
139
140    verifyReplicationStuck();
141
142    // Remove peer
143    admin1.removeReplicationPeer(PEER_ID);
144    // Drop table
145    admin1.disableTable(TABLE);
146    admin1.deleteTable(TABLE);
147    admin2.disableTable(TABLE);
148    admin2.deleteTable(TABLE);
149  }
150
151  private void verifyReplicationStuck() throws Exception {
152    try (Table normalTable = utility1.getConnection().getTable(TABLE)) {
153      Put put = new Put(ROW);
154      put.addColumn(NORMAL_FAMILY, QUALIFIER, VALUE);
155      normalTable.put(put);
156    }
157    try (Table normalTable = utility2.getConnection().getTable(TABLE)) {
158      for (int i = 0; i < NB_RETRIES; i++) {
159        Result result = normalTable.get(new Get(ROW).addColumn(NORMAL_FAMILY, QUALIFIER));
160        if (result != null && !result.isEmpty()) {
161          fail("Edit should have been stuck behind dropped tables, but value is "
162            + Bytes.toString(result.getValue(NORMAL_FAMILY, QUALIFIER)));
163        } else {
164          LOG.info("Row not replicated, let's wait a bit more...");
165          Thread.sleep(SLEEP_TIME);
166        }
167      }
168    }
169  }
170
171  private TableDescriptor createTableDescriptor(byte[]... cfs) {
172    return TableDescriptorBuilder.newBuilder(TABLE).setColumnFamilies(Arrays.stream(cfs).map(
173      cf -> ColumnFamilyDescriptorBuilder.newBuilder(cf).setScope(REPLICATION_SCOPE_GLOBAL).build())
174      .collect(Collectors.toList())).build();
175  }
176}