001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.*; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HColumnDescriptor; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.ConnectionFactory; 038import org.apache.hadoop.hbase.client.Delete; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 044import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 046import org.apache.hadoop.hbase.testclassification.FlakeyTests; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 050import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 051import org.junit.AfterClass; 052import org.junit.BeforeClass; 053import org.junit.ClassRule; 054import org.junit.Rule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.junit.rules.TestName; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 062 063@Category({ FlakeyTests.class, LargeTests.class }) 064public class TestPerTableCFReplication { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestPerTableCFReplication.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestPerTableCFReplication.class); 071 072 private static Configuration conf1; 073 private static Configuration conf2; 074 private static Configuration conf3; 075 076 private static HBaseTestingUtility utility1; 077 private static HBaseTestingUtility utility2; 078 private static HBaseTestingUtility utility3; 079 private static final long SLEEP_TIME = 500; 080 private static final int NB_RETRIES = 100; 081 082 private static final TableName tableName = TableName.valueOf("test"); 083 private static final TableName tabAName = TableName.valueOf("TA"); 084 private static final TableName tabBName = TableName.valueOf("TB"); 085 private static final TableName tabCName = TableName.valueOf("TC"); 086 private static final byte[] famName = Bytes.toBytes("f"); 087 private static final byte[] f1Name = Bytes.toBytes("f1"); 088 private static final byte[] f2Name = Bytes.toBytes("f2"); 089 private static final byte[] f3Name = Bytes.toBytes("f3"); 090 private static final byte[] row1 = Bytes.toBytes("row1"); 091 private static final byte[] row2 = Bytes.toBytes("row2"); 092 private static final byte[] noRepfamName = Bytes.toBytes("norep"); 093 private static final byte[] val = Bytes.toBytes("myval"); 094 095 private static HTableDescriptor table; 096 private static HTableDescriptor tabA; 097 private static HTableDescriptor tabB; 098 private static HTableDescriptor tabC; 099 100 @Rule 101 public TestName name = new TestName(); 102 103 @BeforeClass 104 public static void setUpBeforeClass() throws Exception { 105 conf1 = HBaseConfiguration.create(); 106 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 107 // smaller block size and capacity to trigger more operations 108 // and test them 109 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20); 110 conf1.setInt("replication.source.size.capacity", 1024); 111 conf1.setLong("replication.source.sleepforretries", 100); 112 conf1.setInt("hbase.regionserver.maxlogs", 10); 113 conf1.setLong("hbase.master.logcleaner.ttl", 10); 114 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 115 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 116 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); 117 118 utility1 = new HBaseTestingUtility(conf1); 119 utility1.startMiniZKCluster(); 120 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 121 new ZKWatcher(conf1, "cluster1", null, true); 122 123 conf2 = new Configuration(conf1); 124 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 125 126 conf3 = new Configuration(conf1); 127 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); 128 129 utility2 = new HBaseTestingUtility(conf2); 130 utility2.setZkCluster(miniZK); 131 new ZKWatcher(conf2, "cluster3", null, true); 132 133 utility3 = new HBaseTestingUtility(conf3); 134 utility3.setZkCluster(miniZK); 135 new ZKWatcher(conf3, "cluster3", null, true); 136 137 table = new HTableDescriptor(tableName); 138 HColumnDescriptor fam = new HColumnDescriptor(famName); 139 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 140 table.addFamily(fam); 141 fam = new HColumnDescriptor(noRepfamName); 142 table.addFamily(fam); 143 144 tabA = new HTableDescriptor(tabAName); 145 fam = new HColumnDescriptor(f1Name); 146 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 147 tabA.addFamily(fam); 148 fam = new HColumnDescriptor(f2Name); 149 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 150 tabA.addFamily(fam); 151 fam = new HColumnDescriptor(f3Name); 152 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 153 tabA.addFamily(fam); 154 155 tabB = new HTableDescriptor(tabBName); 156 fam = new HColumnDescriptor(f1Name); 157 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 158 tabB.addFamily(fam); 159 fam = new HColumnDescriptor(f2Name); 160 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 161 tabB.addFamily(fam); 162 fam = new HColumnDescriptor(f3Name); 163 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 164 tabB.addFamily(fam); 165 166 tabC = new HTableDescriptor(tabCName); 167 fam = new HColumnDescriptor(f1Name); 168 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 169 tabC.addFamily(fam); 170 fam = new HColumnDescriptor(f2Name); 171 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 172 tabC.addFamily(fam); 173 fam = new HColumnDescriptor(f3Name); 174 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 175 tabC.addFamily(fam); 176 177 utility1.startMiniCluster(); 178 utility2.startMiniCluster(); 179 utility3.startMiniCluster(); 180 } 181 182 @AfterClass 183 public static void tearDownAfterClass() throws Exception { 184 utility3.shutdownMiniCluster(); 185 utility2.shutdownMiniCluster(); 186 utility1.shutdownMiniCluster(); 187 } 188 189 @Test 190 public void testParseTableCFsFromConfig() { 191 Map<TableName, List<String>> tabCFsMap = null; 192 193 // 1. null or empty string, result should be null 194 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(null); 195 assertEquals(null, tabCFsMap); 196 197 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(""); 198 assertEquals(null, tabCFsMap); 199 200 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(" "); 201 assertEquals(null, tabCFsMap); 202 203 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); 204 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); 205 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3"); 206 207 // 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3" 208 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1.getNameAsString()); 209 assertEquals(1, tabCFsMap.size()); // only one table 210 assertTrue(tabCFsMap.containsKey(tableName1)); // its table name is "tableName1" 211 assertFalse(tabCFsMap.containsKey(tableName2)); // not other table 212 assertEquals(null, tabCFsMap.get(tableName1)); // null cf-list, 213 214 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName2 + ":cf1"); 215 assertEquals(1, tabCFsMap.size()); // only one table 216 assertTrue(tabCFsMap.containsKey(tableName2)); // its table name is "tableName2" 217 assertFalse(tabCFsMap.containsKey(tableName1)); // not other table 218 assertEquals(1, tabCFsMap.get(tableName2).size()); // cf-list contains only 1 cf 219 assertEquals("cf1", tabCFsMap.get(tableName2).get(0));// the only cf is "cf1" 220 221 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3"); 222 assertEquals(1, tabCFsMap.size()); // only one table 223 assertTrue(tabCFsMap.containsKey(tableName3)); // its table name is "tableName2" 224 assertFalse(tabCFsMap.containsKey(tableName1)); // not other table 225 assertEquals(2, tabCFsMap.get(tableName3).size()); // cf-list contains 2 cf 226 assertTrue(tabCFsMap.get(tableName3).contains("cf1"));// contains "cf1" 227 assertTrue(tabCFsMap.get(tableName3).contains("cf3"));// contains "cf3" 228 229 // 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3" 230 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig( 231 tableName1 + " ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,cf3"); 232 // 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3" 233 assertEquals(3, tabCFsMap.size()); 234 assertTrue(tabCFsMap.containsKey(tableName1)); 235 assertTrue(tabCFsMap.containsKey(tableName2)); 236 assertTrue(tabCFsMap.containsKey(tableName3)); 237 // 3.2 table "tab1" : null cf-list 238 assertEquals(null, tabCFsMap.get(tableName1)); 239 // 3.3 table "tab2" : cf-list contains a single cf "cf1" 240 assertEquals(1, tabCFsMap.get(tableName2).size()); 241 assertEquals("cf1", tabCFsMap.get(tableName2).get(0)); 242 // 3.4 table "tab3" : cf-list contains "cf1" and "cf3" 243 assertEquals(2, tabCFsMap.get(tableName3).size()); 244 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 245 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 246 247 // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated 248 // still use the example of multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3" 249 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig( 250 tableName1 + " ; ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,,cf3 ;"); 251 // 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3" 252 assertEquals(3, tabCFsMap.size()); 253 assertTrue(tabCFsMap.containsKey(tableName1)); 254 assertTrue(tabCFsMap.containsKey(tableName2)); 255 assertTrue(tabCFsMap.containsKey(tableName3)); 256 // 4.2 table "tab1" : null cf-list 257 assertEquals(null, tabCFsMap.get(tableName1)); 258 // 4.3 table "tab2" : cf-list contains a single cf "cf1" 259 assertEquals(1, tabCFsMap.get(tableName2).size()); 260 assertEquals("cf1", tabCFsMap.get(tableName2).get(0)); 261 // 4.4 table "tab3" : cf-list contains "cf1" and "cf3" 262 assertEquals(2, tabCFsMap.get(tableName3).size()); 263 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 264 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 265 266 // 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; tableName3:cf1,cf3" 267 // "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be ignored totally 268 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig( 269 tableName1 + ":tt:cf1 ; " + tableName2 + "::cf1 ; " + tableName3 + ":cf1,cf3"); 270 // 5.1 no "tableName1" and "tableName2", only "tableName3" 271 assertEquals(1, tabCFsMap.size()); // only one table 272 assertFalse(tabCFsMap.containsKey(tableName1)); 273 assertFalse(tabCFsMap.containsKey(tableName2)); 274 assertTrue(tabCFsMap.containsKey(tableName3)); 275 // 5.2 table "tableName3" : cf-list contains "cf1" and "cf3" 276 assertEquals(2, tabCFsMap.get(tableName3).size()); 277 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 278 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 279 } 280 281 @Test 282 public void testTableCFsHelperConverter() { 283 284 ReplicationProtos.TableCF[] tableCFs = null; 285 Map<TableName, List<String>> tabCFsMap = null; 286 287 // 1. null or empty string, result should be null 288 assertNull(ReplicationPeerConfigUtil.convert(tabCFsMap)); 289 290 tabCFsMap = new HashMap<>(); 291 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 292 assertEquals(0, tableCFs.length); 293 294 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); 295 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); 296 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3"); 297 298 // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3" 299 tabCFsMap.clear(); 300 tabCFsMap.put(tableName1, null); 301 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 302 assertEquals(1, tableCFs.length); // only one table 303 assertEquals(tableName1.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8()); 304 assertEquals(0, tableCFs[0].getFamiliesCount()); 305 306 tabCFsMap.clear(); 307 tabCFsMap.put(tableName2, new ArrayList<>()); 308 tabCFsMap.get(tableName2).add("cf1"); 309 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 310 assertEquals(1, tableCFs.length); // only one table 311 assertEquals(tableName2.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8()); 312 assertEquals(1, tableCFs[0].getFamiliesCount()); 313 assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8()); 314 315 tabCFsMap.clear(); 316 tabCFsMap.put(tableName3, new ArrayList<>()); 317 tabCFsMap.get(tableName3).add("cf1"); 318 tabCFsMap.get(tableName3).add("cf3"); 319 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 320 assertEquals(1, tableCFs.length); 321 assertEquals(tableName3.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8()); 322 assertEquals(2, tableCFs[0].getFamiliesCount()); 323 assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8()); 324 assertEquals("cf3", tableCFs[0].getFamilies(1).toStringUtf8()); 325 326 tabCFsMap.clear(); 327 tabCFsMap.put(tableName1, null); 328 tabCFsMap.put(tableName2, new ArrayList<>()); 329 tabCFsMap.get(tableName2).add("cf1"); 330 tabCFsMap.put(tableName3, new ArrayList<>()); 331 tabCFsMap.get(tableName3).add("cf1"); 332 tabCFsMap.get(tableName3).add("cf3"); 333 334 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 335 assertEquals(3, tableCFs.length); 336 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString())); 337 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())); 338 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())); 339 340 assertEquals(0, 341 ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount()); 342 343 assertEquals(1, 344 ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()).getFamiliesCount()); 345 assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()) 346 .getFamilies(0).toStringUtf8()); 347 348 assertEquals(2, 349 ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()).getFamiliesCount()); 350 assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()) 351 .getFamilies(0).toStringUtf8()); 352 assertEquals("cf3", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()) 353 .getFamilies(1).toStringUtf8()); 354 355 tabCFsMap = ReplicationPeerConfigUtil.convert2Map(tableCFs); 356 assertEquals(3, tabCFsMap.size()); 357 assertTrue(tabCFsMap.containsKey(tableName1)); 358 assertTrue(tabCFsMap.containsKey(tableName2)); 359 assertTrue(tabCFsMap.containsKey(tableName3)); 360 // 3.2 table "tab1" : null cf-list 361 assertEquals(null, tabCFsMap.get(tableName1)); 362 // 3.3 table "tab2" : cf-list contains a single cf "cf1" 363 assertEquals(1, tabCFsMap.get(tableName2).size()); 364 assertEquals("cf1", tabCFsMap.get(tableName2).get(0)); 365 // 3.4 table "tab3" : cf-list contains "cf1" and "cf3" 366 assertEquals(2, tabCFsMap.get(tableName3).size()); 367 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 368 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 369 } 370 371 @Test 372 public void testPerTableCFReplication() throws Exception { 373 LOG.info("testPerTableCFReplication"); 374 ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf1); 375 Connection connection1 = ConnectionFactory.createConnection(conf1); 376 Connection connection2 = ConnectionFactory.createConnection(conf2); 377 Connection connection3 = ConnectionFactory.createConnection(conf3); 378 try { 379 Admin admin1 = connection1.getAdmin(); 380 Admin admin2 = connection2.getAdmin(); 381 Admin admin3 = connection3.getAdmin(); 382 383 admin1.createTable(tabA); 384 admin1.createTable(tabB); 385 admin1.createTable(tabC); 386 admin2.createTable(tabA); 387 admin2.createTable(tabB); 388 admin2.createTable(tabC); 389 admin3.createTable(tabA); 390 admin3.createTable(tabB); 391 admin3.createTable(tabC); 392 393 Table htab1A = connection1.getTable(tabAName); 394 Table htab2A = connection2.getTable(tabAName); 395 Table htab3A = connection3.getTable(tabAName); 396 397 Table htab1B = connection1.getTable(tabBName); 398 Table htab2B = connection2.getTable(tabBName); 399 Table htab3B = connection3.getTable(tabBName); 400 401 Table htab1C = connection1.getTable(tabCName); 402 Table htab2C = connection2.getTable(tabCName); 403 Table htab3C = connection3.getTable(tabCName); 404 405 // A. add cluster2/cluster3 as peers to cluster1 406 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 407 rpc2.setClusterKey(utility2.getClusterKey()); 408 rpc2.setReplicateAllUserTables(false); 409 Map<TableName, List<String>> tableCFs = new HashMap<>(); 410 tableCFs.put(tabCName, null); 411 tableCFs.put(tabBName, new ArrayList<>()); 412 tableCFs.get(tabBName).add("f1"); 413 tableCFs.get(tabBName).add("f3"); 414 replicationAdmin.addPeer("2", rpc2, tableCFs); 415 416 ReplicationPeerConfig rpc3 = new ReplicationPeerConfig(); 417 rpc3.setClusterKey(utility3.getClusterKey()); 418 rpc3.setReplicateAllUserTables(false); 419 tableCFs.clear(); 420 tableCFs.put(tabAName, null); 421 tableCFs.put(tabBName, new ArrayList<>()); 422 tableCFs.get(tabBName).add("f1"); 423 tableCFs.get(tabBName).add("f2"); 424 replicationAdmin.addPeer("3", rpc3, tableCFs); 425 426 // A1. tableA can only replicated to cluster3 427 putAndWaitWithFamily(row1, f1Name, htab1A, htab3A); 428 ensureRowNotReplicated(row1, f1Name, htab2A); 429 deleteAndWaitWithFamily(row1, f1Name, htab1A, htab3A); 430 431 putAndWaitWithFamily(row1, f2Name, htab1A, htab3A); 432 ensureRowNotReplicated(row1, f2Name, htab2A); 433 deleteAndWaitWithFamily(row1, f2Name, htab1A, htab3A); 434 435 putAndWaitWithFamily(row1, f3Name, htab1A, htab3A); 436 ensureRowNotReplicated(row1, f3Name, htab2A); 437 deleteAndWaitWithFamily(row1, f3Name, htab1A, htab3A); 438 439 // A2. cf 'f1' of tableB can replicated to both cluster2 and cluster3 440 putAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B); 441 deleteAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B); 442 443 // cf 'f2' of tableB can only replicated to cluster3 444 putAndWaitWithFamily(row1, f2Name, htab1B, htab3B); 445 ensureRowNotReplicated(row1, f2Name, htab2B); 446 deleteAndWaitWithFamily(row1, f2Name, htab1B, htab3B); 447 448 // cf 'f3' of tableB can only replicated to cluster2 449 putAndWaitWithFamily(row1, f3Name, htab1B, htab2B); 450 ensureRowNotReplicated(row1, f3Name, htab3B); 451 deleteAndWaitWithFamily(row1, f3Name, htab1B, htab2B); 452 453 // A3. tableC can only replicated to cluster2 454 putAndWaitWithFamily(row1, f1Name, htab1C, htab2C); 455 ensureRowNotReplicated(row1, f1Name, htab3C); 456 deleteAndWaitWithFamily(row1, f1Name, htab1C, htab2C); 457 458 putAndWaitWithFamily(row1, f2Name, htab1C, htab2C); 459 ensureRowNotReplicated(row1, f2Name, htab3C); 460 deleteAndWaitWithFamily(row1, f2Name, htab1C, htab2C); 461 462 putAndWaitWithFamily(row1, f3Name, htab1C, htab2C); 463 ensureRowNotReplicated(row1, f3Name, htab3C); 464 deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C); 465 466 // B. change peers' replicable table-cf config 467 tableCFs.clear(); 468 tableCFs.put(tabAName, new ArrayList<>()); 469 tableCFs.get(tabAName).add("f1"); 470 tableCFs.get(tabAName).add("f2"); 471 tableCFs.put(tabCName, new ArrayList<>()); 472 tableCFs.get(tabCName).add("f2"); 473 tableCFs.get(tabCName).add("f3"); 474 replicationAdmin.setPeerTableCFs("2", tableCFs); 475 476 tableCFs.clear(); 477 tableCFs.put(tabBName, null); 478 tableCFs.put(tabCName, new ArrayList<>()); 479 tableCFs.get(tabCName).add("f3"); 480 replicationAdmin.setPeerTableCFs("3", tableCFs); 481 482 // B1. cf 'f1' of tableA can only replicated to cluster2 483 putAndWaitWithFamily(row2, f1Name, htab1A, htab2A); 484 ensureRowNotReplicated(row2, f1Name, htab3A); 485 deleteAndWaitWithFamily(row2, f1Name, htab1A, htab2A); 486 // cf 'f2' of tableA can only replicated to cluster2 487 putAndWaitWithFamily(row2, f2Name, htab1A, htab2A); 488 ensureRowNotReplicated(row2, f2Name, htab3A); 489 deleteAndWaitWithFamily(row2, f2Name, htab1A, htab2A); 490 // cf 'f3' of tableA isn't replicable to either cluster2 or cluster3 491 putAndWaitWithFamily(row2, f3Name, htab1A); 492 ensureRowNotReplicated(row2, f3Name, htab2A, htab3A); 493 deleteAndWaitWithFamily(row2, f3Name, htab1A); 494 495 // B2. tableB can only replicated to cluster3 496 putAndWaitWithFamily(row2, f1Name, htab1B, htab3B); 497 ensureRowNotReplicated(row2, f1Name, htab2B); 498 deleteAndWaitWithFamily(row2, f1Name, htab1B, htab3B); 499 500 putAndWaitWithFamily(row2, f2Name, htab1B, htab3B); 501 ensureRowNotReplicated(row2, f2Name, htab2B); 502 deleteAndWaitWithFamily(row2, f2Name, htab1B, htab3B); 503 504 putAndWaitWithFamily(row2, f3Name, htab1B, htab3B); 505 ensureRowNotReplicated(row2, f3Name, htab2B); 506 deleteAndWaitWithFamily(row2, f3Name, htab1B, htab3B); 507 508 // B3. cf 'f1' of tableC non-replicable to either cluster 509 putAndWaitWithFamily(row2, f1Name, htab1C); 510 ensureRowNotReplicated(row2, f1Name, htab2C, htab3C); 511 deleteAndWaitWithFamily(row2, f1Name, htab1C); 512 // cf 'f2' of tableC can only replicated to cluster2 513 putAndWaitWithFamily(row2, f2Name, htab1C, htab2C); 514 ensureRowNotReplicated(row2, f2Name, htab3C); 515 deleteAndWaitWithFamily(row2, f2Name, htab1C, htab2C); 516 // cf 'f3' of tableC can replicated to cluster2 and cluster3 517 putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C); 518 deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C); 519 } finally { 520 connection1.close(); 521 connection2.close(); 522 connection3.close(); 523 } 524 } 525 526 private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException { 527 Get get = new Get(row); 528 get.addFamily(fam); 529 for (Table table : tables) { 530 Result res = table.get(get); 531 assertEquals(0, res.size()); 532 } 533 } 534 535 private void deleteAndWaitWithFamily(byte[] row, byte[] fam, Table source, Table... targets) 536 throws Exception { 537 Delete del = new Delete(row); 538 del.addFamily(fam); 539 source.delete(del); 540 541 Get get = new Get(row); 542 get.addFamily(fam); 543 for (int i = 0; i < NB_RETRIES; i++) { 544 if (i == NB_RETRIES - 1) { 545 fail("Waited too much time for del replication"); 546 } 547 boolean removedFromAll = true; 548 for (Table target : targets) { 549 Result res = target.get(get); 550 if (res.size() >= 1) { 551 LOG.info("Row not deleted"); 552 removedFromAll = false; 553 break; 554 } 555 } 556 if (removedFromAll) { 557 break; 558 } else { 559 Thread.sleep(SLEEP_TIME); 560 } 561 } 562 } 563 564 private void putAndWaitWithFamily(byte[] row, byte[] fam, Table source, Table... targets) 565 throws Exception { 566 Put put = new Put(row); 567 put.addColumn(fam, row, val); 568 source.put(put); 569 570 Get get = new Get(row); 571 get.addFamily(fam); 572 for (int i = 0; i < NB_RETRIES; i++) { 573 if (i == NB_RETRIES - 1) { 574 fail("Waited too much time for put replication"); 575 } 576 boolean replicatedToAll = true; 577 for (Table target : targets) { 578 Result res = target.get(get); 579 if (res.isEmpty()) { 580 LOG.info("Row not available"); 581 replicatedToAll = false; 582 break; 583 } else { 584 assertEquals(1, res.size()); 585 assertArrayEquals(val, res.value()); 586 } 587 } 588 if (replicatedToAll) { 589 break; 590 } else { 591 Thread.sleep(SLEEP_TIME); 592 } 593 } 594 } 595}