001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.fail;
021
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.HBaseConfiguration;
024import org.apache.hadoop.hbase.HBaseTestingUtil;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.NamespaceDescriptor;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.Waiter.Predicate;
029import org.apache.hadoop.hbase.client.Admin;
030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
031import org.apache.hadoop.hbase.client.Get;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.client.TableDescriptor;
036import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
037import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.ReplicationTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.JVMClusterUtil;
042import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
043import org.junit.jupiter.api.AfterAll;
044import org.junit.jupiter.api.AfterEach;
045import org.junit.jupiter.api.BeforeAll;
046import org.junit.jupiter.api.BeforeEach;
047import org.junit.jupiter.api.Tag;
048import org.junit.jupiter.api.Test;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052@Tag(ReplicationTests.TAG)
053@Tag(LargeTests.TAG)
054public class TestReplicationEditsDroppedWithDroppedTable {
055
056  private static final Logger LOG =
057    LoggerFactory.getLogger(TestReplicationEditsDroppedWithDroppedTable.class);
058
059  private static Configuration conf1 = HBaseConfiguration.create();
060  private static Configuration conf2 = HBaseConfiguration.create();
061
062  protected static HBaseTestingUtil utility1;
063  protected static HBaseTestingUtil utility2;
064
065  private static Admin admin1;
066  private static Admin admin2;
067
068  private static final String namespace = "NS";
069  private static final TableName NORMAL_TABLE = TableName.valueOf("normal-table");
070  private static final TableName DROPPED_TABLE = TableName.valueOf("dropped-table");
071  private static final TableName DROPPED_NS_TABLE = TableName.valueOf("NS:dropped-table");
072  private static final byte[] ROW = Bytes.toBytes("row");
073  private static final byte[] FAMILY = Bytes.toBytes("f");
074  private static final byte[] QUALIFIER = Bytes.toBytes("q");
075  private static final byte[] VALUE = Bytes.toBytes("value");
076
077  private static final String PEER_ID = "1";
078  private static final long SLEEP_TIME = 1000;
079  private static final int NB_RETRIES = 10;
080
081  @BeforeAll
082  public static void setUpBeforeClass() throws Exception {
083    // Set true to filter replication edits for dropped table
084    conf1.setBoolean(HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_TABLE_KEY,
085      true);
086    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
087    conf1.setInt("replication.source.nb.capacity", 1);
088    utility1 = new HBaseTestingUtil(conf1);
089    utility1.startMiniZKCluster();
090    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
091    conf1 = utility1.getConfiguration();
092
093    conf2 = HBaseConfiguration.create(conf1);
094    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
095    utility2 = new HBaseTestingUtil(conf2);
096    utility2.setZkCluster(miniZK);
097
098    utility1.startMiniCluster(1);
099    utility2.startMiniCluster(1);
100
101    admin1 = utility1.getAdmin();
102    admin2 = utility2.getAdmin();
103
104    NamespaceDescriptor nsDesc = NamespaceDescriptor.create(namespace).build();
105    admin1.createNamespace(nsDesc);
106    admin2.createNamespace(nsDesc);
107  }
108
109  @AfterAll
110  public static void tearDownAfterClass() throws Exception {
111    utility2.shutdownMiniCluster();
112    utility1.shutdownMiniCluster();
113  }
114
115  @BeforeEach
116  public void setup() throws Exception {
117    // Roll log
118    for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
119      .getRegionServerThreads()) {
120      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
121    }
122    // add peer
123    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
124      .setClusterKey(utility2.getRpcConnnectionURI()).setReplicateAllUserTables(true).build();
125    admin1.addReplicationPeer(PEER_ID, rpc);
126    // create table
127    createTable(NORMAL_TABLE);
128  }
129
130  @AfterEach
131  public void tearDown() throws Exception {
132    // Remove peer
133    admin1.removeReplicationPeer(PEER_ID);
134    // Drop table
135    admin1.disableTable(NORMAL_TABLE);
136    admin1.deleteTable(NORMAL_TABLE);
137    admin2.disableTable(NORMAL_TABLE);
138    admin2.deleteTable(NORMAL_TABLE);
139  }
140
141  private void createTable(TableName tableName) throws Exception {
142    TableDescriptor desc =
143      TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
144        .newBuilder(FAMILY).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
145    admin1.createTable(desc);
146    admin2.createTable(desc);
147    utility1.waitUntilAllRegionsAssigned(tableName);
148    utility2.waitUntilAllRegionsAssigned(tableName);
149  }
150
151  @Test
152  public void testEditsDroppedWithDroppedTable() throws Exception {
153    testWithDroppedTable(DROPPED_TABLE);
154  }
155
156  @Test
157  public void testEditsDroppedWithDroppedTableNS() throws Exception {
158    testWithDroppedTable(DROPPED_NS_TABLE);
159  }
160
161  private void testWithDroppedTable(TableName droppedTableName) throws Exception {
162    createTable(droppedTableName);
163    admin1.disableReplicationPeer(PEER_ID);
164
165    try (Table droppedTable = utility1.getConnection().getTable(droppedTableName)) {
166      Put put = new Put(ROW);
167      put.addColumn(FAMILY, QUALIFIER, VALUE);
168      droppedTable.put(put);
169    }
170
171    admin1.disableTable(droppedTableName);
172    admin1.deleteTable(droppedTableName);
173    admin2.disableTable(droppedTableName);
174    admin2.deleteTable(droppedTableName);
175
176    admin1.enableReplicationPeer(PEER_ID);
177
178    verifyReplicationProceeded();
179  }
180
181  @Test
182  public void testEditsBehindDroppedTableTiming() throws Exception {
183    createTable(DROPPED_TABLE);
184    admin1.disableReplicationPeer(PEER_ID);
185
186    try (Table droppedTable = utility1.getConnection().getTable(DROPPED_TABLE)) {
187      Put put = new Put(ROW);
188      put.addColumn(FAMILY, QUALIFIER, VALUE);
189      droppedTable.put(put);
190    }
191
192    // Only delete table from peer cluster
193    admin2.disableTable(DROPPED_TABLE);
194    admin2.deleteTable(DROPPED_TABLE);
195
196    admin1.enableReplicationPeer(PEER_ID);
197
198    // the source table still exists, replication should be stalled
199    verifyReplicationStuck();
200    admin1.disableTable(DROPPED_TABLE);
201    // still stuck, source table still exists
202    verifyReplicationStuck();
203    admin1.deleteTable(DROPPED_TABLE);
204    // now the source table is gone, replication should proceed, the
205    // offending edits be dropped
206    verifyReplicationProceeded();
207  }
208
209  private void verifyReplicationProceeded() throws Exception {
210    try (Table normalTable = utility1.getConnection().getTable(NORMAL_TABLE)) {
211      Put put = new Put(ROW);
212      put.addColumn(FAMILY, QUALIFIER, VALUE);
213      normalTable.put(put);
214    }
215    utility2.waitFor(NB_RETRIES * SLEEP_TIME, (Predicate<Exception>) () -> {
216      try (Table normalTable = utility2.getConnection().getTable(NORMAL_TABLE)) {
217        Result result = normalTable.get(new Get(ROW).addColumn(FAMILY, QUALIFIER));
218        return result != null && !result.isEmpty()
219          && Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER));
220      }
221    });
222  }
223
224  private void verifyReplicationStuck() throws Exception {
225    try (Table normalTable = utility1.getConnection().getTable(NORMAL_TABLE)) {
226      Put put = new Put(ROW);
227      put.addColumn(FAMILY, QUALIFIER, VALUE);
228      normalTable.put(put);
229    }
230    try (Table normalTable = utility2.getConnection().getTable(NORMAL_TABLE)) {
231      for (int i = 0; i < NB_RETRIES; i++) {
232        Result result = normalTable.get(new Get(ROW).addColumn(FAMILY, QUALIFIER));
233        if (result != null && !result.isEmpty()) {
234          fail("Edit should have been stuck behind dropped tables, but value is "
235            + Bytes.toString(result.getValue(FAMILY, QUALIFIER)));
236        } else {
237          LOG.info("Row not replicated, let's wait a bit more...");
238          Thread.sleep(SLEEP_TIME);
239        }
240      }
241    }
242  }
243}