001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.mockito.Mockito.doNothing;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.spy;
023import static org.mockito.Mockito.verify;
024import static org.mockito.Mockito.when;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.UUID;
031import java.util.concurrent.Callable;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicReference;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.Waiter;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.regionserver.HRegion;
044import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
045import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
046import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
047import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
048import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
049import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
050import org.apache.hadoop.hbase.testclassification.MediumTests;
051import org.apache.hadoop.hbase.testclassification.ReplicationTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
054import org.apache.hadoop.hbase.util.Threads;
055import org.apache.hadoop.hbase.wal.WAL.Entry;
056import org.apache.hadoop.hbase.zookeeper.ZKConfig;
057import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
058import org.junit.AfterClass;
059import org.junit.Assert;
060import org.junit.Before;
061import org.junit.BeforeClass;
062import org.junit.ClassRule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068/**
069 * Tests ReplicationSource and ReplicationEndpoint interactions
070 */
071@Category({ ReplicationTests.class, MediumTests.class })
072public class TestReplicationEndpoint extends TestReplicationBase {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076      HBaseClassTestRule.forClass(TestReplicationEndpoint.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class);
079
080  static int numRegionServers;
081
082  @BeforeClass
083  public static void setUpBeforeClass() throws Exception {
084    TestReplicationBase.setUpBeforeClass();
085    numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size();
086  }
087
088  @AfterClass
089  public static void tearDownAfterClass() throws Exception {
090    TestReplicationBase.tearDownAfterClass();
091    // check stop is called
092    Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
093  }
094
095  @Before
096  public void setup() throws Exception {
097    ReplicationEndpointForTest.contructedCount.set(0);
098    ReplicationEndpointForTest.startedCount.set(0);
099    ReplicationEndpointForTest.replicateCount.set(0);
100    ReplicationEndpointReturningFalse.replicated.set(false);
101    ReplicationEndpointForTest.lastEntries = null;
102    final List<RegionServerThread> rsThreads =
103        UTIL1.getMiniHBaseCluster().getRegionServerThreads();
104    for (RegionServerThread rs : rsThreads) {
105      UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
106    }
107    // Wait for  all log roll to finish
108    UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
109      @Override
110      public boolean evaluate() throws Exception {
111        for (RegionServerThread rs : rsThreads) {
112          if (!rs.getRegionServer().walRollRequestFinished()) {
113            return false;
114          }
115        }
116        return true;
117      }
118
119      @Override
120      public String explainFailure() throws Exception {
121        List<String> logRollInProgressRsList = new ArrayList<>();
122        for (RegionServerThread rs : rsThreads) {
123          if (!rs.getRegionServer().walRollRequestFinished()) {
124            logRollInProgressRsList.add(rs.getRegionServer().toString());
125          }
126        }
127        return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
128      }
129    });
130  }
131
132  @Test
133  public void testCustomReplicationEndpoint() throws Exception {
134    // test installing a custom replication endpoint other than the default one.
135    admin.addPeer("testCustomReplicationEndpoint",
136        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
137            .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
138
139    // check whether the class has been constructed and started
140    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
141      @Override
142      public boolean evaluate() throws Exception {
143        return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
144      }
145    });
146
147    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
148      @Override
149      public boolean evaluate() throws Exception {
150        return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
151      }
152    });
153
154    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
155
156    // now replicate some data.
157    doPut(Bytes.toBytes("row42"));
158
159    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
160      @Override
161      public boolean evaluate() throws Exception {
162        return ReplicationEndpointForTest.replicateCount.get() >= 1;
163      }
164    });
165
166    doAssert(Bytes.toBytes("row42"));
167
168    admin.removePeer("testCustomReplicationEndpoint");
169  }
170
171  @Test
172  public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
173    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
174    Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
175    int peerCount = admin.getPeersCount();
176    final String id = "testReplicationEndpointReturnsFalseOnReplicate";
177    admin.addPeer(id,
178      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
179        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
180    // This test is flakey and then there is so much stuff flying around in here its, hard to
181    // debug.  Peer needs to be up for the edit to make it across. This wait on
182    // peer count seems to be a hack that has us not progress till peer is up.
183    if (admin.getPeersCount() <= peerCount) {
184      LOG.info("Waiting on peercount to go up from " + peerCount);
185      Threads.sleep(100);
186    }
187    // now replicate some data
188    doPut(row);
189
190    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
191      @Override
192      public boolean evaluate() throws Exception {
193        // Looks like replication endpoint returns false unless we put more than 10 edits. We
194        // only send over one edit.
195        int count = ReplicationEndpointForTest.replicateCount.get();
196        LOG.info("count=" + count);
197        return ReplicationEndpointReturningFalse.replicated.get();
198      }
199    });
200    if (ReplicationEndpointReturningFalse.ex.get() != null) {
201      throw ReplicationEndpointReturningFalse.ex.get();
202    }
203
204    admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
205  }
206
207  @Test
208  public void testInterClusterReplication() throws Exception {
209    final String id = "testInterClusterReplication";
210
211    List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName);
212    // This trick of waiting on peer to show up is taken from test above.
213    int peerCount = admin.getPeersCount();
214    admin.addPeer(id,
215        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2))
216            .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
217        null);
218    // This test is flakey and then there is so much stuff flying around in here its, hard to
219    // debug.  Peer needs to be up for the edit to make it across. This wait on
220    // peer count seems to be a hack that has us not progress till peer is up.
221    if (admin.getPeersCount() <= peerCount) {
222      LOG.info("Waiting on peercount to go up from " + peerCount);
223      Threads.sleep(100);
224    }
225
226    int totEdits = 0;
227
228    // Make sure edits are spread across regions because we do region based batching
229    // before shipping edits.
230    for (HRegion region: regions) {
231      RegionInfo hri = region.getRegionInfo();
232      byte[] row = hri.getStartKey();
233      for (int i = 0; i < 100; i++) {
234        if (row.length > 0) {
235          Put put = new Put(row);
236          put.addColumn(famName, row, row);
237          region.put(put);
238          totEdits++;
239        }
240      }
241    }
242
243    final int numEdits = totEdits;
244    LOG.info("Waiting on replication of {}", numEdits);
245    Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() {
246      @Override
247      public boolean evaluate() throws Exception {
248        return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
249      }
250
251      @Override
252      public String explainFailure() throws Exception {
253        String failure = "Failed to replicate all edits, expected = " + numEdits
254            + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
255        return failure;
256      }
257    });
258
259    admin.removePeer("testInterClusterReplication");
260    UTIL1.deleteTableData(tableName);
261  }
262
263  @Test
264  public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
265    ReplicationPeerConfig rpc =
266      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
267        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
268    // test that we can create mutliple WALFilters reflectively
269    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
270      EverythingPassesWALEntryFilter.class.getName() + "," +
271        EverythingPassesWALEntryFilterSubclass.class.getName());
272    admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
273    // now replicate some data.
274    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
275      doPut(connection, Bytes.toBytes("row1"));
276      doPut(connection, row);
277      doPut(connection, Bytes.toBytes("row2"));
278    }
279
280    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
281      @Override
282      public boolean evaluate() throws Exception {
283        return ReplicationEndpointForTest.replicateCount.get() >= 1;
284      }
285    });
286
287    Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
288    //make sure our reflectively created filter is in the filter chain
289    Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
290    admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
291  }
292
293  @Test(expected = IOException.class)
294  public void testWALEntryFilterAddValidation() throws Exception {
295    ReplicationPeerConfig rpc =
296      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
297        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
298    // test that we can create mutliple WALFilters reflectively
299    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
300      "IAmNotARealWalEntryFilter");
301    admin.addPeer("testWALEntryFilterAddValidation", rpc);
302  }
303
304  @Test(expected = IOException.class)
305  public void testWALEntryFilterUpdateValidation() throws Exception {
306    ReplicationPeerConfig rpc =
307      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
308        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
309    // test that we can create mutliple WALFilters reflectively
310    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
311      "IAmNotARealWalEntryFilter");
312    admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc);
313  }
314
315  @Test
316  public void testMetricsSourceBaseSourcePassthrough() {
317    /*
318     * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a
319     * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of
320     * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows
321     * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
322     * MetricsSource actually calls down through the two layers of wrapping to the actual
323     * BaseSource.
324     */
325    String id = "id";
326    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
327    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
328    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
329    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
330    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
331
332    MetricsReplicationSourceSource singleSourceSource =
333      new MetricsReplicationSourceSourceImpl(singleRms, id);
334    MetricsReplicationSourceSource globalSourceSource =
335      new MetricsReplicationGlobalSourceSource(globalRms);
336    MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
337    doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
338
339    Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
340    MetricsSource source =
341      new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
342
343
344    String gaugeName = "gauge";
345    String singleGaugeName = "source.id." + gaugeName;
346    String globalGaugeName = "source." + gaugeName;
347    long delta = 1;
348    String counterName = "counter";
349    String singleCounterName = "source.id." + counterName;
350    String globalCounterName = "source." + counterName;
351    long count = 2;
352    source.decGauge(gaugeName, delta);
353    source.getMetricsContext();
354    source.getMetricsDescription();
355    source.getMetricsJmxContext();
356    source.getMetricsName();
357    source.incCounters(counterName, count);
358    source.incGauge(gaugeName, delta);
359    source.init();
360    source.removeMetric(gaugeName);
361    source.setGauge(gaugeName, delta);
362    source.updateHistogram(counterName, count);
363    source.incrFailedRecoveryQueue();
364
365
366    verify(singleRms).decGauge(singleGaugeName, delta);
367    verify(globalRms).decGauge(globalGaugeName, delta);
368    verify(globalRms).getMetricsContext();
369    verify(globalRms).getMetricsJmxContext();
370    verify(globalRms).getMetricsName();
371    verify(singleRms).incCounters(singleCounterName, count);
372    verify(globalRms).incCounters(globalCounterName, count);
373    verify(singleRms).incGauge(singleGaugeName, delta);
374    verify(globalRms).incGauge(globalGaugeName, delta);
375    verify(globalRms).init();
376    verify(singleRms).removeMetric(singleGaugeName);
377    verify(globalRms).removeMetric(globalGaugeName);
378    verify(singleRms).setGauge(singleGaugeName, delta);
379    verify(globalRms).setGauge(globalGaugeName, delta);
380    verify(singleRms).updateHistogram(singleCounterName, count);
381    verify(globalRms).updateHistogram(globalCounterName, count);
382    verify(spyglobalSourceSource).incrFailedRecoveryQueue();
383
384    //check singleSourceSourceByTable metrics.
385    // singleSourceSourceByTable map entry will be created only
386    // after calling #setAgeOfLastShippedOpByTable
387    boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
388        .containsKey("RandomNewTable");
389    Assert.assertEquals(false, containsRandomNewTable);
390    source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
391    containsRandomNewTable = source.getSingleSourceSourceByTable()
392        .containsKey("RandomNewTable");
393    Assert.assertEquals(true, containsRandomNewTable);
394    MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
395        .get("RandomNewTable");
396    // cannot put more concreate value here to verify because the age is arbitrary.
397    // as long as it's greater than 0, we see it as correct answer.
398    Assert.assertTrue(msr.getLastShippedAge() > 0);
399
400  }
401
402  private void doPut(byte[] row) throws IOException {
403    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
404      doPut(connection, row);
405    }
406  }
407
408  private void doPut(final Connection connection, final byte [] row) throws IOException {
409    try (Table t = connection.getTable(tableName)) {
410      Put put = new Put(row);
411      put.addColumn(famName, row, row);
412      t.put(put);
413    }
414  }
415
416  private static void doAssert(byte[] row) throws Exception {
417    if (ReplicationEndpointForTest.lastEntries == null) {
418      return; // first call
419    }
420    Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
421    List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
422    Assert.assertEquals(1, cells.size());
423    Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
424      cells.get(0).getRowLength(), row, 0, row.length));
425  }
426
427  public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
428    static UUID uuid = UTIL1.getRandomUUID();
429    static AtomicInteger contructedCount = new AtomicInteger();
430    static AtomicInteger startedCount = new AtomicInteger();
431    static AtomicInteger stoppedCount = new AtomicInteger();
432    static AtomicInteger replicateCount = new AtomicInteger();
433    static volatile List<Entry> lastEntries = null;
434
435    public ReplicationEndpointForTest() {
436      replicateCount.set(0);
437      contructedCount.incrementAndGet();
438    }
439
440    @Override
441    public UUID getPeerUUID() {
442      return uuid;
443    }
444
445    @Override
446    public boolean replicate(ReplicateContext replicateContext) {
447      replicateCount.incrementAndGet();
448      lastEntries = new ArrayList<>(replicateContext.entries);
449      return true;
450    }
451
452    @Override
453    public void start() {
454      startAsync();
455    }
456
457    @Override
458    public void stop() {
459      stopAsync();
460    }
461
462    @Override
463    protected void doStart() {
464      startedCount.incrementAndGet();
465      notifyStarted();
466    }
467
468    @Override
469    protected void doStop() {
470      stoppedCount.incrementAndGet();
471      notifyStopped();
472    }
473  }
474
475  public static class InterClusterReplicationEndpointForTest
476      extends HBaseInterClusterReplicationEndpoint {
477
478    static AtomicInteger replicateCount = new AtomicInteger();
479    static boolean failedOnce;
480
481    public InterClusterReplicationEndpointForTest() {
482      replicateCount.set(0);
483    }
484
485    @Override
486    public boolean replicate(ReplicateContext replicateContext) {
487      boolean success = super.replicate(replicateContext);
488      if (success) {
489        replicateCount.addAndGet(replicateContext.entries.size());
490      }
491      return success;
492    }
493
494    @Override
495    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
496      // Fail only once, we don't want to slow down the test.
497      if (failedOnce) {
498        return () -> ordinal;
499      } else {
500        failedOnce = true;
501        return () -> {
502          throw new IOException("Sample Exception: Failed to replicate.");
503        };
504      }
505    }
506  }
507
508  public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
509    static int COUNT = 10;
510    static AtomicReference<Exception> ex = new AtomicReference<>(null);
511    static AtomicBoolean replicated = new AtomicBoolean(false);
512    @Override
513    public boolean replicate(ReplicateContext replicateContext) {
514      try {
515        // check row
516        doAssert(row);
517      } catch (Exception e) {
518        ex.set(e);
519      }
520
521      super.replicate(replicateContext);
522      LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get());
523
524      replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
525      return replicated.get();
526    }
527  }
528
529  // return a WALEntry filter which only accepts "row", but not other rows
530  public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
531    static AtomicReference<Exception> ex = new AtomicReference<>(null);
532
533    @Override
534    public boolean replicate(ReplicateContext replicateContext) {
535      try {
536        super.replicate(replicateContext);
537        doAssert(row);
538      } catch (Exception e) {
539        ex.set(e);
540      }
541      return true;
542    }
543
544    @Override
545    public WALEntryFilter getWALEntryfilter() {
546      return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
547        @Override
548        public Entry filter(Entry entry) {
549          ArrayList<Cell> cells = entry.getEdit().getCells();
550          int size = cells.size();
551          for (int i = size-1; i >= 0; i--) {
552            Cell cell = cells.get(i);
553            if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
554              row, 0, row.length)) {
555              cells.remove(i);
556            }
557          }
558          return entry;
559        }
560      });
561    }
562  }
563
564  public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
565    private static boolean passedEntry = false;
566    @Override
567    public Entry filter(Entry entry) {
568      passedEntry = true;
569      return entry;
570    }
571
572    public static boolean hasPassedAnEntry() {
573      return passedEntry;
574    }
575  }
576
577  public static class EverythingPassesWALEntryFilterSubclass
578      extends EverythingPassesWALEntryFilter {
579  }
580}