001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.fail;
023
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.stream.Stream;
031import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.NamespaceDescriptor;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.ConnectionFactory;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.testclassification.ReplicationTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.junit.jupiter.api.AfterAll;
050import org.junit.jupiter.api.BeforeAll;
051import org.junit.jupiter.api.Tag;
052import org.junit.jupiter.api.TestTemplate;
053import org.junit.jupiter.params.provider.Arguments;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057@Tag(ReplicationTests.TAG)
058@Tag(LargeTests.TAG)
059@HBaseParameterizedTestTemplate(name = "{index}: serialPeer={0}")
060public class TestNamespaceReplication extends TestReplicationBase {
061
062  private static final Logger LOG = LoggerFactory.getLogger(TestNamespaceReplication.class);
063
064  private static String ns1 = "ns1";
065  private static String ns2 = "ns2";
066
067  private static final TableName tabAName = TableName.valueOf("ns1:TA");
068  private static final TableName tabBName = TableName.valueOf("ns2:TB");
069
070  private static final byte[] f1Name = Bytes.toBytes("f1");
071  private static final byte[] f2Name = Bytes.toBytes("f2");
072
073  private static final byte[] val = Bytes.toBytes("myval");
074
075  private static Connection connection1;
076  private static Connection connection2;
077  private static Admin admin1;
078  private static Admin admin2;
079
080  private boolean serialPeer;
081
082  public TestNamespaceReplication(boolean serialPeer) {
083    this.serialPeer = serialPeer;
084  }
085
086  @Override
087  protected boolean isSerialPeer() {
088    return serialPeer;
089  }
090
091  public static Stream<Arguments> parameters() {
092    return Stream.of(Arguments.of(true), Arguments.of(false));
093  }
094
095  @BeforeAll
096  public static void setUpBeforeClass() throws Exception {
097    connection1 = ConnectionFactory.createConnection(CONF1);
098    connection2 = ConnectionFactory.createConnection(CONF2);
099    admin1 = connection1.getAdmin();
100    admin2 = connection2.getAdmin();
101
102    admin1.createNamespace(NamespaceDescriptor.create(ns1).build());
103    admin1.createNamespace(NamespaceDescriptor.create(ns2).build());
104    admin2.createNamespace(NamespaceDescriptor.create(ns1).build());
105    admin2.createNamespace(NamespaceDescriptor.create(ns2).build());
106
107    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tabAName);
108    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name)
109      .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
110    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name)
111      .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
112    TableDescriptor tabA = builder.build();
113    admin1.createTable(tabA);
114    admin2.createTable(tabA);
115
116    builder = TableDescriptorBuilder.newBuilder(tabBName);
117    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name)
118      .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
119    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name)
120      .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
121    TableDescriptor tabB = builder.build();
122    admin1.createTable(tabB);
123    admin2.createTable(tabB);
124  }
125
126  @AfterAll
127  public static void tearDownAfterClass() throws Exception {
128    admin1.disableTable(tabAName);
129    admin1.deleteTable(tabAName);
130    admin1.disableTable(tabBName);
131    admin1.deleteTable(tabBName);
132    admin2.disableTable(tabAName);
133    admin2.deleteTable(tabAName);
134    admin2.disableTable(tabBName);
135    admin2.deleteTable(tabBName);
136
137    admin1.deleteNamespace(ns1);
138    admin1.deleteNamespace(ns2);
139    admin2.deleteNamespace(ns1);
140    admin2.deleteNamespace(ns2);
141
142    connection1.close();
143    connection2.close();
144  }
145
146  @TestTemplate
147  public void testNamespaceReplication() throws Exception {
148    String peerId = "2";
149
150    Table htab1A = connection1.getTable(tabAName);
151    Table htab2A = connection2.getTable(tabAName);
152
153    Table htab1B = connection1.getTable(tabBName);
154    Table htab2B = connection2.getTable(tabBName);
155
156    ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId);
157    admin1.updateReplicationPeerConfig(peerId,
158      ReplicationPeerConfig.newBuilder(rpc).setReplicateAllUserTables(false).build());
159
160    // add ns1 to peer config which replicate to cluster2
161    rpc = admin1.getReplicationPeerConfig(peerId);
162    Set<String> namespaces = new HashSet<>();
163    namespaces.add(ns1);
164    admin1.updateReplicationPeerConfig(peerId,
165      ReplicationPeerConfig.newBuilder(rpc).setNamespaces(namespaces).build());
166    LOG.info("update peer config");
167
168    // Table A can be replicated to cluster2
169    put(htab1A, row, f1Name, f2Name);
170    ensureRowExisted(htab2A, row, f1Name, f2Name);
171    delete(htab1A, row, f1Name, f2Name);
172    ensureRowNotExisted(htab2A, row, f1Name, f2Name);
173
174    // Table B can not be replicated to cluster2
175    put(htab1B, row, f1Name, f2Name);
176    ensureRowNotExisted(htab2B, row, f1Name, f2Name);
177
178    // add ns1:TA => 'f1' and ns2 to peer config which replicate to cluster2
179    rpc = admin1.getReplicationPeerConfig(peerId);
180    namespaces = new HashSet<>();
181    namespaces.add(ns2);
182    Map<TableName, List<String>> tableCfs = new HashMap<>();
183    tableCfs.put(tabAName, new ArrayList<>());
184    tableCfs.get(tabAName).add("f1");
185    admin1.updateReplicationPeerConfig(peerId, ReplicationPeerConfig.newBuilder(rpc)
186      .setNamespaces(namespaces).setTableCFsMap(tableCfs).build());
187    LOG.info("update peer config");
188
189    // Only family f1 of Table A can replicated to cluster2
190    put(htab1A, row, f1Name, f2Name);
191    ensureRowExisted(htab2A, row, f1Name);
192    delete(htab1A, row, f1Name, f2Name);
193    ensureRowNotExisted(htab2A, row, f1Name);
194
195    // All cfs of table B can replicated to cluster2
196    put(htab1B, row, f1Name, f2Name);
197    ensureRowExisted(htab2B, row, f1Name, f2Name);
198    delete(htab1B, row, f1Name, f2Name);
199    ensureRowNotExisted(htab2B, row, f1Name, f2Name);
200
201    admin1.removeReplicationPeer(peerId);
202  }
203
204  private void put(Table source, byte[] row, byte[]... families) throws Exception {
205    for (byte[] fam : families) {
206      Put put = new Put(row);
207      put.addColumn(fam, row, val);
208      source.put(put);
209    }
210  }
211
212  private void delete(Table source, byte[] row, byte[]... families) throws Exception {
213    for (byte[] fam : families) {
214      Delete del = new Delete(row);
215      del.addFamily(fam);
216      source.delete(del);
217    }
218  }
219
220  private void ensureRowExisted(Table target, byte[] row, byte[]... families) throws Exception {
221    for (byte[] fam : families) {
222      Get get = new Get(row);
223      get.addFamily(fam);
224      for (int i = 0; i < NB_RETRIES; i++) {
225        if (i == NB_RETRIES - 1) {
226          fail("Waited too much time for put replication");
227        }
228        Result res = target.get(get);
229        if (res.isEmpty()) {
230          LOG.info("Row not available");
231        } else {
232          assertEquals(1, res.size());
233          assertArrayEquals(val, res.value());
234          break;
235        }
236        Thread.sleep(10 * SLEEP_TIME);
237      }
238    }
239  }
240
241  private void ensureRowNotExisted(Table target, byte[] row, byte[]... families) throws Exception {
242    for (byte[] fam : families) {
243      Get get = new Get(row);
244      get.addFamily(fam);
245      for (int i = 0; i < NB_RETRIES; i++) {
246        if (i == NB_RETRIES - 1) {
247          fail("Waited too much time for delete replication");
248        }
249        Result res = target.get(get);
250        if (res.size() >= 1) {
251          LOG.info("Row not deleted");
252        } else {
253          break;
254        }
255        Thread.sleep(10 * SLEEP_TIME);
256      }
257    }
258  }
259}