001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.fail;
021
022import java.io.IOException;
023
024import org.apache.hadoop.hbase.HBaseClassTestRule;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.NamespaceDescriptor;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Admin;
029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.ConnectionFactory;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.ResultScanner;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.client.TableDescriptor;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.ipc.RpcServer;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.apache.hadoop.hbase.testclassification.ReplicationTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.JVMClusterUtil;
044import org.junit.Assert;
045import org.junit.Before;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052@Category({ ReplicationTests.class, LargeTests.class })
053public class TestReplicationDroppedTables extends TestReplicationBase {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057      HBaseClassTestRule.forClass(TestReplicationDroppedTables.class);
058
059  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class);
060  private static final int ROWS_COUNT = 1000;
061
062  @Before
063  public void setUpBase() throws Exception {
064    // Starting and stopping replication can make us miss new logs,
065    // rolling like this makes sure the most recent one gets added to the queue
066    for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
067        .getRegionServerThreads()) {
068      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
069    }
070    // Initialize the peer after wal rolling, so that we will abandon the stuck WALs.
071    super.setUpBase();
072    int rowCount = utility1.countRows(tableName);
073    utility1.deleteTableData(tableName);
074    // truncating the table will send one Delete per row to the slave cluster
075    // in an async fashion, which is why we cannot just call deleteTableData on
076    // utility2 since late writes could make it to the slave in some way.
077    // Instead, we truncate the first table and wait for all the Deletes to
078    // make it to the slave.
079    Scan scan = new Scan();
080    int lastCount = 0;
081    for (int i = 0; i < NB_RETRIES; i++) {
082      if (i == NB_RETRIES - 1) {
083        fail("Waited too much time for truncate");
084      }
085      ResultScanner scanner = htable2.getScanner(scan);
086      Result[] res = scanner.next(rowCount);
087      scanner.close();
088      if (res.length != 0) {
089        if (res.length < lastCount) {
090          i--; // Don't increment timeout if we make progress
091        }
092        lastCount = res.length;
093        LOG.info("Still got " + res.length + " rows");
094        Thread.sleep(SLEEP_TIME);
095      } else {
096        break;
097      }
098    }
099    // Set the max request size to a tiny 10K for dividing the replication WAL entries into multiple
100    // batches. the default max request size is 256M, so all replication entries are in a batch, but
101    // when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table
102    // may apply first, and then test_dropped table, and we will believe that the replication is not
103    // got stuck (HBASE-20475).
104    conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024);
105  }
106
107  @Test
108  public void testEditsStuckBehindDroppedTable() throws Exception {
109    // Sanity check Make sure by default edits for dropped tables stall the replication queue, even
110    // when the table(s) in question have been deleted on both ends.
111    testEditsBehindDroppedTable(false, "test_dropped");
112  }
113
114  @Test
115  public void testEditsDroppedWithDroppedTable() throws Exception {
116    // Make sure by default edits for dropped tables are themselves dropped when the
117    // table(s) in question have been deleted on both ends.
118    testEditsBehindDroppedTable(true, "test_dropped");
119  }
120
121  @Test
122  public void testEditsDroppedWithDroppedTableNS() throws Exception {
123    // also try with a namespace
124    Connection connection1 = ConnectionFactory.createConnection(conf1);
125    try (Admin admin1 = connection1.getAdmin()) {
126      admin1.createNamespace(NamespaceDescriptor.create("NS").build());
127    }
128    Connection connection2 = ConnectionFactory.createConnection(conf2);
129    try (Admin admin2 = connection2.getAdmin()) {
130      admin2.createNamespace(NamespaceDescriptor.create("NS").build());
131    }
132    testEditsBehindDroppedTable(true, "NS:test_dropped");
133    try (Admin admin1 = connection1.getAdmin()) {
134      admin1.deleteNamespace("NS");
135    }
136    try (Admin admin2 = connection2.getAdmin()) {
137      admin2.deleteNamespace("NS");
138    }
139  }
140
141  private byte[] generateRowKey(int id) {
142    return Bytes.toBytes(String.format("NormalPut%03d", id));
143  }
144
145  private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception {
146    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
147    conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
148
149    // make sure we have a single region server only, so that all
150    // edits for all tables go there
151    utility1.shutdownMiniHBaseCluster();
152    utility1.startMiniHBaseCluster();
153
154    TableName tablename = TableName.valueOf(tName);
155    byte[] familyName = Bytes.toBytes("fam");
156    byte[] row = Bytes.toBytes("row");
157
158    TableDescriptor table =
159        TableDescriptorBuilder
160            .newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder
161                .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
162            .build();
163
164    Connection connection1 = ConnectionFactory.createConnection(conf1);
165    Connection connection2 = ConnectionFactory.createConnection(conf2);
166    try (Admin admin1 = connection1.getAdmin()) {
167      admin1.createTable(table);
168    }
169    try (Admin admin2 = connection2.getAdmin()) {
170      admin2.createTable(table);
171    }
172    utility1.waitUntilAllRegionsAssigned(tablename);
173    utility2.waitUntilAllRegionsAssigned(tablename);
174
175    // now suspend replication
176    try (Admin admin1 = connection1.getAdmin()) {
177      admin1.disableReplicationPeer(PEER_ID2);
178    }
179
180    // put some data (lead with 0 so the edit gets sorted before the other table's edits
181    // in the replication batch) write a bunch of edits, making sure we fill a batch
182    try (Table droppedTable = connection1.getTable(tablename)) {
183      byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
184      Put put = new Put(rowKey);
185      put.addColumn(familyName, row, row);
186      droppedTable.put(put);
187    }
188
189    try (Table table1 = connection1.getTable(tableName)) {
190      for (int i = 0; i < ROWS_COUNT; i++) {
191        Put put = new Put(generateRowKey(i)).addColumn(famName, row, row);
192        table1.put(put);
193      }
194    }
195
196    try (Admin admin1 = connection1.getAdmin()) {
197      admin1.disableTable(tablename);
198      admin1.deleteTable(tablename);
199    }
200    try (Admin admin2 = connection2.getAdmin()) {
201      admin2.disableTable(tablename);
202      admin2.deleteTable(tablename);
203    }
204
205    try (Admin admin1 = connection1.getAdmin()) {
206      admin1.enableReplicationPeer(PEER_ID2);
207    }
208
209    if (allowProceeding) {
210      // in this we'd expect the key to make it over
211      verifyReplicationProceeded();
212    } else {
213      verifyReplicationStuck();
214    }
215    // just to be safe
216    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
217  }
218
219  @Test
220  public void testEditsBehindDroppedTableTiming() throws Exception {
221    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
222    conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
223
224    // make sure we have a single region server only, so that all
225    // edits for all tables go there
226    utility1.shutdownMiniHBaseCluster();
227    utility1.startMiniHBaseCluster();
228
229    TableName tablename = TableName.valueOf("testdroppedtimed");
230    byte[] familyName = Bytes.toBytes("fam");
231    byte[] row = Bytes.toBytes("row");
232
233    TableDescriptor table =
234        TableDescriptorBuilder
235            .newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder
236                .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
237            .build();
238
239    Connection connection1 = ConnectionFactory.createConnection(conf1);
240    Connection connection2 = ConnectionFactory.createConnection(conf2);
241    try (Admin admin1 = connection1.getAdmin()) {
242      admin1.createTable(table);
243    }
244    try (Admin admin2 = connection2.getAdmin()) {
245      admin2.createTable(table);
246    }
247    utility1.waitUntilAllRegionsAssigned(tablename);
248    utility2.waitUntilAllRegionsAssigned(tablename);
249
250    // now suspend replication
251    try (Admin admin1 = connection1.getAdmin()) {
252      admin1.disableReplicationPeer(PEER_ID2);
253    }
254
255    // put some data (lead with 0 so the edit gets sorted before the other table's edits
256    // in the replication batch) write a bunch of edits, making sure we fill a batch
257    try (Table droppedTable = connection1.getTable(tablename)) {
258      byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
259      Put put = new Put(rowKey);
260      put.addColumn(familyName, row, row);
261      droppedTable.put(put);
262    }
263
264    try (Table table1 = connection1.getTable(tableName)) {
265      for (int i = 0; i < ROWS_COUNT; i++) {
266        Put put = new Put(generateRowKey(i)).addColumn(famName, row, row);
267        table1.put(put);
268      }
269    }
270
271    try (Admin admin2 = connection2.getAdmin()) {
272      admin2.disableTable(tablename);
273      admin2.deleteTable(tablename);
274    }
275
276    // edit should still be stuck
277    try (Admin admin1 = connection1.getAdmin()) {
278      // enable the replication peer.
279      admin1.enableReplicationPeer(PEER_ID2);
280      // the source table still exists, replication should be stalled
281      verifyReplicationStuck();
282
283      admin1.disableTable(tablename);
284      // still stuck, source table still exists
285      verifyReplicationStuck();
286
287      admin1.deleteTable(tablename);
288      // now the source table is gone, replication should proceed, the
289      // offending edits be dropped
290      verifyReplicationProceeded();
291    }
292    // just to be safe
293    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
294  }
295
296  private boolean peerHasAllNormalRows() throws IOException {
297    try (ResultScanner scanner = htable2.getScanner(new Scan())) {
298      Result[] results = scanner.next(ROWS_COUNT);
299      if (results.length != ROWS_COUNT) {
300        return false;
301      }
302      for (int i = 0; i < results.length; i++) {
303        Assert.assertArrayEquals(generateRowKey(i), results[i].getRow());
304      }
305      return true;
306    }
307  }
308
309  private void verifyReplicationProceeded() throws Exception {
310    for (int i = 0; i < NB_RETRIES; i++) {
311      if (i == NB_RETRIES - 1) {
312        fail("Waited too much time for put replication");
313      }
314      if (!peerHasAllNormalRows()) {
315        LOG.info("Row not available");
316        Thread.sleep(SLEEP_TIME);
317      } else {
318        break;
319      }
320    }
321  }
322
323  private void verifyReplicationStuck() throws Exception {
324    for (int i = 0; i < NB_RETRIES; i++) {
325      if (peerHasAllNormalRows()) {
326        fail("Edit should have been stuck behind dropped tables");
327      } else {
328        LOG.info("Row not replicated, let's wait a bit more...");
329        Thread.sleep(SLEEP_TIME);
330      }
331    }
332  }
333}