001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.*; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HColumnDescriptor; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.ConnectionFactory; 038import org.apache.hadoop.hbase.client.Delete; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 044import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 046import org.apache.hadoop.hbase.testclassification.FlakeyTests; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 050import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 051import org.junit.AfterClass; 052import org.junit.BeforeClass; 053import org.junit.ClassRule; 054import org.junit.Rule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.junit.rules.TestName; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 062 063@Category({FlakeyTests.class, LargeTests.class}) 064public class TestPerTableCFReplication { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestPerTableCFReplication.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestPerTableCFReplication.class); 071 072 private static Configuration conf1; 073 private static Configuration conf2; 074 private static Configuration conf3; 075 076 private static HBaseTestingUtility utility1; 077 private static HBaseTestingUtility utility2; 078 private static HBaseTestingUtility utility3; 079 private static final long SLEEP_TIME = 500; 080 private static final int NB_RETRIES = 100; 081 082 private static final TableName tableName = TableName.valueOf("test"); 083 private static final TableName tabAName = TableName.valueOf("TA"); 084 private static final TableName tabBName = TableName.valueOf("TB"); 085 private static final TableName tabCName = TableName.valueOf("TC"); 086 private static final byte[] famName = Bytes.toBytes("f"); 087 private static final byte[] f1Name = Bytes.toBytes("f1"); 088 private static final byte[] f2Name = Bytes.toBytes("f2"); 089 private static final byte[] f3Name = Bytes.toBytes("f3"); 090 private static final byte[] row1 = Bytes.toBytes("row1"); 091 private static final byte[] row2 = Bytes.toBytes("row2"); 092 private static final byte[] noRepfamName = Bytes.toBytes("norep"); 093 private static final byte[] val = Bytes.toBytes("myval"); 094 095 private static HTableDescriptor table; 096 private static HTableDescriptor tabA; 097 private static HTableDescriptor tabB; 098 private static HTableDescriptor tabC; 099 100 @Rule 101 public TestName name = new TestName(); 102 103 @BeforeClass 104 public static void setUpBeforeClass() throws Exception { 105 conf1 = HBaseConfiguration.create(); 106 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 107 // smaller block size and capacity to trigger more operations 108 // and test them 109 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20); 110 conf1.setInt("replication.source.size.capacity", 1024); 111 conf1.setLong("replication.source.sleepforretries", 100); 112 conf1.setInt("hbase.regionserver.maxlogs", 10); 113 conf1.setLong("hbase.master.logcleaner.ttl", 10); 114 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 115 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 116 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); 117 118 utility1 = new HBaseTestingUtility(conf1); 119 utility1.startMiniZKCluster(); 120 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 121 new ZKWatcher(conf1, "cluster1", null, true); 122 123 conf2 = new Configuration(conf1); 124 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 125 126 conf3 = new Configuration(conf1); 127 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); 128 129 utility2 = new HBaseTestingUtility(conf2); 130 utility2.setZkCluster(miniZK); 131 new ZKWatcher(conf2, "cluster3", null, true); 132 133 utility3 = new HBaseTestingUtility(conf3); 134 utility3.setZkCluster(miniZK); 135 new ZKWatcher(conf3, "cluster3", null, true); 136 137 table = new HTableDescriptor(tableName); 138 HColumnDescriptor fam = new HColumnDescriptor(famName); 139 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 140 table.addFamily(fam); 141 fam = new HColumnDescriptor(noRepfamName); 142 table.addFamily(fam); 143 144 tabA = new HTableDescriptor(tabAName); 145 fam = new HColumnDescriptor(f1Name); 146 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 147 tabA.addFamily(fam); 148 fam = new HColumnDescriptor(f2Name); 149 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 150 tabA.addFamily(fam); 151 fam = new HColumnDescriptor(f3Name); 152 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 153 tabA.addFamily(fam); 154 155 tabB = new HTableDescriptor(tabBName); 156 fam = new HColumnDescriptor(f1Name); 157 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 158 tabB.addFamily(fam); 159 fam = new HColumnDescriptor(f2Name); 160 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 161 tabB.addFamily(fam); 162 fam = new HColumnDescriptor(f3Name); 163 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 164 tabB.addFamily(fam); 165 166 tabC = new HTableDescriptor(tabCName); 167 fam = new HColumnDescriptor(f1Name); 168 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 169 tabC.addFamily(fam); 170 fam = new HColumnDescriptor(f2Name); 171 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 172 tabC.addFamily(fam); 173 fam = new HColumnDescriptor(f3Name); 174 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 175 tabC.addFamily(fam); 176 177 utility1.startMiniCluster(); 178 utility2.startMiniCluster(); 179 utility3.startMiniCluster(); 180 } 181 182 @AfterClass 183 public static void tearDownAfterClass() throws Exception { 184 utility3.shutdownMiniCluster(); 185 utility2.shutdownMiniCluster(); 186 utility1.shutdownMiniCluster(); 187 } 188 189 @Test 190 public void testParseTableCFsFromConfig() { 191 Map<TableName, List<String>> tabCFsMap = null; 192 193 // 1. null or empty string, result should be null 194 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(null); 195 assertEquals(null, tabCFsMap); 196 197 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(""); 198 assertEquals(null, tabCFsMap); 199 200 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(" "); 201 assertEquals(null, tabCFsMap); 202 203 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); 204 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); 205 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3"); 206 207 // 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3" 208 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1.getNameAsString()); 209 assertEquals(1, tabCFsMap.size()); // only one table 210 assertTrue(tabCFsMap.containsKey(tableName1)); // its table name is "tableName1" 211 assertFalse(tabCFsMap.containsKey(tableName2)); // not other table 212 assertEquals(null, tabCFsMap.get(tableName1)); // null cf-list, 213 214 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName2 + ":cf1"); 215 assertEquals(1, tabCFsMap.size()); // only one table 216 assertTrue(tabCFsMap.containsKey(tableName2)); // its table name is "tableName2" 217 assertFalse(tabCFsMap.containsKey(tableName1)); // not other table 218 assertEquals(1, tabCFsMap.get(tableName2).size()); // cf-list contains only 1 cf 219 assertEquals("cf1", tabCFsMap.get(tableName2).get(0));// the only cf is "cf1" 220 221 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3"); 222 assertEquals(1, tabCFsMap.size()); // only one table 223 assertTrue(tabCFsMap.containsKey(tableName3)); // its table name is "tableName2" 224 assertFalse(tabCFsMap.containsKey(tableName1)); // not other table 225 assertEquals(2, tabCFsMap.get(tableName3).size()); // cf-list contains 2 cf 226 assertTrue(tabCFsMap.get(tableName3).contains("cf1"));// contains "cf1" 227 assertTrue(tabCFsMap.get(tableName3).contains("cf3"));// contains "cf3" 228 229 // 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3" 230 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1 + " ; " + tableName2 231 + ":cf1 ; " + tableName3 + ":cf1,cf3"); 232 // 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3" 233 assertEquals(3, tabCFsMap.size()); 234 assertTrue(tabCFsMap.containsKey(tableName1)); 235 assertTrue(tabCFsMap.containsKey(tableName2)); 236 assertTrue(tabCFsMap.containsKey(tableName3)); 237 // 3.2 table "tab1" : null cf-list 238 assertEquals(null, tabCFsMap.get(tableName1)); 239 // 3.3 table "tab2" : cf-list contains a single cf "cf1" 240 assertEquals(1, tabCFsMap.get(tableName2).size()); 241 assertEquals("cf1", tabCFsMap.get(tableName2).get(0)); 242 // 3.4 table "tab3" : cf-list contains "cf1" and "cf3" 243 assertEquals(2, tabCFsMap.get(tableName3).size()); 244 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 245 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 246 247 // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated 248 // still use the example of multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3" 249 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig( 250 tableName1 + " ; ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,,cf3 ;"); 251 // 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3" 252 assertEquals(3, tabCFsMap.size()); 253 assertTrue(tabCFsMap.containsKey(tableName1)); 254 assertTrue(tabCFsMap.containsKey(tableName2)); 255 assertTrue(tabCFsMap.containsKey(tableName3)); 256 // 4.2 table "tab1" : null cf-list 257 assertEquals(null, tabCFsMap.get(tableName1)); 258 // 4.3 table "tab2" : cf-list contains a single cf "cf1" 259 assertEquals(1, tabCFsMap.get(tableName2).size()); 260 assertEquals("cf1", tabCFsMap.get(tableName2).get(0)); 261 // 4.4 table "tab3" : cf-list contains "cf1" and "cf3" 262 assertEquals(2, tabCFsMap.get(tableName3).size()); 263 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 264 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 265 266 // 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; tableName3:cf1,cf3" 267 // "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be ignored totally 268 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig( 269 tableName1 + ":tt:cf1 ; " + tableName2 + "::cf1 ; " + tableName3 + ":cf1,cf3"); 270 // 5.1 no "tableName1" and "tableName2", only "tableName3" 271 assertEquals(1, tabCFsMap.size()); // only one table 272 assertFalse(tabCFsMap.containsKey(tableName1)); 273 assertFalse(tabCFsMap.containsKey(tableName2)); 274 assertTrue(tabCFsMap.containsKey(tableName3)); 275 // 5.2 table "tableName3" : cf-list contains "cf1" and "cf3" 276 assertEquals(2, tabCFsMap.get(tableName3).size()); 277 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 278 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 279 } 280 281 @Test 282 public void testTableCFsHelperConverter() { 283 284 ReplicationProtos.TableCF[] tableCFs = null; 285 Map<TableName, List<String>> tabCFsMap = null; 286 287 // 1. null or empty string, result should be null 288 assertNull(ReplicationPeerConfigUtil.convert(tabCFsMap)); 289 290 tabCFsMap = new HashMap<>(); 291 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 292 assertEquals(0, tableCFs.length); 293 294 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); 295 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); 296 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3"); 297 298 // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3" 299 tabCFsMap.clear(); 300 tabCFsMap.put(tableName1, null); 301 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 302 assertEquals(1, tableCFs.length); // only one table 303 assertEquals(tableName1.toString(), 304 tableCFs[0].getTableName().getQualifier().toStringUtf8()); 305 assertEquals(0, tableCFs[0].getFamiliesCount()); 306 307 tabCFsMap.clear(); 308 tabCFsMap.put(tableName2, new ArrayList<>()); 309 tabCFsMap.get(tableName2).add("cf1"); 310 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 311 assertEquals(1, tableCFs.length); // only one table 312 assertEquals(tableName2.toString(), 313 tableCFs[0].getTableName().getQualifier().toStringUtf8()); 314 assertEquals(1, tableCFs[0].getFamiliesCount()); 315 assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8()); 316 317 tabCFsMap.clear(); 318 tabCFsMap.put(tableName3, new ArrayList<>()); 319 tabCFsMap.get(tableName3).add("cf1"); 320 tabCFsMap.get(tableName3).add("cf3"); 321 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 322 assertEquals(1, tableCFs.length); 323 assertEquals(tableName3.toString(), 324 tableCFs[0].getTableName().getQualifier().toStringUtf8()); 325 assertEquals(2, tableCFs[0].getFamiliesCount()); 326 assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8()); 327 assertEquals("cf3", tableCFs[0].getFamilies(1).toStringUtf8()); 328 329 tabCFsMap.clear(); 330 tabCFsMap.put(tableName1, null); 331 tabCFsMap.put(tableName2, new ArrayList<>()); 332 tabCFsMap.get(tableName2).add("cf1"); 333 tabCFsMap.put(tableName3, new ArrayList<>()); 334 tabCFsMap.get(tableName3).add("cf1"); 335 tabCFsMap.get(tableName3).add("cf3"); 336 337 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 338 assertEquals(3, tableCFs.length); 339 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString())); 340 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())); 341 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())); 342 343 assertEquals(0, 344 ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount()); 345 346 assertEquals(1, ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()) 347 .getFamiliesCount()); 348 assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()) 349 .getFamilies(0).toStringUtf8()); 350 351 assertEquals(2, ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()) 352 .getFamiliesCount()); 353 assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()) 354 .getFamilies(0).toStringUtf8()); 355 assertEquals("cf3", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()) 356 .getFamilies(1).toStringUtf8()); 357 358 tabCFsMap = ReplicationPeerConfigUtil.convert2Map(tableCFs); 359 assertEquals(3, tabCFsMap.size()); 360 assertTrue(tabCFsMap.containsKey(tableName1)); 361 assertTrue(tabCFsMap.containsKey(tableName2)); 362 assertTrue(tabCFsMap.containsKey(tableName3)); 363 // 3.2 table "tab1" : null cf-list 364 assertEquals(null, tabCFsMap.get(tableName1)); 365 // 3.3 table "tab2" : cf-list contains a single cf "cf1" 366 assertEquals(1, tabCFsMap.get(tableName2).size()); 367 assertEquals("cf1", tabCFsMap.get(tableName2).get(0)); 368 // 3.4 table "tab3" : cf-list contains "cf1" and "cf3" 369 assertEquals(2, tabCFsMap.get(tableName3).size()); 370 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 371 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 372 } 373 374 @Test 375 public void testPerTableCFReplication() throws Exception { 376 LOG.info("testPerTableCFReplication"); 377 ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf1); 378 Connection connection1 = ConnectionFactory.createConnection(conf1); 379 Connection connection2 = ConnectionFactory.createConnection(conf2); 380 Connection connection3 = ConnectionFactory.createConnection(conf3); 381 try { 382 Admin admin1 = connection1.getAdmin(); 383 Admin admin2 = connection2.getAdmin(); 384 Admin admin3 = connection3.getAdmin(); 385 386 admin1.createTable(tabA); 387 admin1.createTable(tabB); 388 admin1.createTable(tabC); 389 admin2.createTable(tabA); 390 admin2.createTable(tabB); 391 admin2.createTable(tabC); 392 admin3.createTable(tabA); 393 admin3.createTable(tabB); 394 admin3.createTable(tabC); 395 396 Table htab1A = connection1.getTable(tabAName); 397 Table htab2A = connection2.getTable(tabAName); 398 Table htab3A = connection3.getTable(tabAName); 399 400 Table htab1B = connection1.getTable(tabBName); 401 Table htab2B = connection2.getTable(tabBName); 402 Table htab3B = connection3.getTable(tabBName); 403 404 Table htab1C = connection1.getTable(tabCName); 405 Table htab2C = connection2.getTable(tabCName); 406 Table htab3C = connection3.getTable(tabCName); 407 408 // A. add cluster2/cluster3 as peers to cluster1 409 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 410 rpc2.setClusterKey(utility2.getClusterKey()); 411 rpc2.setReplicateAllUserTables(false); 412 Map<TableName, List<String>> tableCFs = new HashMap<>(); 413 tableCFs.put(tabCName, null); 414 tableCFs.put(tabBName, new ArrayList<>()); 415 tableCFs.get(tabBName).add("f1"); 416 tableCFs.get(tabBName).add("f3"); 417 replicationAdmin.addPeer("2", rpc2, tableCFs); 418 419 ReplicationPeerConfig rpc3 = new ReplicationPeerConfig(); 420 rpc3.setClusterKey(utility3.getClusterKey()); 421 rpc3.setReplicateAllUserTables(false); 422 tableCFs.clear(); 423 tableCFs.put(tabAName, null); 424 tableCFs.put(tabBName, new ArrayList<>()); 425 tableCFs.get(tabBName).add("f1"); 426 tableCFs.get(tabBName).add("f2"); 427 replicationAdmin.addPeer("3", rpc3, tableCFs); 428 429 // A1. tableA can only replicated to cluster3 430 putAndWaitWithFamily(row1, f1Name, htab1A, htab3A); 431 ensureRowNotReplicated(row1, f1Name, htab2A); 432 deleteAndWaitWithFamily(row1, f1Name, htab1A, htab3A); 433 434 putAndWaitWithFamily(row1, f2Name, htab1A, htab3A); 435 ensureRowNotReplicated(row1, f2Name, htab2A); 436 deleteAndWaitWithFamily(row1, f2Name, htab1A, htab3A); 437 438 putAndWaitWithFamily(row1, f3Name, htab1A, htab3A); 439 ensureRowNotReplicated(row1, f3Name, htab2A); 440 deleteAndWaitWithFamily(row1, f3Name, htab1A, htab3A); 441 442 // A2. cf 'f1' of tableB can replicated to both cluster2 and cluster3 443 putAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B); 444 deleteAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B); 445 446 // cf 'f2' of tableB can only replicated to cluster3 447 putAndWaitWithFamily(row1, f2Name, htab1B, htab3B); 448 ensureRowNotReplicated(row1, f2Name, htab2B); 449 deleteAndWaitWithFamily(row1, f2Name, htab1B, htab3B); 450 451 // cf 'f3' of tableB can only replicated to cluster2 452 putAndWaitWithFamily(row1, f3Name, htab1B, htab2B); 453 ensureRowNotReplicated(row1, f3Name, htab3B); 454 deleteAndWaitWithFamily(row1, f3Name, htab1B, htab2B); 455 456 // A3. tableC can only replicated to cluster2 457 putAndWaitWithFamily(row1, f1Name, htab1C, htab2C); 458 ensureRowNotReplicated(row1, f1Name, htab3C); 459 deleteAndWaitWithFamily(row1, f1Name, htab1C, htab2C); 460 461 putAndWaitWithFamily(row1, f2Name, htab1C, htab2C); 462 ensureRowNotReplicated(row1, f2Name, htab3C); 463 deleteAndWaitWithFamily(row1, f2Name, htab1C, htab2C); 464 465 putAndWaitWithFamily(row1, f3Name, htab1C, htab2C); 466 ensureRowNotReplicated(row1, f3Name, htab3C); 467 deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C); 468 469 // B. change peers' replicable table-cf config 470 tableCFs.clear(); 471 tableCFs.put(tabAName, new ArrayList<>()); 472 tableCFs.get(tabAName).add("f1"); 473 tableCFs.get(tabAName).add("f2"); 474 tableCFs.put(tabCName, new ArrayList<>()); 475 tableCFs.get(tabCName).add("f2"); 476 tableCFs.get(tabCName).add("f3"); 477 replicationAdmin.setPeerTableCFs("2", tableCFs); 478 479 tableCFs.clear(); 480 tableCFs.put(tabBName, null); 481 tableCFs.put(tabCName, new ArrayList<>()); 482 tableCFs.get(tabCName).add("f3"); 483 replicationAdmin.setPeerTableCFs("3", tableCFs); 484 485 // B1. cf 'f1' of tableA can only replicated to cluster2 486 putAndWaitWithFamily(row2, f1Name, htab1A, htab2A); 487 ensureRowNotReplicated(row2, f1Name, htab3A); 488 deleteAndWaitWithFamily(row2, f1Name, htab1A, htab2A); 489 // cf 'f2' of tableA can only replicated to cluster2 490 putAndWaitWithFamily(row2, f2Name, htab1A, htab2A); 491 ensureRowNotReplicated(row2, f2Name, htab3A); 492 deleteAndWaitWithFamily(row2, f2Name, htab1A, htab2A); 493 // cf 'f3' of tableA isn't replicable to either cluster2 or cluster3 494 putAndWaitWithFamily(row2, f3Name, htab1A); 495 ensureRowNotReplicated(row2, f3Name, htab2A, htab3A); 496 deleteAndWaitWithFamily(row2, f3Name, htab1A); 497 498 // B2. tableB can only replicated to cluster3 499 putAndWaitWithFamily(row2, f1Name, htab1B, htab3B); 500 ensureRowNotReplicated(row2, f1Name, htab2B); 501 deleteAndWaitWithFamily(row2, f1Name, htab1B, htab3B); 502 503 putAndWaitWithFamily(row2, f2Name, htab1B, htab3B); 504 ensureRowNotReplicated(row2, f2Name, htab2B); 505 deleteAndWaitWithFamily(row2, f2Name, htab1B, htab3B); 506 507 putAndWaitWithFamily(row2, f3Name, htab1B, htab3B); 508 ensureRowNotReplicated(row2, f3Name, htab2B); 509 deleteAndWaitWithFamily(row2, f3Name, htab1B, htab3B); 510 511 // B3. cf 'f1' of tableC non-replicable to either cluster 512 putAndWaitWithFamily(row2, f1Name, htab1C); 513 ensureRowNotReplicated(row2, f1Name, htab2C, htab3C); 514 deleteAndWaitWithFamily(row2, f1Name, htab1C); 515 // cf 'f2' of tableC can only replicated to cluster2 516 putAndWaitWithFamily(row2, f2Name, htab1C, htab2C); 517 ensureRowNotReplicated(row2, f2Name, htab3C); 518 deleteAndWaitWithFamily(row2, f2Name, htab1C, htab2C); 519 // cf 'f3' of tableC can replicated to cluster2 and cluster3 520 putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C); 521 deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C); 522 } finally { 523 connection1.close(); 524 connection2.close(); 525 connection3.close(); 526 } 527 } 528 529 private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException { 530 Get get = new Get(row); 531 get.addFamily(fam); 532 for (Table table : tables) { 533 Result res = table.get(get); 534 assertEquals(0, res.size()); 535 } 536 } 537 538 private void deleteAndWaitWithFamily(byte[] row, byte[] fam, 539 Table source, Table... targets) 540 throws Exception { 541 Delete del = new Delete(row); 542 del.addFamily(fam); 543 source.delete(del); 544 545 Get get = new Get(row); 546 get.addFamily(fam); 547 for (int i = 0; i < NB_RETRIES; i++) { 548 if (i==NB_RETRIES-1) { 549 fail("Waited too much time for del replication"); 550 } 551 boolean removedFromAll = true; 552 for (Table target : targets) { 553 Result res = target.get(get); 554 if (res.size() >= 1) { 555 LOG.info("Row not deleted"); 556 removedFromAll = false; 557 break; 558 } 559 } 560 if (removedFromAll) { 561 break; 562 } else { 563 Thread.sleep(SLEEP_TIME); 564 } 565 } 566 } 567 568 private void putAndWaitWithFamily(byte[] row, byte[] fam, 569 Table source, Table... targets) 570 throws Exception { 571 Put put = new Put(row); 572 put.addColumn(fam, row, val); 573 source.put(put); 574 575 Get get = new Get(row); 576 get.addFamily(fam); 577 for (int i = 0; i < NB_RETRIES; i++) { 578 if (i==NB_RETRIES-1) { 579 fail("Waited too much time for put replication"); 580 } 581 boolean replicatedToAll = true; 582 for (Table target : targets) { 583 Result res = target.get(get); 584 if (res.isEmpty()) { 585 LOG.info("Row not available"); 586 replicatedToAll = false; 587 break; 588 } else { 589 assertEquals(1, res.size()); 590 assertArrayEquals(val, res.value()); 591 } 592 } 593 if (replicatedToAll) { 594 break; 595 } else { 596 Thread.sleep(SLEEP_TIME); 597 } 598 } 599 } 600}