001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.fail; 023 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HColumnDescriptor; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.NamespaceDescriptor; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.ConnectionFactory; 039import org.apache.hadoop.hbase.client.Delete; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.junit.AfterClass; 047import org.junit.BeforeClass; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054@Category({MediumTests.class}) 055public class TestNamespaceReplication extends TestReplicationBase { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestNamespaceReplication.class); 060 061 private static final Logger LOG = LoggerFactory.getLogger(TestNamespaceReplication.class); 062 063 private static String ns1 = "ns1"; 064 private static String ns2 = "ns2"; 065 066 private static final TableName tabAName = TableName.valueOf("ns1:TA"); 067 private static final TableName tabBName = TableName.valueOf("ns2:TB"); 068 069 private static final byte[] f1Name = Bytes.toBytes("f1"); 070 private static final byte[] f2Name = Bytes.toBytes("f2"); 071 072 private static final byte[] val = Bytes.toBytes("myval"); 073 074 private static HTableDescriptor tabA; 075 private static HTableDescriptor tabB; 076 077 private static Connection connection1; 078 private static Connection connection2; 079 private static Admin admin1; 080 private static Admin admin2; 081 082 @BeforeClass 083 public static void setUpBeforeClass() throws Exception { 084 TestReplicationBase.setUpBeforeClass(); 085 086 connection1 = ConnectionFactory.createConnection(conf1); 087 connection2 = ConnectionFactory.createConnection(conf2); 088 admin1 = connection1.getAdmin(); 089 admin2 = connection2.getAdmin(); 090 091 admin1.createNamespace(NamespaceDescriptor.create(ns1).build()); 092 admin1.createNamespace(NamespaceDescriptor.create(ns2).build()); 093 admin2.createNamespace(NamespaceDescriptor.create(ns1).build()); 094 admin2.createNamespace(NamespaceDescriptor.create(ns2).build()); 095 096 tabA = new HTableDescriptor(tabAName); 097 HColumnDescriptor fam = new HColumnDescriptor(f1Name); 098 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 099 tabA.addFamily(fam); 100 fam = new HColumnDescriptor(f2Name); 101 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 102 tabA.addFamily(fam); 103 admin1.createTable(tabA); 104 admin2.createTable(tabA); 105 106 tabB = new HTableDescriptor(tabBName); 107 fam = new HColumnDescriptor(f1Name); 108 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 109 tabB.addFamily(fam); 110 fam = new HColumnDescriptor(f2Name); 111 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 112 tabB.addFamily(fam); 113 admin1.createTable(tabB); 114 admin2.createTable(tabB); 115 } 116 117 @AfterClass 118 public static void tearDownAfterClass() throws Exception { 119 admin1.disableTable(tabAName); 120 admin1.deleteTable(tabAName); 121 admin1.disableTable(tabBName); 122 admin1.deleteTable(tabBName); 123 admin2.disableTable(tabAName); 124 admin2.deleteTable(tabAName); 125 admin2.disableTable(tabBName); 126 admin2.deleteTable(tabBName); 127 128 admin1.deleteNamespace(ns1); 129 admin1.deleteNamespace(ns2); 130 admin2.deleteNamespace(ns1); 131 admin2.deleteNamespace(ns2); 132 133 connection1.close(); 134 connection2.close(); 135 TestReplicationBase.tearDownAfterClass(); 136 } 137 138 @Test 139 public void testNamespaceReplication() throws Exception { 140 Table htab1A = connection1.getTable(tabAName); 141 Table htab2A = connection2.getTable(tabAName); 142 143 Table htab1B = connection1.getTable(tabBName); 144 Table htab2B = connection2.getTable(tabBName); 145 146 ReplicationPeerConfig rpc = admin.getPeerConfig("2"); 147 rpc.setReplicateAllUserTables(false); 148 admin.updatePeerConfig("2", rpc); 149 150 // add ns1 to peer config which replicate to cluster2 151 rpc = admin.getPeerConfig("2"); 152 Set<String> namespaces = new HashSet<>(); 153 namespaces.add(ns1); 154 rpc.setNamespaces(namespaces); 155 admin.updatePeerConfig("2", rpc); 156 LOG.info("update peer config"); 157 158 // Table A can be replicated to cluster2 159 put(htab1A, row, f1Name, f2Name); 160 ensureRowExisted(htab2A, row, f1Name, f2Name); 161 delete(htab1A, row, f1Name, f2Name); 162 ensureRowNotExisted(htab2A, row, f1Name, f2Name); 163 164 // Table B can not be replicated to cluster2 165 put(htab1B, row, f1Name, f2Name); 166 ensureRowNotExisted(htab2B, row, f1Name, f2Name); 167 168 // add ns1:TA => 'f1' and ns2 to peer config which replicate to cluster2 169 rpc = admin.getPeerConfig("2"); 170 namespaces = new HashSet<>(); 171 namespaces.add(ns2); 172 rpc.setNamespaces(namespaces); 173 Map<TableName, List<String>> tableCfs = new HashMap<>(); 174 tableCfs.put(tabAName, new ArrayList<>()); 175 tableCfs.get(tabAName).add("f1"); 176 rpc.setTableCFsMap(tableCfs); 177 admin.updatePeerConfig("2", rpc); 178 LOG.info("update peer config"); 179 180 // Only family f1 of Table A can replicated to cluster2 181 put(htab1A, row, f1Name, f2Name); 182 ensureRowExisted(htab2A, row, f1Name); 183 delete(htab1A, row, f1Name, f2Name); 184 ensureRowNotExisted(htab2A, row, f1Name); 185 186 // All cfs of table B can replicated to cluster2 187 put(htab1B, row, f1Name, f2Name); 188 ensureRowExisted(htab2B, row, f1Name, f2Name); 189 delete(htab1B, row, f1Name, f2Name); 190 ensureRowNotExisted(htab2B, row, f1Name, f2Name); 191 192 admin.removePeer("2"); 193 } 194 195 private void put(Table source, byte[] row, byte[]... families) 196 throws Exception { 197 for (byte[] fam : families) { 198 Put put = new Put(row); 199 put.addColumn(fam, row, val); 200 source.put(put); 201 } 202 } 203 204 private void delete(Table source, byte[] row, byte[]... families) 205 throws Exception { 206 for (byte[] fam : families) { 207 Delete del = new Delete(row); 208 del.addFamily(fam); 209 source.delete(del); 210 } 211 } 212 213 private void ensureRowExisted(Table target, byte[] row, byte[]... families) 214 throws Exception { 215 for (byte[] fam : families) { 216 Get get = new Get(row); 217 get.addFamily(fam); 218 for (int i = 0; i < NB_RETRIES; i++) { 219 if (i == NB_RETRIES - 1) { 220 fail("Waited too much time for put replication"); 221 } 222 Result res = target.get(get); 223 if (res.isEmpty()) { 224 LOG.info("Row not available"); 225 } else { 226 assertEquals(1, res.size()); 227 assertArrayEquals(val, res.value()); 228 break; 229 } 230 Thread.sleep(SLEEP_TIME); 231 } 232 } 233 } 234 235 private void ensureRowNotExisted(Table target, byte[] row, byte[]... families) 236 throws Exception { 237 for (byte[] fam : families) { 238 Get get = new Get(row); 239 get.addFamily(fam); 240 for (int i = 0; i < NB_RETRIES; i++) { 241 if (i == NB_RETRIES - 1) { 242 fail("Waited too much time for delete replication"); 243 } 244 Result res = target.get(get); 245 if (res.size() >= 1) { 246 LOG.info("Row not deleted"); 247 } else { 248 break; 249 } 250 Thread.sleep(SLEEP_TIME); 251 } 252 } 253 } 254}