001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.fail;
022
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.HColumnDescriptor;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.HTableDescriptor;
027import org.apache.hadoop.hbase.NamespaceDescriptor;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Admin;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.ConnectionFactory;
032import org.apache.hadoop.hbase.client.Get;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.ResultScanner;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.JVMClusterUtil;
041import org.junit.Before;
042import org.junit.ClassRule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048@Category(LargeTests.class)
049public class TestReplicationDroppedTables extends TestReplicationBase {
050
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053      HBaseClassTestRule.forClass(TestReplicationDroppedTables.class);
054
055  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class);
056
057  /**
058   * @throws java.lang.Exception
059   */
060  @Before
061  public void setUp() throws Exception {
062    // Starting and stopping replication can make us miss new logs,
063    // rolling like this makes sure the most recent one gets added to the queue
064    for ( JVMClusterUtil.RegionServerThread r :
065        utility1.getHBaseCluster().getRegionServerThreads()) {
066      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
067    }
068    int rowCount = utility1.countRows(tableName);
069    utility1.deleteTableData(tableName);
070    // truncating the table will send one Delete per row to the slave cluster
071    // in an async fashion, which is why we cannot just call deleteTableData on
072    // utility2 since late writes could make it to the slave in some way.
073    // Instead, we truncate the first table and wait for all the Deletes to
074    // make it to the slave.
075    Scan scan = new Scan();
076    int lastCount = 0;
077    for (int i = 0; i < NB_RETRIES; i++) {
078      if (i==NB_RETRIES-1) {
079        fail("Waited too much time for truncate");
080      }
081      ResultScanner scanner = htable2.getScanner(scan);
082      Result[] res = scanner.next(rowCount);
083      scanner.close();
084      if (res.length != 0) {
085        if (res.length < lastCount) {
086          i--; // Don't increment timeout if we make progress
087        }
088        lastCount = res.length;
089        LOG.info("Still got " + res.length + " rows");
090        Thread.sleep(SLEEP_TIME);
091      } else {
092        break;
093      }
094    }
095  }
096
097  @Test
098  public void testEditsStuckBehindDroppedTable() throws Exception {
099    // Sanity check
100    // Make sure by default edits for dropped tables stall the replication queue, even when the
101    // table(s) in question have been deleted on both ends.
102    testEditsBehindDroppedTable(false, "test_dropped");
103  }
104
105  @Test
106  public void testEditsDroppedWithDroppedTable() throws Exception {
107    // Make sure by default edits for dropped tables are themselves dropped when the
108    // table(s) in question have been deleted on both ends.
109    testEditsBehindDroppedTable(true, "test_dropped");
110  }
111
112  @Test
113  public void testEditsDroppedWithDroppedTableNS() throws Exception {
114    // also try with a namespace
115    Connection connection1 = ConnectionFactory.createConnection(conf1);
116    try (Admin admin1 = connection1.getAdmin()) {
117      admin1.createNamespace(NamespaceDescriptor.create("NS").build());
118    }
119    Connection connection2 = ConnectionFactory.createConnection(conf2);
120    try (Admin admin2 = connection2.getAdmin()) {
121      admin2.createNamespace(NamespaceDescriptor.create("NS").build());
122    }
123    testEditsBehindDroppedTable(true, "NS:test_dropped");
124  }
125
126  private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception {
127    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
128    conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
129
130    // make sure we have a single region server only, so that all
131    // edits for all tables go there
132    utility1.shutdownMiniHBaseCluster();
133    utility1.startMiniHBaseCluster(1, 1);
134
135    TableName tablename = TableName.valueOf(tName);
136    byte[] familyname = Bytes.toBytes("fam");
137    byte[] row = Bytes.toBytes("row");
138
139    HTableDescriptor table = new HTableDescriptor(tablename);
140    HColumnDescriptor fam = new HColumnDescriptor(familyname);
141    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
142    table.addFamily(fam);
143
144    Connection connection1 = ConnectionFactory.createConnection(conf1);
145    Connection connection2 = ConnectionFactory.createConnection(conf2);
146    try (Admin admin1 = connection1.getAdmin()) {
147      admin1.createTable(table);
148    }
149    try (Admin admin2 = connection2.getAdmin()) {
150      admin2.createTable(table);
151    }
152    utility1.waitUntilAllRegionsAssigned(tablename);
153    utility2.waitUntilAllRegionsAssigned(tablename);
154
155    Table lHtable1 = utility1.getConnection().getTable(tablename);
156
157    // now suspend replication
158    admin.disablePeer("2");
159
160    // put some data (lead with 0 so the edit gets sorted before the other table's edits
161    //   in the replication batch)
162    // write a bunch of edits, making sure we fill a batch
163    byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped");
164    Put put = new Put(rowkey);
165    put.addColumn(familyname, row, row);
166    lHtable1.put(put);
167
168    rowkey = Bytes.toBytes("normal put");
169    put = new Put(rowkey);
170    put.addColumn(famName, row, row);
171    htable1.put(put);
172
173    try (Admin admin1 = connection1.getAdmin()) {
174      admin1.disableTable(tablename);
175      admin1.deleteTable(tablename);
176    }
177    try (Admin admin2 = connection2.getAdmin()) {
178      admin2.disableTable(tablename);
179      admin2.deleteTable(tablename);
180    }
181
182    admin.enablePeer("2");
183    if (allowProceeding) {
184      // in this we'd expect the key to make it over
185      verifyReplicationProceeded(rowkey);
186    } else {
187      verifyReplicationStuck(rowkey);
188    }
189    // just to be safe
190    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
191  }
192
193  @Test
194  public void testEditsBehindDroppedTableTiming() throws Exception {
195    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
196    conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
197
198    // make sure we have a single region server only, so that all
199    // edits for all tables go there
200    utility1.shutdownMiniHBaseCluster();
201    utility1.startMiniHBaseCluster(1, 1);
202
203    TableName tablename = TableName.valueOf("testdroppedtimed");
204    byte[] familyname = Bytes.toBytes("fam");
205    byte[] row = Bytes.toBytes("row");
206
207    HTableDescriptor table = new HTableDescriptor(tablename);
208    HColumnDescriptor fam = new HColumnDescriptor(familyname);
209    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
210    table.addFamily(fam);
211
212    Connection connection1 = ConnectionFactory.createConnection(conf1);
213    Connection connection2 = ConnectionFactory.createConnection(conf2);
214    try (Admin admin1 = connection1.getAdmin()) {
215      admin1.createTable(table);
216    }
217    try (Admin admin2 = connection2.getAdmin()) {
218      admin2.createTable(table);
219    }
220    utility1.waitUntilAllRegionsAssigned(tablename);
221    utility2.waitUntilAllRegionsAssigned(tablename);
222
223    Table lHtable1 = utility1.getConnection().getTable(tablename);
224
225    // now suspend replication
226    admin.disablePeer("2");
227
228    // put some data (lead with 0 so the edit gets sorted before the other table's edits
229    //   in the replication batch)
230    // write a bunch of edits, making sure we fill a batch
231    byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped");
232    Put put = new Put(rowkey);
233    put.addColumn(familyname, row, row);
234    lHtable1.put(put);
235
236    rowkey = Bytes.toBytes("normal put");
237    put = new Put(rowkey);
238    put.addColumn(famName, row, row);
239    htable1.put(put);
240
241    try (Admin admin2 = connection2.getAdmin()) {
242      admin2.disableTable(tablename);
243      admin2.deleteTable(tablename);
244    }
245
246    admin.enablePeer("2");
247    // edit should still be stuck
248
249    try (Admin admin1 = connection1.getAdmin()) {
250      // the source table still exists, replication should be stalled
251      verifyReplicationStuck(rowkey);
252
253      admin1.disableTable(tablename);
254      // still stuck, source table still exists
255      verifyReplicationStuck(rowkey);
256
257      admin1.deleteTable(tablename);
258      // now the source table is gone, replication should proceed, the
259      // offending edits be dropped
260      verifyReplicationProceeded(rowkey);
261    }
262    // just to be safe
263    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
264  }
265
266  private void verifyReplicationProceeded(byte[] rowkey) throws Exception {
267    Get get = new Get(rowkey);
268    for (int i = 0; i < NB_RETRIES; i++) {
269      if (i==NB_RETRIES-1) {
270        fail("Waited too much time for put replication");
271      }
272      Result res = htable2.get(get);
273      if (res.size() == 0) {
274        LOG.info("Row not available");
275        Thread.sleep(SLEEP_TIME);
276      } else {
277        assertArrayEquals(res.getRow(), rowkey);
278        break;
279      }
280    }
281  }
282
283  private void verifyReplicationStuck(byte[] rowkey) throws Exception {
284    Get get = new Get(rowkey);
285    for (int i = 0; i < NB_RETRIES; i++) {
286      Result res = htable2.get(get);
287      if (res.size() >= 1) {
288        fail("Edit should have been stuck behind dropped tables");
289      } else {
290        LOG.info("Row not replicated, let's wait a bit more...");
291        Thread.sleep(SLEEP_TIME);
292      }
293    }
294  }
295}