001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
021import static org.junit.Assert.fail;
022
023import java.util.Arrays;
024import java.util.stream.Collectors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseConfiguration;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
033import org.apache.hadoop.hbase.client.Get;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.client.TableDescriptor;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
042import org.junit.AfterClass;
043import org.junit.BeforeClass;
044import org.junit.ClassRule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * Replication with dropped table will stuck as the default REPLICATION_DROP_ON_DELETED_TABLE_KEY is
052 * false.
053 */
054@Category({ LargeTests.class })
055public class TestReplicationStuckWithDeletedTableCFs {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestReplicationStuckWithDeletedTableCFs.class);
060
061  private static final Logger LOG =
062    LoggerFactory.getLogger(TestReplicationStuckWithDeletedTableCFs.class);
063
064  private static Configuration conf1 = HBaseConfiguration.create();
065  private static Configuration conf2 = HBaseConfiguration.create();
066
067  protected static HBaseTestingUtil utility1;
068  protected static HBaseTestingUtil utility2;
069
070  private static Admin admin1;
071  private static Admin admin2;
072
073  private static final TableName TABLE = TableName.valueOf("normal-table");
074  private static final byte[] ROW = Bytes.toBytes("row");
075  private static final byte[] NORMAL_FAMILY = Bytes.toBytes("nf");
076  private static final byte[] DROPPED_FAMILY = Bytes.toBytes("df");
077  private static final byte[] QUALIFIER = Bytes.toBytes("q");
078  private static final byte[] VALUE = Bytes.toBytes("value");
079
080  private static final String PEER_ID = "1";
081  private static final long SLEEP_TIME = 1000;
082  private static final int NB_RETRIES = 10;
083
084  @BeforeClass
085  public static void setUpBeforeClass() throws Exception {
086    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
087    conf1.setInt("replication.source.nb.capacity", 1);
088    utility1 = new HBaseTestingUtil(conf1);
089    utility1.startMiniZKCluster();
090    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
091    conf1 = utility1.getConfiguration();
092
093    conf2 = HBaseConfiguration.create(conf1);
094    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
095    utility2 = new HBaseTestingUtil(conf2);
096    utility2.setZkCluster(miniZK);
097
098    utility1.startMiniCluster(1);
099    utility2.startMiniCluster(1);
100
101    admin1 = utility1.getAdmin();
102    admin2 = utility2.getAdmin();
103  }
104
105  @AfterClass
106  public static void tearDownAfterClass() throws Exception {
107    utility2.shutdownMiniCluster();
108    utility1.shutdownMiniCluster();
109  }
110
111  private void createTable(TableName tableName) throws Exception {
112    TableDescriptor desc = createTableDescriptor(DROPPED_FAMILY, NORMAL_FAMILY);
113    admin1.createTable(desc);
114    admin2.createTable(desc);
115    utility1.waitUntilAllRegionsAssigned(tableName);
116    utility2.waitUntilAllRegionsAssigned(tableName);
117  }
118
119  @Test
120  public void testEditsStuckBehindDeletedCFs() throws Exception {
121    // add peer
122    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
123      .setClusterKey(utility2.getClusterKey()).setReplicateAllUserTables(true).build();
124    admin1.addReplicationPeer(PEER_ID, rpc);
125
126    // create table
127    createTable(TABLE);
128
129    admin1.disableReplicationPeer(PEER_ID);
130
131    try (Table droppedTable = utility1.getConnection().getTable(TABLE)) {
132      Put put = new Put(ROW);
133      put.addColumn(DROPPED_FAMILY, QUALIFIER, VALUE).addColumn(NORMAL_FAMILY, QUALIFIER, VALUE);
134      droppedTable.put(put);
135    }
136
137    // delete cf
138    TableDescriptor desc = createTableDescriptor(NORMAL_FAMILY);
139    admin1.modifyTable(desc);
140    admin2.modifyTable(desc);
141
142    admin1.enableReplicationPeer(PEER_ID);
143
144    verifyReplicationStuck();
145
146    // Remove peer
147    admin1.removeReplicationPeer(PEER_ID);
148    // Drop table
149    admin1.disableTable(TABLE);
150    admin1.deleteTable(TABLE);
151    admin2.disableTable(TABLE);
152    admin2.deleteTable(TABLE);
153  }
154
155  private void verifyReplicationStuck() throws Exception {
156    try (Table normalTable = utility1.getConnection().getTable(TABLE)) {
157      Put put = new Put(ROW);
158      put.addColumn(NORMAL_FAMILY, QUALIFIER, VALUE);
159      normalTable.put(put);
160    }
161    try (Table normalTable = utility2.getConnection().getTable(TABLE)) {
162      for (int i = 0; i < NB_RETRIES; i++) {
163        Result result = normalTable.get(new Get(ROW).addColumn(NORMAL_FAMILY, QUALIFIER));
164        if (result != null && !result.isEmpty()) {
165          fail("Edit should have been stuck behind dropped tables, but value is "
166            + Bytes.toString(result.getValue(NORMAL_FAMILY, QUALIFIER)));
167        } else {
168          LOG.info("Row not replicated, let's wait a bit more...");
169          Thread.sleep(SLEEP_TIME);
170        }
171      }
172    }
173  }
174
175  private TableDescriptor createTableDescriptor(byte[]... cfs) {
176    return TableDescriptorBuilder.newBuilder(TABLE).setColumnFamilies(Arrays.stream(cfs).map(
177      cf -> ColumnFamilyDescriptorBuilder.newBuilder(cf).setScope(REPLICATION_SCOPE_GLOBAL).build())
178      .collect(Collectors.toList())).build();
179  }
180}