001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.*;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HColumnDescriptor;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.HTableDescriptor;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.client.ConnectionFactory;
038import org.apache.hadoop.hbase.client.Delete;
039import org.apache.hadoop.hbase.client.Get;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
044import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
046import org.apache.hadoop.hbase.testclassification.FlakeyTests;
047import org.apache.hadoop.hbase.testclassification.LargeTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
050import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
051import org.junit.AfterClass;
052import org.junit.BeforeClass;
053import org.junit.ClassRule;
054import org.junit.Rule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.junit.rules.TestName;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
062
063@Category({FlakeyTests.class, LargeTests.class})
064public class TestPerTableCFReplication {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068      HBaseClassTestRule.forClass(TestPerTableCFReplication.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestPerTableCFReplication.class);
071
072  private static Configuration conf1;
073  private static Configuration conf2;
074  private static Configuration conf3;
075
076  private static HBaseTestingUtility utility1;
077  private static HBaseTestingUtility utility2;
078  private static HBaseTestingUtility utility3;
079  private static final long SLEEP_TIME = 500;
080  private static final int NB_RETRIES = 100;
081
082  private static final TableName tableName = TableName.valueOf("test");
083  private static final TableName tabAName = TableName.valueOf("TA");
084  private static final TableName tabBName = TableName.valueOf("TB");
085  private static final TableName tabCName = TableName.valueOf("TC");
086  private static final byte[] famName = Bytes.toBytes("f");
087  private static final byte[] f1Name = Bytes.toBytes("f1");
088  private static final byte[] f2Name = Bytes.toBytes("f2");
089  private static final byte[] f3Name = Bytes.toBytes("f3");
090  private static final byte[] row1 = Bytes.toBytes("row1");
091  private static final byte[] row2 = Bytes.toBytes("row2");
092  private static final byte[] noRepfamName = Bytes.toBytes("norep");
093  private static final byte[] val = Bytes.toBytes("myval");
094
095  private static HTableDescriptor table;
096  private static HTableDescriptor tabA;
097  private static HTableDescriptor tabB;
098  private static HTableDescriptor tabC;
099
100  @Rule
101  public TestName name = new TestName();
102
103  @BeforeClass
104  public static void setUpBeforeClass() throws Exception {
105    conf1 = HBaseConfiguration.create();
106    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
107    // smaller block size and capacity to trigger more operations
108    // and test them
109    conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
110    conf1.setInt("replication.source.size.capacity", 1024);
111    conf1.setLong("replication.source.sleepforretries", 100);
112    conf1.setInt("hbase.regionserver.maxlogs", 10);
113    conf1.setLong("hbase.master.logcleaner.ttl", 10);
114    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
115    conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
116        "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
117
118    utility1 = new HBaseTestingUtility(conf1);
119    utility1.startMiniZKCluster();
120    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
121    new ZKWatcher(conf1, "cluster1", null, true);
122
123    conf2 = new Configuration(conf1);
124    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
125
126    conf3 = new Configuration(conf1);
127    conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
128
129    utility2 = new HBaseTestingUtility(conf2);
130    utility2.setZkCluster(miniZK);
131    new ZKWatcher(conf2, "cluster3", null, true);
132
133    utility3 = new HBaseTestingUtility(conf3);
134    utility3.setZkCluster(miniZK);
135    new ZKWatcher(conf3, "cluster3", null, true);
136
137    table = new HTableDescriptor(tableName);
138    HColumnDescriptor fam = new HColumnDescriptor(famName);
139    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
140    table.addFamily(fam);
141    fam = new HColumnDescriptor(noRepfamName);
142    table.addFamily(fam);
143
144    tabA = new HTableDescriptor(tabAName);
145    fam = new HColumnDescriptor(f1Name);
146    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
147    tabA.addFamily(fam);
148    fam = new HColumnDescriptor(f2Name);
149    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
150    tabA.addFamily(fam);
151    fam = new HColumnDescriptor(f3Name);
152    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
153    tabA.addFamily(fam);
154
155    tabB = new HTableDescriptor(tabBName);
156    fam = new HColumnDescriptor(f1Name);
157    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
158    tabB.addFamily(fam);
159    fam = new HColumnDescriptor(f2Name);
160    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
161    tabB.addFamily(fam);
162    fam = new HColumnDescriptor(f3Name);
163    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
164    tabB.addFamily(fam);
165
166    tabC = new HTableDescriptor(tabCName);
167    fam = new HColumnDescriptor(f1Name);
168    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
169    tabC.addFamily(fam);
170    fam = new HColumnDescriptor(f2Name);
171    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
172    tabC.addFamily(fam);
173    fam = new HColumnDescriptor(f3Name);
174    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
175    tabC.addFamily(fam);
176
177    utility1.startMiniCluster();
178    utility2.startMiniCluster();
179    utility3.startMiniCluster();
180  }
181
182  @AfterClass
183  public static void tearDownAfterClass() throws Exception {
184    utility3.shutdownMiniCluster();
185    utility2.shutdownMiniCluster();
186    utility1.shutdownMiniCluster();
187  }
188
189  @Test
190  public void testParseTableCFsFromConfig() {
191    Map<TableName, List<String>> tabCFsMap = null;
192
193    // 1. null or empty string, result should be null
194    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(null);
195    assertEquals(null, tabCFsMap);
196
197    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig("");
198    assertEquals(null, tabCFsMap);
199
200    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig("   ");
201    assertEquals(null, tabCFsMap);
202
203    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
204    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
205    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
206
207    // 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3"
208    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1.getNameAsString());
209    assertEquals(1, tabCFsMap.size()); // only one table
210    assertTrue(tabCFsMap.containsKey(tableName1));   // its table name is "tableName1"
211    assertFalse(tabCFsMap.containsKey(tableName2));  // not other table
212    assertEquals(null, tabCFsMap.get(tableName1));   // null cf-list,
213
214    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName2 + ":cf1");
215    assertEquals(1, tabCFsMap.size()); // only one table
216    assertTrue(tabCFsMap.containsKey(tableName2));   // its table name is "tableName2"
217    assertFalse(tabCFsMap.containsKey(tableName1));  // not other table
218    assertEquals(1, tabCFsMap.get(tableName2).size());   // cf-list contains only 1 cf
219    assertEquals("cf1", tabCFsMap.get(tableName2).get(0));// the only cf is "cf1"
220
221    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3");
222    assertEquals(1, tabCFsMap.size()); // only one table
223    assertTrue(tabCFsMap.containsKey(tableName3));   // its table name is "tableName2"
224    assertFalse(tabCFsMap.containsKey(tableName1));  // not other table
225    assertEquals(2, tabCFsMap.get(tableName3).size());   // cf-list contains 2 cf
226    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));// contains "cf1"
227    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));// contains "cf3"
228
229    // 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
230    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1 + " ; " + tableName2
231            + ":cf1 ; " + tableName3 + ":cf1,cf3");
232    // 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
233    assertEquals(3, tabCFsMap.size());
234    assertTrue(tabCFsMap.containsKey(tableName1));
235    assertTrue(tabCFsMap.containsKey(tableName2));
236    assertTrue(tabCFsMap.containsKey(tableName3));
237    // 3.2 table "tab1" : null cf-list
238    assertEquals(null, tabCFsMap.get(tableName1));
239    // 3.3 table "tab2" : cf-list contains a single cf "cf1"
240    assertEquals(1, tabCFsMap.get(tableName2).size());
241    assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
242    // 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
243    assertEquals(2, tabCFsMap.get(tableName3).size());
244    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
245    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
246
247    // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
248    // still use the example of multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
249    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
250      tableName1 + " ; ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,,cf3 ;");
251    // 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
252    assertEquals(3, tabCFsMap.size());
253    assertTrue(tabCFsMap.containsKey(tableName1));
254    assertTrue(tabCFsMap.containsKey(tableName2));
255    assertTrue(tabCFsMap.containsKey(tableName3));
256    // 4.2 table "tab1" : null cf-list
257    assertEquals(null, tabCFsMap.get(tableName1));
258    // 4.3 table "tab2" : cf-list contains a single cf "cf1"
259    assertEquals(1, tabCFsMap.get(tableName2).size());
260    assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
261    // 4.4 table "tab3" : cf-list contains "cf1" and "cf3"
262    assertEquals(2, tabCFsMap.get(tableName3).size());
263    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
264    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
265
266    // 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; tableName3:cf1,cf3"
267    //    "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be ignored totally
268    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
269      tableName1 + ":tt:cf1 ; " + tableName2 + "::cf1 ; " + tableName3 + ":cf1,cf3");
270    // 5.1 no "tableName1" and "tableName2", only "tableName3"
271    assertEquals(1, tabCFsMap.size()); // only one table
272    assertFalse(tabCFsMap.containsKey(tableName1));
273    assertFalse(tabCFsMap.containsKey(tableName2));
274    assertTrue(tabCFsMap.containsKey(tableName3));
275   // 5.2 table "tableName3" : cf-list contains "cf1" and "cf3"
276    assertEquals(2, tabCFsMap.get(tableName3).size());
277    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
278    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
279 }
280
281  @Test
282  public void testTableCFsHelperConverter() {
283
284    ReplicationProtos.TableCF[] tableCFs = null;
285    Map<TableName, List<String>> tabCFsMap = null;
286
287    // 1. null or empty string, result should be null
288    assertNull(ReplicationPeerConfigUtil.convert(tabCFsMap));
289
290    tabCFsMap = new HashMap<>();
291    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
292    assertEquals(0, tableCFs.length);
293
294    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
295    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
296    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
297
298    // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
299    tabCFsMap.clear();
300    tabCFsMap.put(tableName1, null);
301    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
302    assertEquals(1, tableCFs.length); // only one table
303    assertEquals(tableName1.toString(),
304        tableCFs[0].getTableName().getQualifier().toStringUtf8());
305    assertEquals(0, tableCFs[0].getFamiliesCount());
306
307    tabCFsMap.clear();
308    tabCFsMap.put(tableName2, new ArrayList<>());
309    tabCFsMap.get(tableName2).add("cf1");
310    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
311    assertEquals(1, tableCFs.length); // only one table
312    assertEquals(tableName2.toString(),
313        tableCFs[0].getTableName().getQualifier().toStringUtf8());
314    assertEquals(1, tableCFs[0].getFamiliesCount());
315    assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8());
316
317    tabCFsMap.clear();
318    tabCFsMap.put(tableName3, new ArrayList<>());
319    tabCFsMap.get(tableName3).add("cf1");
320    tabCFsMap.get(tableName3).add("cf3");
321    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
322    assertEquals(1, tableCFs.length);
323    assertEquals(tableName3.toString(),
324        tableCFs[0].getTableName().getQualifier().toStringUtf8());
325    assertEquals(2, tableCFs[0].getFamiliesCount());
326    assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8());
327    assertEquals("cf3", tableCFs[0].getFamilies(1).toStringUtf8());
328
329    tabCFsMap.clear();
330    tabCFsMap.put(tableName1, null);
331    tabCFsMap.put(tableName2, new ArrayList<>());
332    tabCFsMap.get(tableName2).add("cf1");
333    tabCFsMap.put(tableName3, new ArrayList<>());
334    tabCFsMap.get(tableName3).add("cf1");
335    tabCFsMap.get(tableName3).add("cf3");
336
337    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
338    assertEquals(3, tableCFs.length);
339    assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()));
340    assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()));
341    assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()));
342
343    assertEquals(0,
344        ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount());
345
346    assertEquals(1, ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())
347        .getFamiliesCount());
348    assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())
349        .getFamilies(0).toStringUtf8());
350
351    assertEquals(2, ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
352        .getFamiliesCount());
353    assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
354        .getFamilies(0).toStringUtf8());
355    assertEquals("cf3", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
356        .getFamilies(1).toStringUtf8());
357
358    tabCFsMap = ReplicationPeerConfigUtil.convert2Map(tableCFs);
359    assertEquals(3, tabCFsMap.size());
360    assertTrue(tabCFsMap.containsKey(tableName1));
361    assertTrue(tabCFsMap.containsKey(tableName2));
362    assertTrue(tabCFsMap.containsKey(tableName3));
363    // 3.2 table "tab1" : null cf-list
364    assertEquals(null, tabCFsMap.get(tableName1));
365    // 3.3 table "tab2" : cf-list contains a single cf "cf1"
366    assertEquals(1, tabCFsMap.get(tableName2).size());
367    assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
368    // 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
369    assertEquals(2, tabCFsMap.get(tableName3).size());
370    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
371    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
372  }
373
374  @Test
375  public void testPerTableCFReplication() throws Exception {
376    LOG.info("testPerTableCFReplication");
377    ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf1);
378    Connection connection1 = ConnectionFactory.createConnection(conf1);
379    Connection connection2 = ConnectionFactory.createConnection(conf2);
380    Connection connection3 = ConnectionFactory.createConnection(conf3);
381    try {
382      Admin admin1 = connection1.getAdmin();
383      Admin admin2 = connection2.getAdmin();
384      Admin admin3 = connection3.getAdmin();
385
386      admin1.createTable(tabA);
387      admin1.createTable(tabB);
388      admin1.createTable(tabC);
389      admin2.createTable(tabA);
390      admin2.createTable(tabB);
391      admin2.createTable(tabC);
392      admin3.createTable(tabA);
393      admin3.createTable(tabB);
394      admin3.createTable(tabC);
395
396      Table htab1A = connection1.getTable(tabAName);
397      Table htab2A = connection2.getTable(tabAName);
398      Table htab3A = connection3.getTable(tabAName);
399
400      Table htab1B = connection1.getTable(tabBName);
401      Table htab2B = connection2.getTable(tabBName);
402      Table htab3B = connection3.getTable(tabBName);
403
404      Table htab1C = connection1.getTable(tabCName);
405      Table htab2C = connection2.getTable(tabCName);
406      Table htab3C = connection3.getTable(tabCName);
407
408      // A. add cluster2/cluster3 as peers to cluster1
409      ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
410      rpc2.setClusterKey(utility2.getClusterKey());
411      rpc2.setReplicateAllUserTables(false);
412      Map<TableName, List<String>> tableCFs = new HashMap<>();
413      tableCFs.put(tabCName, null);
414      tableCFs.put(tabBName, new ArrayList<>());
415      tableCFs.get(tabBName).add("f1");
416      tableCFs.get(tabBName).add("f3");
417      replicationAdmin.addPeer("2", rpc2, tableCFs);
418
419      ReplicationPeerConfig rpc3 = new ReplicationPeerConfig();
420      rpc3.setClusterKey(utility3.getClusterKey());
421      rpc3.setReplicateAllUserTables(false);
422      tableCFs.clear();
423      tableCFs.put(tabAName, null);
424      tableCFs.put(tabBName, new ArrayList<>());
425      tableCFs.get(tabBName).add("f1");
426      tableCFs.get(tabBName).add("f2");
427      replicationAdmin.addPeer("3", rpc3, tableCFs);
428
429      // A1. tableA can only replicated to cluster3
430      putAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
431      ensureRowNotReplicated(row1, f1Name, htab2A);
432      deleteAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
433
434      putAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
435      ensureRowNotReplicated(row1, f2Name, htab2A);
436      deleteAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
437
438      putAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
439      ensureRowNotReplicated(row1, f3Name, htab2A);
440      deleteAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
441
442      // A2. cf 'f1' of tableB can replicated to both cluster2 and cluster3
443      putAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
444      deleteAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
445
446      //  cf 'f2' of tableB can only replicated to cluster3
447      putAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
448      ensureRowNotReplicated(row1, f2Name, htab2B);
449      deleteAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
450
451      //  cf 'f3' of tableB can only replicated to cluster2
452      putAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
453      ensureRowNotReplicated(row1, f3Name, htab3B);
454      deleteAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
455
456      // A3. tableC can only replicated to cluster2
457      putAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
458      ensureRowNotReplicated(row1, f1Name, htab3C);
459      deleteAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
460
461      putAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
462      ensureRowNotReplicated(row1, f2Name, htab3C);
463      deleteAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
464
465      putAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
466      ensureRowNotReplicated(row1, f3Name, htab3C);
467      deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
468
469      // B. change peers' replicable table-cf config
470      tableCFs.clear();
471      tableCFs.put(tabAName, new ArrayList<>());
472      tableCFs.get(tabAName).add("f1");
473      tableCFs.get(tabAName).add("f2");
474      tableCFs.put(tabCName, new ArrayList<>());
475      tableCFs.get(tabCName).add("f2");
476      tableCFs.get(tabCName).add("f3");
477      replicationAdmin.setPeerTableCFs("2", tableCFs);
478
479      tableCFs.clear();
480      tableCFs.put(tabBName, null);
481      tableCFs.put(tabCName, new ArrayList<>());
482      tableCFs.get(tabCName).add("f3");
483      replicationAdmin.setPeerTableCFs("3", tableCFs);
484
485      // B1. cf 'f1' of tableA can only replicated to cluster2
486      putAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
487      ensureRowNotReplicated(row2, f1Name, htab3A);
488      deleteAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
489      //     cf 'f2' of tableA can only replicated to cluster2
490      putAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
491      ensureRowNotReplicated(row2, f2Name, htab3A);
492      deleteAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
493      //     cf 'f3' of tableA isn't replicable to either cluster2 or cluster3
494      putAndWaitWithFamily(row2, f3Name, htab1A);
495      ensureRowNotReplicated(row2, f3Name, htab2A, htab3A);
496      deleteAndWaitWithFamily(row2, f3Name, htab1A);
497
498      // B2. tableB can only replicated to cluster3
499      putAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
500      ensureRowNotReplicated(row2, f1Name, htab2B);
501      deleteAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
502
503      putAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
504      ensureRowNotReplicated(row2, f2Name, htab2B);
505      deleteAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
506
507      putAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
508      ensureRowNotReplicated(row2, f3Name, htab2B);
509      deleteAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
510
511      // B3. cf 'f1' of tableC non-replicable to either cluster
512      putAndWaitWithFamily(row2, f1Name, htab1C);
513      ensureRowNotReplicated(row2, f1Name, htab2C, htab3C);
514      deleteAndWaitWithFamily(row2, f1Name, htab1C);
515      //     cf 'f2' of tableC can only replicated to cluster2
516      putAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
517      ensureRowNotReplicated(row2, f2Name, htab3C);
518      deleteAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
519      //     cf 'f3' of tableC can replicated to cluster2 and cluster3
520      putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
521      deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
522    } finally {
523      connection1.close();
524      connection2.close();
525      connection3.close();
526    }
527  }
528
529  private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException {
530    Get get = new Get(row);
531    get.addFamily(fam);
532    for (Table table : tables) {
533      Result res = table.get(get);
534      assertEquals(0, res.size());
535    }
536  }
537
538  private void deleteAndWaitWithFamily(byte[] row, byte[] fam,
539      Table source, Table... targets)
540    throws Exception {
541    Delete del = new Delete(row);
542    del.addFamily(fam);
543    source.delete(del);
544
545    Get get = new Get(row);
546    get.addFamily(fam);
547    for (int i = 0; i < NB_RETRIES; i++) {
548      if (i==NB_RETRIES-1) {
549        fail("Waited too much time for del replication");
550      }
551      boolean removedFromAll = true;
552      for (Table target : targets) {
553        Result res = target.get(get);
554        if (res.size() >= 1) {
555          LOG.info("Row not deleted");
556          removedFromAll = false;
557          break;
558        }
559      }
560      if (removedFromAll) {
561        break;
562      } else {
563        Thread.sleep(SLEEP_TIME);
564      }
565    }
566  }
567
568  private void putAndWaitWithFamily(byte[] row, byte[] fam,
569      Table source, Table... targets)
570    throws Exception {
571    Put put = new Put(row);
572    put.addColumn(fam, row, val);
573    source.put(put);
574
575    Get get = new Get(row);
576    get.addFamily(fam);
577    for (int i = 0; i < NB_RETRIES; i++) {
578      if (i==NB_RETRIES-1) {
579        fail("Waited too much time for put replication");
580      }
581      boolean replicatedToAll = true;
582      for (Table target : targets) {
583        Result res = target.get(get);
584        if (res.isEmpty()) {
585          LOG.info("Row not available");
586          replicatedToAll = false;
587          break;
588        } else {
589          assertEquals(1, res.size());
590          assertArrayEquals(val, res.value());
591        }
592      }
593      if (replicatedToAll) {
594        break;
595      } else {
596        Thread.sleep(SLEEP_TIME);
597      }
598    }
599  }
600}