001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertTrue; 021 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.NamespaceDescriptor; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.ConnectionFactory; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.client.TableDescriptor; 040import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 041import org.apache.hadoop.hbase.regionserver.TestBulkLoadReplication; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.testclassification.ReplicationTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 046import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; 047import org.apache.hadoop.hbase.zookeeper.ZKUtil; 048import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 049 050import org.junit.After; 051import org.junit.Before; 052import org.junit.BeforeClass; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059/** 060 * Testcase for HBASE-23098 061 */ 062// LargeTest because spins up four clusters. 063@Category({ ReplicationTests.class, LargeTests.class }) 064public final class TestNamespaceReplicationWithBulkLoadedData extends TestBulkLoadReplication { 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestNamespaceReplicationWithBulkLoadedData.class); 068 private static final Logger LOG = 069 LoggerFactory.getLogger(TestNamespaceReplicationWithBulkLoadedData.class); 070 071 private static final HBaseTestingUtility UTIL4 = new HBaseTestingUtility(); 072 private static final String PEER4_CLUSTER_ID = "peer4"; 073 private static final String PEER4_NS = "ns_peer1"; 074 private static final String PEER4_NS_TABLE = "ns_peer2"; 075 076 private static final Configuration CONF4 = UTIL4.getConfiguration(); 077 078 private static final String NS1 = "ns1"; 079 private static final String NS2 = "ns2"; 080 081 private static final TableName NS1_TABLE = TableName.valueOf(NS1 + ":t1_syncup"); 082 private static final TableName NS2_TABLE = TableName.valueOf(NS2 + ":t2_syncup"); 083 084 @BeforeClass 085 public static void setUpBeforeClass() throws Exception { 086 setupBulkLoadConfigsForCluster(CONF4, PEER4_CLUSTER_ID); 087 setupConfig(UTIL4, "/4"); 088 TestBulkLoadReplication.setUpBeforeClass(); 089 startFourthCluster(); 090 } 091 092 private static void startFourthCluster() throws Exception { 093 LOG.info("Setup Zk to same one from UTIL1 and UTIL2 and UTIL3"); 094 UTIL4.setZkCluster(UTIL1.getZkCluster()); 095 UTIL4.startMiniCluster(NUM_SLAVES1); 096 097 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 098 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) 099 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 100 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 101 102 Connection connection4 = ConnectionFactory.createConnection(CONF4); 103 try (Admin admin4 = connection4.getAdmin()) { 104 admin4.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 105 } 106 UTIL4.waitUntilAllRegionsAssigned(tableName); 107 } 108 109 @Before 110 @Override 111 public void setUpBase() throws Exception { 112 /** "super.setUpBase()" already sets peer1 from 1 <-> 2 <-> 3 113 * and this test add the fourth cluster. 114 * So we have following topology: 115 * 1 116 * / \ 117 * 2 4 118 * / 119 * 3 120 * 121 * The 1 -> 4 has two peers, 122 * ns_peer1: ns1 -> ns1 (validate this peer hfile-refs) 123 * ns_peer1 configuration is NAMESPACES => ["ns1"] 124 * 125 * ns_peer2: ns2:t2_syncup -> ns2:t2_syncup, this peers is 126 * ns_peer2 configuration is NAMESPACES => ["ns2"], 127 * TABLE_CFS => { "ns2:t2_syncup" => []} 128 * 129 * The 1 -> 2 has one peer, this peer configuration is 130 * add_peer '2', CLUSTER_KEY => "server1.cie.com:2181:/hbase" 131 * 132 */ 133 super.setUpBase(); 134 135 // Create tables 136 TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE) 137 .setColumnFamily( 138 ColumnFamilyDescriptorBuilder.newBuilder(famName) 139 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 140 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 141 142 TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE) 143 .setColumnFamily( 144 ColumnFamilyDescriptorBuilder.newBuilder(famName) 145 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 146 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 147 148 Admin admin1 = UTIL1.getAdmin(); 149 admin1.createNamespace(NamespaceDescriptor.create(NS1).build()); 150 admin1.createNamespace(NamespaceDescriptor.create(NS2).build()); 151 admin1.createTable(table1); 152 admin1.createTable(table2); 153 154 Admin admin2 = UTIL2.getAdmin(); 155 admin2.createNamespace(NamespaceDescriptor.create(NS1).build()); 156 admin2.createNamespace(NamespaceDescriptor.create(NS2).build()); 157 admin2.createTable(table1); 158 admin2.createTable(table2); 159 160 Admin admin3 = UTIL3.getAdmin(); 161 admin3.createNamespace(NamespaceDescriptor.create(NS1).build()); 162 admin3.createNamespace(NamespaceDescriptor.create(NS2).build()); 163 admin3.createTable(table1); 164 admin3.createTable(table2); 165 166 Admin admin4 = UTIL4.getAdmin(); 167 admin4.createNamespace(NamespaceDescriptor.create(NS1).build()); 168 admin4.createNamespace(NamespaceDescriptor.create(NS2).build()); 169 admin4.createTable(table1); 170 admin4.createTable(table2); 171 172 /** 173 * Set ns_peer1 1: ns1 -> 2: ns1 174 * 175 * add_peer 'ns_peer1', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod", 176 * NAMESPACES => ["ns1"] 177 */ 178 Set<String> namespaces = new HashSet<>(); 179 namespaces.add(NS1); 180 ReplicationPeerConfig rpc4_ns = 181 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey()) 182 .setReplicateAllUserTables(false).setNamespaces(namespaces).build(); 183 admin1.addReplicationPeer(PEER4_NS, rpc4_ns); 184 185 /** 186 * Set ns_peer2 1: ns2:t2_syncup -> 4: ns2:t2_syncup 187 * 188 * add_peer 'ns_peer2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod", 189 * NAMESPACES => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => [] } 190 */ 191 Map<TableName, List<String>> tableCFsMap = new HashMap<>(); 192 tableCFsMap.put(NS2_TABLE, null); 193 ReplicationPeerConfig rpc4_ns_table = 194 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey()) 195 .setReplicateAllUserTables(false).setTableCFsMap(tableCFsMap).build(); 196 admin1.addReplicationPeer(PEER4_NS_TABLE, rpc4_ns_table); 197 } 198 199 @After 200 @Override 201 public void tearDownBase() throws Exception { 202 super.tearDownBase(); 203 TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE) 204 .setColumnFamily( 205 ColumnFamilyDescriptorBuilder.newBuilder(famName) 206 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 207 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 208 209 TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE) 210 .setColumnFamily( 211 ColumnFamilyDescriptorBuilder.newBuilder(famName) 212 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 213 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 214 Admin admin1 = UTIL1.getAdmin(); 215 admin1.disableTable(table1.getTableName()); 216 admin1.deleteTable(table1.getTableName()); 217 admin1.disableTable(table2.getTableName()); 218 admin1.deleteTable(table2.getTableName()); 219 admin1.deleteNamespace(NS1); 220 admin1.deleteNamespace(NS2); 221 222 Admin admin2 = UTIL2.getAdmin(); 223 admin2.disableTable(table1.getTableName()); 224 admin2.deleteTable(table1.getTableName()); 225 admin2.disableTable(table2.getTableName()); 226 admin2.deleteTable(table2.getTableName()); 227 admin2.deleteNamespace(NS1); 228 admin2.deleteNamespace(NS2); 229 230 Admin admin3 = UTIL3.getAdmin(); 231 admin3.disableTable(table1.getTableName()); 232 admin3.deleteTable(table1.getTableName()); 233 admin3.disableTable(table2.getTableName()); 234 admin3.deleteTable(table2.getTableName()); 235 admin3.deleteNamespace(NS1); 236 admin3.deleteNamespace(NS2); 237 238 Admin admin4 = UTIL4.getAdmin(); 239 admin4.disableTable(table1.getTableName()); 240 admin4.deleteTable(table1.getTableName()); 241 admin4.disableTable(table2.getTableName()); 242 admin4.deleteTable(table2.getTableName()); 243 admin4.deleteNamespace(NS1); 244 admin4.deleteNamespace(NS2); 245 UTIL1.getAdmin().removeReplicationPeer(PEER4_NS); 246 UTIL1.getAdmin().removeReplicationPeer(PEER4_NS_TABLE); 247 } 248 249 @Test 250 @Override 251 public void testBulkLoadReplicationActiveActive() throws Exception { 252 Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); 253 Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); 254 Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); 255 Table notPeerTable = UTIL4.getConnection().getTable(TestReplicationBase.tableName); 256 Table ns1Table = UTIL4.getConnection().getTable(NS1_TABLE); 257 Table ns2Table = UTIL4.getConnection().getTable(NS2_TABLE); 258 259 // case1: The ns1 tables will be replicate to cluster4 260 byte[] row = Bytes.toBytes("002_ns_peer"); 261 byte[] value = Bytes.toBytes("v2"); 262 bulkLoadOnCluster(ns1Table.getName(), row, value, UTIL1); 263 waitForReplication(ns1Table, 1, NB_RETRIES); 264 assertTableHasValue(ns1Table, row, value); 265 266 // case2: The ns2:t2_syncup will be replicate to cluster4 267 // If it's not fix HBASE-23098 the ns_peer1's hfile-refs(zk) will be backlog 268 row = Bytes.toBytes("003_ns_table_peer"); 269 value = Bytes.toBytes("v2"); 270 bulkLoadOnCluster(ns2Table.getName(), row, value, UTIL1); 271 waitForReplication(ns2Table, 1, NB_RETRIES); 272 assertTableHasValue(ns2Table, row, value); 273 274 // case3: The table test will be replicate to cluster1,cluster2,cluster3 275 // not replicate to cluster4, because we not set other peer for that tables. 276 row = Bytes.toBytes("001_nopeer"); 277 value = Bytes.toBytes("v1"); 278 assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, 279 peer2TestTable, peer3TestTable); 280 assertTableNoValue(notPeerTable, row, value); // 1 -> 4, table is empty 281 282 // Verify hfile-refs for 1:ns_peer1, expect is empty 283 MiniZooKeeperCluster zkCluster = UTIL1.getZkCluster(); 284 ZKWatcher watcher = new ZKWatcher(UTIL1.getConfiguration(), "TestZnodeHFiles-refs", null); 285 RecoverableZooKeeper zk = ZKUtil.connect(UTIL1.getConfiguration(), watcher); 286 ZKReplicationQueueStorage replicationQueueStorage = 287 new ZKReplicationQueueStorage(watcher, UTIL1.getConfiguration()); 288 Set<String> hfiles = replicationQueueStorage.getAllHFileRefs(); 289 assertTrue(hfiles.isEmpty()); 290 } 291}