001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.fail; 021 022import java.io.IOException; 023 024import org.apache.hadoop.hbase.HBaseClassTestRule; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.NamespaceDescriptor; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.Admin; 029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.client.ConnectionFactory; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.ResultScanner; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.client.TableDescriptor; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.ipc.RpcServer; 040import org.apache.hadoop.hbase.testclassification.LargeTests; 041import org.apache.hadoop.hbase.testclassification.ReplicationTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.JVMClusterUtil; 044import org.junit.Assert; 045import org.junit.Before; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052@Category({ ReplicationTests.class, LargeTests.class }) 053public class TestReplicationDroppedTables extends TestReplicationBase { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestReplicationDroppedTables.class); 058 059 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class); 060 private static final int ROWS_COUNT = 1000; 061 062 @Before 063 public void setUpBase() throws Exception { 064 // Starting and stopping replication can make us miss new logs, 065 // rolling like this makes sure the most recent one gets added to the queue 066 for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster() 067 .getRegionServerThreads()) { 068 utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 069 } 070 // Initialize the peer after wal rolling, so that we will abandon the stuck WALs. 071 super.setUpBase(); 072 int rowCount = utility1.countRows(tableName); 073 utility1.deleteTableData(tableName); 074 // truncating the table will send one Delete per row to the slave cluster 075 // in an async fashion, which is why we cannot just call deleteTableData on 076 // utility2 since late writes could make it to the slave in some way. 077 // Instead, we truncate the first table and wait for all the Deletes to 078 // make it to the slave. 079 Scan scan = new Scan(); 080 int lastCount = 0; 081 for (int i = 0; i < NB_RETRIES; i++) { 082 if (i == NB_RETRIES - 1) { 083 fail("Waited too much time for truncate"); 084 } 085 ResultScanner scanner = htable2.getScanner(scan); 086 Result[] res = scanner.next(rowCount); 087 scanner.close(); 088 if (res.length != 0) { 089 if (res.length < lastCount) { 090 i--; // Don't increment timeout if we make progress 091 } 092 lastCount = res.length; 093 LOG.info("Still got " + res.length + " rows"); 094 Thread.sleep(SLEEP_TIME); 095 } else { 096 break; 097 } 098 } 099 // Set the max request size to a tiny 10K for dividing the replication WAL entries into multiple 100 // batches. the default max request size is 256M, so all replication entries are in a batch, but 101 // when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table 102 // may apply first, and then test_dropped table, and we will believe that the replication is not 103 // got stuck (HBASE-20475). 104 conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024); 105 } 106 107 @Test 108 public void testEditsStuckBehindDroppedTable() throws Exception { 109 // Sanity check Make sure by default edits for dropped tables stall the replication queue, even 110 // when the table(s) in question have been deleted on both ends. 111 testEditsBehindDroppedTable(false, "test_dropped"); 112 } 113 114 @Test 115 public void testEditsDroppedWithDroppedTable() throws Exception { 116 // Make sure by default edits for dropped tables are themselves dropped when the 117 // table(s) in question have been deleted on both ends. 118 testEditsBehindDroppedTable(true, "test_dropped"); 119 } 120 121 @Test 122 public void testEditsDroppedWithDroppedTableNS() throws Exception { 123 // also try with a namespace 124 Connection connection1 = ConnectionFactory.createConnection(conf1); 125 try (Admin admin1 = connection1.getAdmin()) { 126 admin1.createNamespace(NamespaceDescriptor.create("NS").build()); 127 } 128 Connection connection2 = ConnectionFactory.createConnection(conf2); 129 try (Admin admin2 = connection2.getAdmin()) { 130 admin2.createNamespace(NamespaceDescriptor.create("NS").build()); 131 } 132 testEditsBehindDroppedTable(true, "NS:test_dropped"); 133 try (Admin admin1 = connection1.getAdmin()) { 134 admin1.deleteNamespace("NS"); 135 } 136 try (Admin admin2 = connection2.getAdmin()) { 137 admin2.deleteNamespace("NS"); 138 } 139 } 140 141 private byte[] generateRowKey(int id) { 142 return Bytes.toBytes(String.format("NormalPut%03d", id)); 143 } 144 145 private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception { 146 conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding); 147 conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); 148 149 // make sure we have a single region server only, so that all 150 // edits for all tables go there 151 utility1.shutdownMiniHBaseCluster(); 152 utility1.startMiniHBaseCluster(); 153 154 TableName tablename = TableName.valueOf(tName); 155 byte[] familyName = Bytes.toBytes("fam"); 156 byte[] row = Bytes.toBytes("row"); 157 158 TableDescriptor table = 159 TableDescriptorBuilder 160 .newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder 161 .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 162 .build(); 163 164 Connection connection1 = ConnectionFactory.createConnection(conf1); 165 Connection connection2 = ConnectionFactory.createConnection(conf2); 166 try (Admin admin1 = connection1.getAdmin()) { 167 admin1.createTable(table); 168 } 169 try (Admin admin2 = connection2.getAdmin()) { 170 admin2.createTable(table); 171 } 172 utility1.waitUntilAllRegionsAssigned(tablename); 173 utility2.waitUntilAllRegionsAssigned(tablename); 174 175 // now suspend replication 176 try (Admin admin1 = connection1.getAdmin()) { 177 admin1.disableReplicationPeer(PEER_ID2); 178 } 179 180 // put some data (lead with 0 so the edit gets sorted before the other table's edits 181 // in the replication batch) write a bunch of edits, making sure we fill a batch 182 try (Table droppedTable = connection1.getTable(tablename)) { 183 byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped"); 184 Put put = new Put(rowKey); 185 put.addColumn(familyName, row, row); 186 droppedTable.put(put); 187 } 188 189 try (Table table1 = connection1.getTable(tableName)) { 190 for (int i = 0; i < ROWS_COUNT; i++) { 191 Put put = new Put(generateRowKey(i)).addColumn(famName, row, row); 192 table1.put(put); 193 } 194 } 195 196 try (Admin admin1 = connection1.getAdmin()) { 197 admin1.disableTable(tablename); 198 admin1.deleteTable(tablename); 199 } 200 try (Admin admin2 = connection2.getAdmin()) { 201 admin2.disableTable(tablename); 202 admin2.deleteTable(tablename); 203 } 204 205 try (Admin admin1 = connection1.getAdmin()) { 206 admin1.enableReplicationPeer(PEER_ID2); 207 } 208 209 if (allowProceeding) { 210 // in this we'd expect the key to make it over 211 verifyReplicationProceeded(); 212 } else { 213 verifyReplicationStuck(); 214 } 215 // just to be safe 216 conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); 217 } 218 219 @Test 220 public void testEditsBehindDroppedTableTiming() throws Exception { 221 conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true); 222 conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); 223 224 // make sure we have a single region server only, so that all 225 // edits for all tables go there 226 utility1.shutdownMiniHBaseCluster(); 227 utility1.startMiniHBaseCluster(); 228 229 TableName tablename = TableName.valueOf("testdroppedtimed"); 230 byte[] familyName = Bytes.toBytes("fam"); 231 byte[] row = Bytes.toBytes("row"); 232 233 TableDescriptor table = 234 TableDescriptorBuilder 235 .newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder 236 .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 237 .build(); 238 239 Connection connection1 = ConnectionFactory.createConnection(conf1); 240 Connection connection2 = ConnectionFactory.createConnection(conf2); 241 try (Admin admin1 = connection1.getAdmin()) { 242 admin1.createTable(table); 243 } 244 try (Admin admin2 = connection2.getAdmin()) { 245 admin2.createTable(table); 246 } 247 utility1.waitUntilAllRegionsAssigned(tablename); 248 utility2.waitUntilAllRegionsAssigned(tablename); 249 250 // now suspend replication 251 try (Admin admin1 = connection1.getAdmin()) { 252 admin1.disableReplicationPeer(PEER_ID2); 253 } 254 255 // put some data (lead with 0 so the edit gets sorted before the other table's edits 256 // in the replication batch) write a bunch of edits, making sure we fill a batch 257 try (Table droppedTable = connection1.getTable(tablename)) { 258 byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped"); 259 Put put = new Put(rowKey); 260 put.addColumn(familyName, row, row); 261 droppedTable.put(put); 262 } 263 264 try (Table table1 = connection1.getTable(tableName)) { 265 for (int i = 0; i < ROWS_COUNT; i++) { 266 Put put = new Put(generateRowKey(i)).addColumn(famName, row, row); 267 table1.put(put); 268 } 269 } 270 271 try (Admin admin2 = connection2.getAdmin()) { 272 admin2.disableTable(tablename); 273 admin2.deleteTable(tablename); 274 } 275 276 // edit should still be stuck 277 try (Admin admin1 = connection1.getAdmin()) { 278 // enable the replication peer. 279 admin1.enableReplicationPeer(PEER_ID2); 280 // the source table still exists, replication should be stalled 281 verifyReplicationStuck(); 282 283 admin1.disableTable(tablename); 284 // still stuck, source table still exists 285 verifyReplicationStuck(); 286 287 admin1.deleteTable(tablename); 288 // now the source table is gone, replication should proceed, the 289 // offending edits be dropped 290 verifyReplicationProceeded(); 291 } 292 // just to be safe 293 conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); 294 } 295 296 private boolean peerHasAllNormalRows() throws IOException { 297 try (ResultScanner scanner = htable2.getScanner(new Scan())) { 298 Result[] results = scanner.next(ROWS_COUNT); 299 if (results.length != ROWS_COUNT) { 300 return false; 301 } 302 for (int i = 0; i < results.length; i++) { 303 Assert.assertArrayEquals(generateRowKey(i), results[i].getRow()); 304 } 305 return true; 306 } 307 } 308 309 private void verifyReplicationProceeded() throws Exception { 310 for (int i = 0; i < NB_RETRIES; i++) { 311 if (i == NB_RETRIES - 1) { 312 fail("Waited too much time for put replication"); 313 } 314 if (!peerHasAllNormalRows()) { 315 LOG.info("Row not available"); 316 Thread.sleep(SLEEP_TIME); 317 } else { 318 break; 319 } 320 } 321 } 322 323 private void verifyReplicationStuck() throws Exception { 324 for (int i = 0; i < NB_RETRIES; i++) { 325 if (peerHasAllNormalRows()) { 326 fail("Edit should have been stuck behind dropped tables"); 327 } else { 328 LOG.info("Row not replicated, let's wait a bit more..."); 329 Thread.sleep(SLEEP_TIME); 330 } 331 } 332 } 333}