001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertTrue;
021
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.NamespaceDescriptor;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.client.ConnectionFactory;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.regionserver.TestBulkLoadReplication;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.testclassification.ReplicationTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
046import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
047import org.apache.hadoop.hbase.zookeeper.ZKUtil;
048import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
049
050import org.junit.After;
051import org.junit.Before;
052import org.junit.BeforeClass;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059/**
060 * Testcase for HBASE-23098
061 */
062// LargeTest because spins up four clusters.
063@Category({ ReplicationTests.class, LargeTests.class })
064public final class TestNamespaceReplicationWithBulkLoadedData extends TestBulkLoadReplication {
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067      HBaseClassTestRule.forClass(TestNamespaceReplicationWithBulkLoadedData.class);
068  private static final Logger LOG =
069      LoggerFactory.getLogger(TestNamespaceReplicationWithBulkLoadedData.class);
070
071  private static final HBaseTestingUtility UTIL4 = new HBaseTestingUtility();
072  private static final String PEER4_CLUSTER_ID = "peer4";
073  private static final String PEER4_NS = "ns_peer1";
074  private static final String PEER4_NS_TABLE = "ns_peer2";
075
076  private static final Configuration CONF4 = UTIL4.getConfiguration();
077
078  private static final String NS1 = "ns1";
079  private static final String NS2 = "ns2";
080
081  private static final TableName NS1_TABLE = TableName.valueOf(NS1 + ":t1_syncup");
082  private static final TableName NS2_TABLE = TableName.valueOf(NS2 + ":t2_syncup");
083
084  @BeforeClass
085  public static void setUpBeforeClass() throws Exception {
086    setupBulkLoadConfigsForCluster(CONF4, PEER4_CLUSTER_ID);
087    setupConfig(UTIL4, "/4");
088    TestBulkLoadReplication.setUpBeforeClass();
089    startFourthCluster();
090  }
091
092  private static void startFourthCluster() throws Exception {
093    LOG.info("Setup Zk to same one from UTIL1 and UTIL2 and UTIL3");
094    UTIL4.setZkCluster(UTIL1.getZkCluster());
095    UTIL4.startMiniCluster(NUM_SLAVES1);
096
097    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
098        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
099            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
100        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
101
102    Connection connection4 = ConnectionFactory.createConnection(CONF4);
103    try (Admin admin4 = connection4.getAdmin()) {
104      admin4.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
105    }
106    UTIL4.waitUntilAllRegionsAssigned(tableName);
107  }
108
109  @Before
110  @Override
111  public void setUpBase() throws Exception {
112    /** "super.setUpBase()" already sets peer1 from 1 <-> 2 <-> 3
113     * and this test add the fourth cluster.
114     * So we have following topology:
115     *      1
116     *     / \
117     *    2   4
118     *   /
119     *  3
120     *
121     *  The 1 -> 4 has two peers,
122     *  ns_peer1:  ns1 -> ns1 (validate this peer hfile-refs)
123     *             ns_peer1 configuration is NAMESPACES => ["ns1"]
124     *
125     *  ns_peer2:  ns2:t2_syncup -> ns2:t2_syncup, this peers is
126     *             ns_peer2 configuration is NAMESPACES => ["ns2"],
127     *                       TABLE_CFS => { "ns2:t2_syncup" => []}
128     *
129     *  The 1 -> 2 has one peer, this peer configuration is
130     *             add_peer '2', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
131     *
132     */
133    super.setUpBase();
134
135    // Create tables
136    TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE)
137        .setColumnFamily(
138            ColumnFamilyDescriptorBuilder.newBuilder(famName)
139                .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
140        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
141
142    TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE)
143        .setColumnFamily(
144            ColumnFamilyDescriptorBuilder.newBuilder(famName)
145                .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
146        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
147
148    Admin admin1 = UTIL1.getAdmin();
149    admin1.createNamespace(NamespaceDescriptor.create(NS1).build());
150    admin1.createNamespace(NamespaceDescriptor.create(NS2).build());
151    admin1.createTable(table1);
152    admin1.createTable(table2);
153
154    Admin admin2 = UTIL2.getAdmin();
155    admin2.createNamespace(NamespaceDescriptor.create(NS1).build());
156    admin2.createNamespace(NamespaceDescriptor.create(NS2).build());
157    admin2.createTable(table1);
158    admin2.createTable(table2);
159
160    Admin admin3 = UTIL3.getAdmin();
161    admin3.createNamespace(NamespaceDescriptor.create(NS1).build());
162    admin3.createNamespace(NamespaceDescriptor.create(NS2).build());
163    admin3.createTable(table1);
164    admin3.createTable(table2);
165
166    Admin admin4 = UTIL4.getAdmin();
167    admin4.createNamespace(NamespaceDescriptor.create(NS1).build());
168    admin4.createNamespace(NamespaceDescriptor.create(NS2).build());
169    admin4.createTable(table1);
170    admin4.createTable(table2);
171
172    /**
173     *  Set ns_peer1 1: ns1 -> 2: ns1
174     *
175     *  add_peer 'ns_peer1', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
176     *     NAMESPACES => ["ns1"]
177     */
178    Set<String> namespaces = new HashSet<>();
179    namespaces.add(NS1);
180    ReplicationPeerConfig rpc4_ns =
181        ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey())
182            .setReplicateAllUserTables(false).setNamespaces(namespaces).build();
183    admin1.addReplicationPeer(PEER4_NS, rpc4_ns);
184
185    /**
186     * Set ns_peer2 1: ns2:t2_syncup -> 4: ns2:t2_syncup
187     *
188     * add_peer 'ns_peer2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
189     *          NAMESPACES => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => [] }
190     */
191    Map<TableName, List<String>> tableCFsMap = new HashMap<>();
192    tableCFsMap.put(NS2_TABLE, null);
193    ReplicationPeerConfig rpc4_ns_table =
194        ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey())
195            .setReplicateAllUserTables(false).setTableCFsMap(tableCFsMap).build();
196    admin1.addReplicationPeer(PEER4_NS_TABLE, rpc4_ns_table);
197  }
198
199  @After
200  @Override
201  public void tearDownBase() throws Exception {
202    super.tearDownBase();
203    TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE)
204        .setColumnFamily(
205            ColumnFamilyDescriptorBuilder.newBuilder(famName)
206                .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
207        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
208
209    TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE)
210        .setColumnFamily(
211            ColumnFamilyDescriptorBuilder.newBuilder(famName)
212                .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
213        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
214    Admin admin1 = UTIL1.getAdmin();
215    admin1.disableTable(table1.getTableName());
216    admin1.deleteTable(table1.getTableName());
217    admin1.disableTable(table2.getTableName());
218    admin1.deleteTable(table2.getTableName());
219    admin1.deleteNamespace(NS1);
220    admin1.deleteNamespace(NS2);
221
222    Admin admin2 = UTIL2.getAdmin();
223    admin2.disableTable(table1.getTableName());
224    admin2.deleteTable(table1.getTableName());
225    admin2.disableTable(table2.getTableName());
226    admin2.deleteTable(table2.getTableName());
227    admin2.deleteNamespace(NS1);
228    admin2.deleteNamespace(NS2);
229
230    Admin admin3 = UTIL3.getAdmin();
231    admin3.disableTable(table1.getTableName());
232    admin3.deleteTable(table1.getTableName());
233    admin3.disableTable(table2.getTableName());
234    admin3.deleteTable(table2.getTableName());
235    admin3.deleteNamespace(NS1);
236    admin3.deleteNamespace(NS2);
237
238    Admin admin4 = UTIL4.getAdmin();
239    admin4.disableTable(table1.getTableName());
240    admin4.deleteTable(table1.getTableName());
241    admin4.disableTable(table2.getTableName());
242    admin4.deleteTable(table2.getTableName());
243    admin4.deleteNamespace(NS1);
244    admin4.deleteNamespace(NS2);
245    UTIL1.getAdmin().removeReplicationPeer(PEER4_NS);
246    UTIL1.getAdmin().removeReplicationPeer(PEER4_NS_TABLE);
247  }
248
249  @Test
250  @Override
251  public void testBulkLoadReplicationActiveActive() throws Exception {
252    Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
253    Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
254    Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
255    Table notPeerTable = UTIL4.getConnection().getTable(TestReplicationBase.tableName);
256    Table ns1Table = UTIL4.getConnection().getTable(NS1_TABLE);
257    Table ns2Table = UTIL4.getConnection().getTable(NS2_TABLE);
258
259    // case1: The ns1 tables will be replicate to cluster4
260    byte[] row = Bytes.toBytes("002_ns_peer");
261    byte[] value = Bytes.toBytes("v2");
262    bulkLoadOnCluster(ns1Table.getName(), row, value, UTIL1);
263    waitForReplication(ns1Table, 1, NB_RETRIES);
264    assertTableHasValue(ns1Table, row, value);
265
266    // case2: The ns2:t2_syncup will be replicate to cluster4
267    // If it's not fix HBASE-23098 the ns_peer1's hfile-refs(zk) will be backlog
268    row = Bytes.toBytes("003_ns_table_peer");
269    value = Bytes.toBytes("v2");
270    bulkLoadOnCluster(ns2Table.getName(), row, value, UTIL1);
271    waitForReplication(ns2Table, 1, NB_RETRIES);
272    assertTableHasValue(ns2Table, row, value);
273
274    // case3: The table test will be replicate to cluster1,cluster2,cluster3
275    //        not replicate to cluster4, because we not set other peer for that tables.
276    row = Bytes.toBytes("001_nopeer");
277    value = Bytes.toBytes("v1");
278    assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable,
279        peer2TestTable, peer3TestTable);
280    assertTableNoValue(notPeerTable, row, value); // 1 -> 4, table is empty
281
282    // Verify hfile-refs for 1:ns_peer1, expect is empty
283    MiniZooKeeperCluster zkCluster = UTIL1.getZkCluster();
284    ZKWatcher watcher = new ZKWatcher(UTIL1.getConfiguration(), "TestZnodeHFiles-refs", null);
285    RecoverableZooKeeper zk = ZKUtil.connect(UTIL1.getConfiguration(), watcher);
286    ZKReplicationQueueStorage replicationQueueStorage =
287        new ZKReplicationQueueStorage(watcher, UTIL1.getConfiguration());
288    Set<String> hfiles = replicationQueueStorage.getAllHFileRefs();
289    assertTrue(hfiles.isEmpty());
290  }
291}