001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.List;
030import java.util.UUID;
031import java.util.concurrent.Executors;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.Cell.Type;
038import org.apache.hadoop.hbase.CellBuilderFactory;
039import org.apache.hadoop.hbase.CellBuilderType;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.HRegionLocation;
044import org.apache.hadoop.hbase.HTableDescriptor;
045import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.Waiter;
048import org.apache.hadoop.hbase.client.Admin;
049import org.apache.hadoop.hbase.client.ClusterConnection;
050import org.apache.hadoop.hbase.client.Connection;
051import org.apache.hadoop.hbase.client.ConnectionFactory;
052import org.apache.hadoop.hbase.client.RegionLocator;
053import org.apache.hadoop.hbase.client.Table;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.regionserver.HRegion;
056import org.apache.hadoop.hbase.regionserver.HRegionServer;
057import org.apache.hadoop.hbase.regionserver.Region;
058import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
059import org.apache.hadoop.hbase.replication.ReplicationException;
060import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
061import org.apache.hadoop.hbase.testclassification.FlakeyTests;
062import org.apache.hadoop.hbase.testclassification.LargeTests;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.FSTableDescriptors;
065import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
066import org.apache.hadoop.hbase.wal.WAL.Entry;
067import org.apache.hadoop.hbase.wal.WALEdit;
068import org.apache.hadoop.hbase.wal.WALKeyImpl;
069import org.apache.hadoop.hbase.zookeeper.ZKConfig;
070import org.junit.AfterClass;
071import org.junit.BeforeClass;
072import org.junit.ClassRule;
073import org.junit.Rule;
074import org.junit.Test;
075import org.junit.experimental.categories.Category;
076import org.junit.rules.TestName;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
081import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
082
083/**
084 * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying async
085 * wal replication replays the edits to the secondary region in various scenarios.
086 */
087@Category({ FlakeyTests.class, LargeTests.class })
088public class TestRegionReplicaReplicationEndpoint {
089
090  @ClassRule
091  public static final HBaseClassTestRule CLASS_RULE =
092    HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpoint.class);
093
094  private static final Logger LOG =
095    LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
096
097  private static final int NB_SERVERS = 2;
098
099  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
100
101  @Rule
102  public TestName name = new TestName();
103
104  @BeforeClass
105  public static void beforeClass() throws Exception {
106    Configuration conf = HTU.getConfiguration();
107    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
108    conf.setInt("replication.source.size.capacity", 10240);
109    conf.setLong("replication.source.sleepforretries", 100);
110    conf.setInt("hbase.regionserver.maxlogs", 10);
111    conf.setLong("hbase.master.logcleaner.ttl", 10);
112    conf.setInt("zookeeper.recovery.retry", 1);
113    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
114    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
115    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
116    conf.setInt("replication.stats.thread.period.seconds", 5);
117    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
118    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
119    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
120
121    HTU.startMiniCluster(NB_SERVERS);
122  }
123
124  @AfterClass
125  public static void afterClass() throws Exception {
126    HTU.shutdownMiniCluster();
127  }
128
129  @Test
130  public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException {
131    // create a table with region replicas. Check whether the replication peer is created
132    // and replication started.
133    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
134      Admin admin = connection.getAdmin()) {
135      String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER;
136
137      ReplicationPeerConfig peerConfig = null;
138      try {
139        peerConfig = admin.getReplicationPeerConfig(peerId);
140      } catch (ReplicationPeerNotFoundException e) {
141        LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
142      }
143
144      try {
145        peerConfig = admin.getReplicationPeerConfig(peerId);
146      } catch (ReplicationPeerNotFoundException e) {
147        LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
148      }
149
150      if (peerConfig != null) {
151        admin.removeReplicationPeer(peerId);
152        peerConfig = null;
153      }
154
155      HTableDescriptor htd =
156        HTU.createTableDescriptor("testReplicationPeerIsCreated_no_region_replicas");
157      createOrEnableTableWithRetries(htd, true);
158      try {
159        peerConfig = admin.getReplicationPeerConfig(peerId);
160        fail("Should throw ReplicationException, because replication peer id=" + peerId
161          + " not exist");
162      } catch (ReplicationPeerNotFoundException e) {
163      }
164      assertNull(peerConfig);
165
166      htd = HTU.createTableDescriptor("testReplicationPeerIsCreated");
167      htd.setRegionReplication(2);
168      createOrEnableTableWithRetries(htd, true);
169
170      // assert peer configuration is correct
171      peerConfig = admin.getReplicationPeerConfig(peerId);
172      assertNotNull(peerConfig);
173      assertEquals(peerConfig.getClusterKey(),
174        ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
175      assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
176        peerConfig.getReplicationEndpointImpl());
177    }
178  }
179
180  @Test
181  public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
182    // modify a table by adding region replicas. Check whether the replication peer is created
183    // and replication started.
184    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
185      Admin admin = connection.getAdmin()) {
186      String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER;
187      ReplicationPeerConfig peerConfig = null;
188      try {
189        peerConfig = admin.getReplicationPeerConfig(peerId);
190      } catch (ReplicationPeerNotFoundException e) {
191        LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
192      }
193
194      if (peerConfig != null) {
195        admin.removeReplicationPeer(peerId);
196        peerConfig = null;
197      }
198
199      HTableDescriptor htd =
200        HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable");
201      createOrEnableTableWithRetries(htd, true);
202
203      // assert that replication peer is not created yet
204      try {
205        peerConfig = admin.getReplicationPeerConfig(peerId);
206        fail("Should throw ReplicationException, because replication peer id=" + peerId
207          + " not exist");
208      } catch (ReplicationPeerNotFoundException e) {
209      }
210      assertNull(peerConfig);
211
212      HTU.getAdmin().disableTable(htd.getTableName());
213      htd.setRegionReplication(2);
214      HTU.getAdmin().modifyTable(htd.getTableName(), htd);
215      createOrEnableTableWithRetries(htd, false);
216
217      // assert peer configuration is correct
218      peerConfig = admin.getReplicationPeerConfig(peerId);
219      assertNotNull(peerConfig);
220      assertEquals(peerConfig.getClusterKey(),
221        ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
222      assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
223        peerConfig.getReplicationEndpointImpl());
224      admin.close();
225    }
226  }
227
228  public void testRegionReplicaReplication(int regionReplication) throws Exception {
229    // test region replica replication. Create a table with single region, write some data
230    // ensure that data is replicated to the secondary region
231    TableName tableName =
232      TableName.valueOf("testRegionReplicaReplicationWithReplicas_" + regionReplication);
233    HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
234    htd.setRegionReplication(regionReplication);
235    createOrEnableTableWithRetries(htd, true);
236    TableName tableNameNoReplicas =
237      TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
238    HTU.deleteTableIfAny(tableNameNoReplicas);
239    HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1);
240
241    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
242    Table table = connection.getTable(tableName);
243    Table tableNoReplicas = connection.getTable(tableNameNoReplicas);
244
245    try {
246      // load some data to the non-replicated table
247      HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000);
248
249      // load the data to the table
250      HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
251
252      verifyReplication(tableName, regionReplication, 0, 1000);
253
254    } finally {
255      table.close();
256      tableNoReplicas.close();
257      HTU.deleteTableIfAny(tableNameNoReplicas);
258      connection.close();
259    }
260  }
261
262  private void verifyReplication(TableName tableName, int regionReplication, final int startRow,
263    final int endRow) throws Exception {
264    verifyReplication(tableName, regionReplication, startRow, endRow, true);
265  }
266
267  private void verifyReplication(TableName tableName, int regionReplication, final int startRow,
268    final int endRow, final boolean present) throws Exception {
269    // find the regions
270    final Region[] regions = new Region[regionReplication];
271
272    for (int i = 0; i < NB_SERVERS; i++) {
273      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
274      List<HRegion> onlineRegions = rs.getRegions(tableName);
275      for (HRegion region : onlineRegions) {
276        regions[region.getRegionInfo().getReplicaId()] = region;
277      }
278    }
279
280    for (Region region : regions) {
281      assertNotNull(region);
282    }
283
284    for (int i = 1; i < regionReplication; i++) {
285      final Region region = regions[i];
286      // wait until all the data is replicated to all secondary regions
287      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
288        @Override
289        public boolean evaluate() throws Exception {
290          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
291          try {
292            HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow, present);
293          } catch (Throwable ex) {
294            LOG.warn("Verification from secondary region is not complete yet", ex);
295            // still wait
296            return false;
297          }
298          return true;
299        }
300      });
301    }
302  }
303
304  @Test
305  public void testRegionReplicaReplicationWith2Replicas() throws Exception {
306    testRegionReplicaReplication(2);
307  }
308
309  @Test
310  public void testRegionReplicaReplicationWith3Replicas() throws Exception {
311    testRegionReplicaReplication(3);
312  }
313
314  @Test
315  public void testRegionReplicaReplicationWith10Replicas() throws Exception {
316    testRegionReplicaReplication(10);
317  }
318
319  @Test
320  public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
321    int regionReplication = 3;
322    final TableName tableName = TableName.valueOf(name.getMethodName());
323    HTableDescriptor htd = HTU.createTableDescriptor(tableName);
324    htd.setRegionReplication(regionReplication);
325    htd.setRegionMemstoreReplication(false);
326    createOrEnableTableWithRetries(htd, true);
327
328    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
329    Table table = connection.getTable(tableName);
330    try {
331      // write data to the primary. The replicas should not receive the data
332      final int STEP = 100;
333      for (int i = 0; i < 3; ++i) {
334        final int startRow = i * STEP;
335        final int endRow = (i + 1) * STEP;
336        LOG.info("Writing data from " + startRow + " to " + endRow);
337        HTU.loadNumericRows(table, HBaseTestingUtility.fam1, startRow, endRow);
338        verifyReplication(tableName, regionReplication, startRow, endRow, false);
339
340        // Flush the table, now the data should show up in the replicas
341        LOG.info("flushing table");
342        HTU.flush(tableName);
343        verifyReplication(tableName, regionReplication, 0, endRow, true);
344      }
345    } finally {
346      table.close();
347      connection.close();
348    }
349  }
350
351  @Test
352  public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
353    // Tests a table with region replication 3. Writes some data, and causes flushes and
354    // compactions. Verifies that the data is readable from the replicas. Note that this
355    // does not test whether the replicas actually pick up flushed files and apply compaction
356    // to their stores
357    int regionReplication = 3;
358    final TableName tableName = TableName.valueOf(name.getMethodName());
359    HTableDescriptor htd = HTU.createTableDescriptor(tableName);
360    htd.setRegionReplication(regionReplication);
361    createOrEnableTableWithRetries(htd, true);
362
363    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
364    Table table = connection.getTable(tableName);
365    try {
366      // load the data to the table
367
368      for (int i = 0; i < 6000; i += 1000) {
369        LOG.info("Writing data from " + i + " to " + (i + 1000));
370        HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i + 1000);
371        LOG.info("flushing table");
372        HTU.flush(tableName);
373        LOG.info("compacting table");
374        HTU.compact(tableName, false);
375      }
376
377      verifyReplication(tableName, regionReplication, 0, 1000);
378    } finally {
379      table.close();
380      connection.close();
381    }
382  }
383
384  @Test
385  public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
386    testRegionReplicaReplicationIgnores(false, false);
387  }
388
389  @Test
390  public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
391    testRegionReplicaReplicationIgnores(true, false);
392  }
393
394  @Test
395  public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception {
396    testRegionReplicaReplicationIgnores(false, true);
397  }
398
399  public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
400    throws Exception {
401
402    // tests having edits from a disabled or dropped table is handled correctly by skipping those
403    // entries and further edits after the edits from dropped/disabled table can be replicated
404    // without problems.
405    final TableName tableName = TableName.valueOf(
406      name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication);
407    HTableDescriptor htd = HTU.createTableDescriptor(tableName);
408    int regionReplication = 3;
409    htd.setRegionReplication(regionReplication);
410    HTU.deleteTableIfAny(tableName);
411
412    createOrEnableTableWithRetries(htd, true);
413    TableName toBeDisabledTable = TableName.valueOf(
414      dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
415    HTU.deleteTableIfAny(toBeDisabledTable);
416    htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
417    htd.setRegionReplication(regionReplication);
418    createOrEnableTableWithRetries(htd, true);
419
420    // both tables are created, now pause replication
421    HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
422
423    // now that the replication is disabled, write to the table to be dropped, then drop the table.
424
425    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
426    Table table = connection.getTable(tableName);
427    Table tableToBeDisabled = connection.getTable(toBeDisabledTable);
428
429    HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);
430
431    AtomicLong skippedEdits = new AtomicLong();
432    RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
433      mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
434    when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
435    FSTableDescriptors fstd =
436      new FSTableDescriptors(FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath());
437    RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
438      new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
439        (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE,
440        fstd);
441    RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
442    HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
443    byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
444
445    Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
446      .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
447    Entry entry =
448      new Entry(new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1), new WALEdit().add(cell));
449
450    HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
451    if (dropTable) {
452      HTU.getAdmin().deleteTable(toBeDisabledTable);
453    } else if (disableReplication) {
454      htd.setRegionReplication(regionReplication - 2);
455      HTU.getAdmin().modifyTable(toBeDisabledTable, htd);
456      createOrEnableTableWithRetries(htd, false);
457    }
458    sinkWriter.append(toBeDisabledTable, encodedRegionName, HConstants.EMPTY_BYTE_ARRAY,
459      Lists.newArrayList(entry, entry));
460
461    assertEquals(2, skippedEdits.get());
462
463    HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
464    MetricsSource metrics = mock(MetricsSource.class);
465    ReplicationEndpoint.Context ctx = new ReplicationEndpoint.Context(HTU.getConfiguration(),
466      HTU.getConfiguration(), HTU.getTestFileSystem(),
467      ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER, UUID.fromString(rs.getClusterId()),
468      rs.getReplicationSourceService().getReplicationManager().getReplicationPeers()
469        .getPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER),
470      metrics, rs.getTableDescriptors(), rs);
471    RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
472    rrpe.init(ctx);
473    rrpe.start();
474    ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
475    repCtx.setEntries(Lists.newArrayList(entry, entry));
476    assertTrue(rrpe.replicate(repCtx));
477    /*
478     * Come back here. There is a difference on how counting is done here and in master branch.
479     * St.Ack Mockito.verify(metrics, Mockito.times(1)). incrLogEditsFiltered(Mockito.eq(2L));
480     */
481    rrpe.stop();
482    if (disableReplication) {
483      // enable replication again so that we can verify replication
484      HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
485      htd.setRegionReplication(regionReplication);
486      HTU.getAdmin().modifyTable(toBeDisabledTable, htd);
487      createOrEnableTableWithRetries(htd, false);
488    }
489
490    try {
491      // load some data to the to-be-dropped table
492
493      // load the data to the table
494      HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
495
496      // now enable the replication
497      HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
498
499      verifyReplication(tableName, regionReplication, 0, 1000);
500
501    } finally {
502      table.close();
503      rl.close();
504      tableToBeDisabled.close();
505      HTU.deleteTableIfAny(toBeDisabledTable);
506      connection.close();
507    }
508  }
509
510  private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) {
511    // Helper function to run create/enable table operations with a retry feature
512    boolean continueToRetry = true;
513    int tries = 0;
514    while (continueToRetry && tries < 50) {
515      try {
516        continueToRetry = false;
517        if (createTableOperation) {
518          HTU.getAdmin().createTable(htd);
519        } else {
520          HTU.getAdmin().enableTable(htd.getTableName());
521        }
522      } catch (IOException e) {
523        if (e.getCause() instanceof ReplicationException) {
524          continueToRetry = true;
525          tries++;
526          Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
527        }
528      }
529    }
530  }
531}