001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.NamespaceDescriptor;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.client.TableDescriptor;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.testclassification.ReplicationTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.junit.jupiter.api.AfterAll;
043import org.junit.jupiter.api.BeforeAll;
044import org.junit.jupiter.api.Tag;
045import org.junit.jupiter.api.Test;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Testcase for HBASE-23098
051 */
052@Tag(ReplicationTests.TAG)
053@Tag(LargeTests.TAG)
054public final class TestNamespaceReplicationWithBulkLoadedData extends TestBulkLoadReplication {
055
056  private static final Logger LOG =
057    LoggerFactory.getLogger(TestNamespaceReplicationWithBulkLoadedData.class);
058
059  private static final HBaseTestingUtil UTIL4 = new HBaseTestingUtil();
060  private static final String PEER4_CLUSTER_ID = "peer4";
061  private static final String PEER4_NS = "ns_peer1";
062  private static final String PEER4_NS_TABLE = "ns_peer2";
063
064  private static final Configuration CONF4 = UTIL4.getConfiguration();
065
066  private static final String NS1 = "ns1";
067  private static final String NS2 = "ns2";
068
069  private static final TableName NS1_TABLE = TableName.valueOf(NS1 + ":t1_syncup");
070  private static final TableName NS2_TABLE = TableName.valueOf(NS2 + ":t2_syncup");
071
072  @BeforeAll
073  public static void setUpBeforeAll() throws Exception {
074    setupBulkLoadConfigsForCluster(CONF4, PEER4_CLUSTER_ID);
075    setupConfig(UTIL4, "/4");
076    startFourthCluster();
077
078    /**
079     * Parent class already sets peer1 from 1 <-> 2 <-> 3 and this test add the fourth cluster. So
080     * we have following topology: 1 / \ 2 4 / 3 The 1 -> 4 has two peers, ns_peer1: ns1 -> ns1
081     * (validate this peer hfile-refs) ns_peer1 configuration is NAMESPACES => ["ns1"] ns_peer2:
082     * ns2:t2_syncup -> ns2:t2_syncup, this peers is ns_peer2 configuration is NAMESPACES =>
083     * ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => []} The 1 -> 2 has one peer, this peer
084     * configuration is add_peer '2', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
085     */
086    // Create tables
087    TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE)
088      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
089        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
090      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
091
092    TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE)
093      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
094        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
095      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
096
097    Admin admin1 = UTIL1.getAdmin();
098    admin1.createNamespace(NamespaceDescriptor.create(NS1).build());
099    admin1.createNamespace(NamespaceDescriptor.create(NS2).build());
100    admin1.createTable(table1);
101    admin1.createTable(table2);
102
103    Admin admin2 = UTIL2.getAdmin();
104    admin2.createNamespace(NamespaceDescriptor.create(NS1).build());
105    admin2.createNamespace(NamespaceDescriptor.create(NS2).build());
106    admin2.createTable(table1);
107    admin2.createTable(table2);
108
109    Admin admin3 = UTIL3.getAdmin();
110    admin3.createNamespace(NamespaceDescriptor.create(NS1).build());
111    admin3.createNamespace(NamespaceDescriptor.create(NS2).build());
112    admin3.createTable(table1);
113    admin3.createTable(table2);
114
115    Admin admin4 = UTIL4.getAdmin();
116    admin4.createNamespace(NamespaceDescriptor.create(NS1).build());
117    admin4.createNamespace(NamespaceDescriptor.create(NS2).build());
118    admin4.createTable(table1);
119    admin4.createTable(table2);
120
121    /**
122     * Set ns_peer1 1: ns1 -> 2: ns1 add_peer 'ns_peer1', CLUSTER_KEY =>
123     * "zk1,zk2,zk3:2182:/hbase-prod", NAMESPACES => ["ns1"]
124     */
125    Set<String> namespaces = new HashSet<>();
126    namespaces.add(NS1);
127    ReplicationPeerConfig rpc4_ns =
128      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getRpcConnnectionURI())
129        .setReplicateAllUserTables(false).setNamespaces(namespaces).build();
130    admin1.addReplicationPeer(PEER4_NS, rpc4_ns);
131
132    /**
133     * Set ns_peer2 1: ns2:t2_syncup -> 4: ns2:t2_syncup add_peer 'ns_peer2', CLUSTER_KEY =>
134     * "zk1,zk2,zk3:2182:/hbase-prod", NAMESPACES => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => [] }
135     */
136    Map<TableName, List<String>> tableCFsMap = new HashMap<>();
137    tableCFsMap.put(NS2_TABLE, null);
138    ReplicationPeerConfig rpc4_ns_table =
139      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getRpcConnnectionURI())
140        .setReplicateAllUserTables(false).setTableCFsMap(tableCFsMap).build();
141    admin1.addReplicationPeer(PEER4_NS_TABLE, rpc4_ns_table);
142  }
143
144  @AfterAll
145  public static void tearDownAfterAll() throws Exception {
146    UTIL4.shutdownMiniCluster();
147  }
148
149  private static void startFourthCluster() throws Exception {
150    LOG.info("Setup Zk to same one from UTIL1 and UTIL2 and UTIL3");
151    UTIL4.setZkCluster(UTIL1.getZkCluster());
152    UTIL4.startMiniCluster(NUM_SLAVES1);
153
154    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
155      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
156        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
157      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
158
159    Connection connection4 = ConnectionFactory.createConnection(CONF4);
160    try (Admin admin4 = connection4.getAdmin()) {
161      admin4.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
162    }
163    UTIL4.waitUntilAllRegionsAssigned(tableName);
164  }
165
166  @Test
167  @Override
168  public void testBulkLoadReplicationActiveActive() throws Exception {
169    Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
170    Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
171    Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
172    Table notPeerTable = UTIL4.getConnection().getTable(TestReplicationBase.tableName);
173    Table ns1Table = UTIL4.getConnection().getTable(NS1_TABLE);
174    Table ns2Table = UTIL4.getConnection().getTable(NS2_TABLE);
175
176    // case1: The ns1 tables will be replicate to cluster4
177    byte[] row = Bytes.toBytes("002_ns_peer");
178    byte[] value = Bytes.toBytes("v2");
179    bulkLoadOnCluster(ns1Table.getName(), row, value, UTIL1);
180    waitForReplication(ns1Table, 1, NB_RETRIES);
181    assertTableHasValue(ns1Table, row, value);
182
183    // case2: The ns2:t2_syncup will be replicate to cluster4
184    // If it's not fix HBASE-23098 the ns_peer1's hfile-refs(zk) will be backlog
185    row = Bytes.toBytes("003_ns_table_peer");
186    value = Bytes.toBytes("v2");
187    bulkLoadOnCluster(ns2Table.getName(), row, value, UTIL1);
188    waitForReplication(ns2Table, 1, NB_RETRIES);
189    assertTableHasValue(ns2Table, row, value);
190
191    // case3: The table test will be replicate to cluster1,cluster2,cluster3
192    // not replicate to cluster4, because we not set other peer for that tables.
193    row = Bytes.toBytes("001_nopeer");
194    value = Bytes.toBytes("v1");
195    assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, peer2TestTable,
196      peer3TestTable);
197    assertTableNoValue(notPeerTable, row, value); // 1 -> 4, table is empty
198
199    // Verify hfile-refs for 1:ns_peer1, expect is empty
200    ReplicationQueueStorage replicationQueueStorage = ReplicationStorageFactory
201      .getReplicationQueueStorage(UTIL1.getConnection(), UTIL1.getConfiguration());
202    Set<String> hfiles = replicationQueueStorage.getAllHFileRefs();
203    assertTrue(hfiles.isEmpty());
204  }
205}