001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.fail;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.when;
026
027import java.io.IOException;
028import java.util.List;
029import java.util.concurrent.Executors;
030import java.util.concurrent.atomic.AtomicLong;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.Cell.Type;
035import org.apache.hadoop.hbase.CellBuilderFactory;
036import org.apache.hadoop.hbase.CellBuilderType;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.HTableDescriptor;
042import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.Waiter;
045import org.apache.hadoop.hbase.client.ClusterConnection;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.RegionLocator;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
051import org.apache.hadoop.hbase.regionserver.HRegion;
052import org.apache.hadoop.hbase.regionserver.HRegionServer;
053import org.apache.hadoop.hbase.regionserver.Region;
054import org.apache.hadoop.hbase.replication.ReplicationException;
055import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
056import org.apache.hadoop.hbase.testclassification.FlakeyTests;
057import org.apache.hadoop.hbase.testclassification.LargeTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.FSTableDescriptors;
060import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
061import org.apache.hadoop.hbase.wal.WAL.Entry;
062import org.apache.hadoop.hbase.wal.WALEdit;
063import org.apache.hadoop.hbase.wal.WALKeyImpl;
064import org.apache.hadoop.hbase.zookeeper.ZKConfig;
065import org.junit.AfterClass;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Rule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.junit.rules.TestName;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
076
077/**
078 * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
079 * async wal replication replays the edits to the secondary region in various scenarios.
080 */
081@Category({FlakeyTests.class, LargeTests.class})
082public class TestRegionReplicaReplicationEndpoint {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086      HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpoint.class);
087
088  private static final Logger LOG =
089      LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
090
091  private static final int NB_SERVERS = 2;
092
093  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
094
095  @Rule
096  public TestName name = new TestName();
097
098  @BeforeClass
099  public static void beforeClass() throws Exception {
100    Configuration conf = HTU.getConfiguration();
101    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
102    conf.setInt("replication.source.size.capacity", 10240);
103    conf.setLong("replication.source.sleepforretries", 100);
104    conf.setInt("hbase.regionserver.maxlogs", 10);
105    conf.setLong("hbase.master.logcleaner.ttl", 10);
106    conf.setInt("zookeeper.recovery.retry", 1);
107    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
108    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
109    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
110    conf.setInt("replication.stats.thread.period.seconds", 5);
111    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
112    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
113    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
114
115    HTU.startMiniCluster(NB_SERVERS);
116  }
117
118  @AfterClass
119  public static void afterClass() throws Exception {
120    HTU.shutdownMiniCluster();
121  }
122
123  @Test
124  public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException {
125    // create a table with region replicas. Check whether the replication peer is created
126    // and replication started.
127    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
128    String peerId = "region_replica_replication";
129
130    ReplicationPeerConfig peerConfig = null;
131    try {
132      peerConfig = admin.getPeerConfig(peerId);
133    } catch (ReplicationPeerNotFoundException e) {
134      LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
135    }
136
137    if (peerConfig != null) {
138      admin.removePeer(peerId);
139      peerConfig = null;
140    }
141
142    HTableDescriptor htd = HTU.createTableDescriptor(
143      "testReplicationPeerIsCreated_no_region_replicas");
144    HTU.getAdmin().createTable(htd);
145    try {
146      peerConfig = admin.getPeerConfig(peerId);
147      fail("Should throw ReplicationException, because replication peer id=" + peerId
148          + " not exist");
149    } catch (ReplicationPeerNotFoundException e) {
150    }
151    assertNull(peerConfig);
152
153    htd = HTU.createTableDescriptor("testReplicationPeerIsCreated");
154    htd.setRegionReplication(2);
155    HTU.getAdmin().createTable(htd);
156
157    // assert peer configuration is correct
158    peerConfig = admin.getPeerConfig(peerId);
159    assertNotNull(peerConfig);
160    assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
161        HTU.getConfiguration()));
162    assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
163        peerConfig.getReplicationEndpointImpl());
164    admin.close();
165  }
166
167  @Test
168  public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
169    // modify a table by adding region replicas. Check whether the replication peer is created
170    // and replication started.
171    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
172    String peerId = "region_replica_replication";
173
174    ReplicationPeerConfig peerConfig = null;
175    try {
176      peerConfig = admin.getPeerConfig(peerId);
177    } catch (ReplicationPeerNotFoundException e) {
178      LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
179    }
180
181    if (peerConfig != null) {
182      admin.removePeer(peerId);
183      peerConfig = null;
184    }
185
186    HTableDescriptor htd
187      = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable");
188    HTU.getAdmin().createTable(htd);
189
190    // assert that replication peer is not created yet
191    try {
192      peerConfig = admin.getPeerConfig(peerId);
193      fail("Should throw ReplicationException, because replication peer id=" + peerId
194          + " not exist");
195    } catch (ReplicationPeerNotFoundException e) {
196    }
197    assertNull(peerConfig);
198
199    HTU.getAdmin().disableTable(htd.getTableName());
200    htd.setRegionReplication(2);
201    HTU.getAdmin().modifyTable(htd.getTableName(), htd);
202    HTU.getAdmin().enableTable(htd.getTableName());
203
204    // assert peer configuration is correct
205    peerConfig = admin.getPeerConfig(peerId);
206    assertNotNull(peerConfig);
207    assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
208        HTU.getConfiguration()));
209    assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
210        peerConfig.getReplicationEndpointImpl());
211    admin.close();
212  }
213
214  public void testRegionReplicaReplication(int regionReplication) throws Exception {
215    // test region replica replication. Create a table with single region, write some data
216    // ensure that data is replicated to the secondary region
217    TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
218        + regionReplication);
219    HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
220    htd.setRegionReplication(regionReplication);
221    HTU.getAdmin().createTable(htd);
222    TableName tableNameNoReplicas =
223        TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
224    HTU.deleteTableIfAny(tableNameNoReplicas);
225    HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1);
226
227    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
228    Table table = connection.getTable(tableName);
229    Table tableNoReplicas = connection.getTable(tableNameNoReplicas);
230
231    try {
232      // load some data to the non-replicated table
233      HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000);
234
235      // load the data to the table
236      HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
237
238      verifyReplication(tableName, regionReplication, 0, 1000);
239
240    } finally {
241      table.close();
242      tableNoReplicas.close();
243      HTU.deleteTableIfAny(tableNameNoReplicas);
244      connection.close();
245    }
246  }
247
248  private void verifyReplication(TableName tableName, int regionReplication,
249      final int startRow, final int endRow) throws Exception {
250    verifyReplication(tableName, regionReplication, startRow, endRow, true);
251  }
252
253  private void verifyReplication(TableName tableName, int regionReplication,
254      final int startRow, final int endRow, final boolean present) throws Exception {
255    // find the regions
256    final Region[] regions = new Region[regionReplication];
257
258    for (int i=0; i < NB_SERVERS; i++) {
259      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
260      List<HRegion> onlineRegions = rs.getRegions(tableName);
261      for (HRegion region : onlineRegions) {
262        regions[region.getRegionInfo().getReplicaId()] = region;
263      }
264    }
265
266    for (Region region : regions) {
267      assertNotNull(region);
268    }
269
270    for (int i = 1; i < regionReplication; i++) {
271      final Region region = regions[i];
272      // wait until all the data is replicated to all secondary regions
273      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
274        @Override
275        public boolean evaluate() throws Exception {
276          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
277          try {
278            HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow, present);
279          } catch(Throwable ex) {
280            LOG.warn("Verification from secondary region is not complete yet", ex);
281            // still wait
282            return false;
283          }
284          return true;
285        }
286      });
287    }
288  }
289
290  @Test
291  public void testRegionReplicaReplicationWith2Replicas() throws Exception {
292    testRegionReplicaReplication(2);
293  }
294
295  @Test
296  public void testRegionReplicaReplicationWith3Replicas() throws Exception {
297    testRegionReplicaReplication(3);
298  }
299
300  @Test
301  public void testRegionReplicaReplicationWith10Replicas() throws Exception {
302    testRegionReplicaReplication(10);
303  }
304
305  @Test
306  public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
307    int regionReplication = 3;
308    final TableName tableName = TableName.valueOf(name.getMethodName());
309    HTableDescriptor htd = HTU.createTableDescriptor(tableName);
310    htd.setRegionReplication(regionReplication);
311    htd.setRegionMemstoreReplication(false);
312    HTU.getAdmin().createTable(htd);
313
314    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
315    Table table = connection.getTable(tableName);
316    try {
317      // write data to the primary. The replicas should not receive the data
318      final int STEP = 100;
319      for (int i = 0; i < 3; ++i) {
320        final int startRow = i * STEP;
321        final int endRow = (i + 1) * STEP;
322        LOG.info("Writing data from " + startRow + " to " + endRow);
323        HTU.loadNumericRows(table, HBaseTestingUtility.fam1, startRow, endRow);
324        verifyReplication(tableName, regionReplication, startRow, endRow, false);
325
326        // Flush the table, now the data should show up in the replicas
327        LOG.info("flushing table");
328        HTU.flush(tableName);
329        verifyReplication(tableName, regionReplication, 0, endRow, true);
330      }
331    } finally {
332      table.close();
333      connection.close();
334    }
335  }
336
337  @Test
338  public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
339    // Tests a table with region replication 3. Writes some data, and causes flushes and
340    // compactions. Verifies that the data is readable from the replicas. Note that this
341    // does not test whether the replicas actually pick up flushed files and apply compaction
342    // to their stores
343    int regionReplication = 3;
344    final TableName tableName = TableName.valueOf(name.getMethodName());
345    HTableDescriptor htd = HTU.createTableDescriptor(tableName);
346    htd.setRegionReplication(regionReplication);
347    HTU.getAdmin().createTable(htd);
348
349
350    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
351    Table table = connection.getTable(tableName);
352    try {
353      // load the data to the table
354
355      for (int i = 0; i < 6000; i += 1000) {
356        LOG.info("Writing data from " + i + " to " + (i+1000));
357        HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i+1000);
358        LOG.info("flushing table");
359        HTU.flush(tableName);
360        LOG.info("compacting table");
361        HTU.compact(tableName, false);
362      }
363
364      verifyReplication(tableName, regionReplication, 0, 1000);
365    } finally {
366      table.close();
367      connection.close();
368    }
369  }
370
371  @Test
372  public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
373    testRegionReplicaReplicationIgnores(false, false);
374  }
375
376  @Test
377  public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
378    testRegionReplicaReplicationIgnores(true, false);
379  }
380
381  @Test
382  public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception {
383    testRegionReplicaReplicationIgnores(false, true);
384  }
385
386  public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
387      throws Exception {
388
389    // tests having edits from a disabled or dropped table is handled correctly by skipping those
390    // entries and further edits after the edits from dropped/disabled table can be replicated
391    // without problems.
392    final TableName tableName = TableName.valueOf(
393      name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication);
394    HTableDescriptor htd = HTU.createTableDescriptor(tableName);
395    int regionReplication = 3;
396    htd.setRegionReplication(regionReplication);
397    HTU.deleteTableIfAny(tableName);
398
399    HTU.getAdmin().createTable(htd);
400    TableName toBeDisabledTable = TableName.valueOf(
401      dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
402    HTU.deleteTableIfAny(toBeDisabledTable);
403    htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
404    htd.setRegionReplication(regionReplication);
405    HTU.getAdmin().createTable(htd);
406
407    // both tables are created, now pause replication
408    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
409    admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
410
411    // now that the replication is disabled, write to the table to be dropped, then drop the table.
412
413    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
414    Table table = connection.getTable(tableName);
415    Table tableToBeDisabled = connection.getTable(toBeDisabledTable);
416
417    HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);
418
419    AtomicLong skippedEdits = new AtomicLong();
420    RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
421        mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
422    when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
423    FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(),
424        FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath());
425    RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
426        new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
427            (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE,
428            fstd);
429    RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
430    HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
431    byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
432
433    Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
434        .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
435    Entry entry = new Entry(
436      new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
437        new WALEdit()
438            .add(cell));
439
440    HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
441    if (dropTable) {
442      HTU.getAdmin().deleteTable(toBeDisabledTable);
443    } else if (disableReplication) {
444      htd.setRegionReplication(regionReplication - 2);
445      HTU.getAdmin().modifyTable(toBeDisabledTable, htd);
446      HTU.getAdmin().enableTable(toBeDisabledTable);
447    }
448    sinkWriter.append(toBeDisabledTable, encodedRegionName,
449      HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
450
451    assertEquals(2, skippedEdits.get());
452
453    if (disableReplication) {
454      // enable replication again so that we can verify replication
455      HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
456      htd.setRegionReplication(regionReplication);
457      HTU.getAdmin().modifyTable(toBeDisabledTable, htd);
458      HTU.getAdmin().enableTable(toBeDisabledTable);
459    }
460
461    try {
462      // load some data to the to-be-dropped table
463
464      // load the data to the table
465      HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
466
467      // now enable the replication
468      admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
469
470      verifyReplication(tableName, regionReplication, 0, 1000);
471
472    } finally {
473      admin.close();
474      table.close();
475      rl.close();
476      tableToBeDisabled.close();
477      HTU.deleteTableIfAny(toBeDisabledTable);
478      connection.close();
479    }
480  }
481}