001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.List;
030import java.util.UUID;
031import java.util.concurrent.Executors;
032import java.util.concurrent.atomic.AtomicLong;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.Cell.Type;
037import org.apache.hadoop.hbase.CellBuilderFactory;
038import org.apache.hadoop.hbase.CellBuilderType;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.HTableDescriptor;
044import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.Waiter;
047import org.apache.hadoop.hbase.client.Admin;
048import org.apache.hadoop.hbase.client.ClusterConnection;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.ConnectionFactory;
051import org.apache.hadoop.hbase.client.RegionLocator;
052import org.apache.hadoop.hbase.client.Table;
053import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
054import org.apache.hadoop.hbase.regionserver.HRegion;
055import org.apache.hadoop.hbase.regionserver.HRegionServer;
056import org.apache.hadoop.hbase.regionserver.Region;
057import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
058import org.apache.hadoop.hbase.replication.ReplicationException;
059import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
060import org.apache.hadoop.hbase.testclassification.FlakeyTests;
061import org.apache.hadoop.hbase.testclassification.LargeTests;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.FSTableDescriptors;
064import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
065import org.apache.hadoop.hbase.wal.WAL.Entry;
066import org.apache.hadoop.hbase.wal.WALEdit;
067import org.apache.hadoop.hbase.wal.WALKeyImpl;
068import org.apache.hadoop.hbase.zookeeper.ZKConfig;
069import org.junit.AfterClass;
070import org.junit.BeforeClass;
071import org.junit.ClassRule;
072import org.junit.Rule;
073import org.junit.Test;
074import org.junit.experimental.categories.Category;
075import org.junit.rules.TestName;
076import org.mockito.Mockito;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
081
082/**
083 * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
084 * async wal replication replays the edits to the secondary region in various scenarios.
085 */
086@Category({FlakeyTests.class, LargeTests.class})
087public class TestRegionReplicaReplicationEndpoint {
088
089  @ClassRule
090  public static final HBaseClassTestRule CLASS_RULE =
091      HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpoint.class);
092
093  private static final Logger LOG =
094      LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
095
096  private static final int NB_SERVERS = 2;
097
098  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
099
100  @Rule
101  public TestName name = new TestName();
102
103  @BeforeClass
104  public static void beforeClass() throws Exception {
105    Configuration conf = HTU.getConfiguration();
106    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
107    conf.setInt("replication.source.size.capacity", 10240);
108    conf.setLong("replication.source.sleepforretries", 100);
109    conf.setInt("hbase.regionserver.maxlogs", 10);
110    conf.setLong("hbase.master.logcleaner.ttl", 10);
111    conf.setInt("zookeeper.recovery.retry", 1);
112    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
113    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
114    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
115    conf.setInt("replication.stats.thread.period.seconds", 5);
116    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
117    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
118    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
119
120    HTU.startMiniCluster(NB_SERVERS);
121  }
122
123  @AfterClass
124  public static void afterClass() throws Exception {
125    HTU.shutdownMiniCluster();
126  }
127
128  @Test
129  public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException {
130    // create a table with region replicas. Check whether the replication peer is created
131    // and replication started.
132    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
133      Admin admin = connection.getAdmin()) {
134      String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER;
135
136      ReplicationPeerConfig peerConfig = null;
137      try {
138        peerConfig = admin.getReplicationPeerConfig(peerId);
139      } catch (ReplicationPeerNotFoundException e) {
140        LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
141      }
142
143      try {
144        peerConfig = admin.getReplicationPeerConfig(peerId);
145      } catch (ReplicationPeerNotFoundException e) {
146        LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
147      }
148
149      if (peerConfig != null) {
150        admin.removeReplicationPeer(peerId);
151        peerConfig = null;
152      }
153
154      HTableDescriptor htd = HTU.createTableDescriptor(
155        "testReplicationPeerIsCreated_no_region_replicas");
156      HTU.getAdmin().createTable(htd);
157      try {
158        peerConfig = admin.getReplicationPeerConfig(peerId);
159        fail("Should throw ReplicationException, because replication peer id=" + peerId
160            + " not exist");
161      } catch (ReplicationPeerNotFoundException e) {
162      }
163      assertNull(peerConfig);
164
165      htd = HTU.createTableDescriptor("testReplicationPeerIsCreated");
166      htd.setRegionReplication(2);
167      HTU.getAdmin().createTable(htd);
168
169      // assert peer configuration is correct
170      peerConfig = admin.getReplicationPeerConfig(peerId);
171      assertNotNull(peerConfig);
172      assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
173          HTU.getConfiguration()));
174      assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
175          peerConfig.getReplicationEndpointImpl());
176    }  
177  }
178
179  @Test
180  public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
181    // modify a table by adding region replicas. Check whether the replication peer is created
182    // and replication started.
183    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
184      Admin admin = connection.getAdmin()) {
185      String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER;
186      ReplicationPeerConfig peerConfig = null;
187      try {
188        peerConfig = admin.getReplicationPeerConfig(peerId);
189      } catch (ReplicationPeerNotFoundException e) {
190        LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
191      }
192
193      if (peerConfig != null) {
194        admin.removeReplicationPeer(peerId);
195        peerConfig = null;
196      }
197
198      HTableDescriptor htd = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable");
199      HTU.getAdmin().createTable(htd);
200
201      // assert that replication peer is not created yet
202      try {
203        peerConfig = admin.getReplicationPeerConfig(peerId);
204        fail("Should throw ReplicationException, because replication peer id=" + peerId
205          + " not exist");
206      } catch (ReplicationPeerNotFoundException e) {
207      }
208      assertNull(peerConfig);
209
210      HTU.getAdmin().disableTable(htd.getTableName());
211      htd.setRegionReplication(2);
212      HTU.getAdmin().modifyTable(htd.getTableName(), htd);
213      HTU.getAdmin().enableTable(htd.getTableName());
214
215      // assert peer configuration is correct
216      peerConfig = admin.getReplicationPeerConfig(peerId);
217      assertNotNull(peerConfig);
218      assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
219      assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
220        peerConfig.getReplicationEndpointImpl());
221      admin.close();
222    }
223  }
224
225  public void testRegionReplicaReplication(int regionReplication) throws Exception {
226    // test region replica replication. Create a table with single region, write some data
227    // ensure that data is replicated to the secondary region
228    TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
229        + regionReplication);
230    HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
231    htd.setRegionReplication(regionReplication);
232    HTU.getAdmin().createTable(htd);
233    TableName tableNameNoReplicas =
234        TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
235    HTU.deleteTableIfAny(tableNameNoReplicas);
236    HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1);
237
238    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
239    Table table = connection.getTable(tableName);
240    Table tableNoReplicas = connection.getTable(tableNameNoReplicas);
241
242    try {
243      // load some data to the non-replicated table
244      HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000);
245
246      // load the data to the table
247      HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
248
249      verifyReplication(tableName, regionReplication, 0, 1000);
250
251    } finally {
252      table.close();
253      tableNoReplicas.close();
254      HTU.deleteTableIfAny(tableNameNoReplicas);
255      connection.close();
256    }
257  }
258
259  private void verifyReplication(TableName tableName, int regionReplication,
260      final int startRow, final int endRow) throws Exception {
261    verifyReplication(tableName, regionReplication, startRow, endRow, true);
262  }
263
264  private void verifyReplication(TableName tableName, int regionReplication,
265      final int startRow, final int endRow, final boolean present) throws Exception {
266    // find the regions
267    final Region[] regions = new Region[regionReplication];
268
269    for (int i=0; i < NB_SERVERS; i++) {
270      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
271      List<HRegion> onlineRegions = rs.getRegions(tableName);
272      for (HRegion region : onlineRegions) {
273        regions[region.getRegionInfo().getReplicaId()] = region;
274      }
275    }
276
277    for (Region region : regions) {
278      assertNotNull(region);
279    }
280
281    for (int i = 1; i < regionReplication; i++) {
282      final Region region = regions[i];
283      // wait until all the data is replicated to all secondary regions
284      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
285        @Override
286        public boolean evaluate() throws Exception {
287          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
288          try {
289            HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow, present);
290          } catch(Throwable ex) {
291            LOG.warn("Verification from secondary region is not complete yet", ex);
292            // still wait
293            return false;
294          }
295          return true;
296        }
297      });
298    }
299  }
300
301  @Test
302  public void testRegionReplicaReplicationWith2Replicas() throws Exception {
303    testRegionReplicaReplication(2);
304  }
305
306  @Test
307  public void testRegionReplicaReplicationWith3Replicas() throws Exception {
308    testRegionReplicaReplication(3);
309  }
310
311  @Test
312  public void testRegionReplicaReplicationWith10Replicas() throws Exception {
313    testRegionReplicaReplication(10);
314  }
315
316  @Test
317  public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
318    int regionReplication = 3;
319    final TableName tableName = TableName.valueOf(name.getMethodName());
320    HTableDescriptor htd = HTU.createTableDescriptor(tableName);
321    htd.setRegionReplication(regionReplication);
322    htd.setRegionMemstoreReplication(false);
323    HTU.getAdmin().createTable(htd);
324
325    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
326    Table table = connection.getTable(tableName);
327    try {
328      // write data to the primary. The replicas should not receive the data
329      final int STEP = 100;
330      for (int i = 0; i < 3; ++i) {
331        final int startRow = i * STEP;
332        final int endRow = (i + 1) * STEP;
333        LOG.info("Writing data from " + startRow + " to " + endRow);
334        HTU.loadNumericRows(table, HBaseTestingUtility.fam1, startRow, endRow);
335        verifyReplication(tableName, regionReplication, startRow, endRow, false);
336
337        // Flush the table, now the data should show up in the replicas
338        LOG.info("flushing table");
339        HTU.flush(tableName);
340        verifyReplication(tableName, regionReplication, 0, endRow, true);
341      }
342    } finally {
343      table.close();
344      connection.close();
345    }
346  }
347
348  @Test
349  public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
350    // Tests a table with region replication 3. Writes some data, and causes flushes and
351    // compactions. Verifies that the data is readable from the replicas. Note that this
352    // does not test whether the replicas actually pick up flushed files and apply compaction
353    // to their stores
354    int regionReplication = 3;
355    final TableName tableName = TableName.valueOf(name.getMethodName());
356    HTableDescriptor htd = HTU.createTableDescriptor(tableName);
357    htd.setRegionReplication(regionReplication);
358    HTU.getAdmin().createTable(htd);
359
360
361    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
362    Table table = connection.getTable(tableName);
363    try {
364      // load the data to the table
365
366      for (int i = 0; i < 6000; i += 1000) {
367        LOG.info("Writing data from " + i + " to " + (i+1000));
368        HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i+1000);
369        LOG.info("flushing table");
370        HTU.flush(tableName);
371        LOG.info("compacting table");
372        HTU.compact(tableName, false);
373      }
374
375      verifyReplication(tableName, regionReplication, 0, 1000);
376    } finally {
377      table.close();
378      connection.close();
379    }
380  }
381
382  @Test
383  public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
384    testRegionReplicaReplicationIgnores(false, false);
385  }
386
387  @Test
388  public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
389    testRegionReplicaReplicationIgnores(true, false);
390  }
391
392  @Test
393  public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception {
394    testRegionReplicaReplicationIgnores(false, true);
395  }
396
397  public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
398      throws Exception {
399
400    // tests having edits from a disabled or dropped table is handled correctly by skipping those
401    // entries and further edits after the edits from dropped/disabled table can be replicated
402    // without problems.
403    final TableName tableName = TableName.valueOf(
404      name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication);
405    HTableDescriptor htd = HTU.createTableDescriptor(tableName);
406    int regionReplication = 3;
407    htd.setRegionReplication(regionReplication);
408    HTU.deleteTableIfAny(tableName);
409
410    HTU.getAdmin().createTable(htd);
411    TableName toBeDisabledTable = TableName.valueOf(
412      dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
413    HTU.deleteTableIfAny(toBeDisabledTable);
414    htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
415    htd.setRegionReplication(regionReplication);
416    HTU.getAdmin().createTable(htd);
417
418    // both tables are created, now pause replication
419    HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
420
421    // now that the replication is disabled, write to the table to be dropped, then drop the table.
422
423    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
424    Table table = connection.getTable(tableName);
425    Table tableToBeDisabled = connection.getTable(toBeDisabledTable);
426
427    HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);
428
429    AtomicLong skippedEdits = new AtomicLong();
430    RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
431        mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
432    when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
433    FSTableDescriptors fstd =
434        new FSTableDescriptors(FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath());
435    RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
436        new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
437            (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE,
438            fstd);
439    RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
440    HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
441    byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
442
443    Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
444        .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
445    Entry entry = new Entry(
446      new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
447        new WALEdit()
448            .add(cell));
449
450    HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
451    if (dropTable) {
452      HTU.getAdmin().deleteTable(toBeDisabledTable);
453    } else if (disableReplication) {
454      htd.setRegionReplication(regionReplication - 2);
455      HTU.getAdmin().modifyTable(toBeDisabledTable, htd);
456      HTU.getAdmin().enableTable(toBeDisabledTable);
457    }
458    sinkWriter.append(toBeDisabledTable, encodedRegionName,
459      HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
460
461    assertEquals(2, skippedEdits.get());
462
463    HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
464    MetricsSource metrics = mock(MetricsSource.class);
465    ReplicationEndpoint.Context ctx =
466      new ReplicationEndpoint.Context(HTU.getConfiguration(), HTU.getConfiguration(),
467        HTU.getTestFileSystem(), ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER,
468        UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().
469        getReplicationManager().getReplicationPeers()
470          .getPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER),
471        metrics, rs.getTableDescriptors(), rs);
472    RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
473    rrpe.init(ctx);
474    rrpe.start();
475    ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
476    repCtx.setEntries(Lists.newArrayList(entry, entry));
477    assertTrue(rrpe.replicate(repCtx));
478    /* Come back here. There is a difference on how counting is done here and in master branch.
479       St.Ack
480    Mockito.verify(metrics, Mockito.times(1)).
481      incrLogEditsFiltered(Mockito.eq(2L));
482     */
483    rrpe.stop();
484    if (disableReplication) {
485      // enable replication again so that we can verify replication
486      HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
487      htd.setRegionReplication(regionReplication);
488      HTU.getAdmin().modifyTable(toBeDisabledTable, htd);
489      HTU.getAdmin().enableTable(toBeDisabledTable);
490    }
491
492    try {
493      // load some data to the to-be-dropped table
494
495      // load the data to the table
496      HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
497
498      // now enable the replication
499      HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
500
501      verifyReplication(tableName, regionReplication, 0, 1000);
502
503    } finally {
504      table.close();
505      rl.close();
506      tableToBeDisabled.close();
507      HTU.deleteTableIfAny(toBeDisabledTable);
508      connection.close();
509    }
510  }
511}