001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
021import static org.junit.Assert.fail;
022
023import java.util.Arrays;
024import java.util.stream.Collectors;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Get;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Result;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.client.TableDescriptor;
039import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
043import org.junit.AfterClass;
044import org.junit.BeforeClass;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * Replication with dropped table will stuck as the default REPLICATION_DROP_ON_DELETED_TABLE_KEY
053 * is false.
054 */
055@Category({ LargeTests.class })
056public class TestReplicationStuckWithDeletedTableCFs {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060      HBaseClassTestRule.forClass(TestReplicationStuckWithDeletedTableCFs.class);
061
062  private static final Logger LOG =
063      LoggerFactory.getLogger(TestReplicationStuckWithDeletedTableCFs.class);
064
065  private static Configuration conf1 = HBaseConfiguration.create();
066  private static Configuration conf2 = HBaseConfiguration.create();
067
068  protected static HBaseTestingUtility utility1;
069  protected static HBaseTestingUtility utility2;
070
071  private static Admin admin1;
072  private static Admin admin2;
073
074  private static final TableName TABLE = TableName.valueOf("normal-table");
075  private static final byte[] ROW = Bytes.toBytes("row");
076  private static final byte[] NORMAL_FAMILY = Bytes.toBytes("nf");
077  private static final byte[] DROPPED_FAMILY = Bytes.toBytes("df");
078  private static final byte[] QUALIFIER = Bytes.toBytes("q");
079  private static final byte[] VALUE = Bytes.toBytes("value");
080
081  private static final String PEER_ID = "1";
082  private static final long SLEEP_TIME = 1000;
083  private static final int NB_RETRIES = 10;
084
085  @BeforeClass
086  public static void setUpBeforeClass() throws Exception {
087    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
088    conf1.setInt("replication.source.nb.capacity", 1);
089    utility1 = new HBaseTestingUtility(conf1);
090    utility1.startMiniZKCluster();
091    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
092    conf1 = utility1.getConfiguration();
093
094    conf2 = HBaseConfiguration.create(conf1);
095    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
096    utility2 = new HBaseTestingUtility(conf2);
097    utility2.setZkCluster(miniZK);
098
099    utility1.startMiniCluster(1);
100    utility2.startMiniCluster(1);
101
102    admin1 = utility1.getAdmin();
103    admin2 = utility2.getAdmin();
104  }
105
106  @AfterClass
107  public static void tearDownAfterClass() throws Exception {
108    utility2.shutdownMiniCluster();
109    utility1.shutdownMiniCluster();
110  }
111
112  private void createTable(TableName tableName) throws Exception {
113    TableDescriptor desc = createTableDescriptor(DROPPED_FAMILY, NORMAL_FAMILY);
114    admin1.createTable(desc);
115    admin2.createTable(desc);
116    utility1.waitUntilAllRegionsAssigned(tableName);
117    utility2.waitUntilAllRegionsAssigned(tableName);
118  }
119
120  @Test
121  public void testEditsStuckBehindDeletedCFs() throws Exception {
122    // add peer
123    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
124        .setClusterKey(utility2.getClusterKey()).setReplicateAllUserTables(true).build();
125    admin1.addReplicationPeer(PEER_ID, rpc);
126
127    // create table
128    createTable(TABLE);
129
130    admin1.disableReplicationPeer(PEER_ID);
131
132    try (Table droppedTable = utility1.getConnection().getTable(TABLE)) {
133      Put put = new Put(ROW);
134      put.addColumn(DROPPED_FAMILY, QUALIFIER, VALUE).addColumn(NORMAL_FAMILY, QUALIFIER, VALUE);
135      droppedTable.put(put);
136    }
137
138    // delete cf
139    TableDescriptor desc = createTableDescriptor(NORMAL_FAMILY);
140    admin1.modifyTable(desc);
141    admin2.modifyTable(desc);
142
143    admin1.enableReplicationPeer(PEER_ID);
144
145    verifyReplicationStuck();
146
147    // Remove peer
148    admin1.removeReplicationPeer(PEER_ID);
149    // Drop table
150    admin1.disableTable(TABLE);
151    admin1.deleteTable(TABLE);
152    admin2.disableTable(TABLE);
153    admin2.deleteTable(TABLE);
154  }
155
156  private void verifyReplicationStuck() throws Exception {
157    try (Table normalTable = utility1.getConnection().getTable(TABLE)) {
158      Put put = new Put(ROW);
159      put.addColumn(NORMAL_FAMILY, QUALIFIER, VALUE);
160      normalTable.put(put);
161    }
162    try (Table normalTable = utility2.getConnection().getTable(TABLE)) {
163      for (int i = 0; i < NB_RETRIES; i++) {
164        Result result = normalTable.get(new Get(ROW).addColumn(NORMAL_FAMILY, QUALIFIER));
165        if (result != null && !result.isEmpty()) {
166          fail("Edit should have been stuck behind dropped tables, but value is " + Bytes
167              .toString(result.getValue(NORMAL_FAMILY, QUALIFIER)));
168        } else {
169          LOG.info("Row not replicated, let's wait a bit more...");
170          Thread.sleep(SLEEP_TIME);
171        }
172      }
173    }
174  }
175
176  private TableDescriptor createTableDescriptor(byte[]... cfs) {
177    return TableDescriptorBuilder.newBuilder(TABLE)
178        .setColumnFamilies(Arrays.stream(cfs).map(cf ->
179            ColumnFamilyDescriptorBuilder.newBuilder(cf).setScope(REPLICATION_SCOPE_GLOBAL).build())
180            .collect(Collectors.toList())
181        ).build();
182  }
183}