001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_TABLE_KEY;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.NamespaceDescriptor;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Admin;
030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
031import org.apache.hadoop.hbase.client.Connection;
032import org.apache.hadoop.hbase.client.ConnectionFactory;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.ResultScanner;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.client.TableDescriptor;
039import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
040import org.apache.hadoop.hbase.ipc.RpcServer;
041import org.apache.hadoop.hbase.testclassification.LargeTests;
042import org.apache.hadoop.hbase.testclassification.ReplicationTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.JVMClusterUtil;
045import org.junit.Assert;
046import org.junit.Before;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053@Category({ ReplicationTests.class, LargeTests.class })
054public class TestReplicationDroppedTables extends TestReplicationBase {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058      HBaseClassTestRule.forClass(TestReplicationDroppedTables.class);
059
060  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class);
061  private static final int ROWS_COUNT = 1000;
062
063  @Before
064  public void setUpBase() throws Exception {
065    // Starting and stopping replication can make us miss new logs,
066    // rolling like this makes sure the most recent one gets added to the queue
067    for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster()
068        .getRegionServerThreads()) {
069      UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
070    }
071    // Initialize the peer after wal rolling, so that we will abandon the stuck WALs.
072    super.setUpBase();
073    int rowCount = UTIL1.countRows(tableName);
074    UTIL1.deleteTableData(tableName);
075    // truncating the table will send one Delete per row to the slave cluster
076    // in an async fashion, which is why we cannot just call deleteTableData on
077    // utility2 since late writes could make it to the slave in some way.
078    // Instead, we truncate the first table and wait for all the Deletes to
079    // make it to the slave.
080    Scan scan = new Scan();
081    int lastCount = 0;
082    for (int i = 0; i < NB_RETRIES; i++) {
083      if (i == NB_RETRIES - 1) {
084        fail("Waited too much time for truncate");
085      }
086      ResultScanner scanner = htable2.getScanner(scan);
087      Result[] res = scanner.next(rowCount);
088      scanner.close();
089      if (res.length != 0) {
090        if (res.length < lastCount) {
091          i--; // Don't increment timeout if we make progress
092        }
093        lastCount = res.length;
094        LOG.info("Still got " + res.length + " rows");
095        Thread.sleep(SLEEP_TIME);
096      } else {
097        break;
098      }
099    }
100    // Set the max request size to a tiny 10K for dividing the replication WAL entries into multiple
101    // batches. the default max request size is 256M, so all replication entries are in a batch, but
102    // when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table
103    // may apply first, and then test_dropped table, and we will believe that the replication is not
104    // got stuck (HBASE-20475).
105    CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024);
106  }
107
108  @Test
109  public void testEditsStuckBehindDroppedTable() throws Exception {
110    // Sanity check Make sure by default edits for dropped tables stall the replication queue, even
111    // when the table(s) in question have been deleted on both ends.
112    testEditsBehindDroppedTable(false, "test_dropped");
113  }
114
115  @Test
116  public void testEditsDroppedWithDroppedTable() throws Exception {
117    // Make sure by default edits for dropped tables are themselves dropped when the
118    // table(s) in question have been deleted on both ends.
119    testEditsBehindDroppedTable(true, "test_dropped");
120  }
121
122  @Test
123  public void testEditsDroppedWithDroppedTableNS() throws Exception {
124    // also try with a namespace
125    UTIL1.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build());
126    UTIL2.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build());
127    try {
128      testEditsBehindDroppedTable(true, "NS:test_dropped");
129    } finally {
130      UTIL1.getAdmin().deleteNamespace("NS");
131      UTIL2.getAdmin().deleteNamespace("NS");
132    }
133  }
134
135  private byte[] generateRowKey(int id) {
136    return Bytes.toBytes(String.format("NormalPut%03d", id));
137  }
138
139  private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception {
140    CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
141    CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
142
143    // make sure we have a single region server only, so that all
144    // edits for all tables go there
145    restartSourceCluster(1);
146
147    TableName tablename = TableName.valueOf(tName);
148    byte[] familyName = Bytes.toBytes("fam");
149    byte[] row = Bytes.toBytes("row");
150
151    TableDescriptor table =
152        TableDescriptorBuilder
153            .newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder
154                .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
155            .build();
156
157    Connection connection1 = ConnectionFactory.createConnection(UTIL1.getConfiguration());
158    Connection connection2 = ConnectionFactory.createConnection(UTIL2.getConfiguration());
159    try (Admin admin1 = connection1.getAdmin()) {
160      admin1.createTable(table);
161    }
162    try (Admin admin2 = connection2.getAdmin()) {
163      admin2.createTable(table);
164    }
165    UTIL1.waitUntilAllRegionsAssigned(tablename);
166    UTIL2.waitUntilAllRegionsAssigned(tablename);
167
168    // now suspend replication
169    try (Admin admin1 = connection1.getAdmin()) {
170      admin1.disableReplicationPeer(PEER_ID2);
171    }
172
173    // put some data (lead with 0 so the edit gets sorted before the other table's edits
174    // in the replication batch) write a bunch of edits, making sure we fill a batch
175    try (Table droppedTable = connection1.getTable(tablename)) {
176      byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
177      Put put = new Put(rowKey);
178      put.addColumn(familyName, row, row);
179      droppedTable.put(put);
180    }
181
182    try (Table table1 = connection1.getTable(tableName)) {
183      for (int i = 0; i < ROWS_COUNT; i++) {
184        Put put = new Put(generateRowKey(i)).addColumn(famName, row, row);
185        table1.put(put);
186      }
187    }
188
189    try (Admin admin1 = connection1.getAdmin()) {
190      admin1.disableTable(tablename);
191      admin1.deleteTable(tablename);
192    }
193    try (Admin admin2 = connection2.getAdmin()) {
194      admin2.disableTable(tablename);
195      admin2.deleteTable(tablename);
196    }
197
198    try (Admin admin1 = connection1.getAdmin()) {
199      admin1.enableReplicationPeer(PEER_ID2);
200    }
201
202    if (allowProceeding) {
203      // in this we'd expect the key to make it over
204      verifyReplicationProceeded();
205    } else {
206      verifyReplicationStuck();
207    }
208    // just to be safe
209    CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
210  }
211
212  @Test
213  public void testEditsBehindDroppedTableTiming() throws Exception {
214    CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
215    CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
216
217    // make sure we have a single region server only, so that all
218    // edits for all tables go there
219    restartSourceCluster(1);
220
221    TableName tablename = TableName.valueOf("testdroppedtimed");
222    byte[] familyName = Bytes.toBytes("fam");
223    byte[] row = Bytes.toBytes("row");
224
225    TableDescriptor table =
226        TableDescriptorBuilder
227            .newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder
228                .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
229            .build();
230
231    Connection connection1 = ConnectionFactory.createConnection(CONF1);
232    Connection connection2 = ConnectionFactory.createConnection(CONF2);
233    try (Admin admin1 = connection1.getAdmin()) {
234      admin1.createTable(table);
235    }
236    try (Admin admin2 = connection2.getAdmin()) {
237      admin2.createTable(table);
238    }
239    UTIL1.waitUntilAllRegionsAssigned(tablename);
240    UTIL2.waitUntilAllRegionsAssigned(tablename);
241
242    // now suspend replication
243    try (Admin admin1 = connection1.getAdmin()) {
244      admin1.disableReplicationPeer(PEER_ID2);
245    }
246
247    // put some data (lead with 0 so the edit gets sorted before the other table's edits
248    // in the replication batch) write a bunch of edits, making sure we fill a batch
249    try (Table droppedTable = connection1.getTable(tablename)) {
250      byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
251      Put put = new Put(rowKey);
252      put.addColumn(familyName, row, row);
253      droppedTable.put(put);
254    }
255
256    try (Table table1 = connection1.getTable(tableName)) {
257      for (int i = 0; i < ROWS_COUNT; i++) {
258        Put put = new Put(generateRowKey(i)).addColumn(famName, row, row);
259        table1.put(put);
260      }
261    }
262
263    try (Admin admin2 = connection2.getAdmin()) {
264      admin2.disableTable(tablename);
265      admin2.deleteTable(tablename);
266    }
267
268    // edit should still be stuck
269    try (Admin admin1 = connection1.getAdmin()) {
270      // enable the replication peer.
271      admin1.enableReplicationPeer(PEER_ID2);
272      // the source table still exists, replication should be stalled
273      verifyReplicationStuck();
274
275      admin1.disableTable(tablename);
276      // still stuck, source table still exists
277      verifyReplicationStuck();
278
279      admin1.deleteTable(tablename);
280      // now the source table is gone, replication should proceed, the
281      // offending edits be dropped
282      verifyReplicationProceeded();
283    }
284    // just to be safe
285    CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
286  }
287
288  private boolean peerHasAllNormalRows() throws IOException {
289    try (ResultScanner scanner = htable2.getScanner(new Scan())) {
290      Result[] results = scanner.next(ROWS_COUNT);
291      if (results.length != ROWS_COUNT) {
292        return false;
293      }
294      for (int i = 0; i < results.length; i++) {
295        Assert.assertArrayEquals(generateRowKey(i), results[i].getRow());
296      }
297      return true;
298    }
299  }
300
301  private void verifyReplicationProceeded() throws Exception {
302    for (int i = 0; i < NB_RETRIES; i++) {
303      if (i == NB_RETRIES - 1) {
304        fail("Waited too much time for put replication");
305      }
306      if (!peerHasAllNormalRows()) {
307        LOG.info("Row not available");
308        Thread.sleep(SLEEP_TIME);
309      } else {
310        break;
311      }
312    }
313  }
314
315  private void verifyReplicationStuck() throws Exception {
316    for (int i = 0; i < NB_RETRIES; i++) {
317      if (peerHasAllNormalRows()) {
318        fail("Edit should have been stuck behind dropped tables");
319      } else {
320        LOG.info("Row not replicated, let's wait a bit more...");
321        Thread.sleep(SLEEP_TIME);
322      }
323    }
324  }
325}