001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertNotNull;
024import static org.junit.jupiter.api.Assertions.assertNull;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026import static org.junit.jupiter.api.Assertions.fail;
027
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.TableNameTestExtension;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
051import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
052import org.apache.hadoop.hbase.testclassification.LargeTests;
053import org.apache.hadoop.hbase.testclassification.ReplicationTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
056import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
057import org.junit.jupiter.api.AfterAll;
058import org.junit.jupiter.api.BeforeAll;
059import org.junit.jupiter.api.Tag;
060import org.junit.jupiter.api.Test;
061import org.junit.jupiter.api.extension.RegisterExtension;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
066
067@Tag(ReplicationTests.TAG)
068@Tag(LargeTests.TAG)
069public class TestPerTableCFReplication {
070
071  private static final Logger LOG = LoggerFactory.getLogger(TestPerTableCFReplication.class);
072
073  private static Configuration conf1;
074  private static Configuration conf2;
075  private static Configuration conf3;
076
077  private static HBaseTestingUtil utility1;
078  private static HBaseTestingUtil utility2;
079  private static HBaseTestingUtil utility3;
080  private static final long SLEEP_TIME = 500;
081  private static final int NB_RETRIES = 100;
082
083  private static final TableName tabAName = TableName.valueOf("TA");
084  private static final TableName tabBName = TableName.valueOf("TB");
085  private static final TableName tabCName = TableName.valueOf("TC");
086  private static final byte[] f1Name = Bytes.toBytes("f1");
087  private static final byte[] f2Name = Bytes.toBytes("f2");
088  private static final byte[] f3Name = Bytes.toBytes("f3");
089  private static final byte[] row1 = Bytes.toBytes("row1");
090  private static final byte[] row2 = Bytes.toBytes("row2");
091  private static final byte[] val = Bytes.toBytes("myval");
092
093  private static TableDescriptor tabA;
094  private static TableDescriptor tabB;
095  private static TableDescriptor tabC;
096
097  @RegisterExtension
098  private final TableNameTestExtension tableNameExt = new TableNameTestExtension();
099
100  @BeforeAll
101  public static void setUpBeforeClass() throws Exception {
102    conf1 = HBaseConfiguration.create();
103    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
104    // smaller block size and capacity to trigger more operations
105    // and test them
106    conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
107    conf1.setInt("replication.source.size.capacity", 1024);
108    conf1.setLong("replication.source.sleepforretries", 100);
109    conf1.setInt("hbase.regionserver.maxlogs", 10);
110    conf1.setLong("hbase.master.logcleaner.ttl", 10);
111    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
112    conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
113      "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
114
115    utility1 = new HBaseTestingUtil(conf1);
116    utility1.startMiniZKCluster();
117    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
118    new ZKWatcher(conf1, "cluster1", null, true).close();
119
120    conf2 = new Configuration(conf1);
121    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
122
123    conf3 = new Configuration(conf1);
124    conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
125
126    utility2 = new HBaseTestingUtil(conf2);
127    utility2.setZkCluster(miniZK);
128    new ZKWatcher(conf2, "cluster3", null, true).close();
129
130    utility3 = new HBaseTestingUtil(conf3);
131    utility3.setZkCluster(miniZK);
132    new ZKWatcher(conf3, "cluster3", null, true).close();
133
134    tabA = TableDescriptorBuilder.newBuilder(tabAName)
135      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name)
136        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
137      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name)
138        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
139      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name)
140        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
141      .build();
142
143    tabB = TableDescriptorBuilder.newBuilder(tabBName)
144      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name)
145        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
146      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name)
147        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
148      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name)
149        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
150      .build();
151
152    tabC = TableDescriptorBuilder.newBuilder(tabCName)
153      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name)
154        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
155      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name)
156        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
157      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name)
158        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
159      .build();
160
161    utility1.startMiniCluster();
162    utility2.startMiniCluster();
163    utility3.startMiniCluster();
164  }
165
166  @AfterAll
167  public static void tearDownAfterClass() throws Exception {
168    utility3.shutdownMiniCluster();
169    utility2.shutdownMiniCluster();
170    utility1.shutdownMiniCluster();
171  }
172
173  @Test
174  public void testParseTableCFsFromConfig() {
175    Map<TableName, List<String>> tabCFsMap = null;
176
177    // 1. null or empty string, result should be null
178    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(null);
179    assertEquals(null, tabCFsMap);
180
181    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig("");
182    assertEquals(null, tabCFsMap);
183
184    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig("   ");
185    assertEquals(null, tabCFsMap);
186
187    final TableName tableName1 = tableNameExt.getTableName("1");
188    final TableName tableName2 = tableNameExt.getTableName("2");
189    final TableName tableName3 = tableNameExt.getTableName("3");
190
191    // 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3"
192    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1.getNameAsString());
193    assertEquals(1, tabCFsMap.size()); // only one table
194    assertTrue(tabCFsMap.containsKey(tableName1)); // its table name is "tableName1"
195    assertFalse(tabCFsMap.containsKey(tableName2)); // not other table
196    assertEquals(null, tabCFsMap.get(tableName1)); // null cf-list,
197
198    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName2 + ":cf1");
199    assertEquals(1, tabCFsMap.size()); // only one table
200    assertTrue(tabCFsMap.containsKey(tableName2)); // its table name is "tableName2"
201    assertFalse(tabCFsMap.containsKey(tableName1)); // not other table
202    assertEquals(1, tabCFsMap.get(tableName2).size()); // cf-list contains only 1 cf
203    assertEquals("cf1", tabCFsMap.get(tableName2).get(0));// the only cf is "cf1"
204
205    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3");
206    assertEquals(1, tabCFsMap.size()); // only one table
207    assertTrue(tabCFsMap.containsKey(tableName3)); // its table name is "tableName2"
208    assertFalse(tabCFsMap.containsKey(tableName1)); // not other table
209    assertEquals(2, tabCFsMap.get(tableName3).size()); // cf-list contains 2 cf
210    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));// contains "cf1"
211    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));// contains "cf3"
212
213    // 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
214    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
215      tableName1 + " ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,cf3");
216    // 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
217    assertEquals(3, tabCFsMap.size());
218    assertTrue(tabCFsMap.containsKey(tableName1));
219    assertTrue(tabCFsMap.containsKey(tableName2));
220    assertTrue(tabCFsMap.containsKey(tableName3));
221    // 3.2 table "tab1" : null cf-list
222    assertEquals(null, tabCFsMap.get(tableName1));
223    // 3.3 table "tab2" : cf-list contains a single cf "cf1"
224    assertEquals(1, tabCFsMap.get(tableName2).size());
225    assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
226    // 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
227    assertEquals(2, tabCFsMap.get(tableName3).size());
228    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
229    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
230
231    // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
232    // still use the example of multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
233    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
234      tableName1 + " ; ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,,cf3 ;");
235    // 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
236    assertEquals(3, tabCFsMap.size());
237    assertTrue(tabCFsMap.containsKey(tableName1));
238    assertTrue(tabCFsMap.containsKey(tableName2));
239    assertTrue(tabCFsMap.containsKey(tableName3));
240    // 4.2 table "tab1" : null cf-list
241    assertEquals(null, tabCFsMap.get(tableName1));
242    // 4.3 table "tab2" : cf-list contains a single cf "cf1"
243    assertEquals(1, tabCFsMap.get(tableName2).size());
244    assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
245    // 4.4 table "tab3" : cf-list contains "cf1" and "cf3"
246    assertEquals(2, tabCFsMap.get(tableName3).size());
247    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
248    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
249
250    // 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; tableName3:cf1,cf3"
251    // "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be ignored totally
252    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
253      tableName1 + ":tt:cf1 ; " + tableName2 + "::cf1 ; " + tableName3 + ":cf1,cf3");
254    // 5.1 no "tableName1" and "tableName2", only "tableName3"
255    assertEquals(1, tabCFsMap.size()); // only one table
256    assertFalse(tabCFsMap.containsKey(tableName1));
257    assertFalse(tabCFsMap.containsKey(tableName2));
258    assertTrue(tabCFsMap.containsKey(tableName3));
259    // 5.2 table "tableName3" : cf-list contains "cf1" and "cf3"
260    assertEquals(2, tabCFsMap.get(tableName3).size());
261    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
262    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
263  }
264
265  @Test
266  public void testTableCFsHelperConverter() {
267
268    ReplicationProtos.TableCF[] tableCFs = null;
269    Map<TableName, List<String>> tabCFsMap = null;
270
271    // 1. null or empty string, result should be null
272    assertNull(ReplicationPeerConfigUtil.convert(tabCFsMap));
273
274    tabCFsMap = new HashMap<>();
275    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
276    assertEquals(0, tableCFs.length);
277
278    final TableName tableName1 = tableNameExt.getTableName("1");
279    final TableName tableName2 = tableNameExt.getTableName("2");
280    final TableName tableName3 = tableNameExt.getTableName("3");
281
282    // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
283    tabCFsMap.clear();
284    tabCFsMap.put(tableName1, null);
285    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
286    assertEquals(1, tableCFs.length); // only one table
287    assertEquals(tableName1.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8());
288    assertEquals(0, tableCFs[0].getFamiliesCount());
289
290    tabCFsMap.clear();
291    tabCFsMap.put(tableName2, new ArrayList<>());
292    tabCFsMap.get(tableName2).add("cf1");
293    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
294    assertEquals(1, tableCFs.length); // only one table
295    assertEquals(tableName2.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8());
296    assertEquals(1, tableCFs[0].getFamiliesCount());
297    assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8());
298
299    tabCFsMap.clear();
300    tabCFsMap.put(tableName3, new ArrayList<>());
301    tabCFsMap.get(tableName3).add("cf1");
302    tabCFsMap.get(tableName3).add("cf3");
303    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
304    assertEquals(1, tableCFs.length);
305    assertEquals(tableName3.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8());
306    assertEquals(2, tableCFs[0].getFamiliesCount());
307    assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8());
308    assertEquals("cf3", tableCFs[0].getFamilies(1).toStringUtf8());
309
310    tabCFsMap.clear();
311    tabCFsMap.put(tableName1, null);
312    tabCFsMap.put(tableName2, new ArrayList<>());
313    tabCFsMap.get(tableName2).add("cf1");
314    tabCFsMap.put(tableName3, new ArrayList<>());
315    tabCFsMap.get(tableName3).add("cf1");
316    tabCFsMap.get(tableName3).add("cf3");
317
318    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
319    assertEquals(3, tableCFs.length);
320    assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()));
321    assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()));
322    assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()));
323
324    assertEquals(0,
325      ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount());
326
327    assertEquals(1,
328      ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()).getFamiliesCount());
329    assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())
330      .getFamilies(0).toStringUtf8());
331
332    assertEquals(2,
333      ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()).getFamiliesCount());
334    assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
335      .getFamilies(0).toStringUtf8());
336    assertEquals("cf3", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
337      .getFamilies(1).toStringUtf8());
338
339    tabCFsMap = ReplicationPeerConfigUtil.convert2Map(tableCFs);
340    assertEquals(3, tabCFsMap.size());
341    assertTrue(tabCFsMap.containsKey(tableName1));
342    assertTrue(tabCFsMap.containsKey(tableName2));
343    assertTrue(tabCFsMap.containsKey(tableName3));
344    // 3.2 table "tab1" : null cf-list
345    assertEquals(null, tabCFsMap.get(tableName1));
346    // 3.3 table "tab2" : cf-list contains a single cf "cf1"
347    assertEquals(1, tabCFsMap.get(tableName2).size());
348    assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
349    // 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
350    assertEquals(2, tabCFsMap.get(tableName3).size());
351    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
352    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
353  }
354
355  @Test
356  public void testPerTableCFReplication() throws Exception {
357    LOG.info("testPerTableCFReplication");
358    try (Connection connection1 = ConnectionFactory.createConnection(conf1);
359      Connection connection2 = ConnectionFactory.createConnection(conf2);
360      Connection connection3 = ConnectionFactory.createConnection(conf3);
361      Admin admin1 = connection1.getAdmin(); Admin admin2 = connection2.getAdmin();
362      Admin admin3 = connection3.getAdmin(); Admin replicationAdmin = connection1.getAdmin()) {
363
364      admin1.createTable(tabA);
365      admin1.createTable(tabB);
366      admin1.createTable(tabC);
367      admin2.createTable(tabA);
368      admin2.createTable(tabB);
369      admin2.createTable(tabC);
370      admin3.createTable(tabA);
371      admin3.createTable(tabB);
372      admin3.createTable(tabC);
373
374      Table htab1A = connection1.getTable(tabAName);
375      Table htab2A = connection2.getTable(tabAName);
376      Table htab3A = connection3.getTable(tabAName);
377
378      Table htab1B = connection1.getTable(tabBName);
379      Table htab2B = connection2.getTable(tabBName);
380      Table htab3B = connection3.getTable(tabBName);
381
382      Table htab1C = connection1.getTable(tabCName);
383      Table htab2C = connection2.getTable(tabCName);
384      Table htab3C = connection3.getTable(tabCName);
385
386      // A. add cluster2/cluster3 as peers to cluster1
387      Map<TableName, List<String>> tableCFs = new HashMap<>();
388      tableCFs.put(tabCName, null);
389      tableCFs.put(tabBName, new ArrayList<>());
390      tableCFs.get(tabBName).add("f1");
391      tableCFs.get(tabBName).add("f3");
392      ReplicationPeerConfig rpc2 =
393        ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI())
394          .setReplicateAllUserTables(false).setTableCFsMap(tableCFs).build();
395      replicationAdmin.addReplicationPeer("2", rpc2);
396
397      tableCFs.clear();
398      tableCFs.put(tabAName, null);
399      tableCFs.put(tabBName, new ArrayList<>());
400      tableCFs.get(tabBName).add("f1");
401      tableCFs.get(tabBName).add("f2");
402      ReplicationPeerConfig rpc3 =
403        ReplicationPeerConfig.newBuilder().setClusterKey(utility3.getRpcConnnectionURI())
404          .setReplicateAllUserTables(false).setTableCFsMap(tableCFs).build();
405      replicationAdmin.addReplicationPeer("3", rpc3);
406
407      // A1. tableA can only replicated to cluster3
408      putAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
409      ensureRowNotReplicated(row1, f1Name, htab2A);
410      deleteAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
411
412      putAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
413      ensureRowNotReplicated(row1, f2Name, htab2A);
414      deleteAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
415
416      putAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
417      ensureRowNotReplicated(row1, f3Name, htab2A);
418      deleteAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
419
420      // A2. cf 'f1' of tableB can replicated to both cluster2 and cluster3
421      putAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
422      deleteAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
423
424      // cf 'f2' of tableB can only replicated to cluster3
425      putAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
426      ensureRowNotReplicated(row1, f2Name, htab2B);
427      deleteAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
428
429      // cf 'f3' of tableB can only replicated to cluster2
430      putAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
431      ensureRowNotReplicated(row1, f3Name, htab3B);
432      deleteAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
433
434      // A3. tableC can only replicated to cluster2
435      putAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
436      ensureRowNotReplicated(row1, f1Name, htab3C);
437      deleteAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
438
439      putAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
440      ensureRowNotReplicated(row1, f2Name, htab3C);
441      deleteAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
442
443      putAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
444      ensureRowNotReplicated(row1, f3Name, htab3C);
445      deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
446
447      // B. change peers' replicable table-cf config
448      tableCFs.clear();
449      tableCFs.put(tabAName, new ArrayList<>());
450      tableCFs.get(tabAName).add("f1");
451      tableCFs.get(tabAName).add("f2");
452      tableCFs.put(tabCName, new ArrayList<>());
453      tableCFs.get(tabCName).add("f2");
454      tableCFs.get(tabCName).add("f3");
455      replicationAdmin.updateReplicationPeerConfig("2",
456        ReplicationPeerConfig.newBuilder(replicationAdmin.getReplicationPeerConfig("2"))
457          .setTableCFsMap(tableCFs).build());
458
459      tableCFs.clear();
460      tableCFs.put(tabBName, null);
461      tableCFs.put(tabCName, new ArrayList<>());
462      tableCFs.get(tabCName).add("f3");
463      replicationAdmin.updateReplicationPeerConfig("3",
464        ReplicationPeerConfig.newBuilder(replicationAdmin.getReplicationPeerConfig("3"))
465          .setTableCFsMap(tableCFs).build());
466
467      // B1. cf 'f1' of tableA can only replicated to cluster2
468      putAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
469      ensureRowNotReplicated(row2, f1Name, htab3A);
470      deleteAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
471      // cf 'f2' of tableA can only replicated to cluster2
472      putAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
473      ensureRowNotReplicated(row2, f2Name, htab3A);
474      deleteAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
475      // cf 'f3' of tableA isn't replicable to either cluster2 or cluster3
476      putAndWaitWithFamily(row2, f3Name, htab1A);
477      ensureRowNotReplicated(row2, f3Name, htab2A, htab3A);
478      deleteAndWaitWithFamily(row2, f3Name, htab1A);
479
480      // B2. tableB can only replicated to cluster3
481      putAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
482      ensureRowNotReplicated(row2, f1Name, htab2B);
483      deleteAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
484
485      putAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
486      ensureRowNotReplicated(row2, f2Name, htab2B);
487      deleteAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
488
489      putAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
490      ensureRowNotReplicated(row2, f3Name, htab2B);
491      deleteAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
492
493      // B3. cf 'f1' of tableC non-replicable to either cluster
494      putAndWaitWithFamily(row2, f1Name, htab1C);
495      ensureRowNotReplicated(row2, f1Name, htab2C, htab3C);
496      deleteAndWaitWithFamily(row2, f1Name, htab1C);
497      // cf 'f2' of tableC can only replicated to cluster2
498      putAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
499      ensureRowNotReplicated(row2, f2Name, htab3C);
500      deleteAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
501      // cf 'f3' of tableC can replicated to cluster2 and cluster3
502      putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
503      deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
504    }
505  }
506
507  private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException {
508    Get get = new Get(row);
509    get.addFamily(fam);
510    for (Table table : tables) {
511      Result res = table.get(get);
512      assertEquals(0, res.size());
513    }
514  }
515
516  private void deleteAndWaitWithFamily(byte[] row, byte[] fam, Table source, Table... targets)
517    throws Exception {
518    Delete del = new Delete(row);
519    del.addFamily(fam);
520    source.delete(del);
521
522    Get get = new Get(row);
523    get.addFamily(fam);
524    for (int i = 0; i < NB_RETRIES; i++) {
525      if (i == NB_RETRIES - 1) {
526        fail("Waited too much time for del replication");
527      }
528      boolean removedFromAll = true;
529      for (Table target : targets) {
530        Result res = target.get(get);
531        if (res.size() >= 1) {
532          LOG.info("Row not deleted");
533          removedFromAll = false;
534          break;
535        }
536      }
537      if (removedFromAll) {
538        break;
539      } else {
540        Thread.sleep(SLEEP_TIME);
541      }
542    }
543  }
544
545  private void putAndWaitWithFamily(byte[] row, byte[] fam, Table source, Table... targets)
546    throws Exception {
547    Put put = new Put(row);
548    put.addColumn(fam, row, val);
549    source.put(put);
550
551    Get get = new Get(row);
552    get.addFamily(fam);
553    for (int i = 0; i < NB_RETRIES; i++) {
554      if (i == NB_RETRIES - 1) {
555        fail("Waited too much time for put replication");
556      }
557      boolean replicatedToAll = true;
558      for (Table target : targets) {
559        Result res = target.get(get);
560        if (res.isEmpty()) {
561          LOG.info("Row not available");
562          replicatedToAll = false;
563          break;
564        } else {
565          assertEquals(1, res.size());
566          assertArrayEquals(val, res.value());
567        }
568      }
569      if (replicatedToAll) {
570        break;
571      } else {
572        Thread.sleep(SLEEP_TIME);
573      }
574    }
575  }
576}