001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertTrue;
021
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.NamespaceDescriptor;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.client.TableDescriptor;
039import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
040import org.apache.hadoop.hbase.regionserver.TestBulkLoadReplication;
041import org.apache.hadoop.hbase.testclassification.LargeTests;
042import org.apache.hadoop.hbase.testclassification.ReplicationTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
045import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
046import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
047import org.junit.After;
048import org.junit.Before;
049import org.junit.BeforeClass;
050import org.junit.ClassRule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * Testcase for HBASE-23098
058 */
059// LargeTest because spins up four clusters.
060@Category({ ReplicationTests.class, LargeTests.class })
061public final class TestNamespaceReplicationWithBulkLoadedData extends TestBulkLoadReplication {
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestNamespaceReplicationWithBulkLoadedData.class);
065  private static final Logger LOG =
066    LoggerFactory.getLogger(TestNamespaceReplicationWithBulkLoadedData.class);
067
068  private static final HBaseTestingUtility UTIL4 = new HBaseTestingUtility();
069  private static final String PEER4_CLUSTER_ID = "peer4";
070  private static final String PEER4_NS = "ns_peer1";
071  private static final String PEER4_NS_TABLE = "ns_peer2";
072
073  private static final Configuration CONF4 = UTIL4.getConfiguration();
074
075  private static final String NS1 = "ns1";
076  private static final String NS2 = "ns2";
077
078  private static final TableName NS1_TABLE = TableName.valueOf(NS1 + ":t1_syncup");
079  private static final TableName NS2_TABLE = TableName.valueOf(NS2 + ":t2_syncup");
080
081  @BeforeClass
082  public static void setUpBeforeClass() throws Exception {
083    setupBulkLoadConfigsForCluster(CONF4, PEER4_CLUSTER_ID);
084    setupConfig(UTIL4, "/4");
085    TestBulkLoadReplication.setUpBeforeClass();
086    startFourthCluster();
087  }
088
089  private static void startFourthCluster() throws Exception {
090    LOG.info("Setup Zk to same one from UTIL1 and UTIL2 and UTIL3");
091    UTIL4.setZkCluster(UTIL1.getZkCluster());
092    UTIL4.startMiniCluster(NUM_SLAVES1);
093
094    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
095      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
096        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
097      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
098
099    Connection connection4 = ConnectionFactory.createConnection(CONF4);
100    try (Admin admin4 = connection4.getAdmin()) {
101      admin4.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
102    }
103    UTIL4.waitUntilAllRegionsAssigned(tableName);
104  }
105
106  @Before
107  @Override
108  public void setUpBase() throws Exception {
109    /**
110     * "super.setUpBase()" already sets peer1 from 1 <-> 2 <-> 3 and this test add the fourth
111     * cluster. So we have following topology: 1 / \ 2 4 / 3 The 1 -> 4 has two peers, ns_peer1: ns1
112     * -> ns1 (validate this peer hfile-refs) ns_peer1 configuration is NAMESPACES => ["ns1"]
113     * ns_peer2: ns2:t2_syncup -> ns2:t2_syncup, this peers is ns_peer2 configuration is NAMESPACES
114     * => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => []} The 1 -> 2 has one peer, this peer
115     * configuration is add_peer '2', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
116     */
117    super.setUpBase();
118
119    // Create tables
120    TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE)
121      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
122        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
123      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
124
125    TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE)
126      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
127        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
128      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
129
130    Admin admin1 = UTIL1.getAdmin();
131    admin1.createNamespace(NamespaceDescriptor.create(NS1).build());
132    admin1.createNamespace(NamespaceDescriptor.create(NS2).build());
133    admin1.createTable(table1);
134    admin1.createTable(table2);
135
136    Admin admin2 = UTIL2.getAdmin();
137    admin2.createNamespace(NamespaceDescriptor.create(NS1).build());
138    admin2.createNamespace(NamespaceDescriptor.create(NS2).build());
139    admin2.createTable(table1);
140    admin2.createTable(table2);
141
142    Admin admin3 = UTIL3.getAdmin();
143    admin3.createNamespace(NamespaceDescriptor.create(NS1).build());
144    admin3.createNamespace(NamespaceDescriptor.create(NS2).build());
145    admin3.createTable(table1);
146    admin3.createTable(table2);
147
148    Admin admin4 = UTIL4.getAdmin();
149    admin4.createNamespace(NamespaceDescriptor.create(NS1).build());
150    admin4.createNamespace(NamespaceDescriptor.create(NS2).build());
151    admin4.createTable(table1);
152    admin4.createTable(table2);
153
154    /**
155     * Set ns_peer1 1: ns1 -> 2: ns1 add_peer 'ns_peer1', CLUSTER_KEY =>
156     * "zk1,zk2,zk3:2182:/hbase-prod", NAMESPACES => ["ns1"]
157     */
158    Set<String> namespaces = new HashSet<>();
159    namespaces.add(NS1);
160    ReplicationPeerConfig rpc4_ns =
161      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey())
162        .setReplicateAllUserTables(false).setNamespaces(namespaces).build();
163    admin1.addReplicationPeer(PEER4_NS, rpc4_ns);
164
165    /**
166     * Set ns_peer2 1: ns2:t2_syncup -> 4: ns2:t2_syncup add_peer 'ns_peer2', CLUSTER_KEY =>
167     * "zk1,zk2,zk3:2182:/hbase-prod", NAMESPACES => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => [] }
168     */
169    Map<TableName, List<String>> tableCFsMap = new HashMap<>();
170    tableCFsMap.put(NS2_TABLE, null);
171    ReplicationPeerConfig rpc4_ns_table =
172      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey())
173        .setReplicateAllUserTables(false).setTableCFsMap(tableCFsMap).build();
174    admin1.addReplicationPeer(PEER4_NS_TABLE, rpc4_ns_table);
175  }
176
177  @After
178  @Override
179  public void tearDownBase() throws Exception {
180    super.tearDownBase();
181    TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE)
182      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
183        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
184      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
185
186    TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE)
187      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
188        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
189      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
190    Admin admin1 = UTIL1.getAdmin();
191    admin1.disableTable(table1.getTableName());
192    admin1.deleteTable(table1.getTableName());
193    admin1.disableTable(table2.getTableName());
194    admin1.deleteTable(table2.getTableName());
195    admin1.deleteNamespace(NS1);
196    admin1.deleteNamespace(NS2);
197
198    Admin admin2 = UTIL2.getAdmin();
199    admin2.disableTable(table1.getTableName());
200    admin2.deleteTable(table1.getTableName());
201    admin2.disableTable(table2.getTableName());
202    admin2.deleteTable(table2.getTableName());
203    admin2.deleteNamespace(NS1);
204    admin2.deleteNamespace(NS2);
205
206    Admin admin3 = UTIL3.getAdmin();
207    admin3.disableTable(table1.getTableName());
208    admin3.deleteTable(table1.getTableName());
209    admin3.disableTable(table2.getTableName());
210    admin3.deleteTable(table2.getTableName());
211    admin3.deleteNamespace(NS1);
212    admin3.deleteNamespace(NS2);
213
214    Admin admin4 = UTIL4.getAdmin();
215    admin4.disableTable(table1.getTableName());
216    admin4.deleteTable(table1.getTableName());
217    admin4.disableTable(table2.getTableName());
218    admin4.deleteTable(table2.getTableName());
219    admin4.deleteNamespace(NS1);
220    admin4.deleteNamespace(NS2);
221    UTIL1.getAdmin().removeReplicationPeer(PEER4_NS);
222    UTIL1.getAdmin().removeReplicationPeer(PEER4_NS_TABLE);
223  }
224
225  @Test
226  @Override
227  public void testBulkLoadReplicationActiveActive() throws Exception {
228    Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
229    Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
230    Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
231    Table notPeerTable = UTIL4.getConnection().getTable(TestReplicationBase.tableName);
232    Table ns1Table = UTIL4.getConnection().getTable(NS1_TABLE);
233    Table ns2Table = UTIL4.getConnection().getTable(NS2_TABLE);
234
235    // case1: The ns1 tables will be replicate to cluster4
236    byte[] row = Bytes.toBytes("002_ns_peer");
237    byte[] value = Bytes.toBytes("v2");
238    bulkLoadOnCluster(ns1Table.getName(), row, value, UTIL1);
239    waitForReplication(ns1Table, 1, NB_RETRIES);
240    assertTableHasValue(ns1Table, row, value);
241
242    // case2: The ns2:t2_syncup will be replicate to cluster4
243    // If it's not fix HBASE-23098 the ns_peer1's hfile-refs(zk) will be backlog
244    row = Bytes.toBytes("003_ns_table_peer");
245    value = Bytes.toBytes("v2");
246    bulkLoadOnCluster(ns2Table.getName(), row, value, UTIL1);
247    waitForReplication(ns2Table, 1, NB_RETRIES);
248    assertTableHasValue(ns2Table, row, value);
249
250    // case3: The table test will be replicate to cluster1,cluster2,cluster3
251    // not replicate to cluster4, because we not set other peer for that tables.
252    row = Bytes.toBytes("001_nopeer");
253    value = Bytes.toBytes("v1");
254    assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, peer2TestTable,
255      peer3TestTable);
256    assertTableNoValue(notPeerTable, row, value); // 1 -> 4, table is empty
257
258    // Verify hfile-refs for 1:ns_peer1, expect is empty
259    MiniZooKeeperCluster zkCluster = UTIL1.getZkCluster();
260    ZKWatcher watcher = new ZKWatcher(UTIL1.getConfiguration(), "TestZnodeHFiles-refs", null);
261    RecoverableZooKeeper zk = RecoverableZooKeeper.connect(UTIL1.getConfiguration(), watcher);
262    ZKReplicationQueueStorage replicationQueueStorage =
263      new ZKReplicationQueueStorage(watcher, UTIL1.getConfiguration());
264    Set<String> hfiles = replicationQueueStorage.getAllHFileRefs();
265    assertTrue(hfiles.isEmpty());
266  }
267}