001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.fail; 023 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.NamespaceDescriptor; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.ConnectionFactory; 038import org.apache.hadoop.hbase.client.Delete; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.client.TableDescriptor; 044import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.junit.AfterClass; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.junit.runner.RunWith; 053import org.junit.runners.Parameterized; 054import org.junit.runners.Parameterized.Parameter; 055import org.junit.runners.Parameterized.Parameters; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 060 061@RunWith(Parameterized.class) 062@Category({ LargeTests.class }) 063public class TestNamespaceReplication extends TestReplicationBase { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestNamespaceReplication.class); 068 069 private static final Logger LOG = LoggerFactory.getLogger(TestNamespaceReplication.class); 070 071 private static String ns1 = "ns1"; 072 private static String ns2 = "ns2"; 073 074 private static final TableName tabAName = TableName.valueOf("ns1:TA"); 075 private static final TableName tabBName = TableName.valueOf("ns2:TB"); 076 077 private static final byte[] f1Name = Bytes.toBytes("f1"); 078 private static final byte[] f2Name = Bytes.toBytes("f2"); 079 080 private static final byte[] val = Bytes.toBytes("myval"); 081 082 private static Connection connection1; 083 private static Connection connection2; 084 private static Admin admin1; 085 private static Admin admin2; 086 087 @Parameter 088 public boolean serialPeer; 089 090 @Override 091 protected boolean isSerialPeer() { 092 return serialPeer; 093 } 094 095 @Parameters(name = "{index}: serialPeer={0}") 096 public static List<Boolean> parameters() { 097 return ImmutableList.of(true, false); 098 } 099 100 @BeforeClass 101 public static void setUpBeforeClass() throws Exception { 102 TestReplicationBase.setUpBeforeClass(); 103 104 connection1 = ConnectionFactory.createConnection(CONF1); 105 connection2 = ConnectionFactory.createConnection(CONF2); 106 admin1 = connection1.getAdmin(); 107 admin2 = connection2.getAdmin(); 108 109 admin1.createNamespace(NamespaceDescriptor.create(ns1).build()); 110 admin1.createNamespace(NamespaceDescriptor.create(ns2).build()); 111 admin2.createNamespace(NamespaceDescriptor.create(ns1).build()); 112 admin2.createNamespace(NamespaceDescriptor.create(ns2).build()); 113 114 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tabAName); 115 builder.setColumnFamily(ColumnFamilyDescriptorBuilder 116 .newBuilder(f1Name).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 117 builder.setColumnFamily(ColumnFamilyDescriptorBuilder 118 .newBuilder(f2Name).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 119 TableDescriptor tabA = builder.build(); 120 admin1.createTable(tabA); 121 admin2.createTable(tabA); 122 123 builder = TableDescriptorBuilder.newBuilder(tabBName); 124 builder.setColumnFamily(ColumnFamilyDescriptorBuilder 125 .newBuilder(f1Name).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 126 builder.setColumnFamily(ColumnFamilyDescriptorBuilder 127 .newBuilder(f2Name).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 128 TableDescriptor tabB = builder.build(); 129 admin1.createTable(tabB); 130 admin2.createTable(tabB); 131 } 132 133 @AfterClass 134 public static void tearDownAfterClass() throws Exception { 135 admin1.disableTable(tabAName); 136 admin1.deleteTable(tabAName); 137 admin1.disableTable(tabBName); 138 admin1.deleteTable(tabBName); 139 admin2.disableTable(tabAName); 140 admin2.deleteTable(tabAName); 141 admin2.disableTable(tabBName); 142 admin2.deleteTable(tabBName); 143 144 admin1.deleteNamespace(ns1); 145 admin1.deleteNamespace(ns2); 146 admin2.deleteNamespace(ns1); 147 admin2.deleteNamespace(ns2); 148 149 connection1.close(); 150 connection2.close(); 151 TestReplicationBase.tearDownAfterClass(); 152 } 153 154 @Test 155 public void testNamespaceReplication() throws Exception { 156 String peerId = "2"; 157 158 Table htab1A = connection1.getTable(tabAName); 159 Table htab2A = connection2.getTable(tabAName); 160 161 Table htab1B = connection1.getTable(tabBName); 162 Table htab2B = connection2.getTable(tabBName); 163 164 ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId); 165 admin1.updateReplicationPeerConfig(peerId, 166 ReplicationPeerConfig.newBuilder(rpc).setReplicateAllUserTables(false).build()); 167 168 // add ns1 to peer config which replicate to cluster2 169 rpc = admin1.getReplicationPeerConfig(peerId); 170 Set<String> namespaces = new HashSet<>(); 171 namespaces.add(ns1); 172 admin1.updateReplicationPeerConfig(peerId, 173 ReplicationPeerConfig.newBuilder(rpc).setNamespaces(namespaces).build()); 174 LOG.info("update peer config"); 175 176 // Table A can be replicated to cluster2 177 put(htab1A, row, f1Name, f2Name); 178 ensureRowExisted(htab2A, row, f1Name, f2Name); 179 delete(htab1A, row, f1Name, f2Name); 180 ensureRowNotExisted(htab2A, row, f1Name, f2Name); 181 182 // Table B can not be replicated to cluster2 183 put(htab1B, row, f1Name, f2Name); 184 ensureRowNotExisted(htab2B, row, f1Name, f2Name); 185 186 // add ns1:TA => 'f1' and ns2 to peer config which replicate to cluster2 187 rpc = admin1.getReplicationPeerConfig(peerId); 188 namespaces = new HashSet<>(); 189 namespaces.add(ns2); 190 Map<TableName, List<String>> tableCfs = new HashMap<>(); 191 tableCfs.put(tabAName, new ArrayList<>()); 192 tableCfs.get(tabAName).add("f1"); 193 admin1.updateReplicationPeerConfig(peerId, ReplicationPeerConfig.newBuilder(rpc) 194 .setNamespaces(namespaces).setTableCFsMap(tableCfs).build()); 195 LOG.info("update peer config"); 196 197 // Only family f1 of Table A can replicated to cluster2 198 put(htab1A, row, f1Name, f2Name); 199 ensureRowExisted(htab2A, row, f1Name); 200 delete(htab1A, row, f1Name, f2Name); 201 ensureRowNotExisted(htab2A, row, f1Name); 202 203 // All cfs of table B can replicated to cluster2 204 put(htab1B, row, f1Name, f2Name); 205 ensureRowExisted(htab2B, row, f1Name, f2Name); 206 delete(htab1B, row, f1Name, f2Name); 207 ensureRowNotExisted(htab2B, row, f1Name, f2Name); 208 209 admin1.removeReplicationPeer(peerId); 210 } 211 212 private void put(Table source, byte[] row, byte[]... families) 213 throws Exception { 214 for (byte[] fam : families) { 215 Put put = new Put(row); 216 put.addColumn(fam, row, val); 217 source.put(put); 218 } 219 } 220 221 private void delete(Table source, byte[] row, byte[]... families) 222 throws Exception { 223 for (byte[] fam : families) { 224 Delete del = new Delete(row); 225 del.addFamily(fam); 226 source.delete(del); 227 } 228 } 229 230 private void ensureRowExisted(Table target, byte[] row, byte[]... families) 231 throws Exception { 232 for (byte[] fam : families) { 233 Get get = new Get(row); 234 get.addFamily(fam); 235 for (int i = 0; i < NB_RETRIES; i++) { 236 if (i == NB_RETRIES - 1) { 237 fail("Waited too much time for put replication"); 238 } 239 Result res = target.get(get); 240 if (res.isEmpty()) { 241 LOG.info("Row not available"); 242 } else { 243 assertEquals(1, res.size()); 244 assertArrayEquals(val, res.value()); 245 break; 246 } 247 Thread.sleep(10 * SLEEP_TIME); 248 } 249 } 250 } 251 252 private void ensureRowNotExisted(Table target, byte[] row, byte[]... families) 253 throws Exception { 254 for (byte[] fam : families) { 255 Get get = new Get(row); 256 get.addFamily(fam); 257 for (int i = 0; i < NB_RETRIES; i++) { 258 if (i == NB_RETRIES - 1) { 259 fail("Waited too much time for delete replication"); 260 } 261 Result res = target.get(get); 262 if (res.size() >= 1) { 263 LOG.info("Row not deleted"); 264 } else { 265 break; 266 } 267 Thread.sleep(10 * SLEEP_TIME); 268 } 269 } 270 } 271}