001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.stream.Stream; 031import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.NamespaceDescriptor; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.ConnectionFactory; 039import org.apache.hadoop.hbase.client.Delete; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.testclassification.LargeTests; 047import org.apache.hadoop.hbase.testclassification.ReplicationTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.junit.jupiter.api.AfterAll; 050import org.junit.jupiter.api.BeforeAll; 051import org.junit.jupiter.api.Tag; 052import org.junit.jupiter.api.TestTemplate; 053import org.junit.jupiter.params.provider.Arguments; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057@Tag(ReplicationTests.TAG) 058@Tag(LargeTests.TAG) 059@HBaseParameterizedTestTemplate(name = "{index}: serialPeer={0}") 060public class TestNamespaceReplication extends TestReplicationBase { 061 062 private static final Logger LOG = LoggerFactory.getLogger(TestNamespaceReplication.class); 063 064 private static String ns1 = "ns1"; 065 private static String ns2 = "ns2"; 066 067 private static final TableName tabAName = TableName.valueOf("ns1:TA"); 068 private static final TableName tabBName = TableName.valueOf("ns2:TB"); 069 070 private static final byte[] f1Name = Bytes.toBytes("f1"); 071 private static final byte[] f2Name = Bytes.toBytes("f2"); 072 073 private static final byte[] val = Bytes.toBytes("myval"); 074 075 private static Connection connection1; 076 private static Connection connection2; 077 private static Admin admin1; 078 private static Admin admin2; 079 080 private boolean serialPeer; 081 082 public TestNamespaceReplication(boolean serialPeer) { 083 this.serialPeer = serialPeer; 084 } 085 086 @Override 087 protected boolean isSerialPeer() { 088 return serialPeer; 089 } 090 091 public static Stream<Arguments> parameters() { 092 return Stream.of(Arguments.of(true), Arguments.of(false)); 093 } 094 095 @BeforeAll 096 public static void setUpBeforeClass() throws Exception { 097 connection1 = ConnectionFactory.createConnection(CONF1); 098 connection2 = ConnectionFactory.createConnection(CONF2); 099 admin1 = connection1.getAdmin(); 100 admin2 = connection2.getAdmin(); 101 102 admin1.createNamespace(NamespaceDescriptor.create(ns1).build()); 103 admin1.createNamespace(NamespaceDescriptor.create(ns2).build()); 104 admin2.createNamespace(NamespaceDescriptor.create(ns1).build()); 105 admin2.createNamespace(NamespaceDescriptor.create(ns2).build()); 106 107 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tabAName); 108 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name) 109 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 110 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name) 111 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 112 TableDescriptor tabA = builder.build(); 113 admin1.createTable(tabA); 114 admin2.createTable(tabA); 115 116 builder = TableDescriptorBuilder.newBuilder(tabBName); 117 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name) 118 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 119 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name) 120 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 121 TableDescriptor tabB = builder.build(); 122 admin1.createTable(tabB); 123 admin2.createTable(tabB); 124 } 125 126 @AfterAll 127 public static void tearDownAfterClass() throws Exception { 128 admin1.disableTable(tabAName); 129 admin1.deleteTable(tabAName); 130 admin1.disableTable(tabBName); 131 admin1.deleteTable(tabBName); 132 admin2.disableTable(tabAName); 133 admin2.deleteTable(tabAName); 134 admin2.disableTable(tabBName); 135 admin2.deleteTable(tabBName); 136 137 admin1.deleteNamespace(ns1); 138 admin1.deleteNamespace(ns2); 139 admin2.deleteNamespace(ns1); 140 admin2.deleteNamespace(ns2); 141 142 connection1.close(); 143 connection2.close(); 144 } 145 146 @TestTemplate 147 public void testNamespaceReplication() throws Exception { 148 String peerId = "2"; 149 150 Table htab1A = connection1.getTable(tabAName); 151 Table htab2A = connection2.getTable(tabAName); 152 153 Table htab1B = connection1.getTable(tabBName); 154 Table htab2B = connection2.getTable(tabBName); 155 156 ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId); 157 admin1.updateReplicationPeerConfig(peerId, 158 ReplicationPeerConfig.newBuilder(rpc).setReplicateAllUserTables(false).build()); 159 160 // add ns1 to peer config which replicate to cluster2 161 rpc = admin1.getReplicationPeerConfig(peerId); 162 Set<String> namespaces = new HashSet<>(); 163 namespaces.add(ns1); 164 admin1.updateReplicationPeerConfig(peerId, 165 ReplicationPeerConfig.newBuilder(rpc).setNamespaces(namespaces).build()); 166 LOG.info("update peer config"); 167 168 // Table A can be replicated to cluster2 169 put(htab1A, row, f1Name, f2Name); 170 ensureRowExisted(htab2A, row, f1Name, f2Name); 171 delete(htab1A, row, f1Name, f2Name); 172 ensureRowNotExisted(htab2A, row, f1Name, f2Name); 173 174 // Table B can not be replicated to cluster2 175 put(htab1B, row, f1Name, f2Name); 176 ensureRowNotExisted(htab2B, row, f1Name, f2Name); 177 178 // add ns1:TA => 'f1' and ns2 to peer config which replicate to cluster2 179 rpc = admin1.getReplicationPeerConfig(peerId); 180 namespaces = new HashSet<>(); 181 namespaces.add(ns2); 182 Map<TableName, List<String>> tableCfs = new HashMap<>(); 183 tableCfs.put(tabAName, new ArrayList<>()); 184 tableCfs.get(tabAName).add("f1"); 185 admin1.updateReplicationPeerConfig(peerId, ReplicationPeerConfig.newBuilder(rpc) 186 .setNamespaces(namespaces).setTableCFsMap(tableCfs).build()); 187 LOG.info("update peer config"); 188 189 // Only family f1 of Table A can replicated to cluster2 190 put(htab1A, row, f1Name, f2Name); 191 ensureRowExisted(htab2A, row, f1Name); 192 delete(htab1A, row, f1Name, f2Name); 193 ensureRowNotExisted(htab2A, row, f1Name); 194 195 // All cfs of table B can replicated to cluster2 196 put(htab1B, row, f1Name, f2Name); 197 ensureRowExisted(htab2B, row, f1Name, f2Name); 198 delete(htab1B, row, f1Name, f2Name); 199 ensureRowNotExisted(htab2B, row, f1Name, f2Name); 200 201 admin1.removeReplicationPeer(peerId); 202 } 203 204 private void put(Table source, byte[] row, byte[]... families) throws Exception { 205 for (byte[] fam : families) { 206 Put put = new Put(row); 207 put.addColumn(fam, row, val); 208 source.put(put); 209 } 210 } 211 212 private void delete(Table source, byte[] row, byte[]... families) throws Exception { 213 for (byte[] fam : families) { 214 Delete del = new Delete(row); 215 del.addFamily(fam); 216 source.delete(del); 217 } 218 } 219 220 private void ensureRowExisted(Table target, byte[] row, byte[]... families) throws Exception { 221 for (byte[] fam : families) { 222 Get get = new Get(row); 223 get.addFamily(fam); 224 for (int i = 0; i < NB_RETRIES; i++) { 225 if (i == NB_RETRIES - 1) { 226 fail("Waited too much time for put replication"); 227 } 228 Result res = target.get(get); 229 if (res.isEmpty()) { 230 LOG.info("Row not available"); 231 } else { 232 assertEquals(1, res.size()); 233 assertArrayEquals(val, res.value()); 234 break; 235 } 236 Thread.sleep(10 * SLEEP_TIME); 237 } 238 } 239 } 240 241 private void ensureRowNotExisted(Table target, byte[] row, byte[]... families) throws Exception { 242 for (byte[] fam : families) { 243 Get get = new Get(row); 244 get.addFamily(fam); 245 for (int i = 0; i < NB_RETRIES; i++) { 246 if (i == NB_RETRIES - 1) { 247 fail("Waited too much time for delete replication"); 248 } 249 Result res = target.get(get); 250 if (res.size() >= 1) { 251 LOG.info("Row not deleted"); 252 } else { 253 break; 254 } 255 Thread.sleep(10 * SLEEP_TIME); 256 } 257 } 258 } 259}