001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.mockito.Mockito.mock;
021import static org.mockito.Mockito.verify;
022import static org.mockito.Mockito.when;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.UUID;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.concurrent.atomic.AtomicReference;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.Waiter;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.regionserver.HRegion;
040import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
041import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
042import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
043import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
044import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
045import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.testclassification.ReplicationTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
050import org.apache.hadoop.hbase.util.Threads;
051import org.apache.hadoop.hbase.wal.WAL.Entry;
052import org.apache.hadoop.hbase.zookeeper.ZKConfig;
053import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
054import org.junit.AfterClass;
055import org.junit.Assert;
056import org.junit.Before;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * Tests ReplicationSource and ReplicationEndpoint interactions
066 */
067@Category({ReplicationTests.class, MediumTests.class})
068public class TestReplicationEndpoint extends TestReplicationBase {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072      HBaseClassTestRule.forClass(TestReplicationEndpoint.class);
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class);
075
076  static int numRegionServers;
077
078  @BeforeClass
079  public static void setUpBeforeClass() throws Exception {
080    TestReplicationBase.setUpBeforeClass();
081    admin.removePeer("2");
082    numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
083  }
084
085  @AfterClass
086  public static void tearDownAfterClass() throws Exception {
087    TestReplicationBase.tearDownAfterClass();
088    // check stop is called
089    Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
090  }
091
092  @Before
093  public void setup() throws Exception {
094    ReplicationEndpointForTest.contructedCount.set(0);
095    ReplicationEndpointForTest.startedCount.set(0);
096    ReplicationEndpointForTest.replicateCount.set(0);
097    ReplicationEndpointReturningFalse.replicated.set(false);
098    ReplicationEndpointForTest.lastEntries = null;
099    final List<RegionServerThread> rsThreads =
100        utility1.getMiniHBaseCluster().getRegionServerThreads();
101    for (RegionServerThread rs : rsThreads) {
102      utility1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
103    }
104    // Wait for  all log roll to finish
105    utility1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
106      @Override
107      public boolean evaluate() throws Exception {
108        for (RegionServerThread rs : rsThreads) {
109          if (!rs.getRegionServer().walRollRequestFinished()) {
110            return false;
111          }
112        }
113        return true;
114      }
115
116      @Override
117      public String explainFailure() throws Exception {
118        List<String> logRollInProgressRsList = new ArrayList<>();
119        for (RegionServerThread rs : rsThreads) {
120          if (!rs.getRegionServer().walRollRequestFinished()) {
121            logRollInProgressRsList.add(rs.getRegionServer().toString());
122          }
123        }
124        return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
125      }
126    });
127  }
128
129  @Test
130  public void testCustomReplicationEndpoint() throws Exception {
131    // test installing a custom replication endpoint other than the default one.
132    admin.addPeer("testCustomReplicationEndpoint",
133        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
134            .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
135
136    // check whether the class has been constructed and started
137    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
138      @Override
139      public boolean evaluate() throws Exception {
140        return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
141      }
142    });
143
144    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
145      @Override
146      public boolean evaluate() throws Exception {
147        return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
148      }
149    });
150
151    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
152
153    // now replicate some data.
154    doPut(Bytes.toBytes("row42"));
155
156    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
157      @Override
158      public boolean evaluate() throws Exception {
159        return ReplicationEndpointForTest.replicateCount.get() >= 1;
160      }
161    });
162
163    doAssert(Bytes.toBytes("row42"));
164
165    admin.removePeer("testCustomReplicationEndpoint");
166  }
167
168  @Test
169  public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
170    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
171    Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
172    int peerCount = admin.getPeersCount();
173    final String id = "testReplicationEndpointReturnsFalseOnReplicate";
174    admin.addPeer(id,
175      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
176        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
177    // This test is flakey and then there is so much stuff flying around in here its, hard to
178    // debug.  Peer needs to be up for the edit to make it across. This wait on
179    // peer count seems to be a hack that has us not progress till peer is up.
180    if (admin.getPeersCount() <= peerCount) {
181      LOG.info("Waiting on peercount to go up from " + peerCount);
182      Threads.sleep(100);
183    }
184    // now replicate some data
185    doPut(row);
186
187    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
188      @Override
189      public boolean evaluate() throws Exception {
190        // Looks like replication endpoint returns false unless we put more than 10 edits. We
191        // only send over one edit.
192        int count = ReplicationEndpointForTest.replicateCount.get();
193        LOG.info("count=" + count);
194        return ReplicationEndpointReturningFalse.replicated.get();
195      }
196    });
197    if (ReplicationEndpointReturningFalse.ex.get() != null) {
198      throw ReplicationEndpointReturningFalse.ex.get();
199    }
200
201    admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
202  }
203
204  @Test
205  public void testInterClusterReplication() throws Exception {
206    final String id = "testInterClusterReplication";
207
208    List<HRegion> regions = utility1.getHBaseCluster().getRegions(tableName);
209    int totEdits = 0;
210
211    // Make sure edits are spread across regions because we do region based batching
212    // before shipping edits.
213    for(HRegion region: regions) {
214      RegionInfo hri = region.getRegionInfo();
215      byte[] row = hri.getStartKey();
216      for (int i = 0; i < 100; i++) {
217        if (row.length > 0) {
218          Put put = new Put(row);
219          put.addColumn(famName, row, row);
220          region.put(put);
221          totEdits++;
222        }
223      }
224    }
225
226    admin.addPeer(id,
227        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2))
228            .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
229        null);
230
231    final int numEdits = totEdits;
232    Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate<Exception>() {
233      @Override
234      public boolean evaluate() throws Exception {
235        return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
236      }
237
238      @Override
239      public String explainFailure() throws Exception {
240        String failure = "Failed to replicate all edits, expected = " + numEdits
241            + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
242        return failure;
243      }
244    });
245
246    admin.removePeer("testInterClusterReplication");
247    utility1.deleteTableData(tableName);
248  }
249
250  @Test
251  public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
252    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
253        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
254    //test that we can create mutliple WALFilters reflectively
255    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
256        EverythingPassesWALEntryFilter.class.getName() +
257            "," + EverythingPassesWALEntryFilterSubclass.class.getName());
258    admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
259    // now replicate some data.
260    try (Connection connection = ConnectionFactory.createConnection(conf1)) {
261      doPut(connection, Bytes.toBytes("row1"));
262      doPut(connection, row);
263      doPut(connection, Bytes.toBytes("row2"));
264    }
265
266    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
267      @Override
268      public boolean evaluate() throws Exception {
269        return ReplicationEndpointForTest.replicateCount.get() >= 1;
270      }
271    });
272
273    Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
274    //make sure our reflectively created filter is in the filter chain
275    Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
276    admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
277  }
278
279  @Test (expected=IOException.class)
280  public void testWALEntryFilterAddValidation() throws Exception {
281    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
282        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
283    //test that we can create mutliple WALFilters reflectively
284    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
285        "IAmNotARealWalEntryFilter");
286    admin.addPeer("testWALEntryFilterAddValidation", rpc);
287  }
288
289  @Test (expected=IOException.class)
290  public void testWALEntryFilterUpdateValidation() throws Exception {
291    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
292        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
293    //test that we can create mutliple WALFilters reflectively
294    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
295        "IAmNotARealWalEntryFilter");
296    admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc);
297  }
298
299
300  @Test
301  public void testMetricsSourceBaseSourcePassthrough(){
302    /*
303    The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
304    and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
305    Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
306    allows for custom JMX metrics.
307    This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
308    the two layers of wrapping to the actual BaseSource.
309    */
310    String id = "id";
311    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
312    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
313    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
314    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
315    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
316
317    MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
318    MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
319    MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
320    String gaugeName = "gauge";
321    String singleGaugeName = "source.id." + gaugeName;
322    String globalGaugeName = "source." + gaugeName;
323    long delta = 1;
324    String counterName = "counter";
325    String singleCounterName = "source.id." + counterName;
326    String globalCounterName = "source." + counterName;
327    long count = 2;
328    source.decGauge(gaugeName, delta);
329    source.getMetricsContext();
330    source.getMetricsDescription();
331    source.getMetricsJmxContext();
332    source.getMetricsName();
333    source.incCounters(counterName, count);
334    source.incGauge(gaugeName, delta);
335    source.init();
336    source.removeMetric(gaugeName);
337    source.setGauge(gaugeName, delta);
338    source.updateHistogram(counterName, count);
339
340    verify(singleRms).decGauge(singleGaugeName, delta);
341    verify(globalRms).decGauge(globalGaugeName, delta);
342    verify(globalRms).getMetricsContext();
343    verify(globalRms).getMetricsJmxContext();
344    verify(globalRms).getMetricsName();
345    verify(singleRms).incCounters(singleCounterName, count);
346    verify(globalRms).incCounters(globalCounterName, count);
347    verify(singleRms).incGauge(singleGaugeName, delta);
348    verify(globalRms).incGauge(globalGaugeName, delta);
349    verify(globalRms).init();
350    verify(singleRms).removeMetric(singleGaugeName);
351    verify(globalRms).removeMetric(globalGaugeName);
352    verify(singleRms).setGauge(singleGaugeName, delta);
353    verify(globalRms).setGauge(globalGaugeName, delta);
354    verify(singleRms).updateHistogram(singleCounterName, count);
355    verify(globalRms).updateHistogram(globalCounterName, count);
356  }
357
358  private void doPut(byte[] row) throws IOException {
359    try (Connection connection = ConnectionFactory.createConnection(conf1)) {
360      doPut(connection, row);
361    }
362  }
363
364  private void doPut(final Connection connection, final byte [] row) throws IOException {
365    try (Table t = connection.getTable(tableName)) {
366      Put put = new Put(row);
367      put.addColumn(famName, row, row);
368      t.put(put);
369    }
370  }
371
372  private static void doAssert(byte[] row) throws Exception {
373    if (ReplicationEndpointForTest.lastEntries == null) {
374      return; // first call
375    }
376    Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
377    List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
378    Assert.assertEquals(1, cells.size());
379    Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
380      cells.get(0).getRowLength(), row, 0, row.length));
381  }
382
383  public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
384    static UUID uuid = UUID.randomUUID();
385    static AtomicInteger contructedCount = new AtomicInteger();
386    static AtomicInteger startedCount = new AtomicInteger();
387    static AtomicInteger stoppedCount = new AtomicInteger();
388    static AtomicInteger replicateCount = new AtomicInteger();
389    static volatile List<Entry> lastEntries = null;
390
391    public ReplicationEndpointForTest() {
392      contructedCount.incrementAndGet();
393    }
394
395    @Override
396    public UUID getPeerUUID() {
397      return uuid;
398    }
399
400    @Override
401    public boolean replicate(ReplicateContext replicateContext) {
402      replicateCount.incrementAndGet();
403      lastEntries = new ArrayList<>(replicateContext.entries);
404      return true;
405    }
406
407    @Override
408    public void start() {
409      startAsync();
410    }
411
412    @Override
413    public void stop() {
414      stopAsync();
415    }
416
417    @Override
418    protected void doStart() {
419      startedCount.incrementAndGet();
420      notifyStarted();
421    }
422
423    @Override
424    protected void doStop() {
425      stoppedCount.incrementAndGet();
426      notifyStopped();
427    }
428  }
429
430  public static class InterClusterReplicationEndpointForTest
431      extends HBaseInterClusterReplicationEndpoint {
432
433    static AtomicInteger replicateCount = new AtomicInteger();
434    static boolean failedOnce;
435
436    @Override
437    public boolean replicate(ReplicateContext replicateContext) {
438      boolean success = super.replicate(replicateContext);
439      if (success) {
440        replicateCount.addAndGet(replicateContext.entries.size());
441      }
442      return success;
443    }
444
445    @Override
446    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
447      // Fail only once, we don't want to slow down the test.
448      if (failedOnce) {
449        return new DummyReplicator(entries, ordinal);
450      } else {
451        failedOnce = true;
452        return new FailingDummyReplicator(entries, ordinal);
453      }
454    }
455
456    protected class DummyReplicator extends Replicator {
457
458      private int ordinal;
459
460      public DummyReplicator(List<Entry> entries, int ordinal) {
461        super(entries, ordinal);
462        this.ordinal = ordinal;
463      }
464
465      @Override
466      public Integer call() throws IOException {
467        return ordinal;
468      }
469    }
470
471    protected class FailingDummyReplicator extends DummyReplicator {
472
473      public FailingDummyReplicator(List<Entry> entries, int ordinal) {
474        super(entries, ordinal);
475      }
476
477      @Override
478      public Integer call() throws IOException {
479        throw new IOException("Sample Exception: Failed to replicate.");
480      }
481    }
482  }
483
484  public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
485    static int COUNT = 10;
486    static AtomicReference<Exception> ex = new AtomicReference<>(null);
487    static AtomicBoolean replicated = new AtomicBoolean(false);
488    @Override
489    public boolean replicate(ReplicateContext replicateContext) {
490      try {
491        // check row
492        doAssert(row);
493      } catch (Exception e) {
494        ex.set(e);
495      }
496
497      super.replicate(replicateContext);
498      LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get());
499
500      replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
501      return replicated.get();
502    }
503  }
504
505  // return a WALEntry filter which only accepts "row", but not other rows
506  public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
507    static AtomicReference<Exception> ex = new AtomicReference<>(null);
508
509    @Override
510    public boolean replicate(ReplicateContext replicateContext) {
511      try {
512        super.replicate(replicateContext);
513        doAssert(row);
514      } catch (Exception e) {
515        ex.set(e);
516      }
517      return true;
518    }
519
520    @Override
521    public WALEntryFilter getWALEntryfilter() {
522      return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
523        @Override
524        public Entry filter(Entry entry) {
525          ArrayList<Cell> cells = entry.getEdit().getCells();
526          int size = cells.size();
527          for (int i = size-1; i >= 0; i--) {
528            Cell cell = cells.get(i);
529            if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
530              row, 0, row.length)) {
531              cells.remove(i);
532            }
533          }
534          return entry;
535        }
536      });
537    }
538  }
539
540  public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
541    private static boolean passedEntry = false;
542    @Override
543    public Entry filter(Entry entry) {
544      passedEntry = true;
545      return entry;
546    }
547
548    public static boolean hasPassedAnEntry(){
549      return passedEntry;
550    }
551  }
552
553  public static class EverythingPassesWALEntryFilterSubclass extends EverythingPassesWALEntryFilter {
554
555  }
556}