001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_TABLE_KEY; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import org.apache.hadoop.hbase.HBaseClassTestRule; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.NamespaceDescriptor; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.Admin; 029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.client.ConnectionFactory; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.ResultScanner; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.client.TableDescriptor; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.ipc.RpcServer; 040import org.apache.hadoop.hbase.testclassification.LargeTests; 041import org.apache.hadoop.hbase.testclassification.ReplicationTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.JVMClusterUtil; 044import org.junit.Assert; 045import org.junit.Before; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052@Category({ ReplicationTests.class, LargeTests.class }) 053public class TestReplicationDroppedTables extends TestReplicationBase { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestReplicationDroppedTables.class); 058 059 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class); 060 private static final int ROWS_COUNT = 1000; 061 062 @Before 063 public void setUpBase() throws Exception { 064 // Starting and stopping replication can make us miss new logs, 065 // rolling like this makes sure the most recent one gets added to the queue 066 for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) { 067 UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 068 } 069 // Initialize the peer after wal rolling, so that we will abandon the stuck WALs. 070 super.setUpBase(); 071 int rowCount = UTIL1.countRows(tableName); 072 UTIL1.deleteTableData(tableName); 073 // truncating the table will send one Delete per row to the slave cluster 074 // in an async fashion, which is why we cannot just call deleteTableData on 075 // utility2 since late writes could make it to the slave in some way. 076 // Instead, we truncate the first table and wait for all the Deletes to 077 // make it to the slave. 078 Scan scan = new Scan(); 079 int lastCount = 0; 080 for (int i = 0; i < NB_RETRIES; i++) { 081 if (i == NB_RETRIES - 1) { 082 fail("Waited too much time for truncate"); 083 } 084 ResultScanner scanner = htable2.getScanner(scan); 085 Result[] res = scanner.next(rowCount); 086 scanner.close(); 087 if (res.length != 0) { 088 if (res.length < lastCount) { 089 i--; // Don't increment timeout if we make progress 090 } 091 lastCount = res.length; 092 LOG.info("Still got " + res.length + " rows"); 093 Thread.sleep(SLEEP_TIME); 094 } else { 095 break; 096 } 097 } 098 // Set the max request size to a tiny 10K for dividing the replication WAL entries into multiple 099 // batches. the default max request size is 256M, so all replication entries are in a batch, but 100 // when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table 101 // may apply first, and then test_dropped table, and we will believe that the replication is not 102 // got stuck (HBASE-20475). 103 CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024); 104 } 105 106 @Test 107 public void testEditsStuckBehindDroppedTable() throws Exception { 108 // Sanity check Make sure by default edits for dropped tables stall the replication queue, even 109 // when the table(s) in question have been deleted on both ends. 110 testEditsBehindDroppedTable(false, "test_dropped"); 111 } 112 113 @Test 114 public void testEditsDroppedWithDroppedTable() throws Exception { 115 // Make sure by default edits for dropped tables are themselves dropped when the 116 // table(s) in question have been deleted on both ends. 117 testEditsBehindDroppedTable(true, "test_dropped"); 118 } 119 120 @Test 121 public void testEditsDroppedWithDroppedTableNS() throws Exception { 122 // also try with a namespace 123 UTIL1.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build()); 124 UTIL2.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build()); 125 try { 126 testEditsBehindDroppedTable(true, "NS:test_dropped"); 127 } finally { 128 UTIL1.getAdmin().deleteNamespace("NS"); 129 UTIL2.getAdmin().deleteNamespace("NS"); 130 } 131 } 132 133 private byte[] generateRowKey(int id) { 134 return Bytes.toBytes(String.format("NormalPut%03d", id)); 135 } 136 137 private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception { 138 CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding); 139 CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); 140 141 // make sure we have a single region server only, so that all 142 // edits for all tables go there 143 restartSourceCluster(1); 144 145 TableName tablename = TableName.valueOf(tName); 146 byte[] familyName = Bytes.toBytes("fam"); 147 byte[] row = Bytes.toBytes("row"); 148 149 TableDescriptor table = 150 TableDescriptorBuilder.newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder 151 .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); 152 153 Connection connection1 = ConnectionFactory.createConnection(UTIL1.getConfiguration()); 154 Connection connection2 = ConnectionFactory.createConnection(UTIL2.getConfiguration()); 155 try (Admin admin1 = connection1.getAdmin()) { 156 admin1.createTable(table); 157 } 158 try (Admin admin2 = connection2.getAdmin()) { 159 admin2.createTable(table); 160 } 161 UTIL1.waitUntilAllRegionsAssigned(tablename); 162 UTIL2.waitUntilAllRegionsAssigned(tablename); 163 164 // now suspend replication 165 try (Admin admin1 = connection1.getAdmin()) { 166 admin1.disableReplicationPeer(PEER_ID2); 167 } 168 169 // put some data (lead with 0 so the edit gets sorted before the other table's edits 170 // in the replication batch) write a bunch of edits, making sure we fill a batch 171 try (Table droppedTable = connection1.getTable(tablename)) { 172 byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped"); 173 Put put = new Put(rowKey); 174 put.addColumn(familyName, row, row); 175 droppedTable.put(put); 176 } 177 178 try (Table table1 = connection1.getTable(tableName)) { 179 for (int i = 0; i < ROWS_COUNT; i++) { 180 Put put = new Put(generateRowKey(i)).addColumn(famName, row, row); 181 table1.put(put); 182 } 183 } 184 185 try (Admin admin1 = connection1.getAdmin()) { 186 admin1.disableTable(tablename); 187 admin1.deleteTable(tablename); 188 } 189 try (Admin admin2 = connection2.getAdmin()) { 190 admin2.disableTable(tablename); 191 admin2.deleteTable(tablename); 192 } 193 194 try (Admin admin1 = connection1.getAdmin()) { 195 admin1.enableReplicationPeer(PEER_ID2); 196 } 197 198 if (allowProceeding) { 199 // in this we'd expect the key to make it over 200 verifyReplicationProceeded(); 201 } else { 202 verifyReplicationStuck(); 203 } 204 // just to be safe 205 CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); 206 } 207 208 @Test 209 public void testEditsBehindDroppedTableTiming() throws Exception { 210 CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, true); 211 CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); 212 213 // make sure we have a single region server only, so that all 214 // edits for all tables go there 215 restartSourceCluster(1); 216 217 TableName tablename = TableName.valueOf("testdroppedtimed"); 218 byte[] familyName = Bytes.toBytes("fam"); 219 byte[] row = Bytes.toBytes("row"); 220 221 TableDescriptor table = 222 TableDescriptorBuilder.newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder 223 .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); 224 225 Connection connection1 = ConnectionFactory.createConnection(CONF1); 226 Connection connection2 = ConnectionFactory.createConnection(CONF2); 227 try (Admin admin1 = connection1.getAdmin()) { 228 admin1.createTable(table); 229 } 230 try (Admin admin2 = connection2.getAdmin()) { 231 admin2.createTable(table); 232 } 233 UTIL1.waitUntilAllRegionsAssigned(tablename); 234 UTIL2.waitUntilAllRegionsAssigned(tablename); 235 236 // now suspend replication 237 try (Admin admin1 = connection1.getAdmin()) { 238 admin1.disableReplicationPeer(PEER_ID2); 239 } 240 241 // put some data (lead with 0 so the edit gets sorted before the other table's edits 242 // in the replication batch) write a bunch of edits, making sure we fill a batch 243 try (Table droppedTable = connection1.getTable(tablename)) { 244 byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped"); 245 Put put = new Put(rowKey); 246 put.addColumn(familyName, row, row); 247 droppedTable.put(put); 248 } 249 250 try (Table table1 = connection1.getTable(tableName)) { 251 for (int i = 0; i < ROWS_COUNT; i++) { 252 Put put = new Put(generateRowKey(i)).addColumn(famName, row, row); 253 table1.put(put); 254 } 255 } 256 257 try (Admin admin2 = connection2.getAdmin()) { 258 admin2.disableTable(tablename); 259 admin2.deleteTable(tablename); 260 } 261 262 // edit should still be stuck 263 try (Admin admin1 = connection1.getAdmin()) { 264 // enable the replication peer. 265 admin1.enableReplicationPeer(PEER_ID2); 266 // the source table still exists, replication should be stalled 267 verifyReplicationStuck(); 268 269 admin1.disableTable(tablename); 270 // still stuck, source table still exists 271 verifyReplicationStuck(); 272 273 admin1.deleteTable(tablename); 274 // now the source table is gone, replication should proceed, the 275 // offending edits be dropped 276 verifyReplicationProceeded(); 277 } 278 // just to be safe 279 CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); 280 } 281 282 private boolean peerHasAllNormalRows() throws IOException { 283 try (ResultScanner scanner = htable2.getScanner(new Scan())) { 284 Result[] results = scanner.next(ROWS_COUNT); 285 if (results.length != ROWS_COUNT) { 286 return false; 287 } 288 for (int i = 0; i < results.length; i++) { 289 Assert.assertArrayEquals(generateRowKey(i), results[i].getRow()); 290 } 291 return true; 292 } 293 } 294 295 private void verifyReplicationProceeded() throws Exception { 296 for (int i = 0; i < NB_RETRIES; i++) { 297 if (i == NB_RETRIES - 1) { 298 fail("Waited too much time for put replication"); 299 } 300 if (!peerHasAllNormalRows()) { 301 LOG.info("Row not available"); 302 Thread.sleep(SLEEP_TIME); 303 } else { 304 break; 305 } 306 } 307 } 308 309 private void verifyReplicationStuck() throws Exception { 310 for (int i = 0; i < NB_RETRIES; i++) { 311 if (peerHasAllNormalRows()) { 312 fail("Edit should have been stuck behind dropped tables"); 313 } else { 314 LOG.info("Row not replicated, let's wait a bit more..."); 315 Thread.sleep(SLEEP_TIME); 316 } 317 } 318 } 319}