001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.fail;
021
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.HBaseConfiguration;
025import org.apache.hadoop.hbase.HBaseTestingUtility;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.NamespaceDescriptor;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.Waiter.Predicate;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
032import org.apache.hadoop.hbase.client.Get;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.client.TableDescriptor;
037import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
038import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.JVMClusterUtil;
042import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
043import org.junit.After;
044import org.junit.AfterClass;
045import org.junit.Before;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053@Category({ LargeTests.class })
054public class TestReplicationEditsDroppedWithDroppedTable {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058      HBaseClassTestRule.forClass(TestReplicationEditsDroppedWithDroppedTable.class);
059
060  private static final Logger LOG =
061      LoggerFactory.getLogger(TestReplicationEditsDroppedWithDroppedTable.class);
062
063  private static Configuration conf1 = HBaseConfiguration.create();
064  private static Configuration conf2 = HBaseConfiguration.create();
065
066  protected static HBaseTestingUtility utility1;
067  protected static HBaseTestingUtility utility2;
068
069  private static Admin admin1;
070  private static Admin admin2;
071
072  private static final String namespace = "NS";
073  private static final TableName NORMAL_TABLE = TableName.valueOf("normal-table");
074  private static final TableName DROPPED_TABLE = TableName.valueOf("dropped-table");
075  private static final TableName DROPPED_NS_TABLE = TableName.valueOf("NS:dropped-table");
076  private static final byte[] ROW = Bytes.toBytes("row");
077  private static final byte[] FAMILY = Bytes.toBytes("f");
078  private static final byte[] QUALIFIER = Bytes.toBytes("q");
079  private static final byte[] VALUE = Bytes.toBytes("value");
080
081  private static final String PEER_ID = "1";
082  private static final long SLEEP_TIME = 1000;
083  private static final int NB_RETRIES = 10;
084
085  @BeforeClass
086  public static void setUpBeforeClass() throws Exception {
087    // Set true to filter replication edits for dropped table
088    conf1.setBoolean(HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_TABLE_KEY,
089        true);
090    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
091    conf1.setInt("replication.source.nb.capacity", 1);
092    utility1 = new HBaseTestingUtility(conf1);
093    utility1.startMiniZKCluster();
094    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
095    conf1 = utility1.getConfiguration();
096
097    conf2 = HBaseConfiguration.create(conf1);
098    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
099    utility2 = new HBaseTestingUtility(conf2);
100    utility2.setZkCluster(miniZK);
101
102    utility1.startMiniCluster(1);
103    utility2.startMiniCluster(1);
104
105    admin1 = utility1.getAdmin();
106    admin2 = utility2.getAdmin();
107
108    NamespaceDescriptor nsDesc = NamespaceDescriptor.create(namespace).build();
109    admin1.createNamespace(nsDesc);
110    admin2.createNamespace(nsDesc);
111  }
112
113  @AfterClass
114  public static void tearDownAfterClass() throws Exception {
115    utility2.shutdownMiniCluster();
116    utility1.shutdownMiniCluster();
117  }
118
119  @Before
120  public void setup() throws Exception {
121    // Roll log
122    for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
123        .getRegionServerThreads()) {
124      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
125    }
126    // add peer
127    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
128        .setClusterKey(utility2.getClusterKey())
129        .setReplicateAllUserTables(true).build();
130    admin1.addReplicationPeer(PEER_ID, rpc);
131    // create table
132    createTable(NORMAL_TABLE);
133  }
134
135  @After
136  public void tearDown() throws Exception {
137    // Remove peer
138    admin1.removeReplicationPeer(PEER_ID);
139    // Drop table
140    admin1.disableTable(NORMAL_TABLE);
141    admin1.deleteTable(NORMAL_TABLE);
142    admin2.disableTable(NORMAL_TABLE);
143    admin2.deleteTable(NORMAL_TABLE);
144  }
145
146  private void createTable(TableName tableName) throws Exception {
147    TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(
148        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY)
149            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()
150    ).build();
151    admin1.createTable(desc);
152    admin2.createTable(desc);
153    utility1.waitUntilAllRegionsAssigned(tableName);
154    utility2.waitUntilAllRegionsAssigned(tableName);
155  }
156
157  @Test
158  public void testEditsDroppedWithDroppedTable() throws Exception {
159    testWithDroppedTable(DROPPED_TABLE);
160  }
161
162  @Test
163  public void testEditsDroppedWithDroppedTableNS() throws Exception {
164    testWithDroppedTable(DROPPED_NS_TABLE);
165  }
166
167  private void testWithDroppedTable(TableName droppedTableName) throws Exception {
168    createTable(droppedTableName);
169    admin1.disableReplicationPeer(PEER_ID);
170
171    try (Table droppedTable = utility1.getConnection().getTable(droppedTableName)) {
172      Put put = new Put(ROW);
173      put.addColumn(FAMILY, QUALIFIER, VALUE);
174      droppedTable.put(put);
175    }
176
177    admin1.disableTable(droppedTableName);
178    admin1.deleteTable(droppedTableName);
179    admin2.disableTable(droppedTableName);
180    admin2.deleteTable(droppedTableName);
181
182    admin1.enableReplicationPeer(PEER_ID);
183
184    verifyReplicationProceeded();
185  }
186
187  @Test
188  public void testEditsBehindDroppedTableTiming() throws Exception {
189    createTable(DROPPED_TABLE);
190    admin1.disableReplicationPeer(PEER_ID);
191
192    try (Table droppedTable = utility1.getConnection().getTable(DROPPED_TABLE)) {
193      Put put = new Put(ROW);
194      put.addColumn(FAMILY, QUALIFIER, VALUE);
195      droppedTable.put(put);
196    }
197
198    // Only delete table from peer cluster
199    admin2.disableTable(DROPPED_TABLE);
200    admin2.deleteTable(DROPPED_TABLE);
201
202    admin1.enableReplicationPeer(PEER_ID);
203
204    // the source table still exists, replication should be stalled
205    verifyReplicationStuck();
206    admin1.disableTable(DROPPED_TABLE);
207    // still stuck, source table still exists
208    verifyReplicationStuck();
209    admin1.deleteTable(DROPPED_TABLE);
210    // now the source table is gone, replication should proceed, the
211    // offending edits be dropped
212    verifyReplicationProceeded();
213  }
214
215  private void verifyReplicationProceeded() throws Exception {
216    try (Table normalTable = utility1.getConnection().getTable(NORMAL_TABLE)) {
217      Put put = new Put(ROW);
218      put.addColumn(FAMILY, QUALIFIER, VALUE);
219      normalTable.put(put);
220    }
221    utility2.waitFor(NB_RETRIES * SLEEP_TIME, (Predicate<Exception>) () -> {
222      try (Table normalTable = utility2.getConnection().getTable(NORMAL_TABLE)) {
223        Result result = normalTable.get(new Get(ROW).addColumn(FAMILY, QUALIFIER));
224        return result != null && !result.isEmpty()
225            && Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER));
226      }
227    });
228  }
229
230  private void verifyReplicationStuck() throws Exception {
231    try (Table normalTable = utility1.getConnection().getTable(NORMAL_TABLE)) {
232      Put put = new Put(ROW);
233      put.addColumn(FAMILY, QUALIFIER, VALUE);
234      normalTable.put(put);
235    }
236    try (Table normalTable = utility2.getConnection().getTable(NORMAL_TABLE)) {
237      for (int i = 0; i < NB_RETRIES; i++) {
238        Result result = normalTable.get(new Get(ROW).addColumn(FAMILY, QUALIFIER));
239        if (result != null && !result.isEmpty()) {
240          fail("Edit should have been stuck behind dropped tables, but value is " + Bytes
241              .toString(result.getValue(FAMILY, QUALIFIER)));
242        } else {
243          LOG.info("Row not replicated, let's wait a bit more...");
244          Thread.sleep(SLEEP_TIME);
245        }
246      }
247    }
248  }
249}