001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_TABLE_KEY;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import org.apache.hadoop.hbase.HBaseClassTestRule;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.NamespaceDescriptor;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Admin;
029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.ConnectionFactory;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.ResultScanner;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.client.TableDescriptor;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.ipc.RpcServer;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.apache.hadoop.hbase.testclassification.ReplicationTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.JVMClusterUtil;
044import org.junit.Assert;
045import org.junit.Before;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052@Category({ ReplicationTests.class, LargeTests.class })
053public class TestReplicationDroppedTables extends TestReplicationBase {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestReplicationDroppedTables.class);
058
059  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class);
060  private static final int ROWS_COUNT = 1000;
061
062  @Before
063  public void setUpBase() throws Exception {
064    // Starting and stopping replication can make us miss new logs,
065    // rolling like this makes sure the most recent one gets added to the queue
066    for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) {
067      UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
068    }
069    // Initialize the peer after wal rolling, so that we will abandon the stuck WALs.
070    super.setUpBase();
071    int rowCount = UTIL1.countRows(tableName);
072    UTIL1.deleteTableData(tableName);
073    // truncating the table will send one Delete per row to the slave cluster
074    // in an async fashion, which is why we cannot just call deleteTableData on
075    // utility2 since late writes could make it to the slave in some way.
076    // Instead, we truncate the first table and wait for all the Deletes to
077    // make it to the slave.
078    Scan scan = new Scan();
079    int lastCount = 0;
080    for (int i = 0; i < NB_RETRIES; i++) {
081      if (i == NB_RETRIES - 1) {
082        fail("Waited too much time for truncate");
083      }
084      ResultScanner scanner = htable2.getScanner(scan);
085      Result[] res = scanner.next(rowCount);
086      scanner.close();
087      if (res.length != 0) {
088        if (res.length < lastCount) {
089          i--; // Don't increment timeout if we make progress
090        }
091        lastCount = res.length;
092        LOG.info("Still got " + res.length + " rows");
093        Thread.sleep(SLEEP_TIME);
094      } else {
095        break;
096      }
097    }
098    // Set the max request size to a tiny 10K for dividing the replication WAL entries into multiple
099    // batches. the default max request size is 256M, so all replication entries are in a batch, but
100    // when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table
101    // may apply first, and then test_dropped table, and we will believe that the replication is not
102    // got stuck (HBASE-20475).
103    CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024);
104  }
105
106  @Test
107  public void testEditsStuckBehindDroppedTable() throws Exception {
108    // Sanity check Make sure by default edits for dropped tables stall the replication queue, even
109    // when the table(s) in question have been deleted on both ends.
110    testEditsBehindDroppedTable(false, "test_dropped");
111  }
112
113  @Test
114  public void testEditsDroppedWithDroppedTable() throws Exception {
115    // Make sure by default edits for dropped tables are themselves dropped when the
116    // table(s) in question have been deleted on both ends.
117    testEditsBehindDroppedTable(true, "test_dropped");
118  }
119
120  @Test
121  public void testEditsDroppedWithDroppedTableNS() throws Exception {
122    // also try with a namespace
123    UTIL1.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build());
124    UTIL2.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build());
125    try {
126      testEditsBehindDroppedTable(true, "NS:test_dropped");
127    } finally {
128      UTIL1.getAdmin().deleteNamespace("NS");
129      UTIL2.getAdmin().deleteNamespace("NS");
130    }
131  }
132
133  private byte[] generateRowKey(int id) {
134    return Bytes.toBytes(String.format("NormalPut%03d", id));
135  }
136
137  private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception {
138    CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
139    CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
140
141    // make sure we have a single region server only, so that all
142    // edits for all tables go there
143    restartSourceCluster(1);
144
145    TableName tablename = TableName.valueOf(tName);
146    byte[] familyName = Bytes.toBytes("fam");
147    byte[] row = Bytes.toBytes("row");
148
149    TableDescriptor table =
150      TableDescriptorBuilder.newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder
151        .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
152
153    Connection connection1 = ConnectionFactory.createConnection(UTIL1.getConfiguration());
154    Connection connection2 = ConnectionFactory.createConnection(UTIL2.getConfiguration());
155    try (Admin admin1 = connection1.getAdmin()) {
156      admin1.createTable(table);
157    }
158    try (Admin admin2 = connection2.getAdmin()) {
159      admin2.createTable(table);
160    }
161    UTIL1.waitUntilAllRegionsAssigned(tablename);
162    UTIL2.waitUntilAllRegionsAssigned(tablename);
163
164    // now suspend replication
165    try (Admin admin1 = connection1.getAdmin()) {
166      admin1.disableReplicationPeer(PEER_ID2);
167    }
168
169    // put some data (lead with 0 so the edit gets sorted before the other table's edits
170    // in the replication batch) write a bunch of edits, making sure we fill a batch
171    try (Table droppedTable = connection1.getTable(tablename)) {
172      byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
173      Put put = new Put(rowKey);
174      put.addColumn(familyName, row, row);
175      droppedTable.put(put);
176    }
177
178    try (Table table1 = connection1.getTable(tableName)) {
179      for (int i = 0; i < ROWS_COUNT; i++) {
180        Put put = new Put(generateRowKey(i)).addColumn(famName, row, row);
181        table1.put(put);
182      }
183    }
184
185    try (Admin admin1 = connection1.getAdmin()) {
186      admin1.disableTable(tablename);
187      admin1.deleteTable(tablename);
188    }
189    try (Admin admin2 = connection2.getAdmin()) {
190      admin2.disableTable(tablename);
191      admin2.deleteTable(tablename);
192    }
193
194    try (Admin admin1 = connection1.getAdmin()) {
195      admin1.enableReplicationPeer(PEER_ID2);
196    }
197
198    if (allowProceeding) {
199      // in this we'd expect the key to make it over
200      verifyReplicationProceeded();
201    } else {
202      verifyReplicationStuck();
203    }
204    // just to be safe
205    CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
206  }
207
208  @Test
209  public void testEditsBehindDroppedTableTiming() throws Exception {
210    CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
211    CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
212
213    // make sure we have a single region server only, so that all
214    // edits for all tables go there
215    restartSourceCluster(1);
216
217    TableName tablename = TableName.valueOf("testdroppedtimed");
218    byte[] familyName = Bytes.toBytes("fam");
219    byte[] row = Bytes.toBytes("row");
220
221    TableDescriptor table =
222      TableDescriptorBuilder.newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder
223        .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
224
225    Connection connection1 = ConnectionFactory.createConnection(CONF1);
226    Connection connection2 = ConnectionFactory.createConnection(CONF2);
227    try (Admin admin1 = connection1.getAdmin()) {
228      admin1.createTable(table);
229    }
230    try (Admin admin2 = connection2.getAdmin()) {
231      admin2.createTable(table);
232    }
233    UTIL1.waitUntilAllRegionsAssigned(tablename);
234    UTIL2.waitUntilAllRegionsAssigned(tablename);
235
236    // now suspend replication
237    try (Admin admin1 = connection1.getAdmin()) {
238      admin1.disableReplicationPeer(PEER_ID2);
239    }
240
241    // put some data (lead with 0 so the edit gets sorted before the other table's edits
242    // in the replication batch) write a bunch of edits, making sure we fill a batch
243    try (Table droppedTable = connection1.getTable(tablename)) {
244      byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
245      Put put = new Put(rowKey);
246      put.addColumn(familyName, row, row);
247      droppedTable.put(put);
248    }
249
250    try (Table table1 = connection1.getTable(tableName)) {
251      for (int i = 0; i < ROWS_COUNT; i++) {
252        Put put = new Put(generateRowKey(i)).addColumn(famName, row, row);
253        table1.put(put);
254      }
255    }
256
257    try (Admin admin2 = connection2.getAdmin()) {
258      admin2.disableTable(tablename);
259      admin2.deleteTable(tablename);
260    }
261
262    // edit should still be stuck
263    try (Admin admin1 = connection1.getAdmin()) {
264      // enable the replication peer.
265      admin1.enableReplicationPeer(PEER_ID2);
266      // the source table still exists, replication should be stalled
267      verifyReplicationStuck();
268
269      admin1.disableTable(tablename);
270      // still stuck, source table still exists
271      verifyReplicationStuck();
272
273      admin1.deleteTable(tablename);
274      // now the source table is gone, replication should proceed, the
275      // offending edits be dropped
276      verifyReplicationProceeded();
277    }
278    // just to be safe
279    CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
280  }
281
282  private boolean peerHasAllNormalRows() throws IOException {
283    try (ResultScanner scanner = htable2.getScanner(new Scan())) {
284      Result[] results = scanner.next(ROWS_COUNT);
285      if (results.length != ROWS_COUNT) {
286        return false;
287      }
288      for (int i = 0; i < results.length; i++) {
289        Assert.assertArrayEquals(generateRowKey(i), results[i].getRow());
290      }
291      return true;
292    }
293  }
294
295  private void verifyReplicationProceeded() throws Exception {
296    for (int i = 0; i < NB_RETRIES; i++) {
297      if (i == NB_RETRIES - 1) {
298        fail("Waited too much time for put replication");
299      }
300      if (!peerHasAllNormalRows()) {
301        LOG.info("Row not available");
302        Thread.sleep(SLEEP_TIME);
303      } else {
304        break;
305      }
306    }
307  }
308
309  private void verifyReplicationStuck() throws Exception {
310    for (int i = 0; i < NB_RETRIES; i++) {
311      if (peerHasAllNormalRows()) {
312        fail("Edit should have been stuck behind dropped tables");
313      } else {
314        LOG.info("Row not replicated, let's wait a bit more...");
315        Thread.sleep(SLEEP_TIME);
316      }
317    }
318  }
319}