001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.fail; 021 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.HBaseConfiguration; 024import org.apache.hadoop.hbase.HBaseTestingUtil; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.NamespaceDescriptor; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.Waiter.Predicate; 029import org.apache.hadoop.hbase.client.Admin; 030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 031import org.apache.hadoop.hbase.client.Get; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.client.TableDescriptor; 036import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 037import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.testclassification.ReplicationTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.JVMClusterUtil; 042import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 043import org.junit.jupiter.api.AfterAll; 044import org.junit.jupiter.api.AfterEach; 045import org.junit.jupiter.api.BeforeAll; 046import org.junit.jupiter.api.BeforeEach; 047import org.junit.jupiter.api.Tag; 048import org.junit.jupiter.api.Test; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052@Tag(ReplicationTests.TAG) 053@Tag(LargeTests.TAG) 054public class TestReplicationEditsDroppedWithDroppedTable { 055 056 private static final Logger LOG = 057 LoggerFactory.getLogger(TestReplicationEditsDroppedWithDroppedTable.class); 058 059 private static Configuration conf1 = HBaseConfiguration.create(); 060 private static Configuration conf2 = HBaseConfiguration.create(); 061 062 protected static HBaseTestingUtil utility1; 063 protected static HBaseTestingUtil utility2; 064 065 private static Admin admin1; 066 private static Admin admin2; 067 068 private static final String namespace = "NS"; 069 private static final TableName NORMAL_TABLE = TableName.valueOf("normal-table"); 070 private static final TableName DROPPED_TABLE = TableName.valueOf("dropped-table"); 071 private static final TableName DROPPED_NS_TABLE = TableName.valueOf("NS:dropped-table"); 072 private static final byte[] ROW = Bytes.toBytes("row"); 073 private static final byte[] FAMILY = Bytes.toBytes("f"); 074 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 075 private static final byte[] VALUE = Bytes.toBytes("value"); 076 077 private static final String PEER_ID = "1"; 078 private static final long SLEEP_TIME = 1000; 079 private static final int NB_RETRIES = 10; 080 081 @BeforeAll 082 public static void setUpBeforeClass() throws Exception { 083 // Set true to filter replication edits for dropped table 084 conf1.setBoolean(HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_TABLE_KEY, 085 true); 086 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 087 conf1.setInt("replication.source.nb.capacity", 1); 088 utility1 = new HBaseTestingUtil(conf1); 089 utility1.startMiniZKCluster(); 090 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 091 conf1 = utility1.getConfiguration(); 092 093 conf2 = HBaseConfiguration.create(conf1); 094 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 095 utility2 = new HBaseTestingUtil(conf2); 096 utility2.setZkCluster(miniZK); 097 098 utility1.startMiniCluster(1); 099 utility2.startMiniCluster(1); 100 101 admin1 = utility1.getAdmin(); 102 admin2 = utility2.getAdmin(); 103 104 NamespaceDescriptor nsDesc = NamespaceDescriptor.create(namespace).build(); 105 admin1.createNamespace(nsDesc); 106 admin2.createNamespace(nsDesc); 107 } 108 109 @AfterAll 110 public static void tearDownAfterClass() throws Exception { 111 utility2.shutdownMiniCluster(); 112 utility1.shutdownMiniCluster(); 113 } 114 115 @BeforeEach 116 public void setup() throws Exception { 117 // Roll log 118 for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster() 119 .getRegionServerThreads()) { 120 utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 121 } 122 // add peer 123 ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() 124 .setClusterKey(utility2.getRpcConnnectionURI()).setReplicateAllUserTables(true).build(); 125 admin1.addReplicationPeer(PEER_ID, rpc); 126 // create table 127 createTable(NORMAL_TABLE); 128 } 129 130 @AfterEach 131 public void tearDown() throws Exception { 132 // Remove peer 133 admin1.removeReplicationPeer(PEER_ID); 134 // Drop table 135 admin1.disableTable(NORMAL_TABLE); 136 admin1.deleteTable(NORMAL_TABLE); 137 admin2.disableTable(NORMAL_TABLE); 138 admin2.deleteTable(NORMAL_TABLE); 139 } 140 141 private void createTable(TableName tableName) throws Exception { 142 TableDescriptor desc = 143 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder 144 .newBuilder(FAMILY).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); 145 admin1.createTable(desc); 146 admin2.createTable(desc); 147 utility1.waitUntilAllRegionsAssigned(tableName); 148 utility2.waitUntilAllRegionsAssigned(tableName); 149 } 150 151 @Test 152 public void testEditsDroppedWithDroppedTable() throws Exception { 153 testWithDroppedTable(DROPPED_TABLE); 154 } 155 156 @Test 157 public void testEditsDroppedWithDroppedTableNS() throws Exception { 158 testWithDroppedTable(DROPPED_NS_TABLE); 159 } 160 161 private void testWithDroppedTable(TableName droppedTableName) throws Exception { 162 createTable(droppedTableName); 163 admin1.disableReplicationPeer(PEER_ID); 164 165 try (Table droppedTable = utility1.getConnection().getTable(droppedTableName)) { 166 Put put = new Put(ROW); 167 put.addColumn(FAMILY, QUALIFIER, VALUE); 168 droppedTable.put(put); 169 } 170 171 admin1.disableTable(droppedTableName); 172 admin1.deleteTable(droppedTableName); 173 admin2.disableTable(droppedTableName); 174 admin2.deleteTable(droppedTableName); 175 176 admin1.enableReplicationPeer(PEER_ID); 177 178 verifyReplicationProceeded(); 179 } 180 181 @Test 182 public void testEditsBehindDroppedTableTiming() throws Exception { 183 createTable(DROPPED_TABLE); 184 admin1.disableReplicationPeer(PEER_ID); 185 186 try (Table droppedTable = utility1.getConnection().getTable(DROPPED_TABLE)) { 187 Put put = new Put(ROW); 188 put.addColumn(FAMILY, QUALIFIER, VALUE); 189 droppedTable.put(put); 190 } 191 192 // Only delete table from peer cluster 193 admin2.disableTable(DROPPED_TABLE); 194 admin2.deleteTable(DROPPED_TABLE); 195 196 admin1.enableReplicationPeer(PEER_ID); 197 198 // the source table still exists, replication should be stalled 199 verifyReplicationStuck(); 200 admin1.disableTable(DROPPED_TABLE); 201 // still stuck, source table still exists 202 verifyReplicationStuck(); 203 admin1.deleteTable(DROPPED_TABLE); 204 // now the source table is gone, replication should proceed, the 205 // offending edits be dropped 206 verifyReplicationProceeded(); 207 } 208 209 private void verifyReplicationProceeded() throws Exception { 210 try (Table normalTable = utility1.getConnection().getTable(NORMAL_TABLE)) { 211 Put put = new Put(ROW); 212 put.addColumn(FAMILY, QUALIFIER, VALUE); 213 normalTable.put(put); 214 } 215 utility2.waitFor(NB_RETRIES * SLEEP_TIME, (Predicate<Exception>) () -> { 216 try (Table normalTable = utility2.getConnection().getTable(NORMAL_TABLE)) { 217 Result result = normalTable.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)); 218 return result != null && !result.isEmpty() 219 && Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER)); 220 } 221 }); 222 } 223 224 private void verifyReplicationStuck() throws Exception { 225 try (Table normalTable = utility1.getConnection().getTable(NORMAL_TABLE)) { 226 Put put = new Put(ROW); 227 put.addColumn(FAMILY, QUALIFIER, VALUE); 228 normalTable.put(put); 229 } 230 try (Table normalTable = utility2.getConnection().getTable(NORMAL_TABLE)) { 231 for (int i = 0; i < NB_RETRIES; i++) { 232 Result result = normalTable.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)); 233 if (result != null && !result.isEmpty()) { 234 fail("Edit should have been stuck behind dropped tables, but value is " 235 + Bytes.toString(result.getValue(FAMILY, QUALIFIER))); 236 } else { 237 LOG.info("Row not replicated, let's wait a bit more..."); 238 Thread.sleep(SLEEP_TIME); 239 } 240 } 241 } 242 } 243}