001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.fail;
024
025import java.io.Closeable;
026import java.io.IOException;
027import java.util.Arrays;
028import java.util.List;
029import java.util.Optional;
030import java.util.Random;
031import java.util.concurrent.CountDownLatch;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.MiniHBaseCluster;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.Admin;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.Connection;
048import org.apache.hadoop.hbase.client.ConnectionFactory;
049import org.apache.hadoop.hbase.client.Delete;
050import org.apache.hadoop.hbase.client.Durability;
051import org.apache.hadoop.hbase.client.Get;
052import org.apache.hadoop.hbase.client.Put;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
057import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
058import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
059import org.apache.hadoop.hbase.coprocessor.ObserverContext;
060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
062import org.apache.hadoop.hbase.coprocessor.RegionObserver;
063import org.apache.hadoop.hbase.regionserver.HRegion;
064import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
065import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
066import org.apache.hadoop.hbase.testclassification.LargeTests;
067import org.apache.hadoop.hbase.testclassification.ReplicationTests;
068import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.HFileTestUtil;
071import org.apache.hadoop.hbase.wal.WALEdit;
072import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
073import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
074import org.junit.After;
075import org.junit.Assert;
076import org.junit.Before;
077import org.junit.ClassRule;
078import org.junit.Test;
079import org.junit.experimental.categories.Category;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083@Category({ReplicationTests.class, LargeTests.class})
084public class TestMasterReplication {
085
086  @ClassRule
087  public static final HBaseClassTestRule CLASS_RULE =
088      HBaseClassTestRule.forClass(TestMasterReplication.class);
089
090  private static final Logger LOG = LoggerFactory.getLogger(TestMasterReplication.class);
091
092  private Configuration baseConfiguration;
093
094  private HBaseTestingUtility[] utilities;
095  private Configuration[] configurations;
096  private MiniZooKeeperCluster miniZK;
097
098  private static final long SLEEP_TIME = 1000;
099  private static final int NB_RETRIES = 120;
100
101  private static final TableName tableName = TableName.valueOf("test");
102  private static final byte[] famName = Bytes.toBytes("f");
103  private static final byte[] famName1 = Bytes.toBytes("f1");
104  private static final byte[] row = Bytes.toBytes("row");
105  private static final byte[] row1 = Bytes.toBytes("row1");
106  private static final byte[] row2 = Bytes.toBytes("row2");
107  private static final byte[] row3 = Bytes.toBytes("row3");
108  private static final byte[] row4 = Bytes.toBytes("row4");
109  private static final byte[] noRepfamName = Bytes.toBytes("norep");
110
111  private static final byte[] count = Bytes.toBytes("count");
112  private static final byte[] put = Bytes.toBytes("put");
113  private static final byte[] delete = Bytes.toBytes("delete");
114
115  private TableDescriptor table;
116
117  @Before
118  public void setUp() throws Exception {
119    baseConfiguration = HBaseConfiguration.create();
120    // smaller block size and capacity to trigger more operations
121    // and test them
122    baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
123    baseConfiguration.setInt("replication.source.size.capacity", 1024);
124    baseConfiguration.setLong("replication.source.sleepforretries", 100);
125    baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
126    baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
127    baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
128    baseConfiguration.set("hbase.replication.source.fs.conf.provider",
129      TestSourceFSConfigurationProvider.class.getCanonicalName());
130    baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
131    baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
132    baseConfiguration.setStrings(
133        CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
134        CoprocessorCounter.class.getName());
135    table = TableDescriptorBuilder.newBuilder(tableName)
136        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
137            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
138        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName1)
139            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
140        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
141  }
142
143  /**
144   * It tests the replication scenario involving 0 -> 1 -> 0. It does it by
145   * adding and deleting a row to a table in each cluster, checking if it's
146   * replicated. It also tests that the puts and deletes are not replicated back
147   * to the originating cluster.
148   */
149  @Test
150  public void testCyclicReplication1() throws Exception {
151    LOG.info("testSimplePutDelete");
152    int numClusters = 2;
153    Table[] htables = null;
154    try {
155      htables = setUpClusterTablesAndPeers(numClusters);
156
157      int[] expectedCounts = new int[] { 2, 2 };
158
159      // add rows to both clusters,
160      // make sure they are both replication
161      putAndWait(row, famName, htables[0], htables[1]);
162      putAndWait(row1, famName, htables[1], htables[0]);
163      validateCounts(htables, put, expectedCounts);
164
165      deleteAndWait(row, htables[0], htables[1]);
166      deleteAndWait(row1, htables[1], htables[0]);
167      validateCounts(htables, delete, expectedCounts);
168    } finally {
169      close(htables);
170      shutDownMiniClusters();
171    }
172  }
173
174  /**
175   * Tests the replication scenario 0 -> 0. By default
176   * {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint},
177   * the replication peer should not be added.
178   */
179  @Test(expected = DoNotRetryIOException.class)
180  public void testLoopedReplication()
181    throws Exception {
182    LOG.info("testLoopedReplication");
183    startMiniClusters(1);
184    createTableOnClusters(table);
185    addPeer("1", 0, 0);
186  }
187
188  /**
189   * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
190   * HFiles to a table in each cluster, checking if it's replicated.
191   */
192  @Test
193  public void testHFileCyclicReplication() throws Exception {
194    LOG.info("testHFileCyclicReplication");
195    int numClusters = 2;
196    Table[] htables = null;
197    try {
198      htables = setUpClusterTablesAndPeers(numClusters);
199
200      // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
201      // to cluster '1'.
202      byte[][][] hfileRanges =
203          new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
204              new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
205      int numOfRows = 100;
206      int[] expectedCounts =
207          new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
208
209      loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row,
210        famName, htables, hfileRanges, numOfRows, expectedCounts, true);
211
212      // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated
213      // to cluster '0'.
214      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
215          new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
216      numOfRows = 200;
217      int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
218          hfileRanges.length * numOfRows + expectedCounts[1] };
219
220      loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row,
221        famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
222
223    } finally {
224      close(htables);
225      shutDownMiniClusters();
226    }
227  }
228
229  private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception {
230    Table[] htables;
231    startMiniClusters(numClusters);
232    createTableOnClusters(table);
233
234    htables = getHTablesOnClusters(tableName);
235    // Test the replication scenarios of 0 -> 1 -> 0
236    addPeer("1", 0, 1);
237    addPeer("1", 1, 0);
238    return htables;
239  }
240
241  /**
242   * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a
243   * table in each clusters and ensuring that the each of these clusters get the appropriate
244   * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits
245   * originating from itself and also the edits that it received using replication from a different
246   * cluster. The scenario is explained in HBASE-9158
247   */
248  @Test
249  public void testCyclicReplication2() throws Exception {
250    LOG.info("testCyclicReplication2");
251    int numClusters = 3;
252    Table[] htables = null;
253    try {
254      startMiniClusters(numClusters);
255      createTableOnClusters(table);
256
257      // Test the replication scenario of 0 -> 1 -> 2 -> 0
258      addPeer("1", 0, 1);
259      addPeer("1", 1, 2);
260      addPeer("1", 2, 0);
261
262      htables = getHTablesOnClusters(tableName);
263
264      // put "row" and wait 'til it got around
265      putAndWait(row, famName, htables[0], htables[2]);
266      putAndWait(row1, famName, htables[1], htables[0]);
267      putAndWait(row2, famName, htables[2], htables[1]);
268
269      deleteAndWait(row, htables[0], htables[2]);
270      deleteAndWait(row1, htables[1], htables[0]);
271      deleteAndWait(row2, htables[2], htables[1]);
272
273      int[] expectedCounts = new int[] { 3, 3, 3 };
274      validateCounts(htables, put, expectedCounts);
275      validateCounts(htables, delete, expectedCounts);
276
277      // Test HBASE-9158
278      disablePeer("1", 2);
279      // we now have an edit that was replicated into cluster originating from
280      // cluster 0
281      putAndWait(row3, famName, htables[0], htables[1]);
282      // now add a local edit to cluster 1
283      htables[1].put(new Put(row4).addColumn(famName, row4, row4));
284      // re-enable replication from cluster 2 to cluster 0
285      enablePeer("1", 2);
286      // without HBASE-9158 the edit for row4 would have been marked with
287      // cluster 0's id
288      // and hence not replicated to cluster 0
289      wait(row4, htables[0], false);
290    } finally {
291      close(htables);
292      shutDownMiniClusters();
293    }
294  }
295
296  /**
297   * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk
298   * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers.
299   */
300  @Test
301  public void testHFileMultiSlaveReplication() throws Exception {
302    LOG.info("testHFileMultiSlaveReplication");
303    int numClusters = 3;
304    Table[] htables = null;
305    try {
306      startMiniClusters(numClusters);
307      createTableOnClusters(table);
308
309      // Add a slave, 0 -> 1
310      addPeer("1", 0, 1);
311
312      htables = getHTablesOnClusters(tableName);
313
314      // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
315      // to cluster '1'.
316      byte[][][] hfileRanges =
317          new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") },
318              new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, };
319      int numOfRows = 100;
320
321      int[] expectedCounts =
322          new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
323
324      loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row,
325        famName, htables, hfileRanges, numOfRows, expectedCounts, true);
326
327      // Validate data is not replicated to cluster '2'.
328      assertEquals(0, utilities[2].countRows(htables[2]));
329
330      rollWALAndWait(utilities[0], htables[0].getName(), row);
331
332      // Add one more slave, 0 -> 2
333      addPeer("2", 0, 2);
334
335      // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated
336      // to cluster '1' and '2'. Previous data should be replicated to cluster '2'.
337      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") },
338          new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, };
339      numOfRows = 200;
340
341      int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
342          hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows };
343
344      loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row,
345        famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
346
347    } finally {
348      close(htables);
349      shutDownMiniClusters();
350    }
351  }
352
353  /**
354   * It tests the bulk loaded hfile replication scenario to only explicitly specified table column
355   * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
356   * only one CF data to replicate.
357   */
358  @Test
359  public void testHFileReplicationForConfiguredTableCfs() throws Exception {
360    LOG.info("testHFileReplicationForConfiguredTableCfs");
361    int numClusters = 2;
362    Table[] htables = null;
363    try {
364      startMiniClusters(numClusters);
365      createTableOnClusters(table);
366
367      htables = getHTablesOnClusters(tableName);
368      // Test the replication scenarios only 'f' is configured for table data replication not 'f1'
369      addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName));
370
371      // Load 100 rows for each hfile range in cluster '0' for table CF 'f'
372      byte[][][] hfileRanges =
373          new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
374              new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
375      int numOfRows = 100;
376      int[] expectedCounts =
377          new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
378
379      loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables,
380        hfileRanges, numOfRows, expectedCounts, true);
381
382      // Load 100 rows for each hfile range in cluster '0' for table CF 'f1'
383      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
384          new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
385      numOfRows = 100;
386
387      int[] newExpectedCounts =
388          new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] };
389
390      loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables,
391        hfileRanges, numOfRows, newExpectedCounts, false);
392
393      // Validate data replication for CF 'f1'
394
395      // Source cluster table should contain data for the families
396      wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]);
397
398      // Sleep for enough time so that the data is still not replicated for the CF which is not
399      // configured for replication
400      Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME);
401      // Peer cluster should have only configured CF data
402      wait(1, htables[1], expectedCounts[1]);
403    } finally {
404      close(htables);
405      shutDownMiniClusters();
406    }
407  }
408
409  /**
410   * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
411   */
412  @Test
413  public void testCyclicReplication3() throws Exception {
414    LOG.info("testCyclicReplication2");
415    int numClusters = 3;
416    Table[] htables = null;
417    try {
418      startMiniClusters(numClusters);
419      createTableOnClusters(table);
420
421      // Test the replication scenario of 0 -> 1 -> 2 -> 1
422      addPeer("1", 0, 1);
423      addPeer("1", 1, 2);
424      addPeer("1", 2, 1);
425
426      htables = getHTablesOnClusters(tableName);
427
428      // put "row" and wait 'til it got around
429      putAndWait(row, famName, htables[0], htables[2]);
430      putAndWait(row1, famName, htables[1], htables[2]);
431      putAndWait(row2, famName, htables[2], htables[1]);
432
433      deleteAndWait(row, htables[0], htables[2]);
434      deleteAndWait(row1, htables[1], htables[2]);
435      deleteAndWait(row2, htables[2], htables[1]);
436
437      int[] expectedCounts = new int[] { 1, 3, 3 };
438      validateCounts(htables, put, expectedCounts);
439      validateCounts(htables, delete, expectedCounts);
440    } finally {
441      close(htables);
442      shutDownMiniClusters();
443    }
444  }
445
446  /**
447   * Tests that base replication peer configs are applied on peer creation
448   * and the configs are overriden if updated as part of updateReplicationPeerConfig()
449   *
450   */
451  @Test
452  public void testBasePeerConfigsForPeerMutations()
453    throws Exception {
454    LOG.info("testBasePeerConfigsForPeerMutations");
455    String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
456    String firstCustomPeerConfigValue = "test";
457    String firstCustomPeerConfigUpdatedValue = "test_updated";
458
459    String secondCustomPeerConfigKey = "hbase.xxx.custom_second_config";
460    String secondCustomPeerConfigValue = "testSecond";
461    String secondCustomPeerConfigUpdatedValue = "testSecondUpdated";
462    try {
463      baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
464        firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue));
465      startMiniClusters(2);
466      addPeer("1", 0, 1);
467      addPeer("2", 0, 1);
468      Admin admin = utilities[0].getAdmin();
469
470      // Validates base configs 1 is present for both peer.
471      Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
472        getConfiguration().get(firstCustomPeerConfigKey));
473      Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
474        getConfiguration().get(firstCustomPeerConfigKey));
475
476      // override value of configuration 1 for peer "1".
477      ReplicationPeerConfig updatedReplicationConfigForPeer1 = ReplicationPeerConfig.
478        newBuilder(admin.getReplicationPeerConfig("1")).
479        putConfiguration(firstCustomPeerConfigKey, firstCustomPeerConfigUpdatedValue).build();
480
481      // add configuration 2 for peer "2".
482      ReplicationPeerConfig updatedReplicationConfigForPeer2 = ReplicationPeerConfig.
483        newBuilder(admin.getReplicationPeerConfig("2")).
484        putConfiguration(secondCustomPeerConfigKey, secondCustomPeerConfigUpdatedValue).build();
485
486      admin.updateReplicationPeerConfig("1", updatedReplicationConfigForPeer1);
487      admin.updateReplicationPeerConfig("2", updatedReplicationConfigForPeer2);
488
489      // validates configuration is overridden by updateReplicationPeerConfig
490      Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1").
491        getConfiguration().get(firstCustomPeerConfigKey));
492      Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2").
493        getConfiguration().get(secondCustomPeerConfigKey));
494
495      // Add second config to base config and perform restart.
496      utilities[0].getConfiguration().set(ReplicationPeerConfigUtil.
497        HBASE_REPLICATION_PEER_BASE_CONFIG, firstCustomPeerConfigKey.concat("=").
498        concat(firstCustomPeerConfigValue).concat(";").concat(secondCustomPeerConfigKey)
499        .concat("=").concat(secondCustomPeerConfigValue));
500
501      utilities[0].shutdownMiniHBaseCluster();
502      utilities[0].restartHBaseCluster(1);
503      admin = utilities[0].getAdmin();
504
505      // Both retains the value of base configuration 1 value as before restart.
506      // Peer 1 (Update value), Peer 2 (Base Value)
507      Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1").
508        getConfiguration().get(firstCustomPeerConfigKey));
509      Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
510        getConfiguration().get(firstCustomPeerConfigKey));
511
512      // Peer 1 gets new base config as part of restart.
513      Assert.assertEquals(secondCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
514        getConfiguration().get(secondCustomPeerConfigKey));
515      // Peer 2 retains the updated value as before restart.
516      Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2").
517        getConfiguration().get(secondCustomPeerConfigKey));
518    } finally {
519      shutDownMiniClusters();
520      baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
521    }
522  }
523
524  @After
525  public void tearDown() throws IOException {
526    configurations = null;
527    utilities = null;
528  }
529
530  @SuppressWarnings("resource")
531  private void startMiniClusters(int numClusters) throws Exception {
532    Random random = new Random();
533    utilities = new HBaseTestingUtility[numClusters];
534    configurations = new Configuration[numClusters];
535    for (int i = 0; i < numClusters; i++) {
536      Configuration conf = new Configuration(baseConfiguration);
537      conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt());
538      HBaseTestingUtility utility = new HBaseTestingUtility(conf);
539      if (i == 0) {
540        utility.startMiniZKCluster();
541        miniZK = utility.getZkCluster();
542      } else {
543        utility.setZkCluster(miniZK);
544      }
545      utility.startMiniCluster();
546      utilities[i] = utility;
547      configurations[i] = conf;
548      new ZKWatcher(conf, "cluster" + i, null, true);
549    }
550  }
551
552  private void shutDownMiniClusters() throws Exception {
553    int numClusters = utilities.length;
554    for (int i = numClusters - 1; i >= 0; i--) {
555      if (utilities[i] != null) {
556        utilities[i].shutdownMiniCluster();
557      }
558    }
559    miniZK.shutdown();
560  }
561
562  private void createTableOnClusters(TableDescriptor table) throws Exception {
563    for (HBaseTestingUtility utility : utilities) {
564      utility.getAdmin().createTable(table);
565    }
566  }
567
568  private void addPeer(String id, int masterClusterNumber,
569      int slaveClusterNumber) throws Exception {
570    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
571      Admin admin = conn.getAdmin()) {
572      admin.addReplicationPeer(id,
573        new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()));
574    }
575  }
576
577  private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
578      throws Exception {
579    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
580      Admin admin = conn.getAdmin()) {
581      admin.addReplicationPeer(
582        id,
583        new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
584            .setReplicateAllUserTables(false)
585            .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)));
586    }
587  }
588
589  private void disablePeer(String id, int masterClusterNumber) throws Exception {
590    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
591      Admin admin = conn.getAdmin()) {
592      admin.disableReplicationPeer(id);
593    }
594  }
595
596  private void enablePeer(String id, int masterClusterNumber) throws Exception {
597    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
598      Admin admin = conn.getAdmin()) {
599      admin.enableReplicationPeer(id);
600    }
601  }
602
603  private void close(Closeable... closeables) {
604    try {
605      if (closeables != null) {
606        for (Closeable closeable : closeables) {
607          closeable.close();
608        }
609      }
610    } catch (Exception e) {
611      LOG.warn("Exception occurred while closing the object:", e);
612    }
613  }
614
615  @SuppressWarnings("resource")
616  private Table[] getHTablesOnClusters(TableName tableName) throws Exception {
617    int numClusters = utilities.length;
618    Table[] htables = new Table[numClusters];
619    for (int i = 0; i < numClusters; i++) {
620      Table htable = ConnectionFactory.createConnection(configurations[i]).getTable(tableName);
621      htables[i] = htable;
622    }
623    return htables;
624  }
625
626  private void validateCounts(Table[] htables, byte[] type,
627      int[] expectedCounts) throws IOException {
628    for (int i = 0; i < htables.length; i++) {
629      assertEquals(Bytes.toString(type) + " were replicated back ",
630          expectedCounts[i], getCount(htables[i], type));
631    }
632  }
633
634  private int getCount(Table t, byte[] type) throws IOException {
635    Get test = new Get(row);
636    test.setAttribute("count", new byte[] {});
637    Result res = t.get(test);
638    return Bytes.toInt(res.getValue(count, type));
639  }
640
641  private void deleteAndWait(byte[] row, Table source, Table target)
642      throws Exception {
643    Delete del = new Delete(row);
644    source.delete(del);
645    wait(row, target, true);
646  }
647
648  private void putAndWait(byte[] row, byte[] fam, Table source, Table target)
649      throws Exception {
650    Put put = new Put(row);
651    put.addColumn(fam, row, row);
652    source.put(put);
653    wait(row, target, false);
654  }
655
656  private void loadAndValidateHFileReplication(String testName, int masterNumber,
657      int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges,
658      int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception {
659    HBaseTestingUtility util = utilities[masterNumber];
660
661    Path dir = util.getDataTestDirOnTestFS(testName);
662    FileSystem fs = util.getTestFileSystem();
663    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
664    Path familyDir = new Path(dir, Bytes.toString(fam));
665
666    int hfileIdx = 0;
667    for (byte[][] range : hfileRanges) {
668      byte[] from = range[0];
669      byte[] to = range[1];
670      HFileTestUtil.createHFile(util.getConfiguration(), fs,
671        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
672    }
673
674    Table source = tables[masterNumber];
675    final TableName tableName = source.getName();
676    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
677    String[] args = { dir.toString(), tableName.toString() };
678    loader.run(args);
679
680    if (toValidate) {
681      for (int slaveClusterNumber : slaveNumbers) {
682        wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]);
683      }
684    }
685  }
686
687  private void wait(int slaveNumber, Table target, int expectedCount)
688      throws IOException, InterruptedException {
689    int count = 0;
690    for (int i = 0; i < NB_RETRIES; i++) {
691      if (i == NB_RETRIES - 1) {
692        fail("Waited too much time for bulkloaded data replication. Current count=" + count
693            + ", expected count=" + expectedCount);
694      }
695      count = utilities[slaveNumber].countRows(target);
696      if (count != expectedCount) {
697        LOG.info("Waiting more time for bulkloaded data replication.");
698        Thread.sleep(SLEEP_TIME);
699      } else {
700        break;
701      }
702    }
703  }
704
705  private void wait(byte[] row, Table target, boolean isDeleted) throws Exception {
706    Get get = new Get(row);
707    for (int i = 0; i < NB_RETRIES; i++) {
708      if (i == NB_RETRIES - 1) {
709        fail("Waited too much time for replication. Row:" + Bytes.toString(row)
710            + ". IsDeleteReplication:" + isDeleted);
711      }
712      Result res = target.get(get);
713      boolean sleep = isDeleted ? res.size() > 0 : res.isEmpty();
714      if (sleep) {
715        LOG.info("Waiting for more time for replication. Row:"
716            + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
717        Thread.sleep(SLEEP_TIME);
718      } else {
719        if (!isDeleted) {
720          assertArrayEquals(res.value(), row);
721        }
722        LOG.info("Obtained row:"
723            + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
724        break;
725      }
726    }
727  }
728
729  private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
730      final byte[] row) throws IOException {
731    final Admin admin = utility.getAdmin();
732    final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
733
734    // find the region that corresponds to the given row.
735    HRegion region = null;
736    for (HRegion candidate : cluster.getRegions(table)) {
737      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
738        region = candidate;
739        break;
740      }
741    }
742    assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
743
744    final CountDownLatch latch = new CountDownLatch(1);
745
746    // listen for successful log rolls
747    final WALActionsListener listener = new WALActionsListener() {
748          @Override
749          public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
750            latch.countDown();
751          }
752        };
753    region.getWAL().registerWALActionsListener(listener);
754
755    // request a roll
756    admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(),
757      region.getRegionInfo().getRegionName()));
758
759    // wait
760    try {
761      latch.await();
762    } catch (InterruptedException exception) {
763      LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
764          "replication tests fail, it's probably because we should still be waiting.");
765      Thread.currentThread().interrupt();
766    }
767    region.getWAL().unregisterWALActionsListener(listener);
768  }
769
770  /**
771   * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
772   * timestamp there is otherwise no way to count them.
773   */
774  public static class CoprocessorCounter implements RegionCoprocessor, RegionObserver {
775    private int nCount = 0;
776    private int nDelete = 0;
777
778    @Override
779    public Optional<RegionObserver> getRegionObserver() {
780      return Optional.of(this);
781    }
782
783    @Override
784    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
785        final WALEdit edit, final Durability durability) throws IOException {
786      nCount++;
787    }
788
789    @Override
790    public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
791        final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
792      nDelete++;
793    }
794
795    @Override
796    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
797        final Get get, final List<Cell> result) throws IOException {
798      if (get.getAttribute("count") != null) {
799        result.clear();
800        // order is important!
801        result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete)));
802        result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount)));
803        c.bypass();
804      }
805    }
806  }
807
808}