001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertNotNull; 024import static org.junit.jupiter.api.Assertions.assertNull; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026import static org.junit.jupiter.api.Assertions.fail; 027 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.TableNameTestExtension; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 051import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 052import org.apache.hadoop.hbase.testclassification.LargeTests; 053import org.apache.hadoop.hbase.testclassification.ReplicationTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 056import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 057import org.junit.jupiter.api.AfterAll; 058import org.junit.jupiter.api.BeforeAll; 059import org.junit.jupiter.api.Tag; 060import org.junit.jupiter.api.Test; 061import org.junit.jupiter.api.extension.RegisterExtension; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 066 067@Tag(ReplicationTests.TAG) 068@Tag(LargeTests.TAG) 069public class TestPerTableCFReplication { 070 071 private static final Logger LOG = LoggerFactory.getLogger(TestPerTableCFReplication.class); 072 073 private static Configuration conf1; 074 private static Configuration conf2; 075 private static Configuration conf3; 076 077 private static HBaseTestingUtil utility1; 078 private static HBaseTestingUtil utility2; 079 private static HBaseTestingUtil utility3; 080 private static final long SLEEP_TIME = 500; 081 private static final int NB_RETRIES = 100; 082 083 private static final TableName tabAName = TableName.valueOf("TA"); 084 private static final TableName tabBName = TableName.valueOf("TB"); 085 private static final TableName tabCName = TableName.valueOf("TC"); 086 private static final byte[] f1Name = Bytes.toBytes("f1"); 087 private static final byte[] f2Name = Bytes.toBytes("f2"); 088 private static final byte[] f3Name = Bytes.toBytes("f3"); 089 private static final byte[] row1 = Bytes.toBytes("row1"); 090 private static final byte[] row2 = Bytes.toBytes("row2"); 091 private static final byte[] val = Bytes.toBytes("myval"); 092 093 private static TableDescriptor tabA; 094 private static TableDescriptor tabB; 095 private static TableDescriptor tabC; 096 097 @RegisterExtension 098 private final TableNameTestExtension tableNameExt = new TableNameTestExtension(); 099 100 @BeforeAll 101 public static void setUpBeforeClass() throws Exception { 102 conf1 = HBaseConfiguration.create(); 103 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 104 // smaller block size and capacity to trigger more operations 105 // and test them 106 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20); 107 conf1.setInt("replication.source.size.capacity", 1024); 108 conf1.setLong("replication.source.sleepforretries", 100); 109 conf1.setInt("hbase.regionserver.maxlogs", 10); 110 conf1.setLong("hbase.master.logcleaner.ttl", 10); 111 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 112 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 113 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); 114 115 utility1 = new HBaseTestingUtil(conf1); 116 utility1.startMiniZKCluster(); 117 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 118 new ZKWatcher(conf1, "cluster1", null, true).close(); 119 120 conf2 = new Configuration(conf1); 121 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 122 123 conf3 = new Configuration(conf1); 124 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); 125 126 utility2 = new HBaseTestingUtil(conf2); 127 utility2.setZkCluster(miniZK); 128 new ZKWatcher(conf2, "cluster3", null, true).close(); 129 130 utility3 = new HBaseTestingUtil(conf3); 131 utility3.setZkCluster(miniZK); 132 new ZKWatcher(conf3, "cluster3", null, true).close(); 133 134 tabA = TableDescriptorBuilder.newBuilder(tabAName) 135 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name) 136 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 137 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name) 138 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 139 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name) 140 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 141 .build(); 142 143 tabB = TableDescriptorBuilder.newBuilder(tabBName) 144 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name) 145 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 146 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name) 147 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 148 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name) 149 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 150 .build(); 151 152 tabC = TableDescriptorBuilder.newBuilder(tabCName) 153 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name) 154 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 155 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name) 156 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 157 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name) 158 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 159 .build(); 160 161 utility1.startMiniCluster(); 162 utility2.startMiniCluster(); 163 utility3.startMiniCluster(); 164 } 165 166 @AfterAll 167 public static void tearDownAfterClass() throws Exception { 168 utility3.shutdownMiniCluster(); 169 utility2.shutdownMiniCluster(); 170 utility1.shutdownMiniCluster(); 171 } 172 173 @Test 174 public void testParseTableCFsFromConfig() { 175 Map<TableName, List<String>> tabCFsMap = null; 176 177 // 1. null or empty string, result should be null 178 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(null); 179 assertEquals(null, tabCFsMap); 180 181 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(""); 182 assertEquals(null, tabCFsMap); 183 184 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(" "); 185 assertEquals(null, tabCFsMap); 186 187 final TableName tableName1 = tableNameExt.getTableName("1"); 188 final TableName tableName2 = tableNameExt.getTableName("2"); 189 final TableName tableName3 = tableNameExt.getTableName("3"); 190 191 // 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3" 192 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1.getNameAsString()); 193 assertEquals(1, tabCFsMap.size()); // only one table 194 assertTrue(tabCFsMap.containsKey(tableName1)); // its table name is "tableName1" 195 assertFalse(tabCFsMap.containsKey(tableName2)); // not other table 196 assertEquals(null, tabCFsMap.get(tableName1)); // null cf-list, 197 198 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName2 + ":cf1"); 199 assertEquals(1, tabCFsMap.size()); // only one table 200 assertTrue(tabCFsMap.containsKey(tableName2)); // its table name is "tableName2" 201 assertFalse(tabCFsMap.containsKey(tableName1)); // not other table 202 assertEquals(1, tabCFsMap.get(tableName2).size()); // cf-list contains only 1 cf 203 assertEquals("cf1", tabCFsMap.get(tableName2).get(0));// the only cf is "cf1" 204 205 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3"); 206 assertEquals(1, tabCFsMap.size()); // only one table 207 assertTrue(tabCFsMap.containsKey(tableName3)); // its table name is "tableName2" 208 assertFalse(tabCFsMap.containsKey(tableName1)); // not other table 209 assertEquals(2, tabCFsMap.get(tableName3).size()); // cf-list contains 2 cf 210 assertTrue(tabCFsMap.get(tableName3).contains("cf1"));// contains "cf1" 211 assertTrue(tabCFsMap.get(tableName3).contains("cf3"));// contains "cf3" 212 213 // 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3" 214 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig( 215 tableName1 + " ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,cf3"); 216 // 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3" 217 assertEquals(3, tabCFsMap.size()); 218 assertTrue(tabCFsMap.containsKey(tableName1)); 219 assertTrue(tabCFsMap.containsKey(tableName2)); 220 assertTrue(tabCFsMap.containsKey(tableName3)); 221 // 3.2 table "tab1" : null cf-list 222 assertEquals(null, tabCFsMap.get(tableName1)); 223 // 3.3 table "tab2" : cf-list contains a single cf "cf1" 224 assertEquals(1, tabCFsMap.get(tableName2).size()); 225 assertEquals("cf1", tabCFsMap.get(tableName2).get(0)); 226 // 3.4 table "tab3" : cf-list contains "cf1" and "cf3" 227 assertEquals(2, tabCFsMap.get(tableName3).size()); 228 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 229 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 230 231 // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated 232 // still use the example of multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3" 233 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig( 234 tableName1 + " ; ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,,cf3 ;"); 235 // 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3" 236 assertEquals(3, tabCFsMap.size()); 237 assertTrue(tabCFsMap.containsKey(tableName1)); 238 assertTrue(tabCFsMap.containsKey(tableName2)); 239 assertTrue(tabCFsMap.containsKey(tableName3)); 240 // 4.2 table "tab1" : null cf-list 241 assertEquals(null, tabCFsMap.get(tableName1)); 242 // 4.3 table "tab2" : cf-list contains a single cf "cf1" 243 assertEquals(1, tabCFsMap.get(tableName2).size()); 244 assertEquals("cf1", tabCFsMap.get(tableName2).get(0)); 245 // 4.4 table "tab3" : cf-list contains "cf1" and "cf3" 246 assertEquals(2, tabCFsMap.get(tableName3).size()); 247 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 248 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 249 250 // 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; tableName3:cf1,cf3" 251 // "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be ignored totally 252 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig( 253 tableName1 + ":tt:cf1 ; " + tableName2 + "::cf1 ; " + tableName3 + ":cf1,cf3"); 254 // 5.1 no "tableName1" and "tableName2", only "tableName3" 255 assertEquals(1, tabCFsMap.size()); // only one table 256 assertFalse(tabCFsMap.containsKey(tableName1)); 257 assertFalse(tabCFsMap.containsKey(tableName2)); 258 assertTrue(tabCFsMap.containsKey(tableName3)); 259 // 5.2 table "tableName3" : cf-list contains "cf1" and "cf3" 260 assertEquals(2, tabCFsMap.get(tableName3).size()); 261 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 262 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 263 } 264 265 @Test 266 public void testTableCFsHelperConverter() { 267 268 ReplicationProtos.TableCF[] tableCFs = null; 269 Map<TableName, List<String>> tabCFsMap = null; 270 271 // 1. null or empty string, result should be null 272 assertNull(ReplicationPeerConfigUtil.convert(tabCFsMap)); 273 274 tabCFsMap = new HashMap<>(); 275 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 276 assertEquals(0, tableCFs.length); 277 278 final TableName tableName1 = tableNameExt.getTableName("1"); 279 final TableName tableName2 = tableNameExt.getTableName("2"); 280 final TableName tableName3 = tableNameExt.getTableName("3"); 281 282 // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3" 283 tabCFsMap.clear(); 284 tabCFsMap.put(tableName1, null); 285 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 286 assertEquals(1, tableCFs.length); // only one table 287 assertEquals(tableName1.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8()); 288 assertEquals(0, tableCFs[0].getFamiliesCount()); 289 290 tabCFsMap.clear(); 291 tabCFsMap.put(tableName2, new ArrayList<>()); 292 tabCFsMap.get(tableName2).add("cf1"); 293 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 294 assertEquals(1, tableCFs.length); // only one table 295 assertEquals(tableName2.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8()); 296 assertEquals(1, tableCFs[0].getFamiliesCount()); 297 assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8()); 298 299 tabCFsMap.clear(); 300 tabCFsMap.put(tableName3, new ArrayList<>()); 301 tabCFsMap.get(tableName3).add("cf1"); 302 tabCFsMap.get(tableName3).add("cf3"); 303 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 304 assertEquals(1, tableCFs.length); 305 assertEquals(tableName3.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8()); 306 assertEquals(2, tableCFs[0].getFamiliesCount()); 307 assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8()); 308 assertEquals("cf3", tableCFs[0].getFamilies(1).toStringUtf8()); 309 310 tabCFsMap.clear(); 311 tabCFsMap.put(tableName1, null); 312 tabCFsMap.put(tableName2, new ArrayList<>()); 313 tabCFsMap.get(tableName2).add("cf1"); 314 tabCFsMap.put(tableName3, new ArrayList<>()); 315 tabCFsMap.get(tableName3).add("cf1"); 316 tabCFsMap.get(tableName3).add("cf3"); 317 318 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 319 assertEquals(3, tableCFs.length); 320 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString())); 321 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())); 322 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())); 323 324 assertEquals(0, 325 ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount()); 326 327 assertEquals(1, 328 ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()).getFamiliesCount()); 329 assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()) 330 .getFamilies(0).toStringUtf8()); 331 332 assertEquals(2, 333 ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()).getFamiliesCount()); 334 assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()) 335 .getFamilies(0).toStringUtf8()); 336 assertEquals("cf3", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()) 337 .getFamilies(1).toStringUtf8()); 338 339 tabCFsMap = ReplicationPeerConfigUtil.convert2Map(tableCFs); 340 assertEquals(3, tabCFsMap.size()); 341 assertTrue(tabCFsMap.containsKey(tableName1)); 342 assertTrue(tabCFsMap.containsKey(tableName2)); 343 assertTrue(tabCFsMap.containsKey(tableName3)); 344 // 3.2 table "tab1" : null cf-list 345 assertEquals(null, tabCFsMap.get(tableName1)); 346 // 3.3 table "tab2" : cf-list contains a single cf "cf1" 347 assertEquals(1, tabCFsMap.get(tableName2).size()); 348 assertEquals("cf1", tabCFsMap.get(tableName2).get(0)); 349 // 3.4 table "tab3" : cf-list contains "cf1" and "cf3" 350 assertEquals(2, tabCFsMap.get(tableName3).size()); 351 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 352 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 353 } 354 355 @Test 356 public void testPerTableCFReplication() throws Exception { 357 LOG.info("testPerTableCFReplication"); 358 try (Connection connection1 = ConnectionFactory.createConnection(conf1); 359 Connection connection2 = ConnectionFactory.createConnection(conf2); 360 Connection connection3 = ConnectionFactory.createConnection(conf3); 361 Admin admin1 = connection1.getAdmin(); Admin admin2 = connection2.getAdmin(); 362 Admin admin3 = connection3.getAdmin(); Admin replicationAdmin = connection1.getAdmin()) { 363 364 admin1.createTable(tabA); 365 admin1.createTable(tabB); 366 admin1.createTable(tabC); 367 admin2.createTable(tabA); 368 admin2.createTable(tabB); 369 admin2.createTable(tabC); 370 admin3.createTable(tabA); 371 admin3.createTable(tabB); 372 admin3.createTable(tabC); 373 374 Table htab1A = connection1.getTable(tabAName); 375 Table htab2A = connection2.getTable(tabAName); 376 Table htab3A = connection3.getTable(tabAName); 377 378 Table htab1B = connection1.getTable(tabBName); 379 Table htab2B = connection2.getTable(tabBName); 380 Table htab3B = connection3.getTable(tabBName); 381 382 Table htab1C = connection1.getTable(tabCName); 383 Table htab2C = connection2.getTable(tabCName); 384 Table htab3C = connection3.getTable(tabCName); 385 386 // A. add cluster2/cluster3 as peers to cluster1 387 Map<TableName, List<String>> tableCFs = new HashMap<>(); 388 tableCFs.put(tabCName, null); 389 tableCFs.put(tabBName, new ArrayList<>()); 390 tableCFs.get(tabBName).add("f1"); 391 tableCFs.get(tabBName).add("f3"); 392 ReplicationPeerConfig rpc2 = 393 ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI()) 394 .setReplicateAllUserTables(false).setTableCFsMap(tableCFs).build(); 395 replicationAdmin.addReplicationPeer("2", rpc2); 396 397 tableCFs.clear(); 398 tableCFs.put(tabAName, null); 399 tableCFs.put(tabBName, new ArrayList<>()); 400 tableCFs.get(tabBName).add("f1"); 401 tableCFs.get(tabBName).add("f2"); 402 ReplicationPeerConfig rpc3 = 403 ReplicationPeerConfig.newBuilder().setClusterKey(utility3.getRpcConnnectionURI()) 404 .setReplicateAllUserTables(false).setTableCFsMap(tableCFs).build(); 405 replicationAdmin.addReplicationPeer("3", rpc3); 406 407 // A1. tableA can only replicated to cluster3 408 putAndWaitWithFamily(row1, f1Name, htab1A, htab3A); 409 ensureRowNotReplicated(row1, f1Name, htab2A); 410 deleteAndWaitWithFamily(row1, f1Name, htab1A, htab3A); 411 412 putAndWaitWithFamily(row1, f2Name, htab1A, htab3A); 413 ensureRowNotReplicated(row1, f2Name, htab2A); 414 deleteAndWaitWithFamily(row1, f2Name, htab1A, htab3A); 415 416 putAndWaitWithFamily(row1, f3Name, htab1A, htab3A); 417 ensureRowNotReplicated(row1, f3Name, htab2A); 418 deleteAndWaitWithFamily(row1, f3Name, htab1A, htab3A); 419 420 // A2. cf 'f1' of tableB can replicated to both cluster2 and cluster3 421 putAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B); 422 deleteAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B); 423 424 // cf 'f2' of tableB can only replicated to cluster3 425 putAndWaitWithFamily(row1, f2Name, htab1B, htab3B); 426 ensureRowNotReplicated(row1, f2Name, htab2B); 427 deleteAndWaitWithFamily(row1, f2Name, htab1B, htab3B); 428 429 // cf 'f3' of tableB can only replicated to cluster2 430 putAndWaitWithFamily(row1, f3Name, htab1B, htab2B); 431 ensureRowNotReplicated(row1, f3Name, htab3B); 432 deleteAndWaitWithFamily(row1, f3Name, htab1B, htab2B); 433 434 // A3. tableC can only replicated to cluster2 435 putAndWaitWithFamily(row1, f1Name, htab1C, htab2C); 436 ensureRowNotReplicated(row1, f1Name, htab3C); 437 deleteAndWaitWithFamily(row1, f1Name, htab1C, htab2C); 438 439 putAndWaitWithFamily(row1, f2Name, htab1C, htab2C); 440 ensureRowNotReplicated(row1, f2Name, htab3C); 441 deleteAndWaitWithFamily(row1, f2Name, htab1C, htab2C); 442 443 putAndWaitWithFamily(row1, f3Name, htab1C, htab2C); 444 ensureRowNotReplicated(row1, f3Name, htab3C); 445 deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C); 446 447 // B. change peers' replicable table-cf config 448 tableCFs.clear(); 449 tableCFs.put(tabAName, new ArrayList<>()); 450 tableCFs.get(tabAName).add("f1"); 451 tableCFs.get(tabAName).add("f2"); 452 tableCFs.put(tabCName, new ArrayList<>()); 453 tableCFs.get(tabCName).add("f2"); 454 tableCFs.get(tabCName).add("f3"); 455 replicationAdmin.updateReplicationPeerConfig("2", 456 ReplicationPeerConfig.newBuilder(replicationAdmin.getReplicationPeerConfig("2")) 457 .setTableCFsMap(tableCFs).build()); 458 459 tableCFs.clear(); 460 tableCFs.put(tabBName, null); 461 tableCFs.put(tabCName, new ArrayList<>()); 462 tableCFs.get(tabCName).add("f3"); 463 replicationAdmin.updateReplicationPeerConfig("3", 464 ReplicationPeerConfig.newBuilder(replicationAdmin.getReplicationPeerConfig("3")) 465 .setTableCFsMap(tableCFs).build()); 466 467 // B1. cf 'f1' of tableA can only replicated to cluster2 468 putAndWaitWithFamily(row2, f1Name, htab1A, htab2A); 469 ensureRowNotReplicated(row2, f1Name, htab3A); 470 deleteAndWaitWithFamily(row2, f1Name, htab1A, htab2A); 471 // cf 'f2' of tableA can only replicated to cluster2 472 putAndWaitWithFamily(row2, f2Name, htab1A, htab2A); 473 ensureRowNotReplicated(row2, f2Name, htab3A); 474 deleteAndWaitWithFamily(row2, f2Name, htab1A, htab2A); 475 // cf 'f3' of tableA isn't replicable to either cluster2 or cluster3 476 putAndWaitWithFamily(row2, f3Name, htab1A); 477 ensureRowNotReplicated(row2, f3Name, htab2A, htab3A); 478 deleteAndWaitWithFamily(row2, f3Name, htab1A); 479 480 // B2. tableB can only replicated to cluster3 481 putAndWaitWithFamily(row2, f1Name, htab1B, htab3B); 482 ensureRowNotReplicated(row2, f1Name, htab2B); 483 deleteAndWaitWithFamily(row2, f1Name, htab1B, htab3B); 484 485 putAndWaitWithFamily(row2, f2Name, htab1B, htab3B); 486 ensureRowNotReplicated(row2, f2Name, htab2B); 487 deleteAndWaitWithFamily(row2, f2Name, htab1B, htab3B); 488 489 putAndWaitWithFamily(row2, f3Name, htab1B, htab3B); 490 ensureRowNotReplicated(row2, f3Name, htab2B); 491 deleteAndWaitWithFamily(row2, f3Name, htab1B, htab3B); 492 493 // B3. cf 'f1' of tableC non-replicable to either cluster 494 putAndWaitWithFamily(row2, f1Name, htab1C); 495 ensureRowNotReplicated(row2, f1Name, htab2C, htab3C); 496 deleteAndWaitWithFamily(row2, f1Name, htab1C); 497 // cf 'f2' of tableC can only replicated to cluster2 498 putAndWaitWithFamily(row2, f2Name, htab1C, htab2C); 499 ensureRowNotReplicated(row2, f2Name, htab3C); 500 deleteAndWaitWithFamily(row2, f2Name, htab1C, htab2C); 501 // cf 'f3' of tableC can replicated to cluster2 and cluster3 502 putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C); 503 deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C); 504 } 505 } 506 507 private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException { 508 Get get = new Get(row); 509 get.addFamily(fam); 510 for (Table table : tables) { 511 Result res = table.get(get); 512 assertEquals(0, res.size()); 513 } 514 } 515 516 private void deleteAndWaitWithFamily(byte[] row, byte[] fam, Table source, Table... targets) 517 throws Exception { 518 Delete del = new Delete(row); 519 del.addFamily(fam); 520 source.delete(del); 521 522 Get get = new Get(row); 523 get.addFamily(fam); 524 for (int i = 0; i < NB_RETRIES; i++) { 525 if (i == NB_RETRIES - 1) { 526 fail("Waited too much time for del replication"); 527 } 528 boolean removedFromAll = true; 529 for (Table target : targets) { 530 Result res = target.get(get); 531 if (res.size() >= 1) { 532 LOG.info("Row not deleted"); 533 removedFromAll = false; 534 break; 535 } 536 } 537 if (removedFromAll) { 538 break; 539 } else { 540 Thread.sleep(SLEEP_TIME); 541 } 542 } 543 } 544 545 private void putAndWaitWithFamily(byte[] row, byte[] fam, Table source, Table... targets) 546 throws Exception { 547 Put put = new Put(row); 548 put.addColumn(fam, row, val); 549 source.put(put); 550 551 Get get = new Get(row); 552 get.addFamily(fam); 553 for (int i = 0; i < NB_RETRIES; i++) { 554 if (i == NB_RETRIES - 1) { 555 fail("Waited too much time for put replication"); 556 } 557 boolean replicatedToAll = true; 558 for (Table target : targets) { 559 Result res = target.get(get); 560 if (res.isEmpty()) { 561 LOG.info("Row not available"); 562 replicatedToAll = false; 563 break; 564 } else { 565 assertEquals(1, res.size()); 566 assertArrayEquals(val, res.value()); 567 } 568 } 569 if (replicatedToAll) { 570 break; 571 } else { 572 Thread.sleep(SLEEP_TIME); 573 } 574 } 575 } 576}