001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.fail;
024import java.io.Closeable;
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.List;
028import java.util.Optional;
029import java.util.Random;
030import java.util.concurrent.CountDownLatch;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.DoNotRetryIOException;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Admin;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
045import org.apache.hadoop.hbase.client.Connection;
046import org.apache.hadoop.hbase.client.ConnectionFactory;
047import org.apache.hadoop.hbase.client.Delete;
048import org.apache.hadoop.hbase.client.Durability;
049import org.apache.hadoop.hbase.client.Get;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.Table;
053import org.apache.hadoop.hbase.client.TableDescriptor;
054import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
055import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
056import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
057import org.apache.hadoop.hbase.coprocessor.ObserverContext;
058import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
059import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
060import org.apache.hadoop.hbase.coprocessor.RegionObserver;
061import org.apache.hadoop.hbase.regionserver.HRegion;
062import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
063import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
064import org.apache.hadoop.hbase.testclassification.LargeTests;
065import org.apache.hadoop.hbase.testclassification.ReplicationTests;
066import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.hadoop.hbase.util.HFileTestUtil;
069import org.apache.hadoop.hbase.wal.WALEdit;
070import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
071import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
072import org.junit.After;
073import org.junit.Assert;
074import org.junit.Before;
075import org.junit.ClassRule;
076import org.junit.Test;
077import org.junit.experimental.categories.Category;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081@Category({ReplicationTests.class, LargeTests.class})
082public class TestMasterReplication {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086      HBaseClassTestRule.forClass(TestMasterReplication.class);
087
088  private static final Logger LOG = LoggerFactory.getLogger(TestMasterReplication.class);
089
090  private Configuration baseConfiguration;
091
092  private HBaseTestingUtil[] utilities;
093  private Configuration[] configurations;
094  private MiniZooKeeperCluster miniZK;
095
096  private static final long SLEEP_TIME = 1000;
097  private static final int NB_RETRIES = 120;
098
099  private static final TableName tableName = TableName.valueOf("test");
100  private static final byte[] famName = Bytes.toBytes("f");
101  private static final byte[] famName1 = Bytes.toBytes("f1");
102  private static final byte[] row = Bytes.toBytes("row");
103  private static final byte[] row1 = Bytes.toBytes("row1");
104  private static final byte[] row2 = Bytes.toBytes("row2");
105  private static final byte[] row3 = Bytes.toBytes("row3");
106  private static final byte[] row4 = Bytes.toBytes("row4");
107  private static final byte[] noRepfamName = Bytes.toBytes("norep");
108
109  private static final byte[] count = Bytes.toBytes("count");
110  private static final byte[] put = Bytes.toBytes("put");
111  private static final byte[] delete = Bytes.toBytes("delete");
112
113  private TableDescriptor table;
114
115  @Before
116  public void setUp() throws Exception {
117    baseConfiguration = HBaseConfiguration.create();
118    // smaller block size and capacity to trigger more operations
119    // and test them
120    baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
121    baseConfiguration.setInt("replication.source.size.capacity", 1024);
122    baseConfiguration.setLong("replication.source.sleepforretries", 100);
123    baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
124    baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
125    baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
126    baseConfiguration.set("hbase.replication.source.fs.conf.provider",
127      TestSourceFSConfigurationProvider.class.getCanonicalName());
128    baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
129    baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
130    baseConfiguration.setStrings(
131        CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
132        CoprocessorCounter.class.getName());
133    table = TableDescriptorBuilder.newBuilder(tableName)
134        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
135            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
136        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName1)
137            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
138        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
139  }
140
141  /**
142   * It tests the replication scenario involving 0 -> 1 -> 0. It does it by
143   * adding and deleting a row to a table in each cluster, checking if it's
144   * replicated. It also tests that the puts and deletes are not replicated back
145   * to the originating cluster.
146   */
147  @Test
148  public void testCyclicReplication1() throws Exception {
149    LOG.info("testSimplePutDelete");
150    int numClusters = 2;
151    Table[] htables = null;
152    try {
153      htables = setUpClusterTablesAndPeers(numClusters);
154
155      int[] expectedCounts = new int[] { 2, 2 };
156
157      // add rows to both clusters,
158      // make sure they are both replication
159      putAndWait(row, famName, htables[0], htables[1]);
160      putAndWait(row1, famName, htables[1], htables[0]);
161      validateCounts(htables, put, expectedCounts);
162
163      deleteAndWait(row, htables[0], htables[1]);
164      deleteAndWait(row1, htables[1], htables[0]);
165      validateCounts(htables, delete, expectedCounts);
166    } finally {
167      close(htables);
168      shutDownMiniClusters();
169    }
170  }
171
172  /**
173   * Tests the replication scenario 0 -> 0. By default
174   * {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint},
175   * the replication peer should not be added.
176   */
177  @Test(expected = DoNotRetryIOException.class)
178  public void testLoopedReplication()
179    throws Exception {
180    LOG.info("testLoopedReplication");
181    startMiniClusters(1);
182    createTableOnClusters(table);
183    addPeer("1", 0, 0);
184  }
185
186  /**
187   * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
188   * HFiles to a table in each cluster, checking if it's replicated.
189   */
190  @Test
191  public void testHFileCyclicReplication() throws Exception {
192    LOG.info("testHFileCyclicReplication");
193    int numClusters = 2;
194    Table[] htables = null;
195    try {
196      htables = setUpClusterTablesAndPeers(numClusters);
197
198      // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
199      // to cluster '1'.
200      byte[][][] hfileRanges =
201        new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
202          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
203      int numOfRows = 100;
204      int[] expectedCounts =
205          new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
206
207      loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row,
208        famName, htables, hfileRanges, numOfRows, expectedCounts, true);
209
210      // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated
211      // to cluster '0'.
212      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
213        new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
214      numOfRows = 200;
215      int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
216        hfileRanges.length * numOfRows + expectedCounts[1] };
217
218      loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row,
219        famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
220
221    } finally {
222      close(htables);
223      shutDownMiniClusters();
224    }
225  }
226
227  private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception {
228    Table[] htables;
229    startMiniClusters(numClusters);
230    createTableOnClusters(table);
231
232    htables = getHTablesOnClusters(tableName);
233    // Test the replication scenarios of 0 -> 1 -> 0
234    addPeer("1", 0, 1);
235    addPeer("1", 1, 0);
236    return htables;
237  }
238
239  /**
240   * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a
241   * table in each clusters and ensuring that the each of these clusters get the appropriate
242   * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits
243   * originating from itself and also the edits that it received using replication from a different
244   * cluster. The scenario is explained in HBASE-9158
245   */
246  @Test
247  public void testCyclicReplication2() throws Exception {
248    LOG.info("testCyclicReplication2");
249    int numClusters = 3;
250    Table[] htables = null;
251    try {
252      startMiniClusters(numClusters);
253      createTableOnClusters(table);
254
255      // Test the replication scenario of 0 -> 1 -> 2 -> 0
256      addPeer("1", 0, 1);
257      addPeer("1", 1, 2);
258      addPeer("1", 2, 0);
259
260      htables = getHTablesOnClusters(tableName);
261
262      // put "row" and wait 'til it got around
263      putAndWait(row, famName, htables[0], htables[2]);
264      putAndWait(row1, famName, htables[1], htables[0]);
265      putAndWait(row2, famName, htables[2], htables[1]);
266
267      deleteAndWait(row, htables[0], htables[2]);
268      deleteAndWait(row1, htables[1], htables[0]);
269      deleteAndWait(row2, htables[2], htables[1]);
270
271      int[] expectedCounts = new int[] { 3, 3, 3 };
272      validateCounts(htables, put, expectedCounts);
273      validateCounts(htables, delete, expectedCounts);
274
275      // Test HBASE-9158
276      disablePeer("1", 2);
277      // we now have an edit that was replicated into cluster originating from
278      // cluster 0
279      putAndWait(row3, famName, htables[0], htables[1]);
280      // now add a local edit to cluster 1
281      htables[1].put(new Put(row4).addColumn(famName, row4, row4));
282      // re-enable replication from cluster 2 to cluster 0
283      enablePeer("1", 2);
284      // without HBASE-9158 the edit for row4 would have been marked with
285      // cluster 0's id
286      // and hence not replicated to cluster 0
287      wait(row4, htables[0], false);
288    } finally {
289      close(htables);
290      shutDownMiniClusters();
291    }
292  }
293
294  /**
295   * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk
296   * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers.
297   */
298  @Test
299  public void testHFileMultiSlaveReplication() throws Exception {
300    LOG.info("testHFileMultiSlaveReplication");
301    int numClusters = 3;
302    Table[] htables = null;
303    try {
304      startMiniClusters(numClusters);
305      createTableOnClusters(table);
306
307      // Add a slave, 0 -> 1
308      addPeer("1", 0, 1);
309
310      htables = getHTablesOnClusters(tableName);
311
312      // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
313      // to cluster '1'.
314      byte[][][] hfileRanges =
315        new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") },
316          new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, };
317      int numOfRows = 100;
318
319      int[] expectedCounts =
320        new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
321
322      loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row,
323        famName, htables, hfileRanges, numOfRows, expectedCounts, true);
324
325      // Validate data is not replicated to cluster '2'.
326      assertEquals(0, utilities[2].countRows(htables[2]));
327
328      rollWALAndWait(utilities[0], htables[0].getName(), row);
329
330      // Add one more slave, 0 -> 2
331      addPeer("2", 0, 2);
332
333      // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated
334      // to cluster '1' and '2'. Previous data should be replicated to cluster '2'.
335      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") },
336        new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, };
337      numOfRows = 200;
338
339      int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
340        hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows };
341
342      loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row,
343        famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
344
345    } finally {
346      close(htables);
347      shutDownMiniClusters();
348    }
349  }
350
351  /**
352   * It tests the bulk loaded hfile replication scenario to only explicitly specified table column
353   * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
354   * only one CF data to replicate.
355   */
356  @Test
357  public void testHFileReplicationForConfiguredTableCfs() throws Exception {
358    LOG.info("testHFileReplicationForConfiguredTableCfs");
359    int numClusters = 2;
360    Table[] htables = null;
361    try {
362      startMiniClusters(numClusters);
363      createTableOnClusters(table);
364
365      htables = getHTablesOnClusters(tableName);
366      // Test the replication scenarios only 'f' is configured for table data replication not 'f1'
367      addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName));
368
369      // Load 100 rows for each hfile range in cluster '0' for table CF 'f'
370      byte[][][] hfileRanges =
371        new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
372          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
373      int numOfRows = 100;
374      int[] expectedCounts =
375          new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
376
377      loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables,
378        hfileRanges, numOfRows, expectedCounts, true);
379
380      // Load 100 rows for each hfile range in cluster '0' for table CF 'f1'
381      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
382        new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
383      numOfRows = 100;
384
385      int[] newExpectedCounts =
386        new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] };
387
388      loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables,
389        hfileRanges, numOfRows, newExpectedCounts, false);
390
391      // Validate data replication for CF 'f1'
392
393      // Source cluster table should contain data for the families
394      wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]);
395
396      // Sleep for enough time so that the data is still not replicated for the CF which is not
397      // configured for replication
398      Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME);
399      // Peer cluster should have only configured CF data
400      wait(1, htables[1], expectedCounts[1]);
401    } finally {
402      close(htables);
403      shutDownMiniClusters();
404    }
405  }
406
407  /**
408   * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
409   */
410  @Test
411  public void testCyclicReplication3() throws Exception {
412    LOG.info("testCyclicReplication2");
413    int numClusters = 3;
414    Table[] htables = null;
415    try {
416      startMiniClusters(numClusters);
417      createTableOnClusters(table);
418
419      // Test the replication scenario of 0 -> 1 -> 2 -> 1
420      addPeer("1", 0, 1);
421      addPeer("1", 1, 2);
422      addPeer("1", 2, 1);
423
424      htables = getHTablesOnClusters(tableName);
425
426      // put "row" and wait 'til it got around
427      putAndWait(row, famName, htables[0], htables[2]);
428      putAndWait(row1, famName, htables[1], htables[2]);
429      putAndWait(row2, famName, htables[2], htables[1]);
430
431      deleteAndWait(row, htables[0], htables[2]);
432      deleteAndWait(row1, htables[1], htables[2]);
433      deleteAndWait(row2, htables[2], htables[1]);
434
435      int[] expectedCounts = new int[] { 1, 3, 3 };
436      validateCounts(htables, put, expectedCounts);
437      validateCounts(htables, delete, expectedCounts);
438    } finally {
439      close(htables);
440      shutDownMiniClusters();
441    }
442  }
443
444  /**
445   * Tests that base replication peer configs are applied on peer creation
446   * and the configs are overriden if updated as part of updateReplicationPeerConfig()
447   *
448   */
449  @Test
450  public void testBasePeerConfigsForReplicationPeer()
451    throws Exception {
452    LOG.info("testBasePeerConfigsForPeerMutations");
453    String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
454    String firstCustomPeerConfigValue = "test";
455    String firstCustomPeerConfigUpdatedValue = "test_updated";
456
457    String secondCustomPeerConfigKey = "hbase.xxx.custom_second_config";
458    String secondCustomPeerConfigValue = "testSecond";
459    String secondCustomPeerConfigUpdatedValue = "testSecondUpdated";
460    try {
461      baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
462        firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue));
463      startMiniClusters(2);
464      addPeer("1", 0, 1);
465      addPeer("2", 0, 1);
466      Admin admin = utilities[0].getAdmin();
467
468      // Validates base configs 1 is present for both peer.
469      Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
470        getConfiguration().get(firstCustomPeerConfigKey));
471      Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
472        getConfiguration().get(firstCustomPeerConfigKey));
473
474      // override value of configuration 1 for peer "1".
475      ReplicationPeerConfig updatedReplicationConfigForPeer1 = ReplicationPeerConfig.
476        newBuilder(admin.getReplicationPeerConfig("1")).
477        putConfiguration(firstCustomPeerConfigKey, firstCustomPeerConfigUpdatedValue).build();
478
479      // add configuration 2 for peer "2".
480      ReplicationPeerConfig updatedReplicationConfigForPeer2 = ReplicationPeerConfig.
481        newBuilder(admin.getReplicationPeerConfig("2")).
482        putConfiguration(secondCustomPeerConfigKey, secondCustomPeerConfigUpdatedValue).build();
483
484      admin.updateReplicationPeerConfig("1", updatedReplicationConfigForPeer1);
485      admin.updateReplicationPeerConfig("2", updatedReplicationConfigForPeer2);
486
487      // validates configuration is overridden by updateReplicationPeerConfig
488      Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1").
489        getConfiguration().get(firstCustomPeerConfigKey));
490      Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2").
491        getConfiguration().get(secondCustomPeerConfigKey));
492
493      // Add second config to base config and perform restart.
494      utilities[0].getConfiguration().set(ReplicationPeerConfigUtil.
495        HBASE_REPLICATION_PEER_BASE_CONFIG, firstCustomPeerConfigKey.concat("=").
496        concat(firstCustomPeerConfigValue).concat(";").concat(secondCustomPeerConfigKey)
497        .concat("=").concat(secondCustomPeerConfigValue));
498
499      utilities[0].shutdownMiniHBaseCluster();
500      utilities[0].restartHBaseCluster(1);
501      admin = utilities[0].getAdmin();
502
503      // Configurations should be updated after restart again
504      Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
505        getConfiguration().get(firstCustomPeerConfigKey));
506      Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
507        getConfiguration().get(firstCustomPeerConfigKey));
508
509      Assert.assertEquals(secondCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
510        getConfiguration().get(secondCustomPeerConfigKey));
511      Assert.assertEquals(secondCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
512        getConfiguration().get(secondCustomPeerConfigKey));
513    } finally {
514      shutDownMiniClusters();
515      baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
516    }
517  }
518
519  @Test
520  public void testBasePeerConfigsRemovalForReplicationPeer()
521    throws Exception {
522    LOG.info("testBasePeerConfigsForPeerMutations");
523    String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
524    String firstCustomPeerConfigValue = "test";
525
526    try {
527      baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
528        firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue));
529      startMiniClusters(2);
530      addPeer("1", 0, 1);
531      Admin admin = utilities[0].getAdmin();
532
533      // Validates base configs 1 is present for both peer.
534      Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
535        getConfiguration().get(firstCustomPeerConfigKey));
536
537      utilities[0].getConfiguration().unset(ReplicationPeerConfigUtil.
538        HBASE_REPLICATION_PEER_BASE_CONFIG);
539      utilities[0].getConfiguration().set(ReplicationPeerConfigUtil.
540        HBASE_REPLICATION_PEER_BASE_CONFIG, firstCustomPeerConfigKey.concat("=").concat(""));
541
542
543      utilities[0].shutdownMiniHBaseCluster();
544      utilities[0].restartHBaseCluster(1);
545      admin = utilities[0].getAdmin();
546
547      // Configurations should be removed after restart again
548      Assert.assertNull(admin.getReplicationPeerConfig("1")
549        .getConfiguration().get(firstCustomPeerConfigKey));
550    } finally {
551      shutDownMiniClusters();
552      baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
553    }
554  }
555
556  @Test
557  public void testRemoveBasePeerConfigWithoutExistingConfigForReplicationPeer()
558    throws Exception {
559    LOG.info("testBasePeerConfigsForPeerMutations");
560    String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
561
562    try {
563      baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
564        firstCustomPeerConfigKey.concat("=").concat(""));
565      startMiniClusters(2);
566      addPeer("1", 0, 1);
567      Admin admin = utilities[0].getAdmin();
568
569      Assert.assertNull("Config should not be there", admin.getReplicationPeerConfig("1").
570        getConfiguration().get(firstCustomPeerConfigKey));
571    } finally {
572      shutDownMiniClusters();
573      baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
574    }
575  }
576
577  @After
578  public void tearDown() throws IOException {
579    configurations = null;
580    utilities = null;
581  }
582
583  @SuppressWarnings("resource")
584  private void startMiniClusters(int numClusters) throws Exception {
585    Random random = new Random();
586    utilities = new HBaseTestingUtil[numClusters];
587    configurations = new Configuration[numClusters];
588    for (int i = 0; i < numClusters; i++) {
589      Configuration conf = new Configuration(baseConfiguration);
590      conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt());
591      HBaseTestingUtil utility = new HBaseTestingUtil(conf);
592      if (i == 0) {
593        utility.startMiniZKCluster();
594        miniZK = utility.getZkCluster();
595      } else {
596        utility.setZkCluster(miniZK);
597      }
598      utility.startMiniCluster();
599      utilities[i] = utility;
600      configurations[i] = conf;
601      new ZKWatcher(conf, "cluster" + i, null, true);
602    }
603  }
604
605  private void shutDownMiniClusters() throws Exception {
606    int numClusters = utilities.length;
607    for (int i = numClusters - 1; i >= 0; i--) {
608      if (utilities[i] != null) {
609        utilities[i].shutdownMiniCluster();
610      }
611    }
612    miniZK.shutdown();
613  }
614
615  private void createTableOnClusters(TableDescriptor table) throws Exception {
616    for (HBaseTestingUtil utility : utilities) {
617      utility.getAdmin().createTable(table);
618    }
619  }
620
621  private void addPeer(String id, int masterClusterNumber,
622      int slaveClusterNumber) throws Exception {
623    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
624      Admin admin = conn.getAdmin()) {
625      admin.addReplicationPeer(id,
626        ReplicationPeerConfig.newBuilder().
627          setClusterKey(utilities[slaveClusterNumber].getClusterKey()).build());
628    }
629  }
630
631  private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
632      throws Exception {
633    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
634      Admin admin = conn.getAdmin()) {
635      admin.addReplicationPeer(
636        id,
637        ReplicationPeerConfig.newBuilder()
638          .setClusterKey(utilities[slaveClusterNumber].getClusterKey())
639          .setReplicateAllUserTables(false)
640          .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)).build());
641    }
642  }
643
644  private void disablePeer(String id, int masterClusterNumber) throws Exception {
645    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
646      Admin admin = conn.getAdmin()) {
647      admin.disableReplicationPeer(id);
648    }
649  }
650
651  private void enablePeer(String id, int masterClusterNumber) throws Exception {
652    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
653      Admin admin = conn.getAdmin()) {
654      admin.enableReplicationPeer(id);
655    }
656  }
657
658  private void close(Closeable... closeables) {
659    try {
660      if (closeables != null) {
661        for (Closeable closeable : closeables) {
662          closeable.close();
663        }
664      }
665    } catch (Exception e) {
666      LOG.warn("Exception occurred while closing the object:", e);
667    }
668  }
669
670  @SuppressWarnings("resource")
671  private Table[] getHTablesOnClusters(TableName tableName) throws Exception {
672    int numClusters = utilities.length;
673    Table[] htables = new Table[numClusters];
674    for (int i = 0; i < numClusters; i++) {
675      Table htable = ConnectionFactory.createConnection(configurations[i]).getTable(tableName);
676      htables[i] = htable;
677    }
678    return htables;
679  }
680
681  private void validateCounts(Table[] htables, byte[] type,
682      int[] expectedCounts) throws IOException {
683    for (int i = 0; i < htables.length; i++) {
684      assertEquals(Bytes.toString(type) + " were replicated back ",
685          expectedCounts[i], getCount(htables[i], type));
686    }
687  }
688
689  private int getCount(Table t, byte[] type) throws IOException {
690    Get test = new Get(row);
691    test.setAttribute("count", new byte[] {});
692    Result res = t.get(test);
693    return Bytes.toInt(res.getValue(count, type));
694  }
695
696  private void deleteAndWait(byte[] row, Table source, Table target)
697      throws Exception {
698    Delete del = new Delete(row);
699    source.delete(del);
700    wait(row, target, true);
701  }
702
703  private void putAndWait(byte[] row, byte[] fam, Table source, Table target)
704      throws Exception {
705    Put put = new Put(row);
706    put.addColumn(fam, row, row);
707    source.put(put);
708    wait(row, target, false);
709  }
710
711  private void loadAndValidateHFileReplication(String testName, int masterNumber,
712      int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges,
713      int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception {
714    HBaseTestingUtil util = utilities[masterNumber];
715
716    Path dir = util.getDataTestDirOnTestFS(testName);
717    FileSystem fs = util.getTestFileSystem();
718    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
719    Path familyDir = new Path(dir, Bytes.toString(fam));
720
721    int hfileIdx = 0;
722    for (byte[][] range : hfileRanges) {
723      byte[] from = range[0];
724      byte[] to = range[1];
725      HFileTestUtil.createHFile(util.getConfiguration(), fs,
726        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
727    }
728
729    Table source = tables[masterNumber];
730    final TableName tableName = source.getName();
731    BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(tableName, dir);
732
733    if (toValidate) {
734      for (int slaveClusterNumber : slaveNumbers) {
735        wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]);
736      }
737    }
738  }
739
740  private void wait(int slaveNumber, Table target, int expectedCount)
741      throws IOException, InterruptedException {
742    int count = 0;
743    for (int i = 0; i < NB_RETRIES; i++) {
744      if (i == NB_RETRIES - 1) {
745        fail("Waited too much time for bulkloaded data replication. Current count=" + count
746            + ", expected count=" + expectedCount);
747      }
748      count = utilities[slaveNumber].countRows(target);
749      if (count != expectedCount) {
750        LOG.info("Waiting more time for bulkloaded data replication.");
751        Thread.sleep(SLEEP_TIME);
752      } else {
753        break;
754      }
755    }
756  }
757
758  private void wait(byte[] row, Table target, boolean isDeleted) throws Exception {
759    Get get = new Get(row);
760    for (int i = 0; i < NB_RETRIES; i++) {
761      if (i == NB_RETRIES - 1) {
762        fail("Waited too much time for replication. Row:" + Bytes.toString(row)
763            + ". IsDeleteReplication:" + isDeleted);
764      }
765      Result res = target.get(get);
766      boolean sleep = isDeleted ? res.size() > 0 : res.isEmpty();
767      if (sleep) {
768        LOG.info("Waiting for more time for replication. Row:"
769            + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
770        Thread.sleep(SLEEP_TIME);
771      } else {
772        if (!isDeleted) {
773          assertArrayEquals(res.value(), row);
774        }
775        LOG.info("Obtained row:"
776            + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
777        break;
778      }
779    }
780  }
781
782  private void rollWALAndWait(final HBaseTestingUtil utility, final TableName table,
783      final byte[] row) throws IOException {
784    final Admin admin = utility.getAdmin();
785    final SingleProcessHBaseCluster cluster = utility.getMiniHBaseCluster();
786
787    // find the region that corresponds to the given row.
788    HRegion region = null;
789    for (HRegion candidate : cluster.getRegions(table)) {
790      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
791        region = candidate;
792        break;
793      }
794    }
795    assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
796
797    final CountDownLatch latch = new CountDownLatch(1);
798
799    // listen for successful log rolls
800    final WALActionsListener listener = new WALActionsListener() {
801      @Override
802      public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
803        latch.countDown();
804      }
805    };
806    region.getWAL().registerWALActionsListener(listener);
807
808    // request a roll
809    admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(),
810      region.getRegionInfo().getRegionName()));
811
812    // wait
813    try {
814      latch.await();
815    } catch (InterruptedException exception) {
816      LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
817          "replication tests fail, it's probably because we should still be waiting.");
818      Thread.currentThread().interrupt();
819    }
820    region.getWAL().unregisterWALActionsListener(listener);
821  }
822
823  /**
824   * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
825   * timestamp there is otherwise no way to count them.
826   */
827  public static class CoprocessorCounter implements RegionCoprocessor, RegionObserver {
828    private int nCount = 0;
829    private int nDelete = 0;
830
831    @Override
832    public Optional<RegionObserver> getRegionObserver() {
833      return Optional.of(this);
834    }
835
836    @Override
837    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
838        final WALEdit edit, final Durability durability) throws IOException {
839      nCount++;
840    }
841
842    @Override
843    public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
844        final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
845      nDelete++;
846    }
847
848    @Override
849    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
850        final Get get, final List<Cell> result) throws IOException {
851      if (get.getAttribute("count") != null) {
852        result.clear();
853        // order is important!
854        result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete)));
855        result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount)));
856        c.bypass();
857      }
858    }
859  }
860
861}