001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.OptionalLong;
030import java.util.UUID;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.Executors;
033import java.util.concurrent.Future;
034import java.util.concurrent.PriorityBlockingQueue;
035import java.util.concurrent.atomic.AtomicLong;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellBuilderFactory;
041import org.apache.hadoop.hbase.CellBuilderType;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.KeyValue;
047import org.apache.hadoop.hbase.MiniHBaseCluster;
048import org.apache.hadoop.hbase.Server;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.Waiter;
052import org.apache.hadoop.hbase.client.Admin;
053import org.apache.hadoop.hbase.regionserver.HRegionServer;
054import org.apache.hadoop.hbase.regionserver.RegionServerServices;
055import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
056import org.apache.hadoop.hbase.replication.ReplicationPeer;
057import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
059import org.apache.hadoop.hbase.replication.WALEntryFilter;
060import org.apache.hadoop.hbase.testclassification.MediumTests;
061import org.apache.hadoop.hbase.testclassification.ReplicationTests;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.wal.WAL;
064import org.apache.hadoop.hbase.wal.WALEdit;
065import org.apache.hadoop.hbase.wal.WALFactory;
066import org.apache.hadoop.hbase.wal.WALKeyImpl;
067import org.apache.hadoop.hbase.wal.WALProvider;
068import org.junit.AfterClass;
069import org.junit.BeforeClass;
070import org.junit.ClassRule;
071import org.junit.Test;
072import org.junit.experimental.categories.Category;
073import org.mockito.Mockito;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077@Category({ReplicationTests.class, MediumTests.class})
078public class TestReplicationSource {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082      HBaseClassTestRule.forClass(TestReplicationSource.class);
083
084  private static final Logger LOG =
085      LoggerFactory.getLogger(TestReplicationSource.class);
086  private final static HBaseTestingUtility TEST_UTIL =
087      new HBaseTestingUtility();
088  private final static HBaseTestingUtility TEST_UTIL_PEER =
089      new HBaseTestingUtility();
090  private static FileSystem FS;
091  private static Path oldLogDir;
092  private static Path logDir;
093  private static Configuration conf = TEST_UTIL.getConfiguration();
094
095  @BeforeClass
096  public static void setUpBeforeClass() throws Exception {
097    TEST_UTIL.startMiniDFSCluster(1);
098    FS = TEST_UTIL.getDFSCluster().getFileSystem();
099    Path rootDir = TEST_UTIL.createRootDir();
100    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
101    if (FS.exists(oldLogDir)) {
102      FS.delete(oldLogDir, true);
103    }
104    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
105    if (FS.exists(logDir)) {
106      FS.delete(logDir, true);
107    }
108  }
109
110  @AfterClass
111  public static void tearDownAfterClass() throws Exception {
112    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
113    TEST_UTIL.shutdownMiniHBaseCluster();
114    TEST_UTIL.shutdownMiniDFSCluster();
115  }
116
117  /**
118   * Test the default ReplicationSource skips queuing hbase:meta WAL files.
119   */
120  @Test
121  public void testDefaultSkipsMetaWAL() throws IOException {
122    ReplicationSource rs = new ReplicationSource();
123    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
124    conf.setInt("replication.source.maxretriesmultiplier", 1);
125    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
126    when(mockPeer.getConfiguration()).thenReturn(conf);
127    when(mockPeer.getPeerBandwidth()).thenReturn(0L);
128    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
129    when(peerConfig.getReplicationEndpointImpl()).
130      thenReturn(DoNothingReplicationEndpoint.class.getName());
131    when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
132    ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
133    when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
134    String queueId = "qid";
135    RegionServerServices rss =
136      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
137    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
138      p -> OptionalLong.empty(), new MetricsSource(queueId));
139    try {
140      rs.startup();
141      assertTrue(rs.isSourceActive());
142      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
143      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
144      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
145      rs.enqueueLog(new Path("a.1"));
146      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
147    } finally {
148      rs.terminate("Done");
149      rss.stop("Done");
150    }
151  }
152
153  /**
154   * Test that we filter out meta edits, etc.
155   */
156  @Test
157  public void testWALEntryFilter() throws IOException {
158    // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource
159    // instance and init it.
160    ReplicationSource rs = new ReplicationSource();
161    UUID uuid = UUID.randomUUID();
162    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
163    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
164    when(mockPeer.getConfiguration()).thenReturn(conf);
165    when(mockPeer.getPeerBandwidth()).thenReturn(0L);
166    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
167    when(peerConfig.getReplicationEndpointImpl()).
168      thenReturn(DoNothingReplicationEndpoint.class.getName());
169    when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
170    ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
171    when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
172    String queueId = "qid";
173    RegionServerServices rss =
174      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
175    rs.init(conf, null, manager, null, mockPeer, rss, queueId,
176      uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
177    try {
178      rs.startup();
179      TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
180      WALEntryFilter wef = rs.getWalEntryFilter();
181      // Test non-system WAL edit.
182      WALEdit we = new WALEdit().add(CellBuilderFactory.create(CellBuilderType.DEEP_COPY).
183        setRow(HConstants.EMPTY_START_ROW).
184        setFamily(HConstants.CATALOG_FAMILY).
185        setType(Cell.Type.Put).build());
186      WAL.Entry e = new WAL.Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY,
187        TableName.valueOf("test"), -1, -1, uuid), we);
188      assertTrue(wef.filter(e) == e);
189      // Test system WAL edit.
190      e = new WAL.Entry(
191        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1, -1, uuid),
192          we);
193      assertNull(wef.filter(e));
194    } finally {
195      rs.terminate("Done");
196      rss.stop("Done");
197    }
198  }
199
200  /**
201   * Sanity check that we can move logs around while we are reading
202   * from them. Should this test fail, ReplicationSource would have a hard
203   * time reading logs that are being archived.
204   */
205  // This tests doesn't belong in here... it is not about ReplicationSource.
206  @Test
207  public void testLogMoving() throws Exception{
208    Path logPath = new Path(logDir, "log");
209    if (!FS.exists(logDir)) {
210      FS.mkdirs(logDir);
211    }
212    if (!FS.exists(oldLogDir)) {
213      FS.mkdirs(oldLogDir);
214    }
215    WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
216        TEST_UTIL.getConfiguration());
217    for(int i = 0; i < 3; i++) {
218      byte[] b = Bytes.toBytes(Integer.toString(i));
219      KeyValue kv = new KeyValue(b,b,b);
220      WALEdit edit = new WALEdit();
221      edit.add(kv);
222      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
223          HConstants.DEFAULT_CLUSTER_ID);
224      writer.append(new WAL.Entry(key, edit));
225      writer.sync(false);
226    }
227    writer.close();
228
229    WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration());
230    WAL.Entry entry = reader.next();
231    assertNotNull(entry);
232
233    Path oldLogPath = new Path(oldLogDir, "log");
234    FS.rename(logPath, oldLogPath);
235
236    entry = reader.next();
237    assertNotNull(entry);
238
239    reader.next();
240    entry = reader.next();
241
242    assertNull(entry);
243    reader.close();
244  }
245
246  /**
247   * Tests that {@link ReplicationSource#terminate(String)} will timeout properly
248   * Moved here from TestReplicationSource because doesn't need cluster.
249   */
250  @Test
251  public void testTerminateTimeout() throws Exception {
252    ReplicationSource source = new ReplicationSource();
253    ReplicationEndpoint
254      replicationEndpoint = new DoNothingReplicationEndpoint();
255    try {
256      replicationEndpoint.start();
257      ReplicationPeer mockPeer = mock(ReplicationPeer.class);
258      when(mockPeer.getPeerBandwidth()).thenReturn(0L);
259      Configuration testConf = HBaseConfiguration.create();
260      testConf.setInt("replication.source.maxretriesmultiplier", 1);
261      ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
262      when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
263      source.init(testConf, null, manager, null, mockPeer, null, "testPeer",
264        null, p -> OptionalLong.empty(), null);
265      ExecutorService executor = Executors.newSingleThreadExecutor();
266      Future<?> future = executor.submit(
267        () -> source.terminate("testing source termination"));
268      long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
269      Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone);
270    } finally {
271      replicationEndpoint.stop();
272    }
273  }
274
275  /**
276   * Tests that recovered queues are preserved on a regionserver shutdown.
277   * See HBASE-18192
278   */
279  @Test
280  public void testServerShutdownRecoveredQueue() throws Exception {
281    try {
282      // Ensure single-threaded WAL
283      conf.set("hbase.wal.provider", "defaultProvider");
284      conf.setInt("replication.sleep.before.failover", 2000);
285      // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
286      conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
287      MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
288      TEST_UTIL_PEER.startMiniCluster(1);
289
290      HRegionServer serverA = cluster.getRegionServer(0);
291      final ReplicationSourceManager managerA =
292          serverA.getReplicationSourceService().getReplicationManager();
293      HRegionServer serverB = cluster.getRegionServer(1);
294      final ReplicationSourceManager managerB =
295          serverB.getReplicationSourceService().getReplicationManager();
296      final Admin admin = TEST_UTIL.getAdmin();
297
298      final String peerId = "TestPeer";
299      admin.addReplicationPeer(peerId,
300        ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
301      // Wait for replication sources to come up
302      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
303        @Override public boolean evaluate() {
304          return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
305        }
306      });
307      // Disabling peer makes sure there is at least one log to claim when the server dies
308      // The recovered queue will also stay there until the peer is disabled even if the
309      // WALs it contains have no data.
310      admin.disableReplicationPeer(peerId);
311
312      // Stopping serverA
313      // It's queues should be claimed by the only other alive server i.e. serverB
314      cluster.stopRegionServer(serverA.getServerName());
315      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
316        @Override public boolean evaluate() throws Exception {
317          return managerB.getOldSources().size() == 1;
318        }
319      });
320
321      final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
322      serverC.waitForServerOnline();
323      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
324        @Override public boolean evaluate() throws Exception {
325          return serverC.getReplicationSourceService() != null;
326        }
327      });
328      final ReplicationSourceManager managerC =
329          ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
330      // Sanity check
331      assertEquals(0, managerC.getOldSources().size());
332
333      // Stopping serverB
334      // Now serverC should have two recovered queues:
335      // 1. The serverB's normal queue
336      // 2. serverA's recovered queue on serverB
337      cluster.stopRegionServer(serverB.getServerName());
338      Waiter.waitFor(conf, 20000,
339        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2);
340      admin.enableReplicationPeer(peerId);
341      Waiter.waitFor(conf, 20000,
342        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0);
343    } finally {
344      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
345    }
346  }
347
348  /**
349   * Regionserver implementation that adds a delay on the graceful shutdown.
350   */
351  public static class ShutdownDelayRegionServer extends HRegionServer {
352    public ShutdownDelayRegionServer(Configuration conf) throws IOException {
353      super(conf);
354    }
355
356    @Override
357    protected void stopServiceThreads() {
358      // Add a delay before service threads are shutdown.
359      // This will keep the zookeeper connection alive for the duration of the delay.
360      LOG.info("Adding a delay to the regionserver shutdown");
361      try {
362        Thread.sleep(2000);
363      } catch (InterruptedException ex) {
364        LOG.error("Interrupted while sleeping");
365      }
366      super.stopServiceThreads();
367    }
368  }
369
370  /**
371   * Deadend Endpoint. Does nothing.
372   */
373  public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint {
374    private final UUID uuid = UUID.randomUUID();
375
376    @Override public void init(Context context) throws IOException {
377      this.ctx = context;
378    }
379
380    @Override public WALEntryFilter getWALEntryfilter() {
381      return null;
382    }
383
384    @Override public synchronized UUID getPeerUUID() {
385      return this.uuid;
386    }
387
388    @Override
389    protected void doStart() {
390      notifyStarted();
391    }
392
393    @Override
394    protected void doStop() {
395      notifyStopped();
396    }
397
398    @Override public boolean canReplicateToSameCluster() {
399      return true;
400    }
401  }
402
403  /**
404   * Deadend Endpoint. Does nothing.
405   */
406  public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint {
407
408    static int count = 0;
409
410    @Override
411    public synchronized UUID getPeerUUID() {
412      if(count==0) {
413        count++;
414        throw new RuntimeException();
415      } else {
416        return super.getPeerUUID();
417      }
418    }
419
420  }
421
422  public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
423
424    static int count = 0;
425
426    @Override
427    public synchronized UUID getPeerUUID() {
428      throw new RuntimeException();
429    }
430
431  }
432
433  /**
434   * Test HBASE-20497
435   * Moved here from TestReplicationSource because doesn't need cluster.
436   */
437  @Test
438  public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
439    String walGroupId = "fake-wal-group-id";
440    ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
441    ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
442    PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
443    queue.put(new Path("/www/html/test"));
444    RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
445    Server server = mock(Server.class);
446    when(server.getServerName()).thenReturn(serverName);
447    when(source.getServer()).thenReturn(server);
448    when(source.getServerWALsBelongTo()).thenReturn(deadServer);
449    ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class);
450    when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
451      .thenReturn(1001L);
452    when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
453      .thenReturn(-1L);
454    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
455    conf.setInt("replication.source.maxretriesmultiplier", -1);
456    RecoveredReplicationSourceShipper shipper =
457      new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
458    assertEquals(1001L, shipper.getStartPosition());
459  }
460
461  private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf,
462      String endpointName) throws IOException {
463    conf.setInt("replication.source.maxretriesmultiplier", 1);
464    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
465    when(mockPeer.getConfiguration()).thenReturn(conf);
466    when(mockPeer.getPeerBandwidth()).thenReturn(0L);
467    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
468    FaultyReplicationEndpoint.count = 0;
469    when(peerConfig.getReplicationEndpointImpl()).
470      thenReturn(endpointName);
471    when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
472    ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
473    when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
474    String queueId = "qid";
475    RegionServerServices rss =
476      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
477    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
478      p -> OptionalLong.empty(), new MetricsSource(queueId));
479    return rss;
480  }
481
482  /**
483   * Test ReplicationSource retries startup once an uncaught exception happens
484   * during initialization and <b>eplication.source.regionserver.abort</b> is set to false.
485   */
486  @Test
487  public void testAbortFalseOnError() throws IOException {
488    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
489    conf.setBoolean("replication.source.regionserver.abort", false);
490    ReplicationSource rs = new ReplicationSource();
491    RegionServerServices rss = setupForAbortTests(rs, conf,
492      FlakyReplicationEndpoint.class.getName());
493    try {
494      rs.startup();
495      assertTrue(rs.isSourceActive());
496      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
497      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
498      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
499      rs.enqueueLog(new Path("a.1"));
500      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
501    } finally {
502      rs.terminate("Done");
503      rss.stop("Done");
504    }
505  }
506
507  /**
508   * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread,
509   * when <b>eplication.source.regionserver.abort</b> is set to false.
510   */
511  @Test
512  public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException {
513    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
514    ReplicationSource rs = new ReplicationSource();
515    RegionServerServices rss = setupForAbortTests(rs, conf,
516      FaultyReplicationEndpoint.class.getName());
517    try {
518      rs.startup();
519      assertTrue(true);
520    } finally {
521      rs.terminate("Done");
522      rss.stop("Done");
523    }
524  }
525
526  /**
527   * Test ReplicationSource retries startup once an uncaught exception happens
528   * during initialization and <b>replication.source.regionserver.abort</b> is set to true.
529   */
530  @Test
531  public void testAbortTrueOnError() throws IOException {
532    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
533    ReplicationSource rs = new ReplicationSource();
534    RegionServerServices rss = setupForAbortTests(rs, conf,
535      FlakyReplicationEndpoint.class.getName());
536    try {
537      rs.startup();
538      assertTrue(rs.isSourceActive());
539      Waiter.waitFor(conf, 1000, () -> rss.isAborted());
540      assertTrue(rss.isAborted());
541      Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive());
542      assertFalse(rs.isSourceActive());
543    } finally {
544      rs.terminate("Done");
545      rss.stop("Done");
546    }
547  }
548}
549