001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.fail;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.when;
026
027import java.io.IOException;
028import java.util.List;
029import java.util.concurrent.Executors;
030import java.util.concurrent.atomic.AtomicLong;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HRegionLocation;
036import org.apache.hadoop.hbase.HTableDescriptor;
037import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.Waiter;
040import org.apache.hadoop.hbase.client.ClusterConnection;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.RegionLocator;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
046import org.apache.hadoop.hbase.regionserver.HRegion;
047import org.apache.hadoop.hbase.regionserver.HRegionServer;
048import org.apache.hadoop.hbase.regionserver.Region;
049import org.apache.hadoop.hbase.replication.ReplicationException;
050import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
051import org.apache.hadoop.hbase.testclassification.FlakeyTests;
052import org.apache.hadoop.hbase.testclassification.LargeTests;
053import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
054import org.apache.hadoop.hbase.wal.WAL.Entry;
055import org.apache.hadoop.hbase.wal.WALEdit;
056import org.apache.hadoop.hbase.wal.WALKeyImpl;
057import org.apache.hadoop.hbase.zookeeper.ZKConfig;
058import org.junit.AfterClass;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Rule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.junit.rules.TestName;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
069
070/**
071 * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
072 * async wal replication replays the edits to the secondary region in various scenarios.
073 */
074@Category({FlakeyTests.class, LargeTests.class})
075public class TestRegionReplicaReplicationEndpoint {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079      HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpoint.class);
080
081  private static final Logger LOG =
082      LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
083
084  private static final int NB_SERVERS = 2;
085
086  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
087
088  @Rule
089  public TestName name = new TestName();
090
091  @BeforeClass
092  public static void beforeClass() throws Exception {
093    Configuration conf = HTU.getConfiguration();
094    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
095    conf.setInt("replication.source.size.capacity", 10240);
096    conf.setLong("replication.source.sleepforretries", 100);
097    conf.setInt("hbase.regionserver.maxlogs", 10);
098    conf.setLong("hbase.master.logcleaner.ttl", 10);
099    conf.setInt("zookeeper.recovery.retry", 1);
100    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
101    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
102    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
103    conf.setInt("replication.stats.thread.period.seconds", 5);
104    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
105    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
106    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
107
108    HTU.startMiniCluster(NB_SERVERS);
109  }
110
111  @AfterClass
112  public static void afterClass() throws Exception {
113    HTU.shutdownMiniCluster();
114  }
115
116  @Test
117  public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException {
118    // create a table with region replicas. Check whether the replication peer is created
119    // and replication started.
120    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
121    String peerId = "region_replica_replication";
122
123    ReplicationPeerConfig peerConfig = null;
124    try {
125      peerConfig = admin.getPeerConfig(peerId);
126    } catch (ReplicationPeerNotFoundException e) {
127      LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
128    }
129
130    if (peerConfig != null) {
131      admin.removePeer(peerId);
132      peerConfig = null;
133    }
134
135    HTableDescriptor htd = HTU.createTableDescriptor(
136      "testReplicationPeerIsCreated_no_region_replicas");
137    HTU.getAdmin().createTable(htd);
138    try {
139      peerConfig = admin.getPeerConfig(peerId);
140      fail("Should throw ReplicationException, because replication peer id=" + peerId
141          + " not exist");
142    } catch (ReplicationPeerNotFoundException e) {
143    }
144    assertNull(peerConfig);
145
146    htd = HTU.createTableDescriptor("testReplicationPeerIsCreated");
147    htd.setRegionReplication(2);
148    HTU.getAdmin().createTable(htd);
149
150    // assert peer configuration is correct
151    peerConfig = admin.getPeerConfig(peerId);
152    assertNotNull(peerConfig);
153    assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
154        HTU.getConfiguration()));
155    assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
156        peerConfig.getReplicationEndpointImpl());
157    admin.close();
158  }
159
160  @Test
161  public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
162    // modify a table by adding region replicas. Check whether the replication peer is created
163    // and replication started.
164    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
165    String peerId = "region_replica_replication";
166
167    ReplicationPeerConfig peerConfig = null;
168    try {
169      peerConfig = admin.getPeerConfig(peerId);
170    } catch (ReplicationPeerNotFoundException e) {
171      LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
172    }
173
174    if (peerConfig != null) {
175      admin.removePeer(peerId);
176      peerConfig = null;
177    }
178
179    HTableDescriptor htd
180      = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable");
181    HTU.getAdmin().createTable(htd);
182
183    // assert that replication peer is not created yet
184    try {
185      peerConfig = admin.getPeerConfig(peerId);
186      fail("Should throw ReplicationException, because replication peer id=" + peerId
187          + " not exist");
188    } catch (ReplicationPeerNotFoundException e) {
189    }
190    assertNull(peerConfig);
191
192    HTU.getAdmin().disableTable(htd.getTableName());
193    htd.setRegionReplication(2);
194    HTU.getAdmin().modifyTable(htd.getTableName(), htd);
195    HTU.getAdmin().enableTable(htd.getTableName());
196
197    // assert peer configuration is correct
198    peerConfig = admin.getPeerConfig(peerId);
199    assertNotNull(peerConfig);
200    assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
201        HTU.getConfiguration()));
202    assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
203        peerConfig.getReplicationEndpointImpl());
204    admin.close();
205  }
206
207  public void testRegionReplicaReplication(int regionReplication) throws Exception {
208    // test region replica replication. Create a table with single region, write some data
209    // ensure that data is replicated to the secondary region
210    TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
211        + regionReplication);
212    HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
213    htd.setRegionReplication(regionReplication);
214    HTU.getAdmin().createTable(htd);
215    TableName tableNameNoReplicas =
216        TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
217    HTU.deleteTableIfAny(tableNameNoReplicas);
218    HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1);
219
220    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
221    Table table = connection.getTable(tableName);
222    Table tableNoReplicas = connection.getTable(tableNameNoReplicas);
223
224    try {
225      // load some data to the non-replicated table
226      HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000);
227
228      // load the data to the table
229      HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
230
231      verifyReplication(tableName, regionReplication, 0, 1000);
232
233    } finally {
234      table.close();
235      tableNoReplicas.close();
236      HTU.deleteTableIfAny(tableNameNoReplicas);
237      connection.close();
238    }
239  }
240
241  private void verifyReplication(TableName tableName, int regionReplication,
242      final int startRow, final int endRow) throws Exception {
243    verifyReplication(tableName, regionReplication, startRow, endRow, true);
244  }
245
246  private void verifyReplication(TableName tableName, int regionReplication,
247      final int startRow, final int endRow, final boolean present) throws Exception {
248    // find the regions
249    final Region[] regions = new Region[regionReplication];
250
251    for (int i=0; i < NB_SERVERS; i++) {
252      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
253      List<HRegion> onlineRegions = rs.getRegions(tableName);
254      for (HRegion region : onlineRegions) {
255        regions[region.getRegionInfo().getReplicaId()] = region;
256      }
257    }
258
259    for (Region region : regions) {
260      assertNotNull(region);
261    }
262
263    for (int i = 1; i < regionReplication; i++) {
264      final Region region = regions[i];
265      // wait until all the data is replicated to all secondary regions
266      Waiter.waitFor(HTU.getConfiguration(), 90000, new Waiter.Predicate<Exception>() {
267        @Override
268        public boolean evaluate() throws Exception {
269          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
270          try {
271            HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow, present);
272          } catch(Throwable ex) {
273            LOG.warn("Verification from secondary region is not complete yet", ex);
274            // still wait
275            return false;
276          }
277          return true;
278        }
279      });
280    }
281  }
282
283  @Test
284  public void testRegionReplicaReplicationWith2Replicas() throws Exception {
285    testRegionReplicaReplication(2);
286  }
287
288  @Test
289  public void testRegionReplicaReplicationWith3Replicas() throws Exception {
290    testRegionReplicaReplication(3);
291  }
292
293  @Test
294  public void testRegionReplicaReplicationWith10Replicas() throws Exception {
295    testRegionReplicaReplication(10);
296  }
297
298  @Test
299  public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
300    int regionReplication = 3;
301    final TableName tableName = TableName.valueOf(name.getMethodName());
302    HTableDescriptor htd = HTU.createTableDescriptor(tableName);
303    htd.setRegionReplication(regionReplication);
304    htd.setRegionMemstoreReplication(false);
305    HTU.getAdmin().createTable(htd);
306
307    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
308    Table table = connection.getTable(tableName);
309    try {
310      // write data to the primary. The replicas should not receive the data
311      final int STEP = 100;
312      for (int i = 0; i < 3; ++i) {
313        final int startRow = i * STEP;
314        final int endRow = (i + 1) * STEP;
315        LOG.info("Writing data from " + startRow + " to " + endRow);
316        HTU.loadNumericRows(table, HBaseTestingUtility.fam1, startRow, endRow);
317        verifyReplication(tableName, regionReplication, startRow, endRow, false);
318
319        // Flush the table, now the data should show up in the replicas
320        LOG.info("flushing table");
321        HTU.flush(tableName);
322        verifyReplication(tableName, regionReplication, 0, endRow, true);
323      }
324    } finally {
325      table.close();
326      connection.close();
327    }
328  }
329
330  @Test
331  public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
332    // Tests a table with region replication 3. Writes some data, and causes flushes and
333    // compactions. Verifies that the data is readable from the replicas. Note that this
334    // does not test whether the replicas actually pick up flushed files and apply compaction
335    // to their stores
336    int regionReplication = 3;
337    final TableName tableName = TableName.valueOf(name.getMethodName());
338    HTableDescriptor htd = HTU.createTableDescriptor(tableName);
339    htd.setRegionReplication(regionReplication);
340    HTU.getAdmin().createTable(htd);
341
342
343    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
344    Table table = connection.getTable(tableName);
345
346    try {
347      // load the data to the table
348
349      for (int i = 0; i < 6000; i += 1000) {
350        LOG.info("Writing data from " + i + " to " + (i+1000));
351        HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i+1000);
352        LOG.info("flushing table");
353        HTU.flush(tableName);
354        LOG.info("compacting table");
355        HTU.compact(tableName, false);
356      }
357
358      verifyReplication(tableName, regionReplication, 0, 1000);
359    } finally {
360      table.close();
361      connection.close();
362    }
363  }
364
365  @Test
366  public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
367    testRegionReplicaReplicationIgnoresDisabledTables(false);
368  }
369
370  @Test
371  public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
372    testRegionReplicaReplicationIgnoresDisabledTables(true);
373  }
374
375  public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable)
376      throws Exception {
377    // tests having edits from a disabled or dropped table is handled correctly by skipping those
378    // entries and further edits after the edits from dropped/disabled table can be replicated
379    // without problems.
380    final TableName tableName = TableName.valueOf(name.getMethodName() + dropTable);
381    HTableDescriptor htd = HTU.createTableDescriptor(tableName);
382    int regionReplication = 3;
383    htd.setRegionReplication(regionReplication);
384    HTU.deleteTableIfAny(tableName);
385    HTU.getAdmin().createTable(htd);
386    TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable");
387    HTU.deleteTableIfAny(toBeDisabledTable);
388    htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
389    htd.setRegionReplication(regionReplication);
390    HTU.getAdmin().createTable(htd);
391
392    // both tables are created, now pause replication
393    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
394    admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
395
396    // now that the replication is disabled, write to the table to be dropped, then drop the table.
397
398    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
399    Table table = connection.getTable(tableName);
400    Table tableToBeDisabled = connection.getTable(toBeDisabledTable);
401
402    HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);
403
404    AtomicLong skippedEdits = new AtomicLong();
405    RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
406        mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
407    when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
408    RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
409        new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
410          (ClusterConnection) connection,
411          Executors.newSingleThreadExecutor(), Integer.MAX_VALUE);
412    RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
413    HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
414    byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
415
416    Entry entry = new Entry(
417      new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
418      new WALEdit());
419
420    HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
421    if (dropTable) {
422      HTU.getAdmin().deleteTable(toBeDisabledTable);
423    }
424
425    sinkWriter.append(toBeDisabledTable, encodedRegionName,
426      HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
427
428    assertEquals(2, skippedEdits.get());
429
430    try {
431      // load some data to the to-be-dropped table
432
433      // load the data to the table
434      HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
435
436      // now enable the replication
437      admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
438
439      verifyReplication(tableName, regionReplication, 0, 1000);
440
441    } finally {
442      admin.close();
443      table.close();
444      rl.close();
445      tableToBeDisabled.close();
446      HTU.deleteTableIfAny(toBeDisabledTable);
447      connection.close();
448    }
449  }
450}