001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.fail;
024
025import java.io.Closeable;
026import java.io.IOException;
027import java.util.Arrays;
028import java.util.EnumSet;
029import java.util.List;
030import java.util.Optional;
031import java.util.Random;
032import java.util.concurrent.CountDownLatch;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.ClusterMetrics;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.MiniHBaseCluster;
044import org.apache.hadoop.hbase.ServerMetrics;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.Waiter;
048import org.apache.hadoop.hbase.client.Admin;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.ConnectionFactory;
051import org.apache.hadoop.hbase.client.Delete;
052import org.apache.hadoop.hbase.client.Durability;
053import org.apache.hadoop.hbase.client.Get;
054import org.apache.hadoop.hbase.client.Put;
055import org.apache.hadoop.hbase.client.Result;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
060import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
061import org.apache.hadoop.hbase.coprocessor.ObserverContext;
062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
063import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
064import org.apache.hadoop.hbase.coprocessor.RegionObserver;
065import org.apache.hadoop.hbase.regionserver.HRegion;
066import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
067import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
068import org.apache.hadoop.hbase.testclassification.LargeTests;
069import org.apache.hadoop.hbase.testclassification.ReplicationTests;
070import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.HFileTestUtil;
073import org.apache.hadoop.hbase.wal.WALEdit;
074import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
075import org.apache.hadoop.hbase.zookeeper.ZKUtil;
076import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
077import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
078import org.junit.After;
079import org.junit.Before;
080import org.junit.ClassRule;
081import org.junit.Test;
082import org.junit.experimental.categories.Category;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086@Category({ReplicationTests.class, LargeTests.class})
087public class TestMasterReplication {
088
089  @ClassRule
090  public static final HBaseClassTestRule CLASS_RULE =
091      HBaseClassTestRule.forClass(TestMasterReplication.class);
092
093  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
094
095  private Configuration baseConfiguration;
096
097  private HBaseTestingUtility[] utilities;
098  private Configuration[] configurations;
099  private MiniZooKeeperCluster miniZK;
100
101  private static final long SLEEP_TIME = 1000;
102  private static final int NB_RETRIES = 120;
103
104  private static final TableName tableName = TableName.valueOf("test");
105  private static final byte[] famName = Bytes.toBytes("f");
106  private static final byte[] famName1 = Bytes.toBytes("f1");
107  private static final byte[] row = Bytes.toBytes("row");
108  private static final byte[] row1 = Bytes.toBytes("row1");
109  private static final byte[] row2 = Bytes.toBytes("row2");
110  private static final byte[] row3 = Bytes.toBytes("row3");
111  private static final byte[] row4 = Bytes.toBytes("row4");
112  private static final byte[] noRepfamName = Bytes.toBytes("norep");
113
114  private static final byte[] count = Bytes.toBytes("count");
115  private static final byte[] put = Bytes.toBytes("put");
116  private static final byte[] delete = Bytes.toBytes("delete");
117
118  private TableDescriptor table;
119
120  @Before
121  public void setUp() throws Exception {
122    baseConfiguration = HBaseConfiguration.create();
123    // smaller block size and capacity to trigger more operations
124    // and test them
125    baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
126    baseConfiguration.setInt("replication.source.size.capacity", 1024);
127    baseConfiguration.setLong("replication.source.sleepforretries", 100);
128    baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
129    baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
130    baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
131    baseConfiguration.set("hbase.replication.source.fs.conf.provider",
132      TestSourceFSConfigurationProvider.class.getCanonicalName());
133    baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
134    baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
135    baseConfiguration.setStrings(
136        CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
137        CoprocessorCounter.class.getName());
138    table = TableDescriptorBuilder.newBuilder(tableName)
139        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
140            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
141        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName1)
142            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
143        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
144  }
145
146  /**
147   * It tests the replication scenario involving 0 -> 1 -> 0. It does it by
148   * adding and deleting a row to a table in each cluster, checking if it's
149   * replicated. It also tests that the puts and deletes are not replicated back
150   * to the originating cluster.
151   */
152  @Test
153  public void testCyclicReplication1() throws Exception {
154    LOG.info("testSimplePutDelete");
155    int numClusters = 2;
156    Table[] htables = null;
157    try {
158      htables = setUpClusterTablesAndPeers(numClusters);
159
160      int[] expectedCounts = new int[] { 2, 2 };
161
162      // add rows to both clusters,
163      // make sure they are both replication
164      putAndWait(row, famName, htables[0], htables[1]);
165      putAndWait(row1, famName, htables[1], htables[0]);
166      validateCounts(htables, put, expectedCounts);
167
168      deleteAndWait(row, htables[0], htables[1]);
169      deleteAndWait(row1, htables[1], htables[0]);
170      validateCounts(htables, delete, expectedCounts);
171    } finally {
172      close(htables);
173      shutDownMiniClusters();
174    }
175  }
176
177  /**
178   * Tests the replication scenario 0 -> 0. By default
179   * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
180   * ReplicationSource should terminate, and no further logs should get enqueued
181   */
182  @Test
183  public void testLoopedReplication() throws Exception {
184    LOG.info("testLoopedReplication");
185    startMiniClusters(1);
186    createTableOnClusters(table);
187    addPeer("1", 0, 0);
188    Thread.sleep(SLEEP_TIME);
189
190    // wait for source to terminate
191    final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
192    Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
193      @Override
194      public boolean evaluate() throws Exception {
195        ClusterMetrics clusterStatus = utilities[0].getAdmin()
196            .getClusterMetrics(EnumSet.of(ClusterMetrics.Option.LIVE_SERVERS));
197        ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(rsName);
198        List<ReplicationLoadSource> replicationLoadSourceList =
199            serverLoad.getReplicationLoadSourceList();
200        return replicationLoadSourceList.isEmpty();
201      }
202    });
203
204    Table[] htables = getHTablesOnClusters(tableName);
205    putAndWait(row, famName, htables[0], htables[0]);
206    rollWALAndWait(utilities[0], table.getTableName(), row);
207    ZKWatcher zkw = utilities[0].getZooKeeperWatcher();
208    String queuesZnode = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode,
209      ZNodePaths.joinZNode("replication", "rs"));
210    List<String> listChildrenNoWatch =
211        ZKUtil.listChildrenNoWatch(zkw, ZNodePaths.joinZNode(queuesZnode, rsName.toString()));
212    assertEquals(0, listChildrenNoWatch.size());
213  }
214
215  /**
216   * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
217   * HFiles to a table in each cluster, checking if it's replicated.
218   */
219  @Test
220  public void testHFileCyclicReplication() throws Exception {
221    LOG.info("testHFileCyclicReplication");
222    int numClusters = 2;
223    Table[] htables = null;
224    try {
225      htables = setUpClusterTablesAndPeers(numClusters);
226
227      // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
228      // to cluster '1'.
229      byte[][][] hfileRanges =
230          new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
231              new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
232      int numOfRows = 100;
233      int[] expectedCounts =
234          new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
235
236      loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row,
237        famName, htables, hfileRanges, numOfRows, expectedCounts, true);
238
239      // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated
240      // to cluster '0'.
241      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
242          new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
243      numOfRows = 200;
244      int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
245          hfileRanges.length * numOfRows + expectedCounts[1] };
246
247      loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row,
248        famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
249
250    } finally {
251      close(htables);
252      shutDownMiniClusters();
253    }
254  }
255
256  private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception {
257    Table[] htables;
258    startMiniClusters(numClusters);
259    createTableOnClusters(table);
260
261    htables = getHTablesOnClusters(tableName);
262    // Test the replication scenarios of 0 -> 1 -> 0
263    addPeer("1", 0, 1);
264    addPeer("1", 1, 0);
265    return htables;
266  }
267
268  /**
269   * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a
270   * table in each clusters and ensuring that the each of these clusters get the appropriate
271   * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits
272   * originating from itself and also the edits that it received using replication from a different
273   * cluster. The scenario is explained in HBASE-9158
274   */
275  @Test
276  public void testCyclicReplication2() throws Exception {
277    LOG.info("testCyclicReplication2");
278    int numClusters = 3;
279    Table[] htables = null;
280    try {
281      startMiniClusters(numClusters);
282      createTableOnClusters(table);
283
284      // Test the replication scenario of 0 -> 1 -> 2 -> 0
285      addPeer("1", 0, 1);
286      addPeer("1", 1, 2);
287      addPeer("1", 2, 0);
288
289      htables = getHTablesOnClusters(tableName);
290
291      // put "row" and wait 'til it got around
292      putAndWait(row, famName, htables[0], htables[2]);
293      putAndWait(row1, famName, htables[1], htables[0]);
294      putAndWait(row2, famName, htables[2], htables[1]);
295
296      deleteAndWait(row, htables[0], htables[2]);
297      deleteAndWait(row1, htables[1], htables[0]);
298      deleteAndWait(row2, htables[2], htables[1]);
299
300      int[] expectedCounts = new int[] { 3, 3, 3 };
301      validateCounts(htables, put, expectedCounts);
302      validateCounts(htables, delete, expectedCounts);
303
304      // Test HBASE-9158
305      disablePeer("1", 2);
306      // we now have an edit that was replicated into cluster originating from
307      // cluster 0
308      putAndWait(row3, famName, htables[0], htables[1]);
309      // now add a local edit to cluster 1
310      htables[1].put(new Put(row4).addColumn(famName, row4, row4));
311      // re-enable replication from cluster 2 to cluster 0
312      enablePeer("1", 2);
313      // without HBASE-9158 the edit for row4 would have been marked with
314      // cluster 0's id
315      // and hence not replicated to cluster 0
316      wait(row4, htables[0], false);
317    } finally {
318      close(htables);
319      shutDownMiniClusters();
320    }
321  }
322
323  /**
324   * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk
325   * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers.
326   */
327  @Test
328  public void testHFileMultiSlaveReplication() throws Exception {
329    LOG.info("testHFileMultiSlaveReplication");
330    int numClusters = 3;
331    Table[] htables = null;
332    try {
333      startMiniClusters(numClusters);
334      createTableOnClusters(table);
335
336      // Add a slave, 0 -> 1
337      addPeer("1", 0, 1);
338
339      htables = getHTablesOnClusters(tableName);
340
341      // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
342      // to cluster '1'.
343      byte[][][] hfileRanges =
344          new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") },
345              new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, };
346      int numOfRows = 100;
347
348      int[] expectedCounts =
349          new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
350
351      loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row,
352        famName, htables, hfileRanges, numOfRows, expectedCounts, true);
353
354      // Validate data is not replicated to cluster '2'.
355      assertEquals(0, utilities[2].countRows(htables[2]));
356
357      rollWALAndWait(utilities[0], htables[0].getName(), row);
358
359      // Add one more slave, 0 -> 2
360      addPeer("2", 0, 2);
361
362      // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated
363      // to cluster '1' and '2'. Previous data should be replicated to cluster '2'.
364      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") },
365          new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, };
366      numOfRows = 200;
367
368      int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
369          hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows };
370
371      loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row,
372        famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
373
374    } finally {
375      close(htables);
376      shutDownMiniClusters();
377    }
378  }
379
380  /**
381   * It tests the bulk loaded hfile replication scenario to only explicitly specified table column
382   * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
383   * only one CF data to replicate.
384   */
385  @Test
386  public void testHFileReplicationForConfiguredTableCfs() throws Exception {
387    LOG.info("testHFileReplicationForConfiguredTableCfs");
388    int numClusters = 2;
389    Table[] htables = null;
390    try {
391      startMiniClusters(numClusters);
392      createTableOnClusters(table);
393
394      htables = getHTablesOnClusters(tableName);
395      // Test the replication scenarios only 'f' is configured for table data replication not 'f1'
396      addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName));
397
398      // Load 100 rows for each hfile range in cluster '0' for table CF 'f'
399      byte[][][] hfileRanges =
400          new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
401              new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
402      int numOfRows = 100;
403      int[] expectedCounts =
404          new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
405
406      loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables,
407        hfileRanges, numOfRows, expectedCounts, true);
408
409      // Load 100 rows for each hfile range in cluster '0' for table CF 'f1'
410      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
411          new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
412      numOfRows = 100;
413
414      int[] newExpectedCounts =
415          new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] };
416
417      loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables,
418        hfileRanges, numOfRows, newExpectedCounts, false);
419
420      // Validate data replication for CF 'f1'
421
422      // Source cluster table should contain data for the families
423      wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]);
424
425      // Sleep for enough time so that the data is still not replicated for the CF which is not
426      // configured for replication
427      Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME);
428      // Peer cluster should have only configured CF data
429      wait(1, htables[1], expectedCounts[1]);
430    } finally {
431      close(htables);
432      shutDownMiniClusters();
433    }
434  }
435
436  /**
437   * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
438   */
439  @Test
440  public void testCyclicReplication3() throws Exception {
441    LOG.info("testCyclicReplication2");
442    int numClusters = 3;
443    Table[] htables = null;
444    try {
445      startMiniClusters(numClusters);
446      createTableOnClusters(table);
447
448      // Test the replication scenario of 0 -> 1 -> 2 -> 1
449      addPeer("1", 0, 1);
450      addPeer("1", 1, 2);
451      addPeer("1", 2, 1);
452
453      htables = getHTablesOnClusters(tableName);
454
455      // put "row" and wait 'til it got around
456      putAndWait(row, famName, htables[0], htables[2]);
457      putAndWait(row1, famName, htables[1], htables[2]);
458      putAndWait(row2, famName, htables[2], htables[1]);
459
460      deleteAndWait(row, htables[0], htables[2]);
461      deleteAndWait(row1, htables[1], htables[2]);
462      deleteAndWait(row2, htables[2], htables[1]);
463
464      int[] expectedCounts = new int[] { 1, 3, 3 };
465      validateCounts(htables, put, expectedCounts);
466      validateCounts(htables, delete, expectedCounts);
467    } finally {
468      close(htables);
469      shutDownMiniClusters();
470    }
471  }
472
473  @After
474  public void tearDown() throws IOException {
475    configurations = null;
476    utilities = null;
477  }
478
479  @SuppressWarnings("resource")
480  private void startMiniClusters(int numClusters) throws Exception {
481    Random random = new Random();
482    utilities = new HBaseTestingUtility[numClusters];
483    configurations = new Configuration[numClusters];
484    for (int i = 0; i < numClusters; i++) {
485      Configuration conf = new Configuration(baseConfiguration);
486      conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt());
487      HBaseTestingUtility utility = new HBaseTestingUtility(conf);
488      if (i == 0) {
489        utility.startMiniZKCluster();
490        miniZK = utility.getZkCluster();
491      } else {
492        utility.setZkCluster(miniZK);
493      }
494      utility.startMiniCluster();
495      utilities[i] = utility;
496      configurations[i] = conf;
497      new ZKWatcher(conf, "cluster" + i, null, true);
498    }
499  }
500
501  private void shutDownMiniClusters() throws Exception {
502    int numClusters = utilities.length;
503    for (int i = numClusters - 1; i >= 0; i--) {
504      if (utilities[i] != null) {
505        utilities[i].shutdownMiniCluster();
506      }
507    }
508    miniZK.shutdown();
509  }
510
511  private void createTableOnClusters(TableDescriptor table) throws Exception {
512    for (HBaseTestingUtility utility : utilities) {
513      utility.getAdmin().createTable(table);
514    }
515  }
516
517  private void addPeer(String id, int masterClusterNumber,
518      int slaveClusterNumber) throws Exception {
519    try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
520        .getAdmin()) {
521      admin.addReplicationPeer(id,
522        new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()));
523    }
524  }
525
526  private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
527      throws Exception {
528    try (Admin admin =
529        ConnectionFactory.createConnection(configurations[masterClusterNumber]).getAdmin()) {
530      admin.addReplicationPeer(
531        id,
532        new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
533            .setReplicateAllUserTables(false)
534            .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)));
535    }
536  }
537
538  private void disablePeer(String id, int masterClusterNumber) throws Exception {
539    try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
540        .getAdmin()) {
541      admin.disableReplicationPeer(id);
542    }
543  }
544
545  private void enablePeer(String id, int masterClusterNumber) throws Exception {
546    try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
547        .getAdmin()) {
548      admin.enableReplicationPeer(id);
549    }
550  }
551
552  private void close(Closeable... closeables) {
553    try {
554      if (closeables != null) {
555        for (Closeable closeable : closeables) {
556          closeable.close();
557        }
558      }
559    } catch (Exception e) {
560      LOG.warn("Exception occurred while closing the object:", e);
561    }
562  }
563
564  @SuppressWarnings("resource")
565  private Table[] getHTablesOnClusters(TableName tableName) throws Exception {
566    int numClusters = utilities.length;
567    Table[] htables = new Table[numClusters];
568    for (int i = 0; i < numClusters; i++) {
569      Table htable = ConnectionFactory.createConnection(configurations[i]).getTable(tableName);
570      htables[i] = htable;
571    }
572    return htables;
573  }
574
575  private void validateCounts(Table[] htables, byte[] type,
576      int[] expectedCounts) throws IOException {
577    for (int i = 0; i < htables.length; i++) {
578      assertEquals(Bytes.toString(type) + " were replicated back ",
579          expectedCounts[i], getCount(htables[i], type));
580    }
581  }
582
583  private int getCount(Table t, byte[] type) throws IOException {
584    Get test = new Get(row);
585    test.setAttribute("count", new byte[] {});
586    Result res = t.get(test);
587    return Bytes.toInt(res.getValue(count, type));
588  }
589
590  private void deleteAndWait(byte[] row, Table source, Table target)
591      throws Exception {
592    Delete del = new Delete(row);
593    source.delete(del);
594    wait(row, target, true);
595  }
596
597  private void putAndWait(byte[] row, byte[] fam, Table source, Table target)
598      throws Exception {
599    Put put = new Put(row);
600    put.addColumn(fam, row, row);
601    source.put(put);
602    wait(row, target, false);
603  }
604
605  private void loadAndValidateHFileReplication(String testName, int masterNumber,
606      int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges,
607      int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception {
608    HBaseTestingUtility util = utilities[masterNumber];
609
610    Path dir = util.getDataTestDirOnTestFS(testName);
611    FileSystem fs = util.getTestFileSystem();
612    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
613    Path familyDir = new Path(dir, Bytes.toString(fam));
614
615    int hfileIdx = 0;
616    for (byte[][] range : hfileRanges) {
617      byte[] from = range[0];
618      byte[] to = range[1];
619      HFileTestUtil.createHFile(util.getConfiguration(), fs,
620        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
621    }
622
623    Table source = tables[masterNumber];
624    final TableName tableName = source.getName();
625    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
626    String[] args = { dir.toString(), tableName.toString() };
627    loader.run(args);
628
629    if (toValidate) {
630      for (int slaveClusterNumber : slaveNumbers) {
631        wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]);
632      }
633    }
634  }
635
636  private void wait(int slaveNumber, Table target, int expectedCount)
637      throws IOException, InterruptedException {
638    int count = 0;
639    for (int i = 0; i < NB_RETRIES; i++) {
640      if (i == NB_RETRIES - 1) {
641        fail("Waited too much time for bulkloaded data replication. Current count=" + count
642            + ", expected count=" + expectedCount);
643      }
644      count = utilities[slaveNumber].countRows(target);
645      if (count != expectedCount) {
646        LOG.info("Waiting more time for bulkloaded data replication.");
647        Thread.sleep(SLEEP_TIME);
648      } else {
649        break;
650      }
651    }
652  }
653
654  private void wait(byte[] row, Table target, boolean isDeleted) throws Exception {
655    Get get = new Get(row);
656    for (int i = 0; i < NB_RETRIES; i++) {
657      if (i == NB_RETRIES - 1) {
658        fail("Waited too much time for replication. Row:" + Bytes.toString(row)
659            + ". IsDeleteReplication:" + isDeleted);
660      }
661      Result res = target.get(get);
662      boolean sleep = isDeleted ? res.size() > 0 : res.isEmpty();
663      if (sleep) {
664        LOG.info("Waiting for more time for replication. Row:"
665            + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
666        Thread.sleep(SLEEP_TIME);
667      } else {
668        if (!isDeleted) {
669          assertArrayEquals(res.value(), row);
670        }
671        LOG.info("Obtained row:"
672            + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
673        break;
674      }
675    }
676  }
677
678  private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
679      final byte[] row) throws IOException {
680    final Admin admin = utility.getAdmin();
681    final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
682
683    // find the region that corresponds to the given row.
684    HRegion region = null;
685    for (HRegion candidate : cluster.getRegions(table)) {
686      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
687        region = candidate;
688        break;
689      }
690    }
691    assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
692
693    final CountDownLatch latch = new CountDownLatch(1);
694
695    // listen for successful log rolls
696    final WALActionsListener listener = new WALActionsListener() {
697          @Override
698          public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
699            latch.countDown();
700          }
701        };
702    region.getWAL().registerWALActionsListener(listener);
703
704    // request a roll
705    admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(),
706      region.getRegionInfo().getRegionName()));
707
708    // wait
709    try {
710      latch.await();
711    } catch (InterruptedException exception) {
712      LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
713          "replication tests fail, it's probably because we should still be waiting.");
714      Thread.currentThread().interrupt();
715    }
716    region.getWAL().unregisterWALActionsListener(listener);
717  }
718
719  /**
720   * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
721   * timestamp there is otherwise no way to count them.
722   */
723  public static class CoprocessorCounter implements RegionCoprocessor, RegionObserver {
724    private int nCount = 0;
725    private int nDelete = 0;
726
727    @Override
728    public Optional<RegionObserver> getRegionObserver() {
729      return Optional.of(this);
730    }
731
732    @Override
733    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
734        final WALEdit edit, final Durability durability) throws IOException {
735      nCount++;
736    }
737
738    @Override
739    public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
740        final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
741      nDelete++;
742    }
743
744    @Override
745    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
746        final Get get, final List<Cell> result) throws IOException {
747      if (get.getAttribute("count") != null) {
748        result.clear();
749        // order is important!
750        result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete)));
751        result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount)));
752        c.bypass();
753      }
754    }
755  }
756
757}