001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.fail;
024
025import java.io.Closeable;
026import java.io.IOException;
027import java.util.Arrays;
028import java.util.List;
029import java.util.Optional;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.ThreadLocalRandom;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.DoNotRetryIOException;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.MiniHBaseCluster;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.Admin;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.Delete;
049import org.apache.hadoop.hbase.client.Durability;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.Result;
053import org.apache.hadoop.hbase.client.Table;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
056import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
057import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
058import org.apache.hadoop.hbase.coprocessor.ObserverContext;
059import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
061import org.apache.hadoop.hbase.coprocessor.RegionObserver;
062import org.apache.hadoop.hbase.regionserver.HRegion;
063import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
064import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
065import org.apache.hadoop.hbase.testclassification.LargeTests;
066import org.apache.hadoop.hbase.testclassification.ReplicationTests;
067import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.HFileTestUtil;
070import org.apache.hadoop.hbase.wal.WALEdit;
071import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
072import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
073import org.junit.After;
074import org.junit.Assert;
075import org.junit.Before;
076import org.junit.ClassRule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082@Category({ ReplicationTests.class, LargeTests.class })
083public class TestMasterReplication {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE =
087    HBaseClassTestRule.forClass(TestMasterReplication.class);
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestMasterReplication.class);
090
091  private Configuration baseConfiguration;
092
093  private HBaseTestingUtility[] utilities;
094  private Configuration[] configurations;
095  private MiniZooKeeperCluster miniZK;
096
097  private static final long SLEEP_TIME = 1000;
098  private static final int NB_RETRIES = 120;
099
100  private static final TableName tableName = TableName.valueOf("test");
101  private static final byte[] famName = Bytes.toBytes("f");
102  private static final byte[] famName1 = Bytes.toBytes("f1");
103  private static final byte[] row = Bytes.toBytes("row");
104  private static final byte[] row1 = Bytes.toBytes("row1");
105  private static final byte[] row2 = Bytes.toBytes("row2");
106  private static final byte[] row3 = Bytes.toBytes("row3");
107  private static final byte[] row4 = Bytes.toBytes("row4");
108  private static final byte[] noRepfamName = Bytes.toBytes("norep");
109
110  private static final byte[] count = Bytes.toBytes("count");
111  private static final byte[] put = Bytes.toBytes("put");
112  private static final byte[] delete = Bytes.toBytes("delete");
113
114  private TableDescriptor table;
115
116  @Before
117  public void setUp() throws Exception {
118    baseConfiguration = HBaseConfiguration.create();
119    // smaller block size and capacity to trigger more operations
120    // and test them
121    baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
122    baseConfiguration.setInt("replication.source.size.capacity", 1024);
123    baseConfiguration.setLong("replication.source.sleepforretries", 100);
124    baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
125    baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
126    baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
127    baseConfiguration.set("hbase.replication.source.fs.conf.provider",
128      TestSourceFSConfigurationProvider.class.getCanonicalName());
129    baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
130    baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
131    baseConfiguration.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
132      CoprocessorCounter.class.getName());
133    table = TableDescriptorBuilder.newBuilder(tableName)
134      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
135        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
136      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName1)
137        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
138      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
139  }
140
141  /**
142   * It tests the replication scenario involving 0 -> 1 -> 0. It does it by adding and deleting a
143   * row to a table in each cluster, checking if it's replicated. It also tests that the puts and
144   * deletes are not replicated back to the originating cluster.
145   */
146  @Test
147  public void testCyclicReplication1() throws Exception {
148    LOG.info("testSimplePutDelete");
149    int numClusters = 2;
150    Table[] htables = null;
151    try {
152      htables = setUpClusterTablesAndPeers(numClusters);
153
154      int[] expectedCounts = new int[] { 2, 2 };
155
156      // add rows to both clusters,
157      // make sure they are both replication
158      putAndWait(row, famName, htables[0], htables[1]);
159      putAndWait(row1, famName, htables[1], htables[0]);
160      validateCounts(htables, put, expectedCounts);
161
162      deleteAndWait(row, htables[0], htables[1]);
163      deleteAndWait(row1, htables[1], htables[0]);
164      validateCounts(htables, delete, expectedCounts);
165    } finally {
166      close(htables);
167      shutDownMiniClusters();
168    }
169  }
170
171  /**
172   * Tests the replication scenario 0 -> 0. By default
173   * {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint},
174   * the replication peer should not be added.
175   */
176  @Test(expected = DoNotRetryIOException.class)
177  public void testLoopedReplication() throws Exception {
178    LOG.info("testLoopedReplication");
179    startMiniClusters(1);
180    createTableOnClusters(table);
181    addPeer("1", 0, 0);
182  }
183
184  /**
185   * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
186   * HFiles to a table in each cluster, checking if it's replicated.
187   */
188  @Test
189  public void testHFileCyclicReplication() throws Exception {
190    LOG.info("testHFileCyclicReplication");
191    int numClusters = 2;
192    Table[] htables = null;
193    try {
194      htables = setUpClusterTablesAndPeers(numClusters);
195
196      // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
197      // to cluster '1'.
198      byte[][][] hfileRanges =
199        new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
200          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
201      int numOfRows = 100;
202      int[] expectedCounts =
203        new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
204
205      loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row,
206        famName, htables, hfileRanges, numOfRows, expectedCounts, true);
207
208      // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated
209      // to cluster '0'.
210      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
211        new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
212      numOfRows = 200;
213      int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
214        hfileRanges.length * numOfRows + expectedCounts[1] };
215
216      loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row,
217        famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
218
219    } finally {
220      close(htables);
221      shutDownMiniClusters();
222    }
223  }
224
225  private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception {
226    Table[] htables;
227    startMiniClusters(numClusters);
228    createTableOnClusters(table);
229
230    htables = getHTablesOnClusters(tableName);
231    // Test the replication scenarios of 0 -> 1 -> 0
232    addPeer("1", 0, 1);
233    addPeer("1", 1, 0);
234    return htables;
235  }
236
237  /**
238   * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a
239   * table in each clusters and ensuring that the each of these clusters get the appropriate
240   * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits
241   * originating from itself and also the edits that it received using replication from a different
242   * cluster. The scenario is explained in HBASE-9158
243   */
244  @Test
245  public void testCyclicReplication2() throws Exception {
246    LOG.info("testCyclicReplication2");
247    int numClusters = 3;
248    Table[] htables = null;
249    try {
250      startMiniClusters(numClusters);
251      createTableOnClusters(table);
252
253      // Test the replication scenario of 0 -> 1 -> 2 -> 0
254      addPeer("1", 0, 1);
255      addPeer("1", 1, 2);
256      addPeer("1", 2, 0);
257
258      htables = getHTablesOnClusters(tableName);
259
260      // put "row" and wait 'til it got around
261      putAndWait(row, famName, htables[0], htables[2]);
262      putAndWait(row1, famName, htables[1], htables[0]);
263      putAndWait(row2, famName, htables[2], htables[1]);
264
265      deleteAndWait(row, htables[0], htables[2]);
266      deleteAndWait(row1, htables[1], htables[0]);
267      deleteAndWait(row2, htables[2], htables[1]);
268
269      int[] expectedCounts = new int[] { 3, 3, 3 };
270      validateCounts(htables, put, expectedCounts);
271      validateCounts(htables, delete, expectedCounts);
272
273      // Test HBASE-9158
274      disablePeer("1", 2);
275      // we now have an edit that was replicated into cluster originating from
276      // cluster 0
277      putAndWait(row3, famName, htables[0], htables[1]);
278      // now add a local edit to cluster 1
279      htables[1].put(new Put(row4).addColumn(famName, row4, row4));
280      // re-enable replication from cluster 2 to cluster 0
281      enablePeer("1", 2);
282      // without HBASE-9158 the edit for row4 would have been marked with
283      // cluster 0's id
284      // and hence not replicated to cluster 0
285      wait(row4, htables[0], false);
286    } finally {
287      close(htables);
288      shutDownMiniClusters();
289    }
290  }
291
292  /**
293   * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk
294   * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers.
295   */
296  @Test
297  public void testHFileMultiSlaveReplication() throws Exception {
298    LOG.info("testHFileMultiSlaveReplication");
299    int numClusters = 3;
300    Table[] htables = null;
301    try {
302      startMiniClusters(numClusters);
303      createTableOnClusters(table);
304
305      // Add a slave, 0 -> 1
306      addPeer("1", 0, 1);
307
308      htables = getHTablesOnClusters(tableName);
309
310      // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
311      // to cluster '1'.
312      byte[][][] hfileRanges =
313        new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") },
314          new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, };
315      int numOfRows = 100;
316
317      int[] expectedCounts =
318        new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
319
320      loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row,
321        famName, htables, hfileRanges, numOfRows, expectedCounts, true);
322
323      // Validate data is not replicated to cluster '2'.
324      assertEquals(0, utilities[2].countRows(htables[2]));
325
326      rollWALAndWait(utilities[0], htables[0].getName(), row);
327
328      // Add one more slave, 0 -> 2
329      addPeer("2", 0, 2);
330
331      // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated
332      // to cluster '1' and '2'. Previous data should be replicated to cluster '2'.
333      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") },
334        new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, };
335      numOfRows = 200;
336
337      int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
338        hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows };
339
340      loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row,
341        famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
342
343    } finally {
344      close(htables);
345      shutDownMiniClusters();
346    }
347  }
348
349  /**
350   * It tests the bulk loaded hfile replication scenario to only explicitly specified table column
351   * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
352   * only one CF data to replicate.
353   */
354  @Test
355  public void testHFileReplicationForConfiguredTableCfs() throws Exception {
356    LOG.info("testHFileReplicationForConfiguredTableCfs");
357    int numClusters = 2;
358    Table[] htables = null;
359    try {
360      startMiniClusters(numClusters);
361      createTableOnClusters(table);
362
363      htables = getHTablesOnClusters(tableName);
364      // Test the replication scenarios only 'f' is configured for table data replication not 'f1'
365      addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName));
366
367      // Load 100 rows for each hfile range in cluster '0' for table CF 'f'
368      byte[][][] hfileRanges =
369        new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
370          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
371      int numOfRows = 100;
372      int[] expectedCounts =
373        new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
374
375      loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables,
376        hfileRanges, numOfRows, expectedCounts, true);
377
378      // Load 100 rows for each hfile range in cluster '0' for table CF 'f1'
379      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
380        new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
381      numOfRows = 100;
382
383      int[] newExpectedCounts =
384        new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] };
385
386      loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables,
387        hfileRanges, numOfRows, newExpectedCounts, false);
388
389      // Validate data replication for CF 'f1'
390
391      // Source cluster table should contain data for the families
392      wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]);
393
394      // Sleep for enough time so that the data is still not replicated for the CF which is not
395      // configured for replication
396      Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME);
397      // Peer cluster should have only configured CF data
398      wait(1, htables[1], expectedCounts[1]);
399    } finally {
400      close(htables);
401      shutDownMiniClusters();
402    }
403  }
404
405  /**
406   * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
407   */
408  @Test
409  public void testCyclicReplication3() throws Exception {
410    LOG.info("testCyclicReplication2");
411    int numClusters = 3;
412    Table[] htables = null;
413    try {
414      startMiniClusters(numClusters);
415      createTableOnClusters(table);
416
417      // Test the replication scenario of 0 -> 1 -> 2 -> 1
418      addPeer("1", 0, 1);
419      addPeer("1", 1, 2);
420      addPeer("1", 2, 1);
421
422      htables = getHTablesOnClusters(tableName);
423
424      // put "row" and wait 'til it got around
425      putAndWait(row, famName, htables[0], htables[2]);
426      putAndWait(row1, famName, htables[1], htables[2]);
427      putAndWait(row2, famName, htables[2], htables[1]);
428
429      deleteAndWait(row, htables[0], htables[2]);
430      deleteAndWait(row1, htables[1], htables[2]);
431      deleteAndWait(row2, htables[2], htables[1]);
432
433      int[] expectedCounts = new int[] { 1, 3, 3 };
434      validateCounts(htables, put, expectedCounts);
435      validateCounts(htables, delete, expectedCounts);
436    } finally {
437      close(htables);
438      shutDownMiniClusters();
439    }
440  }
441
442  /**
443   * Tests that base replication peer configs are applied on peer creation and the configs are
444   * overriden if updated as part of updateReplicationPeerConfig()
445   */
446  @Test
447  public void testBasePeerConfigsForReplicationPeer() throws Exception {
448    LOG.info("testBasePeerConfigsForPeerMutations");
449    String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
450    String firstCustomPeerConfigValue = "test";
451    String firstCustomPeerConfigUpdatedValue = "test_updated";
452
453    String secondCustomPeerConfigKey = "hbase.xxx.custom_second_config";
454    String secondCustomPeerConfigValue = "testSecond";
455    String secondCustomPeerConfigUpdatedValue = "testSecondUpdated";
456    try {
457      baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
458        firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue));
459      startMiniClusters(2);
460      addPeer("1", 0, 1);
461      addPeer("2", 0, 1);
462      Admin admin = utilities[0].getAdmin();
463
464      // Validates base configs 1 is present for both peer.
465      Assert.assertEquals(firstCustomPeerConfigValue,
466        admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey));
467      Assert.assertEquals(firstCustomPeerConfigValue,
468        admin.getReplicationPeerConfig("2").getConfiguration().get(firstCustomPeerConfigKey));
469
470      // override value of configuration 1 for peer "1".
471      ReplicationPeerConfig updatedReplicationConfigForPeer1 =
472        ReplicationPeerConfig.newBuilder(admin.getReplicationPeerConfig("1"))
473          .putConfiguration(firstCustomPeerConfigKey, firstCustomPeerConfigUpdatedValue).build();
474
475      // add configuration 2 for peer "2".
476      ReplicationPeerConfig updatedReplicationConfigForPeer2 =
477        ReplicationPeerConfig.newBuilder(admin.getReplicationPeerConfig("2"))
478          .putConfiguration(secondCustomPeerConfigKey, secondCustomPeerConfigUpdatedValue).build();
479
480      admin.updateReplicationPeerConfig("1", updatedReplicationConfigForPeer1);
481      admin.updateReplicationPeerConfig("2", updatedReplicationConfigForPeer2);
482
483      // validates configuration is overridden by updateReplicationPeerConfig
484      Assert.assertEquals(firstCustomPeerConfigUpdatedValue,
485        admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey));
486      Assert.assertEquals(secondCustomPeerConfigUpdatedValue,
487        admin.getReplicationPeerConfig("2").getConfiguration().get(secondCustomPeerConfigKey));
488
489      // Add second config to base config and perform restart.
490      utilities[0].getConfiguration().set(
491        ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
492        firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue).concat(";")
493          .concat(secondCustomPeerConfigKey).concat("=").concat(secondCustomPeerConfigValue));
494
495      utilities[0].shutdownMiniHBaseCluster();
496      utilities[0].restartHBaseCluster(1);
497      admin = utilities[0].getAdmin();
498
499      // Configurations should be updated after restart again
500      Assert.assertEquals(firstCustomPeerConfigValue,
501        admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey));
502      Assert.assertEquals(firstCustomPeerConfigValue,
503        admin.getReplicationPeerConfig("2").getConfiguration().get(firstCustomPeerConfigKey));
504
505      Assert.assertEquals(secondCustomPeerConfigValue,
506        admin.getReplicationPeerConfig("1").getConfiguration().get(secondCustomPeerConfigKey));
507      Assert.assertEquals(secondCustomPeerConfigValue,
508        admin.getReplicationPeerConfig("2").getConfiguration().get(secondCustomPeerConfigKey));
509    } finally {
510      shutDownMiniClusters();
511      baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
512    }
513  }
514
515  @Test
516  public void testBasePeerConfigsRemovalForReplicationPeer() throws Exception {
517    LOG.info("testBasePeerConfigsForPeerMutations");
518    String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
519    String firstCustomPeerConfigValue = "test";
520
521    try {
522      baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
523        firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue));
524      startMiniClusters(2);
525      addPeer("1", 0, 1);
526      Admin admin = utilities[0].getAdmin();
527
528      // Validates base configs 1 is present for both peer.
529      Assert.assertEquals(firstCustomPeerConfigValue,
530        admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey));
531
532      utilities[0].getConfiguration()
533        .unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
534      utilities[0].getConfiguration().set(
535        ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
536        firstCustomPeerConfigKey.concat("=").concat(""));
537
538      utilities[0].shutdownMiniHBaseCluster();
539      utilities[0].restartHBaseCluster(1);
540      admin = utilities[0].getAdmin();
541
542      // Configurations should be removed after restart again
543      Assert.assertNull(
544        admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey));
545    } finally {
546      shutDownMiniClusters();
547      baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
548    }
549  }
550
551  @Test
552  public void testRemoveBasePeerConfigWithoutExistingConfigForReplicationPeer() throws Exception {
553    LOG.info("testBasePeerConfigsForPeerMutations");
554    String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
555
556    try {
557      baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
558        firstCustomPeerConfigKey.concat("=").concat(""));
559      startMiniClusters(2);
560      addPeer("1", 0, 1);
561      Admin admin = utilities[0].getAdmin();
562
563      Assert.assertNull("Config should not be there",
564        admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey));
565    } finally {
566      shutDownMiniClusters();
567      baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
568    }
569  }
570
571  @After
572  public void tearDown() throws IOException {
573    configurations = null;
574    utilities = null;
575  }
576
577  @SuppressWarnings("resource")
578  private void startMiniClusters(int numClusters) throws Exception {
579    utilities = new HBaseTestingUtility[numClusters];
580    configurations = new Configuration[numClusters];
581    for (int i = 0; i < numClusters; i++) {
582      Configuration conf = new Configuration(baseConfiguration);
583      conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + ThreadLocalRandom.current().nextInt());
584      HBaseTestingUtility utility = new HBaseTestingUtility(conf);
585      if (i == 0) {
586        utility.startMiniZKCluster();
587        miniZK = utility.getZkCluster();
588      } else {
589        utility.setZkCluster(miniZK);
590      }
591      utility.startMiniCluster();
592      utilities[i] = utility;
593      configurations[i] = conf;
594      new ZKWatcher(conf, "cluster" + i, null, true);
595    }
596  }
597
598  private void shutDownMiniClusters() throws Exception {
599    int numClusters = utilities.length;
600    for (int i = numClusters - 1; i >= 0; i--) {
601      if (utilities[i] != null) {
602        utilities[i].shutdownMiniCluster();
603      }
604    }
605    miniZK.shutdown();
606  }
607
608  private void createTableOnClusters(TableDescriptor table) throws Exception {
609    for (HBaseTestingUtility utility : utilities) {
610      utility.getAdmin().createTable(table);
611    }
612  }
613
614  private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber)
615    throws Exception {
616    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
617      Admin admin = conn.getAdmin()) {
618      admin.addReplicationPeer(id,
619        new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()));
620    }
621  }
622
623  private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
624    throws Exception {
625    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
626      Admin admin = conn.getAdmin()) {
627      admin.addReplicationPeer(id,
628        new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
629          .setReplicateAllUserTables(false)
630          .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)));
631    }
632  }
633
634  private void disablePeer(String id, int masterClusterNumber) throws Exception {
635    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
636      Admin admin = conn.getAdmin()) {
637      admin.disableReplicationPeer(id);
638    }
639  }
640
641  private void enablePeer(String id, int masterClusterNumber) throws Exception {
642    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
643      Admin admin = conn.getAdmin()) {
644      admin.enableReplicationPeer(id);
645    }
646  }
647
648  private void close(Closeable... closeables) {
649    try {
650      if (closeables != null) {
651        for (Closeable closeable : closeables) {
652          closeable.close();
653        }
654      }
655    } catch (Exception e) {
656      LOG.warn("Exception occurred while closing the object:", e);
657    }
658  }
659
660  @SuppressWarnings("resource")
661  private Table[] getHTablesOnClusters(TableName tableName) throws Exception {
662    int numClusters = utilities.length;
663    Table[] htables = new Table[numClusters];
664    for (int i = 0; i < numClusters; i++) {
665      Table htable = ConnectionFactory.createConnection(configurations[i]).getTable(tableName);
666      htables[i] = htable;
667    }
668    return htables;
669  }
670
671  private void validateCounts(Table[] htables, byte[] type, int[] expectedCounts)
672    throws IOException {
673    for (int i = 0; i < htables.length; i++) {
674      assertEquals(Bytes.toString(type) + " were replicated back ", expectedCounts[i],
675        getCount(htables[i], type));
676    }
677  }
678
679  private int getCount(Table t, byte[] type) throws IOException {
680    Get test = new Get(row);
681    test.setAttribute("count", new byte[] {});
682    Result res = t.get(test);
683    return Bytes.toInt(res.getValue(count, type));
684  }
685
686  private void deleteAndWait(byte[] row, Table source, Table target) throws Exception {
687    Delete del = new Delete(row);
688    source.delete(del);
689    wait(row, target, true);
690  }
691
692  private void putAndWait(byte[] row, byte[] fam, Table source, Table target) throws Exception {
693    Put put = new Put(row);
694    put.addColumn(fam, row, row);
695    source.put(put);
696    wait(row, target, false);
697  }
698
699  private void loadAndValidateHFileReplication(String testName, int masterNumber,
700    int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges,
701    int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception {
702    HBaseTestingUtility util = utilities[masterNumber];
703
704    Path dir = util.getDataTestDirOnTestFS(testName);
705    FileSystem fs = util.getTestFileSystem();
706    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
707    Path familyDir = new Path(dir, Bytes.toString(fam));
708
709    int hfileIdx = 0;
710    for (byte[][] range : hfileRanges) {
711      byte[] from = range[0];
712      byte[] to = range[1];
713      HFileTestUtil.createHFile(util.getConfiguration(), fs,
714        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
715    }
716
717    Table source = tables[masterNumber];
718    final TableName tableName = source.getName();
719    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
720    String[] args = { dir.toString(), tableName.toString() };
721    loader.run(args);
722
723    if (toValidate) {
724      for (int slaveClusterNumber : slaveNumbers) {
725        wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]);
726      }
727    }
728  }
729
730  private void wait(int slaveNumber, Table target, int expectedCount)
731    throws IOException, InterruptedException {
732    int count = 0;
733    for (int i = 0; i < NB_RETRIES; i++) {
734      if (i == NB_RETRIES - 1) {
735        fail("Waited too much time for bulkloaded data replication. Current count=" + count
736          + ", expected count=" + expectedCount);
737      }
738      count = utilities[slaveNumber].countRows(target);
739      if (count != expectedCount) {
740        LOG.info("Waiting more time for bulkloaded data replication.");
741        Thread.sleep(SLEEP_TIME);
742      } else {
743        break;
744      }
745    }
746  }
747
748  private void wait(byte[] row, Table target, boolean isDeleted) throws Exception {
749    Get get = new Get(row);
750    for (int i = 0; i < NB_RETRIES; i++) {
751      if (i == NB_RETRIES - 1) {
752        fail("Waited too much time for replication. Row:" + Bytes.toString(row)
753          + ". IsDeleteReplication:" + isDeleted);
754      }
755      Result res = target.get(get);
756      boolean sleep = isDeleted ? res.size() > 0 : res.isEmpty();
757      if (sleep) {
758        LOG.info("Waiting for more time for replication. Row:" + Bytes.toString(row)
759          + ". IsDeleteReplication:" + isDeleted);
760        Thread.sleep(SLEEP_TIME);
761      } else {
762        if (!isDeleted) {
763          assertArrayEquals(res.value(), row);
764        }
765        LOG.info("Obtained row:" + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
766        break;
767      }
768    }
769  }
770
771  private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
772    final byte[] row) throws IOException {
773    final Admin admin = utility.getAdmin();
774    final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
775
776    // find the region that corresponds to the given row.
777    HRegion region = null;
778    for (HRegion candidate : cluster.getRegions(table)) {
779      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
780        region = candidate;
781        break;
782      }
783    }
784    assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
785
786    final CountDownLatch latch = new CountDownLatch(1);
787
788    // listen for successful log rolls
789    final WALActionsListener listener = new WALActionsListener() {
790      @Override
791      public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
792        latch.countDown();
793      }
794    };
795    region.getWAL().registerWALActionsListener(listener);
796
797    // request a roll
798    admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(),
799      region.getRegionInfo().getRegionName()));
800
801    // wait
802    try {
803      latch.await();
804    } catch (InterruptedException exception) {
805      LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later "
806        + "replication tests fail, it's probably because we should still be waiting.");
807      Thread.currentThread().interrupt();
808    }
809    region.getWAL().unregisterWALActionsListener(listener);
810  }
811
812  /**
813   * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
814   * timestamp there is otherwise no way to count them.
815   */
816  public static class CoprocessorCounter implements RegionCoprocessor, RegionObserver {
817    private int nCount = 0;
818    private int nDelete = 0;
819
820    @Override
821    public Optional<RegionObserver> getRegionObserver() {
822      return Optional.of(this);
823    }
824
825    @Override
826    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
827      final WALEdit edit, final Durability durability) throws IOException {
828      nCount++;
829    }
830
831    @Override
832    public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
833      final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
834      nDelete++;
835    }
836
837    @Override
838    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c, final Get get,
839      final List<Cell> result) throws IOException {
840      if (get.getAttribute("count") != null) {
841        result.clear();
842        // order is important!
843        result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete)));
844        result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount)));
845        c.bypass();
846      }
847    }
848  }
849
850}