001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertTrue; 021 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.NamespaceDescriptor; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.client.TableDescriptor; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.testclassification.ReplicationTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.junit.jupiter.api.AfterAll; 043import org.junit.jupiter.api.BeforeAll; 044import org.junit.jupiter.api.Tag; 045import org.junit.jupiter.api.Test; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Testcase for HBASE-23098 051 */ 052@Tag(ReplicationTests.TAG) 053@Tag(LargeTests.TAG) 054public final class TestNamespaceReplicationWithBulkLoadedData extends TestBulkLoadReplication { 055 056 private static final Logger LOG = 057 LoggerFactory.getLogger(TestNamespaceReplicationWithBulkLoadedData.class); 058 059 private static final HBaseTestingUtil UTIL4 = new HBaseTestingUtil(); 060 private static final String PEER4_CLUSTER_ID = "peer4"; 061 private static final String PEER4_NS = "ns_peer1"; 062 private static final String PEER4_NS_TABLE = "ns_peer2"; 063 064 private static final Configuration CONF4 = UTIL4.getConfiguration(); 065 066 private static final String NS1 = "ns1"; 067 private static final String NS2 = "ns2"; 068 069 private static final TableName NS1_TABLE = TableName.valueOf(NS1 + ":t1_syncup"); 070 private static final TableName NS2_TABLE = TableName.valueOf(NS2 + ":t2_syncup"); 071 072 @BeforeAll 073 public static void setUpBeforeAll() throws Exception { 074 setupBulkLoadConfigsForCluster(CONF4, PEER4_CLUSTER_ID); 075 setupConfig(UTIL4, "/4"); 076 startFourthCluster(); 077 078 /** 079 * Parent class already sets peer1 from 1 <-> 2 <-> 3 and this test add the fourth cluster. So 080 * we have following topology: 1 / \ 2 4 / 3 The 1 -> 4 has two peers, ns_peer1: ns1 -> ns1 081 * (validate this peer hfile-refs) ns_peer1 configuration is NAMESPACES => ["ns1"] ns_peer2: 082 * ns2:t2_syncup -> ns2:t2_syncup, this peers is ns_peer2 configuration is NAMESPACES => 083 * ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => []} The 1 -> 2 has one peer, this peer 084 * configuration is add_peer '2', CLUSTER_KEY => "server1.cie.com:2181:/hbase" 085 */ 086 // Create tables 087 TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE) 088 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 089 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 090 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 091 092 TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE) 093 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 094 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 095 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 096 097 Admin admin1 = UTIL1.getAdmin(); 098 admin1.createNamespace(NamespaceDescriptor.create(NS1).build()); 099 admin1.createNamespace(NamespaceDescriptor.create(NS2).build()); 100 admin1.createTable(table1); 101 admin1.createTable(table2); 102 103 Admin admin2 = UTIL2.getAdmin(); 104 admin2.createNamespace(NamespaceDescriptor.create(NS1).build()); 105 admin2.createNamespace(NamespaceDescriptor.create(NS2).build()); 106 admin2.createTable(table1); 107 admin2.createTable(table2); 108 109 Admin admin3 = UTIL3.getAdmin(); 110 admin3.createNamespace(NamespaceDescriptor.create(NS1).build()); 111 admin3.createNamespace(NamespaceDescriptor.create(NS2).build()); 112 admin3.createTable(table1); 113 admin3.createTable(table2); 114 115 Admin admin4 = UTIL4.getAdmin(); 116 admin4.createNamespace(NamespaceDescriptor.create(NS1).build()); 117 admin4.createNamespace(NamespaceDescriptor.create(NS2).build()); 118 admin4.createTable(table1); 119 admin4.createTable(table2); 120 121 /** 122 * Set ns_peer1 1: ns1 -> 2: ns1 add_peer 'ns_peer1', CLUSTER_KEY => 123 * "zk1,zk2,zk3:2182:/hbase-prod", NAMESPACES => ["ns1"] 124 */ 125 Set<String> namespaces = new HashSet<>(); 126 namespaces.add(NS1); 127 ReplicationPeerConfig rpc4_ns = 128 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getRpcConnnectionURI()) 129 .setReplicateAllUserTables(false).setNamespaces(namespaces).build(); 130 admin1.addReplicationPeer(PEER4_NS, rpc4_ns); 131 132 /** 133 * Set ns_peer2 1: ns2:t2_syncup -> 4: ns2:t2_syncup add_peer 'ns_peer2', CLUSTER_KEY => 134 * "zk1,zk2,zk3:2182:/hbase-prod", NAMESPACES => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => [] } 135 */ 136 Map<TableName, List<String>> tableCFsMap = new HashMap<>(); 137 tableCFsMap.put(NS2_TABLE, null); 138 ReplicationPeerConfig rpc4_ns_table = 139 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getRpcConnnectionURI()) 140 .setReplicateAllUserTables(false).setTableCFsMap(tableCFsMap).build(); 141 admin1.addReplicationPeer(PEER4_NS_TABLE, rpc4_ns_table); 142 } 143 144 @AfterAll 145 public static void tearDownAfterAll() throws Exception { 146 UTIL4.shutdownMiniCluster(); 147 } 148 149 private static void startFourthCluster() throws Exception { 150 LOG.info("Setup Zk to same one from UTIL1 and UTIL2 and UTIL3"); 151 UTIL4.setZkCluster(UTIL1.getZkCluster()); 152 UTIL4.startMiniCluster(NUM_SLAVES1); 153 154 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 155 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) 156 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 157 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 158 159 Connection connection4 = ConnectionFactory.createConnection(CONF4); 160 try (Admin admin4 = connection4.getAdmin()) { 161 admin4.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 162 } 163 UTIL4.waitUntilAllRegionsAssigned(tableName); 164 } 165 166 @Test 167 @Override 168 public void testBulkLoadReplicationActiveActive() throws Exception { 169 Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); 170 Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); 171 Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); 172 Table notPeerTable = UTIL4.getConnection().getTable(TestReplicationBase.tableName); 173 Table ns1Table = UTIL4.getConnection().getTable(NS1_TABLE); 174 Table ns2Table = UTIL4.getConnection().getTable(NS2_TABLE); 175 176 // case1: The ns1 tables will be replicate to cluster4 177 byte[] row = Bytes.toBytes("002_ns_peer"); 178 byte[] value = Bytes.toBytes("v2"); 179 bulkLoadOnCluster(ns1Table.getName(), row, value, UTIL1); 180 waitForReplication(ns1Table, 1, NB_RETRIES); 181 assertTableHasValue(ns1Table, row, value); 182 183 // case2: The ns2:t2_syncup will be replicate to cluster4 184 // If it's not fix HBASE-23098 the ns_peer1's hfile-refs(zk) will be backlog 185 row = Bytes.toBytes("003_ns_table_peer"); 186 value = Bytes.toBytes("v2"); 187 bulkLoadOnCluster(ns2Table.getName(), row, value, UTIL1); 188 waitForReplication(ns2Table, 1, NB_RETRIES); 189 assertTableHasValue(ns2Table, row, value); 190 191 // case3: The table test will be replicate to cluster1,cluster2,cluster3 192 // not replicate to cluster4, because we not set other peer for that tables. 193 row = Bytes.toBytes("001_nopeer"); 194 value = Bytes.toBytes("v1"); 195 assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, peer2TestTable, 196 peer3TestTable); 197 assertTableNoValue(notPeerTable, row, value); // 1 -> 4, table is empty 198 199 // Verify hfile-refs for 1:ns_peer1, expect is empty 200 ReplicationQueueStorage replicationQueueStorage = ReplicationStorageFactory 201 .getReplicationQueueStorage(UTIL1.getConnection(), UTIL1.getConfiguration()); 202 Set<String> hfiles = replicationQueueStorage.getAllHFileRefs(); 203 assertTrue(hfiles.isEmpty()); 204 } 205}