001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertNull;
024import static org.junit.jupiter.api.Assertions.assertThrows;
025import static org.junit.jupiter.api.Assertions.fail;
026
027import java.io.Closeable;
028import java.io.IOException;
029import java.util.Arrays;
030import java.util.List;
031import java.util.Optional;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.ThreadLocalRandom;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.DoNotRetryIOException;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.Admin;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.Connection;
048import org.apache.hadoop.hbase.client.ConnectionFactory;
049import org.apache.hadoop.hbase.client.Delete;
050import org.apache.hadoop.hbase.client.Durability;
051import org.apache.hadoop.hbase.client.Get;
052import org.apache.hadoop.hbase.client.Put;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
057import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
058import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
059import org.apache.hadoop.hbase.coprocessor.ObserverContext;
060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
062import org.apache.hadoop.hbase.coprocessor.RegionObserver;
063import org.apache.hadoop.hbase.regionserver.HRegion;
064import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
065import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
066import org.apache.hadoop.hbase.testclassification.LargeTests;
067import org.apache.hadoop.hbase.testclassification.ReplicationTests;
068import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.HFileTestUtil;
071import org.apache.hadoop.hbase.wal.WALEdit;
072import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
073import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
074import org.junit.jupiter.api.AfterEach;
075import org.junit.jupiter.api.BeforeEach;
076import org.junit.jupiter.api.Tag;
077import org.junit.jupiter.api.Test;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081@Tag(ReplicationTests.TAG)
082@Tag(LargeTests.TAG)
083public class TestMasterReplication {
084
085  private static final Logger LOG = LoggerFactory.getLogger(TestMasterReplication.class);
086
087  private Configuration baseConfiguration;
088
089  private HBaseTestingUtil[] utilities;
090  private Configuration[] configurations;
091  private MiniZooKeeperCluster miniZK;
092
093  private static final long SLEEP_TIME = 1000;
094  private static final int NB_RETRIES = 120;
095
096  private static final TableName tableName = TableName.valueOf("test");
097  private static final byte[] famName = Bytes.toBytes("f");
098  private static final byte[] famName1 = Bytes.toBytes("f1");
099  private static final byte[] row = Bytes.toBytes("row");
100  private static final byte[] row1 = Bytes.toBytes("row1");
101  private static final byte[] row2 = Bytes.toBytes("row2");
102  private static final byte[] row3 = Bytes.toBytes("row3");
103  private static final byte[] row4 = Bytes.toBytes("row4");
104  private static final byte[] noRepfamName = Bytes.toBytes("norep");
105
106  private static final byte[] count = Bytes.toBytes("count");
107  private static final byte[] put = Bytes.toBytes("put");
108  private static final byte[] delete = Bytes.toBytes("delete");
109
110  private TableDescriptor table;
111
112  @BeforeEach
113  public void setUp() throws Exception {
114    baseConfiguration = HBaseConfiguration.create();
115    // smaller block size and capacity to trigger more operations
116    // and test them
117    baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
118    baseConfiguration.setInt("replication.source.size.capacity", 1024);
119    baseConfiguration.setLong("replication.source.sleepforretries", 100);
120    baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
121    baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
122    baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
123    baseConfiguration.set("hbase.replication.source.fs.conf.provider",
124      TestSourceFSConfigurationProvider.class.getCanonicalName());
125    baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
126    baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
127    baseConfiguration.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
128      CoprocessorCounter.class.getName());
129    table = TableDescriptorBuilder.newBuilder(tableName)
130      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
131        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
132      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName1)
133        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
134      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
135  }
136
137  /**
138   * It tests the replication scenario involving 0 -> 1 -> 0. It does it by adding and deleting a
139   * row to a table in each cluster, checking if it's replicated. It also tests that the puts and
140   * deletes are not replicated back to the originating cluster.
141   */
142  @Test
143  public void testCyclicReplication1() throws Exception {
144    LOG.info("testSimplePutDelete");
145    int numClusters = 2;
146    Table[] htables = null;
147    try {
148      htables = setUpClusterTablesAndPeers(numClusters);
149
150      int[] expectedCounts = new int[] { 2, 2 };
151
152      // add rows to both clusters,
153      // make sure they are both replication
154      putAndWait(row, famName, htables[0], htables[1]);
155      putAndWait(row1, famName, htables[1], htables[0]);
156      validateCounts(htables, put, expectedCounts);
157
158      deleteAndWait(row, htables[0], htables[1]);
159      deleteAndWait(row1, htables[1], htables[0]);
160      validateCounts(htables, delete, expectedCounts);
161    } finally {
162      close(htables);
163      shutDownMiniClusters();
164    }
165  }
166
167  /**
168   * Tests the replication scenario 0 -> 0. By default
169   * {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint},
170   * the replication peer should not be added.
171   */
172  @Test
173  public void testLoopedReplication() throws Exception {
174    LOG.info("testLoopedReplication");
175    startMiniClusters(1);
176    createTableOnClusters(table);
177    assertThrows(DoNotRetryIOException.class, () -> addPeer("1", 0, 0));
178  }
179
180  /**
181   * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
182   * HFiles to a table in each cluster, checking if it's replicated.
183   */
184  @Test
185  public void testHFileCyclicReplication() throws Exception {
186    LOG.info("testHFileCyclicReplication");
187    int numClusters = 2;
188    Table[] htables = null;
189    try {
190      htables = setUpClusterTablesAndPeers(numClusters);
191
192      // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
193      // to cluster '1'.
194      byte[][][] hfileRanges =
195        new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
196          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
197      int numOfRows = 100;
198      int[] expectedCounts =
199        new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
200
201      loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row,
202        famName, htables, hfileRanges, numOfRows, expectedCounts, true);
203
204      // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated
205      // to cluster '0'.
206      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
207        new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
208      numOfRows = 200;
209      int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
210        hfileRanges.length * numOfRows + expectedCounts[1] };
211
212      loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row,
213        famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
214
215    } finally {
216      close(htables);
217      shutDownMiniClusters();
218    }
219  }
220
221  private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception {
222    Table[] htables;
223    startMiniClusters(numClusters);
224    createTableOnClusters(table);
225
226    htables = getHTablesOnClusters(tableName);
227    // Test the replication scenarios of 0 -> 1 -> 0
228    addPeer("1", 0, 1);
229    addPeer("1", 1, 0);
230    return htables;
231  }
232
233  /**
234   * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a
235   * table in each clusters and ensuring that the each of these clusters get the appropriate
236   * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits
237   * originating from itself and also the edits that it received using replication from a different
238   * cluster. The scenario is explained in HBASE-9158
239   */
240  @Test
241  public void testCyclicReplication2() throws Exception {
242    LOG.info("testCyclicReplication2");
243    int numClusters = 3;
244    Table[] htables = null;
245    try {
246      startMiniClusters(numClusters);
247      createTableOnClusters(table);
248
249      // Test the replication scenario of 0 -> 1 -> 2 -> 0
250      addPeer("1", 0, 1);
251      addPeer("1", 1, 2);
252      addPeer("1", 2, 0);
253
254      htables = getHTablesOnClusters(tableName);
255
256      // put "row" and wait 'til it got around
257      putAndWait(row, famName, htables[0], htables[2]);
258      putAndWait(row1, famName, htables[1], htables[0]);
259      putAndWait(row2, famName, htables[2], htables[1]);
260
261      deleteAndWait(row, htables[0], htables[2]);
262      deleteAndWait(row1, htables[1], htables[0]);
263      deleteAndWait(row2, htables[2], htables[1]);
264
265      int[] expectedCounts = new int[] { 3, 3, 3 };
266      validateCounts(htables, put, expectedCounts);
267      validateCounts(htables, delete, expectedCounts);
268
269      // Test HBASE-9158
270      disablePeer("1", 2);
271      // we now have an edit that was replicated into cluster originating from
272      // cluster 0
273      putAndWait(row3, famName, htables[0], htables[1]);
274      // now add a local edit to cluster 1
275      htables[1].put(new Put(row4).addColumn(famName, row4, row4));
276      // re-enable replication from cluster 2 to cluster 0
277      enablePeer("1", 2);
278      // without HBASE-9158 the edit for row4 would have been marked with
279      // cluster 0's id
280      // and hence not replicated to cluster 0
281      wait(row4, htables[0], false);
282    } finally {
283      close(htables);
284      shutDownMiniClusters();
285    }
286  }
287
288  /**
289   * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk
290   * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers.
291   */
292  @Test
293  public void testHFileMultiSlaveReplication() throws Exception {
294    LOG.info("testHFileMultiSlaveReplication");
295    int numClusters = 3;
296    Table[] htables = null;
297    try {
298      startMiniClusters(numClusters);
299      createTableOnClusters(table);
300
301      // Add a slave, 0 -> 1
302      addPeer("1", 0, 1);
303
304      htables = getHTablesOnClusters(tableName);
305
306      // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
307      // to cluster '1'.
308      byte[][][] hfileRanges =
309        new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") },
310          new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, };
311      int numOfRows = 100;
312
313      int[] expectedCounts =
314        new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
315
316      loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row,
317        famName, htables, hfileRanges, numOfRows, expectedCounts, true);
318
319      // Validate data is not replicated to cluster '2'.
320      assertEquals(0, HBaseTestingUtil.countRows(htables[2]));
321
322      rollWALAndWait(utilities[0], htables[0].getName(), row);
323
324      // Add one more slave, 0 -> 2
325      addPeer("2", 0, 2);
326
327      // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated
328      // to cluster '1' and '2'. Previous data should be replicated to cluster '2'.
329      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") },
330        new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, };
331      numOfRows = 200;
332
333      int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
334        hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows };
335
336      loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row,
337        famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
338
339    } finally {
340      close(htables);
341      shutDownMiniClusters();
342    }
343  }
344
345  /**
346   * It tests the bulk loaded hfile replication scenario to only explicitly specified table column
347   * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
348   * only one CF data to replicate.
349   */
350  @Test
351  public void testHFileReplicationForConfiguredTableCfs() throws Exception {
352    LOG.info("testHFileReplicationForConfiguredTableCfs");
353    int numClusters = 2;
354    Table[] htables = null;
355    try {
356      startMiniClusters(numClusters);
357      createTableOnClusters(table);
358
359      htables = getHTablesOnClusters(tableName);
360      // Test the replication scenarios only 'f' is configured for table data replication not 'f1'
361      addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName));
362
363      // Load 100 rows for each hfile range in cluster '0' for table CF 'f'
364      byte[][][] hfileRanges =
365        new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
366          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
367      int numOfRows = 100;
368      int[] expectedCounts =
369        new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
370
371      loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables,
372        hfileRanges, numOfRows, expectedCounts, true);
373
374      // Load 100 rows for each hfile range in cluster '0' for table CF 'f1'
375      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
376        new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
377      numOfRows = 100;
378
379      int[] newExpectedCounts =
380        new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] };
381
382      loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables,
383        hfileRanges, numOfRows, newExpectedCounts, false);
384
385      // Validate data replication for CF 'f1'
386
387      // Source cluster table should contain data for the families
388      wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]);
389
390      // Sleep for enough time so that the data is still not replicated for the CF which is not
391      // configured for replication
392      Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME);
393      // Peer cluster should have only configured CF data
394      wait(1, htables[1], expectedCounts[1]);
395    } finally {
396      close(htables);
397      shutDownMiniClusters();
398    }
399  }
400
401  /**
402   * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
403   */
404  @Test
405  public void testCyclicReplication3() throws Exception {
406    LOG.info("testCyclicReplication2");
407    int numClusters = 3;
408    Table[] htables = null;
409    try {
410      startMiniClusters(numClusters);
411      createTableOnClusters(table);
412
413      // Test the replication scenario of 0 -> 1 -> 2 -> 1
414      addPeer("1", 0, 1);
415      addPeer("1", 1, 2);
416      addPeer("1", 2, 1);
417
418      htables = getHTablesOnClusters(tableName);
419
420      // put "row" and wait 'til it got around
421      putAndWait(row, famName, htables[0], htables[2]);
422      putAndWait(row1, famName, htables[1], htables[2]);
423      putAndWait(row2, famName, htables[2], htables[1]);
424
425      deleteAndWait(row, htables[0], htables[2]);
426      deleteAndWait(row1, htables[1], htables[2]);
427      deleteAndWait(row2, htables[2], htables[1]);
428
429      int[] expectedCounts = new int[] { 1, 3, 3 };
430      validateCounts(htables, put, expectedCounts);
431      validateCounts(htables, delete, expectedCounts);
432    } finally {
433      close(htables);
434      shutDownMiniClusters();
435    }
436  }
437
438  /**
439   * Tests that base replication peer configs are applied on peer creation and the configs are
440   * overriden if updated as part of updateReplicationPeerConfig()
441   */
442  @Test
443  public void testBasePeerConfigsForReplicationPeer() throws Exception {
444    LOG.info("testBasePeerConfigsForPeerMutations");
445    String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
446    String firstCustomPeerConfigValue = "test";
447    String firstCustomPeerConfigUpdatedValue = "test_updated";
448
449    String secondCustomPeerConfigKey = "hbase.xxx.custom_second_config";
450    String secondCustomPeerConfigValue = "testSecond";
451    String secondCustomPeerConfigUpdatedValue = "testSecondUpdated";
452    try {
453      baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
454        firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue));
455      startMiniClusters(2);
456      addPeer("1", 0, 1);
457      addPeer("2", 0, 1);
458      Admin admin = utilities[0].getAdmin();
459
460      // Validates base configs 1 is present for both peer.
461      assertEquals(firstCustomPeerConfigValue,
462        admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey));
463      assertEquals(firstCustomPeerConfigValue,
464        admin.getReplicationPeerConfig("2").getConfiguration().get(firstCustomPeerConfigKey));
465
466      // override value of configuration 1 for peer "1".
467      ReplicationPeerConfig updatedReplicationConfigForPeer1 =
468        ReplicationPeerConfig.newBuilder(admin.getReplicationPeerConfig("1"))
469          .putConfiguration(firstCustomPeerConfigKey, firstCustomPeerConfigUpdatedValue).build();
470
471      // add configuration 2 for peer "2".
472      ReplicationPeerConfig updatedReplicationConfigForPeer2 =
473        ReplicationPeerConfig.newBuilder(admin.getReplicationPeerConfig("2"))
474          .putConfiguration(secondCustomPeerConfigKey, secondCustomPeerConfigUpdatedValue).build();
475
476      admin.updateReplicationPeerConfig("1", updatedReplicationConfigForPeer1);
477      admin.updateReplicationPeerConfig("2", updatedReplicationConfigForPeer2);
478
479      // validates configuration is overridden by updateReplicationPeerConfig
480      assertEquals(firstCustomPeerConfigUpdatedValue,
481        admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey));
482      assertEquals(secondCustomPeerConfigUpdatedValue,
483        admin.getReplicationPeerConfig("2").getConfiguration().get(secondCustomPeerConfigKey));
484
485      // Add second config to base config and perform restart.
486      utilities[0].getConfiguration().set(
487        ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
488        firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue).concat(";")
489          .concat(secondCustomPeerConfigKey).concat("=").concat(secondCustomPeerConfigValue));
490
491      utilities[0].shutdownMiniHBaseCluster();
492      utilities[0].restartHBaseCluster(1);
493      admin = utilities[0].getAdmin();
494
495      // Configurations should be updated after restart again
496      assertEquals(firstCustomPeerConfigValue,
497        admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey));
498      assertEquals(firstCustomPeerConfigValue,
499        admin.getReplicationPeerConfig("2").getConfiguration().get(firstCustomPeerConfigKey));
500
501      assertEquals(secondCustomPeerConfigValue,
502        admin.getReplicationPeerConfig("1").getConfiguration().get(secondCustomPeerConfigKey));
503      assertEquals(secondCustomPeerConfigValue,
504        admin.getReplicationPeerConfig("2").getConfiguration().get(secondCustomPeerConfigKey));
505    } finally {
506      shutDownMiniClusters();
507      baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
508    }
509  }
510
511  @Test
512  public void testBasePeerConfigsRemovalForReplicationPeer() throws Exception {
513    LOG.info("testBasePeerConfigsForPeerMutations");
514    String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
515    String firstCustomPeerConfigValue = "test";
516
517    try {
518      baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
519        firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue));
520      startMiniClusters(2);
521      addPeer("1", 0, 1);
522      Admin admin = utilities[0].getAdmin();
523
524      // Validates base configs 1 is present for both peer.
525      assertEquals(firstCustomPeerConfigValue,
526        admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey));
527
528      utilities[0].getConfiguration()
529        .unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
530      utilities[0].getConfiguration().set(
531        ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
532        firstCustomPeerConfigKey.concat("=").concat(""));
533
534      utilities[0].shutdownMiniHBaseCluster();
535      utilities[0].restartHBaseCluster(1);
536      admin = utilities[0].getAdmin();
537
538      // Configurations should be removed after restart again
539      assertNull(
540        admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey));
541    } finally {
542      shutDownMiniClusters();
543      baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
544    }
545  }
546
547  @Test
548  public void testRemoveBasePeerConfigWithoutExistingConfigForReplicationPeer() throws Exception {
549    LOG.info("testBasePeerConfigsForPeerMutations");
550    String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
551
552    try {
553      baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
554        firstCustomPeerConfigKey.concat("=").concat(""));
555      startMiniClusters(2);
556      addPeer("1", 0, 1);
557      Admin admin = utilities[0].getAdmin();
558
559      assertNull(
560        admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey),
561        "Config should not be there");
562    } finally {
563      shutDownMiniClusters();
564      baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
565    }
566  }
567
568  @AfterEach
569  public void tearDown() throws IOException {
570    configurations = null;
571    utilities = null;
572  }
573
574  @SuppressWarnings("resource")
575  private void startMiniClusters(int numClusters) throws Exception {
576    utilities = new HBaseTestingUtil[numClusters];
577    configurations = new Configuration[numClusters];
578    for (int i = 0; i < numClusters; i++) {
579      Configuration conf = new Configuration(baseConfiguration);
580      conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + ThreadLocalRandom.current().nextInt());
581      HBaseTestingUtil utility = new HBaseTestingUtil(conf);
582      if (i == 0) {
583        utility.startMiniZKCluster();
584        miniZK = utility.getZkCluster();
585      } else {
586        utility.setZkCluster(miniZK);
587      }
588      utility.startMiniCluster();
589      utilities[i] = utility;
590      configurations[i] = conf;
591      new ZKWatcher(conf, "cluster" + i, null, true);
592    }
593  }
594
595  private void shutDownMiniClusters() throws Exception {
596    int numClusters = utilities.length;
597    for (int i = numClusters - 1; i >= 0; i--) {
598      if (utilities[i] != null) {
599        utilities[i].shutdownMiniCluster();
600      }
601    }
602    miniZK.shutdown();
603  }
604
605  private void createTableOnClusters(TableDescriptor table) throws Exception {
606    for (HBaseTestingUtil utility : utilities) {
607      utility.getAdmin().createTable(table);
608    }
609  }
610
611  private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber)
612    throws Exception {
613    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
614      Admin admin = conn.getAdmin()) {
615      admin.addReplicationPeer(id, ReplicationPeerConfig.newBuilder()
616        .setClusterKey(utilities[slaveClusterNumber].getRpcConnnectionURI()).build());
617    }
618  }
619
620  private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
621    throws Exception {
622    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
623      Admin admin = conn.getAdmin()) {
624      admin.addReplicationPeer(id,
625        ReplicationPeerConfig.newBuilder()
626          .setClusterKey(utilities[slaveClusterNumber].getRpcConnnectionURI())
627          .setReplicateAllUserTables(false)
628          .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)).build());
629    }
630  }
631
632  private void disablePeer(String id, int masterClusterNumber) throws Exception {
633    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
634      Admin admin = conn.getAdmin()) {
635      admin.disableReplicationPeer(id);
636    }
637  }
638
639  private void enablePeer(String id, int masterClusterNumber) throws Exception {
640    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
641      Admin admin = conn.getAdmin()) {
642      admin.enableReplicationPeer(id);
643    }
644  }
645
646  private void close(Closeable... closeables) {
647    try {
648      if (closeables != null) {
649        for (Closeable closeable : closeables) {
650          closeable.close();
651        }
652      }
653    } catch (Exception e) {
654      LOG.warn("Exception occurred while closing the object:", e);
655    }
656  }
657
658  @SuppressWarnings("resource")
659  private Table[] getHTablesOnClusters(TableName tableName) throws Exception {
660    int numClusters = utilities.length;
661    Table[] htables = new Table[numClusters];
662    for (int i = 0; i < numClusters; i++) {
663      Table htable = ConnectionFactory.createConnection(configurations[i]).getTable(tableName);
664      htables[i] = htable;
665    }
666    return htables;
667  }
668
669  private void validateCounts(Table[] htables, byte[] type, int[] expectedCounts)
670    throws IOException {
671    for (int i = 0; i < htables.length; i++) {
672      assertEquals(expectedCounts[i], getCount(htables[i], type),
673        Bytes.toString(type) + " were replicated back");
674    }
675  }
676
677  private int getCount(Table t, byte[] type) throws IOException {
678    Get test = new Get(row);
679    test.setAttribute("count", new byte[] {});
680    Result res = t.get(test);
681    return Bytes.toInt(res.getValue(count, type));
682  }
683
684  private void deleteAndWait(byte[] row, Table source, Table target) throws Exception {
685    Delete del = new Delete(row);
686    source.delete(del);
687    wait(row, target, true);
688  }
689
690  private void putAndWait(byte[] row, byte[] fam, Table source, Table target) throws Exception {
691    Put put = new Put(row);
692    put.addColumn(fam, row, row);
693    source.put(put);
694    wait(row, target, false);
695  }
696
697  private void loadAndValidateHFileReplication(String testName, int masterNumber,
698    int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges,
699    int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception {
700    HBaseTestingUtil util = utilities[masterNumber];
701
702    Path dir = util.getDataTestDirOnTestFS(testName);
703    FileSystem fs = util.getTestFileSystem();
704    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
705    Path familyDir = new Path(dir, Bytes.toString(fam));
706
707    int hfileIdx = 0;
708    for (byte[][] range : hfileRanges) {
709      byte[] from = range[0];
710      byte[] to = range[1];
711      HFileTestUtil.createHFile(util.getConfiguration(), fs,
712        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
713    }
714
715    Table source = tables[masterNumber];
716    final TableName tableName = source.getName();
717    BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(tableName, dir);
718
719    if (toValidate) {
720      for (int slaveClusterNumber : slaveNumbers) {
721        wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]);
722      }
723    }
724  }
725
726  private void wait(int slaveNumber, Table target, int expectedCount)
727    throws IOException, InterruptedException {
728    int count = 0;
729    for (int i = 0; i < NB_RETRIES; i++) {
730      if (i == NB_RETRIES - 1) {
731        fail("Waited too much time for bulkloaded data replication. Current count=" + count
732          + ", expected count=" + expectedCount);
733      }
734      count = HBaseTestingUtil.countRows(target);
735      if (count != expectedCount) {
736        LOG.info("Waiting more time for bulkloaded data replication.");
737        Thread.sleep(SLEEP_TIME);
738      } else {
739        break;
740      }
741    }
742  }
743
744  private void wait(byte[] row, Table target, boolean isDeleted) throws Exception {
745    Get get = new Get(row);
746    for (int i = 0; i < NB_RETRIES; i++) {
747      if (i == NB_RETRIES - 1) {
748        fail("Waited too much time for replication. Row:" + Bytes.toString(row)
749          + ". IsDeleteReplication:" + isDeleted);
750      }
751      Result res = target.get(get);
752      boolean sleep = isDeleted ? res.size() > 0 : res.isEmpty();
753      if (sleep) {
754        LOG.info("Waiting for more time for replication. Row:" + Bytes.toString(row)
755          + ". IsDeleteReplication:" + isDeleted);
756        Thread.sleep(SLEEP_TIME);
757      } else {
758        if (!isDeleted) {
759          assertArrayEquals(res.value(), row);
760        }
761        LOG.info("Obtained row:" + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
762        break;
763      }
764    }
765  }
766
767  private void rollWALAndWait(final HBaseTestingUtil utility, final TableName table,
768    final byte[] row) throws IOException {
769    final Admin admin = utility.getAdmin();
770    final SingleProcessHBaseCluster cluster = utility.getMiniHBaseCluster();
771
772    // find the region that corresponds to the given row.
773    HRegion region = null;
774    for (HRegion candidate : cluster.getRegions(table)) {
775      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
776        region = candidate;
777        break;
778      }
779    }
780    assertNotNull(region, "Couldn't find the region for row '" + Arrays.toString(row) + "'");
781
782    final CountDownLatch latch = new CountDownLatch(1);
783
784    // listen for successful log rolls
785    final WALActionsListener listener = new WALActionsListener() {
786      @Override
787      public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
788        latch.countDown();
789      }
790    };
791    region.getWAL().registerWALActionsListener(listener);
792
793    // request a roll
794    admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(),
795      region.getRegionInfo().getRegionName()));
796
797    // wait
798    try {
799      latch.await();
800    } catch (InterruptedException exception) {
801      LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later "
802        + "replication tests fail, it's probably because we should still be waiting.");
803      Thread.currentThread().interrupt();
804    }
805    region.getWAL().unregisterWALActionsListener(listener);
806  }
807
808  /**
809   * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
810   * timestamp there is otherwise no way to count them.
811   */
812  public static class CoprocessorCounter implements RegionCoprocessor, RegionObserver {
813    private int nCount = 0;
814    private int nDelete = 0;
815
816    @Override
817    public Optional<RegionObserver> getRegionObserver() {
818      return Optional.of(this);
819    }
820
821    @Override
822    public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
823      final Put put, final WALEdit edit, final Durability durability) throws IOException {
824      nCount++;
825    }
826
827    @Override
828    public void postDelete(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
829      final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
830      nDelete++;
831    }
832
833    @Override
834    public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
835      final Get get, final List<Cell> result) throws IOException {
836      if (get.getAttribute("count") != null) {
837        result.clear();
838        // order is important!
839        result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete)));
840        result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount)));
841        c.bypass();
842      }
843    }
844  }
845
846}