001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.fail; 023 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.NamespaceDescriptor; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.ConnectionFactory; 038import org.apache.hadoop.hbase.client.Delete; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.client.TableDescriptor; 044import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.junit.AfterClass; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.junit.runner.RunWith; 053import org.junit.runners.Parameterized; 054import org.junit.runners.Parameterized.Parameter; 055import org.junit.runners.Parameterized.Parameters; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 060 061@RunWith(Parameterized.class) 062@Category({ LargeTests.class }) 063public class TestNamespaceReplication extends TestReplicationBase { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestNamespaceReplication.class); 068 069 private static final Logger LOG = LoggerFactory.getLogger(TestNamespaceReplication.class); 070 071 private static String ns1 = "ns1"; 072 private static String ns2 = "ns2"; 073 074 private static final TableName tabAName = TableName.valueOf("ns1:TA"); 075 private static final TableName tabBName = TableName.valueOf("ns2:TB"); 076 077 private static final byte[] f1Name = Bytes.toBytes("f1"); 078 private static final byte[] f2Name = Bytes.toBytes("f2"); 079 080 private static final byte[] val = Bytes.toBytes("myval"); 081 082 private static Connection connection1; 083 private static Connection connection2; 084 private static Admin admin1; 085 private static Admin admin2; 086 087 @Parameter 088 public boolean serialPeer; 089 090 @Override 091 protected boolean isSerialPeer() { 092 return serialPeer; 093 } 094 095 @Parameters(name = "{index}: serialPeer={0}") 096 public static List<Boolean> parameters() { 097 return ImmutableList.of(true, false); 098 } 099 100 @BeforeClass 101 public static void setUpBeforeClass() throws Exception { 102 TestReplicationBase.setUpBeforeClass(); 103 104 connection1 = ConnectionFactory.createConnection(CONF1); 105 connection2 = ConnectionFactory.createConnection(CONF2); 106 admin1 = connection1.getAdmin(); 107 admin2 = connection2.getAdmin(); 108 109 admin1.createNamespace(NamespaceDescriptor.create(ns1).build()); 110 admin1.createNamespace(NamespaceDescriptor.create(ns2).build()); 111 admin2.createNamespace(NamespaceDescriptor.create(ns1).build()); 112 admin2.createNamespace(NamespaceDescriptor.create(ns2).build()); 113 114 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tabAName); 115 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name) 116 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 117 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name) 118 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 119 TableDescriptor tabA = builder.build(); 120 admin1.createTable(tabA); 121 admin2.createTable(tabA); 122 123 builder = TableDescriptorBuilder.newBuilder(tabBName); 124 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name) 125 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 126 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name) 127 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()); 128 TableDescriptor tabB = builder.build(); 129 admin1.createTable(tabB); 130 admin2.createTable(tabB); 131 } 132 133 @AfterClass 134 public static void tearDownAfterClass() throws Exception { 135 admin1.disableTable(tabAName); 136 admin1.deleteTable(tabAName); 137 admin1.disableTable(tabBName); 138 admin1.deleteTable(tabBName); 139 admin2.disableTable(tabAName); 140 admin2.deleteTable(tabAName); 141 admin2.disableTable(tabBName); 142 admin2.deleteTable(tabBName); 143 144 admin1.deleteNamespace(ns1); 145 admin1.deleteNamespace(ns2); 146 admin2.deleteNamespace(ns1); 147 admin2.deleteNamespace(ns2); 148 149 connection1.close(); 150 connection2.close(); 151 TestReplicationBase.tearDownAfterClass(); 152 } 153 154 @Test 155 public void testNamespaceReplication() throws Exception { 156 String peerId = "2"; 157 158 Table htab1A = connection1.getTable(tabAName); 159 Table htab2A = connection2.getTable(tabAName); 160 161 Table htab1B = connection1.getTable(tabBName); 162 Table htab2B = connection2.getTable(tabBName); 163 164 ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId); 165 admin1.updateReplicationPeerConfig(peerId, 166 ReplicationPeerConfig.newBuilder(rpc).setReplicateAllUserTables(false).build()); 167 168 // add ns1 to peer config which replicate to cluster2 169 rpc = admin1.getReplicationPeerConfig(peerId); 170 Set<String> namespaces = new HashSet<>(); 171 namespaces.add(ns1); 172 admin1.updateReplicationPeerConfig(peerId, 173 ReplicationPeerConfig.newBuilder(rpc).setNamespaces(namespaces).build()); 174 LOG.info("update peer config"); 175 176 // Table A can be replicated to cluster2 177 put(htab1A, row, f1Name, f2Name); 178 ensureRowExisted(htab2A, row, f1Name, f2Name); 179 delete(htab1A, row, f1Name, f2Name); 180 ensureRowNotExisted(htab2A, row, f1Name, f2Name); 181 182 // Table B can not be replicated to cluster2 183 put(htab1B, row, f1Name, f2Name); 184 ensureRowNotExisted(htab2B, row, f1Name, f2Name); 185 186 // add ns1:TA => 'f1' and ns2 to peer config which replicate to cluster2 187 rpc = admin1.getReplicationPeerConfig(peerId); 188 namespaces = new HashSet<>(); 189 namespaces.add(ns2); 190 Map<TableName, List<String>> tableCfs = new HashMap<>(); 191 tableCfs.put(tabAName, new ArrayList<>()); 192 tableCfs.get(tabAName).add("f1"); 193 admin1.updateReplicationPeerConfig(peerId, ReplicationPeerConfig.newBuilder(rpc) 194 .setNamespaces(namespaces).setTableCFsMap(tableCfs).build()); 195 LOG.info("update peer config"); 196 197 // Only family f1 of Table A can replicated to cluster2 198 put(htab1A, row, f1Name, f2Name); 199 ensureRowExisted(htab2A, row, f1Name); 200 delete(htab1A, row, f1Name, f2Name); 201 ensureRowNotExisted(htab2A, row, f1Name); 202 203 // All cfs of table B can replicated to cluster2 204 put(htab1B, row, f1Name, f2Name); 205 ensureRowExisted(htab2B, row, f1Name, f2Name); 206 delete(htab1B, row, f1Name, f2Name); 207 ensureRowNotExisted(htab2B, row, f1Name, f2Name); 208 209 admin1.removeReplicationPeer(peerId); 210 } 211 212 private void put(Table source, byte[] row, byte[]... families) throws Exception { 213 for (byte[] fam : families) { 214 Put put = new Put(row); 215 put.addColumn(fam, row, val); 216 source.put(put); 217 } 218 } 219 220 private void delete(Table source, byte[] row, byte[]... families) throws Exception { 221 for (byte[] fam : families) { 222 Delete del = new Delete(row); 223 del.addFamily(fam); 224 source.delete(del); 225 } 226 } 227 228 private void ensureRowExisted(Table target, byte[] row, byte[]... families) throws Exception { 229 for (byte[] fam : families) { 230 Get get = new Get(row); 231 get.addFamily(fam); 232 for (int i = 0; i < NB_RETRIES; i++) { 233 if (i == NB_RETRIES - 1) { 234 fail("Waited too much time for put replication"); 235 } 236 Result res = target.get(get); 237 if (res.isEmpty()) { 238 LOG.info("Row not available"); 239 } else { 240 assertEquals(1, res.size()); 241 assertArrayEquals(val, res.value()); 242 break; 243 } 244 Thread.sleep(10 * SLEEP_TIME); 245 } 246 } 247 } 248 249 private void ensureRowNotExisted(Table target, byte[] row, byte[]... families) throws Exception { 250 for (byte[] fam : families) { 251 Get get = new Get(row); 252 get.addFamily(fam); 253 for (int i = 0; i < NB_RETRIES; i++) { 254 if (i == NB_RETRIES - 1) { 255 fail("Waited too much time for delete replication"); 256 } 257 Result res = target.get(get); 258 if (res.size() >= 1) { 259 LOG.info("Row not deleted"); 260 } else { 261 break; 262 } 263 Thread.sleep(10 * SLEEP_TIME); 264 } 265 } 266 } 267}