001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL; 021import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HBaseConfiguration; 026import org.apache.hadoop.hbase.HBaseTestingUtil; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.Admin; 029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 030import org.apache.hadoop.hbase.client.Get; 031import org.apache.hadoop.hbase.client.Put; 032import org.apache.hadoop.hbase.client.Result; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.client.TableDescriptor; 035import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 036import org.apache.hadoop.hbase.testclassification.LargeTests; 037import org.apache.hadoop.hbase.testclassification.ReplicationTests; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 040import org.junit.jupiter.api.AfterAll; 041import org.junit.jupiter.api.BeforeAll; 042import org.junit.jupiter.api.Tag; 043import org.junit.jupiter.api.Test; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * Replication with dropped table will stuck as the default REPLICATION_DROP_ON_DELETED_TABLE_KEY is 049 * false. 050 */ 051@Tag(ReplicationTests.TAG) 052@Tag(LargeTests.TAG) 053public class TestReplicationStuckWithDroppedTable { 054 055 private static final Logger LOG = 056 LoggerFactory.getLogger(TestReplicationStuckWithDroppedTable.class); 057 058 private static Configuration conf1 = HBaseConfiguration.create(); 059 private static Configuration conf2 = HBaseConfiguration.create(); 060 061 protected static HBaseTestingUtil utility1; 062 protected static HBaseTestingUtil utility2; 063 064 private static Admin admin1; 065 private static Admin admin2; 066 067 private static final TableName NORMAL_TABLE = TableName.valueOf("normal-table"); 068 private static final TableName DROPPED_TABLE = TableName.valueOf("dropped-table"); 069 private static final byte[] ROW = Bytes.toBytes("row"); 070 private static final byte[] FAMILY = Bytes.toBytes("f"); 071 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 072 private static final byte[] VALUE = Bytes.toBytes("value"); 073 074 private static final String PEER_ID = "1"; 075 private static final long SLEEP_TIME = 1000; 076 private static final int NB_RETRIES = 10; 077 078 @BeforeAll 079 public static void setUpBeforeClass() throws Exception { 080 conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1"); 081 conf1.setInt("replication.source.nb.capacity", 1); 082 utility1 = new HBaseTestingUtil(conf1); 083 utility1.startMiniZKCluster(); 084 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 085 conf1 = utility1.getConfiguration(); 086 087 conf2 = HBaseConfiguration.create(conf1); 088 conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2"); 089 utility2 = new HBaseTestingUtil(conf2); 090 utility2.setZkCluster(miniZK); 091 092 utility1.startMiniCluster(1); 093 utility2.startMiniCluster(1); 094 095 admin1 = utility1.getAdmin(); 096 admin2 = utility2.getAdmin(); 097 } 098 099 @AfterAll 100 public static void tearDownAfterClass() throws Exception { 101 utility2.shutdownMiniCluster(); 102 utility1.shutdownMiniCluster(); 103 } 104 105 private void createTable(TableName tableName) throws Exception { 106 TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) 107 .setColumnFamily( 108 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build()) 109 .build(); 110 admin1.createTable(desc); 111 admin2.createTable(desc); 112 utility1.waitUntilAllRegionsAssigned(tableName); 113 utility2.waitUntilAllRegionsAssigned(tableName); 114 } 115 116 @Test 117 public void testEditsStuckBehindDroppedTable() throws Exception { 118 // add peer 119 ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() 120 .setClusterKey(utility2.getRpcConnnectionURI()).setReplicateAllUserTables(true).build(); 121 admin1.addReplicationPeer(PEER_ID, rpc); 122 123 // create table 124 createTable(NORMAL_TABLE); 125 createTable(DROPPED_TABLE); 126 127 admin1.disableReplicationPeer(PEER_ID); 128 129 try (Table droppedTable = utility1.getConnection().getTable(DROPPED_TABLE)) { 130 Put put = new Put(ROW); 131 put.addColumn(FAMILY, QUALIFIER, VALUE); 132 droppedTable.put(put); 133 } 134 135 admin1.disableTable(DROPPED_TABLE); 136 admin1.deleteTable(DROPPED_TABLE); 137 admin2.disableTable(DROPPED_TABLE); 138 admin2.deleteTable(DROPPED_TABLE); 139 140 admin1.enableReplicationPeer(PEER_ID); 141 142 verifyReplicationStuck(); 143 144 // Remove peer 145 admin1.removeReplicationPeer(PEER_ID); 146 // Drop table 147 admin1.disableTable(NORMAL_TABLE); 148 admin1.deleteTable(NORMAL_TABLE); 149 admin2.disableTable(NORMAL_TABLE); 150 admin2.deleteTable(NORMAL_TABLE); 151 } 152 153 private void verifyReplicationStuck() throws Exception { 154 try (Table normalTable = utility1.getConnection().getTable(NORMAL_TABLE)) { 155 Put put = new Put(ROW); 156 put.addColumn(FAMILY, QUALIFIER, VALUE); 157 normalTable.put(put); 158 } 159 try (Table normalTable = utility2.getConnection().getTable(NORMAL_TABLE)) { 160 for (int i = 0; i < NB_RETRIES; i++) { 161 Result result = normalTable.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)); 162 if (result != null && !result.isEmpty()) { 163 fail("Edit should have been stuck behind dropped tables, but value is " 164 + Bytes.toString(result.getValue(FAMILY, QUALIFIER))); 165 } else { 166 LOG.info("Row not replicated, let's wait a bit more..."); 167 Thread.sleep(SLEEP_TIME); 168 } 169 } 170 } 171 } 172}