001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.fail; 022 023import org.apache.hadoop.hbase.HBaseClassTestRule; 024import org.apache.hadoop.hbase.HColumnDescriptor; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.HTableDescriptor; 027import org.apache.hadoop.hbase.NamespaceDescriptor; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Admin; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.client.ConnectionFactory; 032import org.apache.hadoop.hbase.client.Get; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.ResultScanner; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.JVMClusterUtil; 041import org.junit.Before; 042import org.junit.ClassRule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048@Category(LargeTests.class) 049public class TestReplicationDroppedTables extends TestReplicationBase { 050 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestReplicationDroppedTables.class); 054 055 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class); 056 057 /** 058 * @throws java.lang.Exception 059 */ 060 @Before 061 public void setUp() throws Exception { 062 // Starting and stopping replication can make us miss new logs, 063 // rolling like this makes sure the most recent one gets added to the queue 064 for ( JVMClusterUtil.RegionServerThread r : 065 utility1.getHBaseCluster().getRegionServerThreads()) { 066 utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 067 } 068 int rowCount = utility1.countRows(tableName); 069 utility1.deleteTableData(tableName); 070 // truncating the table will send one Delete per row to the slave cluster 071 // in an async fashion, which is why we cannot just call deleteTableData on 072 // utility2 since late writes could make it to the slave in some way. 073 // Instead, we truncate the first table and wait for all the Deletes to 074 // make it to the slave. 075 Scan scan = new Scan(); 076 int lastCount = 0; 077 for (int i = 0; i < NB_RETRIES; i++) { 078 if (i==NB_RETRIES-1) { 079 fail("Waited too much time for truncate"); 080 } 081 ResultScanner scanner = htable2.getScanner(scan); 082 Result[] res = scanner.next(rowCount); 083 scanner.close(); 084 if (res.length != 0) { 085 if (res.length < lastCount) { 086 i--; // Don't increment timeout if we make progress 087 } 088 lastCount = res.length; 089 LOG.info("Still got " + res.length + " rows"); 090 Thread.sleep(SLEEP_TIME); 091 } else { 092 break; 093 } 094 } 095 } 096 097 @Test 098 public void testEditsStuckBehindDroppedTable() throws Exception { 099 // Sanity check 100 // Make sure by default edits for dropped tables stall the replication queue, even when the 101 // table(s) in question have been deleted on both ends. 102 testEditsBehindDroppedTable(false, "test_dropped"); 103 } 104 105 @Test 106 public void testEditsDroppedWithDroppedTable() throws Exception { 107 // Make sure by default edits for dropped tables are themselves dropped when the 108 // table(s) in question have been deleted on both ends. 109 testEditsBehindDroppedTable(true, "test_dropped"); 110 } 111 112 @Test 113 public void testEditsDroppedWithDroppedTableNS() throws Exception { 114 // also try with a namespace 115 Connection connection1 = ConnectionFactory.createConnection(conf1); 116 try (Admin admin1 = connection1.getAdmin()) { 117 admin1.createNamespace(NamespaceDescriptor.create("NS").build()); 118 } 119 Connection connection2 = ConnectionFactory.createConnection(conf2); 120 try (Admin admin2 = connection2.getAdmin()) { 121 admin2.createNamespace(NamespaceDescriptor.create("NS").build()); 122 } 123 testEditsBehindDroppedTable(true, "NS:test_dropped"); 124 } 125 126 private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception { 127 conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding); 128 conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); 129 130 // make sure we have a single region server only, so that all 131 // edits for all tables go there 132 utility1.shutdownMiniHBaseCluster(); 133 utility1.startMiniHBaseCluster(1, 1); 134 135 TableName tablename = TableName.valueOf(tName); 136 byte[] familyname = Bytes.toBytes("fam"); 137 byte[] row = Bytes.toBytes("row"); 138 139 HTableDescriptor table = new HTableDescriptor(tablename); 140 HColumnDescriptor fam = new HColumnDescriptor(familyname); 141 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 142 table.addFamily(fam); 143 144 Connection connection1 = ConnectionFactory.createConnection(conf1); 145 Connection connection2 = ConnectionFactory.createConnection(conf2); 146 try (Admin admin1 = connection1.getAdmin()) { 147 admin1.createTable(table); 148 } 149 try (Admin admin2 = connection2.getAdmin()) { 150 admin2.createTable(table); 151 } 152 utility1.waitUntilAllRegionsAssigned(tablename); 153 utility2.waitUntilAllRegionsAssigned(tablename); 154 155 Table lHtable1 = utility1.getConnection().getTable(tablename); 156 157 // now suspend replication 158 admin.disablePeer("2"); 159 160 // put some data (lead with 0 so the edit gets sorted before the other table's edits 161 // in the replication batch) 162 // write a bunch of edits, making sure we fill a batch 163 byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped"); 164 Put put = new Put(rowkey); 165 put.addColumn(familyname, row, row); 166 lHtable1.put(put); 167 168 rowkey = Bytes.toBytes("normal put"); 169 put = new Put(rowkey); 170 put.addColumn(famName, row, row); 171 htable1.put(put); 172 173 try (Admin admin1 = connection1.getAdmin()) { 174 admin1.disableTable(tablename); 175 admin1.deleteTable(tablename); 176 } 177 try (Admin admin2 = connection2.getAdmin()) { 178 admin2.disableTable(tablename); 179 admin2.deleteTable(tablename); 180 } 181 182 admin.enablePeer("2"); 183 if (allowProceeding) { 184 // in this we'd expect the key to make it over 185 verifyReplicationProceeded(rowkey); 186 } else { 187 verifyReplicationStuck(rowkey); 188 } 189 // just to be safe 190 conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); 191 } 192 193 @Test 194 public void testEditsBehindDroppedTableTiming() throws Exception { 195 conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true); 196 conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); 197 198 // make sure we have a single region server only, so that all 199 // edits for all tables go there 200 utility1.shutdownMiniHBaseCluster(); 201 utility1.startMiniHBaseCluster(1, 1); 202 203 TableName tablename = TableName.valueOf("testdroppedtimed"); 204 byte[] familyname = Bytes.toBytes("fam"); 205 byte[] row = Bytes.toBytes("row"); 206 207 HTableDescriptor table = new HTableDescriptor(tablename); 208 HColumnDescriptor fam = new HColumnDescriptor(familyname); 209 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 210 table.addFamily(fam); 211 212 Connection connection1 = ConnectionFactory.createConnection(conf1); 213 Connection connection2 = ConnectionFactory.createConnection(conf2); 214 try (Admin admin1 = connection1.getAdmin()) { 215 admin1.createTable(table); 216 } 217 try (Admin admin2 = connection2.getAdmin()) { 218 admin2.createTable(table); 219 } 220 utility1.waitUntilAllRegionsAssigned(tablename); 221 utility2.waitUntilAllRegionsAssigned(tablename); 222 223 Table lHtable1 = utility1.getConnection().getTable(tablename); 224 225 // now suspend replication 226 admin.disablePeer("2"); 227 228 // put some data (lead with 0 so the edit gets sorted before the other table's edits 229 // in the replication batch) 230 // write a bunch of edits, making sure we fill a batch 231 byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped"); 232 Put put = new Put(rowkey); 233 put.addColumn(familyname, row, row); 234 lHtable1.put(put); 235 236 rowkey = Bytes.toBytes("normal put"); 237 put = new Put(rowkey); 238 put.addColumn(famName, row, row); 239 htable1.put(put); 240 241 try (Admin admin2 = connection2.getAdmin()) { 242 admin2.disableTable(tablename); 243 admin2.deleteTable(tablename); 244 } 245 246 admin.enablePeer("2"); 247 // edit should still be stuck 248 249 try (Admin admin1 = connection1.getAdmin()) { 250 // the source table still exists, replication should be stalled 251 verifyReplicationStuck(rowkey); 252 253 admin1.disableTable(tablename); 254 // still stuck, source table still exists 255 verifyReplicationStuck(rowkey); 256 257 admin1.deleteTable(tablename); 258 // now the source table is gone, replication should proceed, the 259 // offending edits be dropped 260 verifyReplicationProceeded(rowkey); 261 } 262 // just to be safe 263 conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); 264 } 265 266 private void verifyReplicationProceeded(byte[] rowkey) throws Exception { 267 Get get = new Get(rowkey); 268 for (int i = 0; i < NB_RETRIES; i++) { 269 if (i==NB_RETRIES-1) { 270 fail("Waited too much time for put replication"); 271 } 272 Result res = htable2.get(get); 273 if (res.size() == 0) { 274 LOG.info("Row not available"); 275 Thread.sleep(SLEEP_TIME); 276 } else { 277 assertArrayEquals(res.getRow(), rowkey); 278 break; 279 } 280 } 281 } 282 283 private void verifyReplicationStuck(byte[] rowkey) throws Exception { 284 Get get = new Get(rowkey); 285 for (int i = 0; i < NB_RETRIES; i++) { 286 Result res = htable2.get(get); 287 if (res.size() >= 1) { 288 fail("Edit should have been stuck behind dropped tables"); 289 } else { 290 LOG.info("Row not replicated, let's wait a bit more..."); 291 Thread.sleep(SLEEP_TIME); 292 } 293 } 294 } 295}