001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_TABLE_KEY; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.NamespaceDescriptor; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Admin; 030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.client.ConnectionFactory; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.ResultScanner; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.client.TableDescriptor; 039import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 040import org.apache.hadoop.hbase.ipc.RpcServer; 041import org.apache.hadoop.hbase.testclassification.LargeTests; 042import org.apache.hadoop.hbase.testclassification.ReplicationTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.JVMClusterUtil; 045import org.junit.Assert; 046import org.junit.Before; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053@Category({ ReplicationTests.class, LargeTests.class }) 054public class TestReplicationDroppedTables extends TestReplicationBase { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestReplicationDroppedTables.class); 059 060 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class); 061 private static final int ROWS_COUNT = 1000; 062 063 @Before 064 public void setUpBase() throws Exception { 065 // Starting and stopping replication can make us miss new logs, 066 // rolling like this makes sure the most recent one gets added to the queue 067 for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster() 068 .getRegionServerThreads()) { 069 UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 070 } 071 // Initialize the peer after wal rolling, so that we will abandon the stuck WALs. 072 super.setUpBase(); 073 int rowCount = UTIL1.countRows(tableName); 074 UTIL1.deleteTableData(tableName); 075 // truncating the table will send one Delete per row to the slave cluster 076 // in an async fashion, which is why we cannot just call deleteTableData on 077 // utility2 since late writes could make it to the slave in some way. 078 // Instead, we truncate the first table and wait for all the Deletes to 079 // make it to the slave. 080 Scan scan = new Scan(); 081 int lastCount = 0; 082 for (int i = 0; i < NB_RETRIES; i++) { 083 if (i == NB_RETRIES - 1) { 084 fail("Waited too much time for truncate"); 085 } 086 ResultScanner scanner = htable2.getScanner(scan); 087 Result[] res = scanner.next(rowCount); 088 scanner.close(); 089 if (res.length != 0) { 090 if (res.length < lastCount) { 091 i--; // Don't increment timeout if we make progress 092 } 093 lastCount = res.length; 094 LOG.info("Still got " + res.length + " rows"); 095 Thread.sleep(SLEEP_TIME); 096 } else { 097 break; 098 } 099 } 100 // Set the max request size to a tiny 10K for dividing the replication WAL entries into multiple 101 // batches. the default max request size is 256M, so all replication entries are in a batch, but 102 // when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table 103 // may apply first, and then test_dropped table, and we will believe that the replication is not 104 // got stuck (HBASE-20475). 105 CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024); 106 } 107 108 @Test 109 public void testEditsStuckBehindDroppedTable() throws Exception { 110 // Sanity check Make sure by default edits for dropped tables stall the replication queue, even 111 // when the table(s) in question have been deleted on both ends. 112 testEditsBehindDroppedTable(false, "test_dropped"); 113 } 114 115 @Test 116 public void testEditsDroppedWithDroppedTable() throws Exception { 117 // Make sure by default edits for dropped tables are themselves dropped when the 118 // table(s) in question have been deleted on both ends. 119 testEditsBehindDroppedTable(true, "test_dropped"); 120 } 121 122 @Test 123 public void testEditsDroppedWithDroppedTableNS() throws Exception { 124 // also try with a namespace 125 UTIL1.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build()); 126 UTIL2.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build()); 127 try { 128 testEditsBehindDroppedTable(true, "NS:test_dropped"); 129 } finally { 130 UTIL1.getAdmin().deleteNamespace("NS"); 131 UTIL2.getAdmin().deleteNamespace("NS"); 132 } 133 } 134 135 private byte[] generateRowKey(int id) { 136 return Bytes.toBytes(String.format("NormalPut%03d", id)); 137 } 138 139 private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception { 140 CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding); 141 CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); 142 143 // make sure we have a single region server only, so that all 144 // edits for all tables go there 145 restartSourceCluster(1); 146 147 TableName tablename = TableName.valueOf(tName); 148 byte[] familyName = Bytes.toBytes("fam"); 149 byte[] row = Bytes.toBytes("row"); 150 151 TableDescriptor table = 152 TableDescriptorBuilder 153 .newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder 154 .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 155 .build(); 156 157 Connection connection1 = ConnectionFactory.createConnection(UTIL1.getConfiguration()); 158 Connection connection2 = ConnectionFactory.createConnection(UTIL2.getConfiguration()); 159 try (Admin admin1 = connection1.getAdmin()) { 160 admin1.createTable(table); 161 } 162 try (Admin admin2 = connection2.getAdmin()) { 163 admin2.createTable(table); 164 } 165 UTIL1.waitUntilAllRegionsAssigned(tablename); 166 UTIL2.waitUntilAllRegionsAssigned(tablename); 167 168 // now suspend replication 169 try (Admin admin1 = connection1.getAdmin()) { 170 admin1.disableReplicationPeer(PEER_ID2); 171 } 172 173 // put some data (lead with 0 so the edit gets sorted before the other table's edits 174 // in the replication batch) write a bunch of edits, making sure we fill a batch 175 try (Table droppedTable = connection1.getTable(tablename)) { 176 byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped"); 177 Put put = new Put(rowKey); 178 put.addColumn(familyName, row, row); 179 droppedTable.put(put); 180 } 181 182 try (Table table1 = connection1.getTable(tableName)) { 183 for (int i = 0; i < ROWS_COUNT; i++) { 184 Put put = new Put(generateRowKey(i)).addColumn(famName, row, row); 185 table1.put(put); 186 } 187 } 188 189 try (Admin admin1 = connection1.getAdmin()) { 190 admin1.disableTable(tablename); 191 admin1.deleteTable(tablename); 192 } 193 try (Admin admin2 = connection2.getAdmin()) { 194 admin2.disableTable(tablename); 195 admin2.deleteTable(tablename); 196 } 197 198 try (Admin admin1 = connection1.getAdmin()) { 199 admin1.enableReplicationPeer(PEER_ID2); 200 } 201 202 if (allowProceeding) { 203 // in this we'd expect the key to make it over 204 verifyReplicationProceeded(); 205 } else { 206 verifyReplicationStuck(); 207 } 208 // just to be safe 209 CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); 210 } 211 212 @Test 213 public void testEditsBehindDroppedTableTiming() throws Exception { 214 CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, true); 215 CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); 216 217 // make sure we have a single region server only, so that all 218 // edits for all tables go there 219 restartSourceCluster(1); 220 221 TableName tablename = TableName.valueOf("testdroppedtimed"); 222 byte[] familyName = Bytes.toBytes("fam"); 223 byte[] row = Bytes.toBytes("row"); 224 225 TableDescriptor table = 226 TableDescriptorBuilder 227 .newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder 228 .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 229 .build(); 230 231 Connection connection1 = ConnectionFactory.createConnection(CONF1); 232 Connection connection2 = ConnectionFactory.createConnection(CONF2); 233 try (Admin admin1 = connection1.getAdmin()) { 234 admin1.createTable(table); 235 } 236 try (Admin admin2 = connection2.getAdmin()) { 237 admin2.createTable(table); 238 } 239 UTIL1.waitUntilAllRegionsAssigned(tablename); 240 UTIL2.waitUntilAllRegionsAssigned(tablename); 241 242 // now suspend replication 243 try (Admin admin1 = connection1.getAdmin()) { 244 admin1.disableReplicationPeer(PEER_ID2); 245 } 246 247 // put some data (lead with 0 so the edit gets sorted before the other table's edits 248 // in the replication batch) write a bunch of edits, making sure we fill a batch 249 try (Table droppedTable = connection1.getTable(tablename)) { 250 byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped"); 251 Put put = new Put(rowKey); 252 put.addColumn(familyName, row, row); 253 droppedTable.put(put); 254 } 255 256 try (Table table1 = connection1.getTable(tableName)) { 257 for (int i = 0; i < ROWS_COUNT; i++) { 258 Put put = new Put(generateRowKey(i)).addColumn(famName, row, row); 259 table1.put(put); 260 } 261 } 262 263 try (Admin admin2 = connection2.getAdmin()) { 264 admin2.disableTable(tablename); 265 admin2.deleteTable(tablename); 266 } 267 268 // edit should still be stuck 269 try (Admin admin1 = connection1.getAdmin()) { 270 // enable the replication peer. 271 admin1.enableReplicationPeer(PEER_ID2); 272 // the source table still exists, replication should be stalled 273 verifyReplicationStuck(); 274 275 admin1.disableTable(tablename); 276 // still stuck, source table still exists 277 verifyReplicationStuck(); 278 279 admin1.deleteTable(tablename); 280 // now the source table is gone, replication should proceed, the 281 // offending edits be dropped 282 verifyReplicationProceeded(); 283 } 284 // just to be safe 285 CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); 286 } 287 288 private boolean peerHasAllNormalRows() throws IOException { 289 try (ResultScanner scanner = htable2.getScanner(new Scan())) { 290 Result[] results = scanner.next(ROWS_COUNT); 291 if (results.length != ROWS_COUNT) { 292 return false; 293 } 294 for (int i = 0; i < results.length; i++) { 295 Assert.assertArrayEquals(generateRowKey(i), results[i].getRow()); 296 } 297 return true; 298 } 299 } 300 301 private void verifyReplicationProceeded() throws Exception { 302 for (int i = 0; i < NB_RETRIES; i++) { 303 if (i == NB_RETRIES - 1) { 304 fail("Waited too much time for put replication"); 305 } 306 if (!peerHasAllNormalRows()) { 307 LOG.info("Row not available"); 308 Thread.sleep(SLEEP_TIME); 309 } else { 310 break; 311 } 312 } 313 } 314 315 private void verifyReplicationStuck() throws Exception { 316 for (int i = 0; i < NB_RETRIES; i++) { 317 if (peerHasAllNormalRows()) { 318 fail("Edit should have been stuck behind dropped tables"); 319 } else { 320 LOG.info("Row not replicated, let's wait a bit more..."); 321 Thread.sleep(SLEEP_TIME); 322 } 323 } 324 } 325}