002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
020import static org.mockito.Mockito.mock;
021import static org.mockito.Mockito.verify;
022import static org.mockito.Mockito.when;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.UUID;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.concurrent.atomic.AtomicReference;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.Waiter;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.regionserver.HRegion;
040import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
041import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
042import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
043import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
044import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
045import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.testclassification.ReplicationTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
050import org.apache.hadoop.hbase.util.Threads;
051import org.apache.hadoop.hbase.wal.WAL.Entry;
052import org.apache.hadoop.hbase.zookeeper.ZKConfig;
053import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
054import org.junit.AfterClass;
055import org.junit.Assert;
056import org.junit.Before;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
065 * Tests ReplicationSource and ReplicationEndpoint interactions
066 */
067@Category({ReplicationTests.class, MediumTests.class})
068public class TestReplicationEndpoint extends TestReplicationBase {
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072      HBaseClassTestRule.forClass(TestReplicationEndpoint.class);
074  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class);
076  static int numRegionServers;
078  @BeforeClass
079  public static void setUpBeforeClass() throws Exception {
080    TestReplicationBase.setUpBeforeClass();
081    admin.removePeer("2");
082    numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
083  }
085  @AfterClass
086  public static void tearDownAfterClass() throws Exception {
087    TestReplicationBase.tearDownAfterClass();
088    // check stop is called
089    Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
090  }
092  @Before
093  public void setup() throws Exception {
094    ReplicationEndpointForTest.contructedCount.set(0);
095    ReplicationEndpointForTest.startedCount.set(0);
096    ReplicationEndpointForTest.replicateCount.set(0);
097    ReplicationEndpointReturningFalse.replicated.set(false);
098    ReplicationEndpointForTest.lastEntries = null;
099    final List<RegionServerThread> rsThreads =
100        utility1.getMiniHBaseCluster().getRegionServerThreads();
101    for (RegionServerThread rs : rsThreads) {
102      utility1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
103    }
104    // Wait for  all log roll to finish
105    utility1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
106      @Override
107      public boolean evaluate() throws Exception {
108        for (RegionServerThread rs : rsThreads) {
109          if (!rs.getRegionServer().walRollRequestFinished()) {
110            return false;
111          }
112        }
113        return true;
114      }
116      @Override
117      public String explainFailure() throws Exception {
118        List<String> logRollInProgressRsList = new ArrayList<>();
119        for (RegionServerThread rs : rsThreads) {
120          if (!rs.getRegionServer().walRollRequestFinished()) {
121            logRollInProgressRsList.add(rs.getRegionServer().toString());
122          }
123        }
124        return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
125      }
126    });
127  }
129  @Test
130  public void testCustomReplicationEndpoint() throws Exception {
131    // test installing a custom replication endpoint other than the default one.
132    admin.addPeer("testCustomReplicationEndpoint",
133        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
134            .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
136    // check whether the class has been constructed and started
137    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
138      @Override
139      public boolean evaluate() throws Exception {
140        return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
141      }
142    });
144    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
145      @Override
146      public boolean evaluate() throws Exception {
147        return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
148      }
149    });
151    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
153    // now replicate some data.
154    doPut(Bytes.toBytes("row42"));
156    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
157      @Override
158      public boolean evaluate() throws Exception {
159        return ReplicationEndpointForTest.replicateCount.get() >= 1;
160      }
161    });
163    doAssert(Bytes.toBytes("row42"));
165    admin.removePeer("testCustomReplicationEndpoint");
166  }
168  @Test
169  public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
170    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
171    Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
172    int peerCount = admin.getPeersCount();
173    final String id = "testReplicationEndpointReturnsFalseOnReplicate";
174    admin.addPeer(id,
175      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
176        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
177    // This test is flakey and then there is so much stuff flying around in here its, hard to
178    // debug.  Peer needs to be up for the edit to make it across. This wait on
179    // peer count seems to be a hack that has us not progress till peer is up.
180    if (admin.getPeersCount() <= peerCount) {
181      LOG.info("Waiting on peercount to go up from " + peerCount);
182      Threads.sleep(100);
183    }
184    // now replicate some data
185    doPut(row);
187    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
188      @Override
189      public boolean evaluate() throws Exception {
190        // Looks like replication endpoint returns false unless we put more than 10 edits. We
191        // only send over one edit.
192        int count = ReplicationEndpointForTest.replicateCount.get();
193        LOG.info("count=" + count);
194        return ReplicationEndpointReturningFalse.replicated.get();
195      }
196    });
197    if (ReplicationEndpointReturningFalse.ex.get() != null) {
198      throw ReplicationEndpointReturningFalse.ex.get();
199    }
201    admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
202  }
204  @Test
205  public void testInterClusterReplication() throws Exception {
206    final String id = "testInterClusterReplication";
208    List<HRegion> regions = utility1.getHBaseCluster().getRegions(tableName);
209    int totEdits = 0;
211    // Make sure edits are spread across regions because we do region based batching
212    // before shipping edits.
213    for(HRegion region: regions) {
214      RegionInfo hri = region.getRegionInfo();
215      byte[] row = hri.getStartKey();
216      for (int i = 0; i < 100; i++) {
217        if (row.length > 0) {
218          Put put = new Put(row);
219          put.addColumn(famName, row, row);
220          region.put(put);
221          totEdits++;
222        }
223      }
224    }
226    admin.addPeer(id,
227        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2))
228            .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
229        null);
231    final int numEdits = totEdits;
232    Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate<Exception>() {
233      @Override
234      public boolean evaluate() throws Exception {
235        return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
236      }
238      @Override
239      public String explainFailure() throws Exception {
240        String failure = "Failed to replicate all edits, expected = " + numEdits
241            + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
242        return failure;
243      }
244    });
246    admin.removePeer("testInterClusterReplication");
247    utility1.deleteTableData(tableName);
248  }
250  @Test
251  public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
252    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
253        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
254    //test that we can create mutliple WALFilters reflectively
255    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
256        EverythingPassesWALEntryFilter.class.getName() +
257            "," + EverythingPassesWALEntryFilterSubclass.class.getName());
258    admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
259    // now replicate some data.
260    try (Connection connection = ConnectionFactory.createConnection(conf1)) {
261      doPut(connection, Bytes.toBytes("row1"));
262      doPut(connection, row);
263      doPut(connection, Bytes.toBytes("row2"));
264    }
266    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
267      @Override
268      public boolean evaluate() throws Exception {
269        return ReplicationEndpointForTest.replicateCount.get() >= 1;
270      }
271    });
273    Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
274    //make sure our reflectively created filter is in the filter chain
275    Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
276    admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
277  }
279  @Test (expected=IOException.class)
280  public void testWALEntryFilterAddValidation() throws Exception {
281    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
282        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
283    //test that we can create mutliple WALFilters reflectively
284    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
285        "IAmNotARealWalEntryFilter");
286    admin.addPeer("testWALEntryFilterAddValidation", rpc);
287  }
289  @Test (expected=IOException.class)
290  public void testWALEntryFilterUpdateValidation() throws Exception {
291    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
292        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
293    //test that we can create mutliple WALFilters reflectively
294    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
295        "IAmNotARealWalEntryFilter");
296    admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc);
297  }
300  @Test
301  public void testMetricsSourceBaseSourcePassthrough(){
302    /*
303    The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
304    and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
305    Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
306    allows for custom JMX metrics.
307    This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
308    the two layers of wrapping to the actual BaseSource.
309    */
310    String id = "id";
311    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
312    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
313    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
314    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
315    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
317    MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
318    MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
319    MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
320    String gaugeName = "gauge";
321    String singleGaugeName = "source.id." + gaugeName;
322    String globalGaugeName = "source." + gaugeName;
323    long delta = 1;
324    String counterName = "counter";
325    String singleCounterName = "source.id." + counterName;
326    String globalCounterName = "source." + counterName;
327    long count = 2;
328    source.decGauge(gaugeName, delta);
329    source.getMetricsContext();
330    source.getMetricsDescription();
331    source.getMetricsJmxContext();
332    source.getMetricsName();
333    source.incCounters(counterName, count);
334    source.incGauge(gaugeName, delta);
335    source.init();
336    source.removeMetric(gaugeName);
337    source.setGauge(gaugeName, delta);
338    source.updateHistogram(counterName, count);
340    verify(singleRms).decGauge(singleGaugeName, delta);
341    verify(globalRms).decGauge(globalGaugeName, delta);
342    verify(globalRms).getMetricsContext();
343    verify(globalRms).getMetricsJmxContext();
344    verify(globalRms).getMetricsName();
345    verify(singleRms).incCounters(singleCounterName, count);
346    verify(globalRms).incCounters(globalCounterName, count);
347    verify(singleRms).incGauge(singleGaugeName, delta);
348    verify(globalRms).incGauge(globalGaugeName, delta);
349    verify(globalRms).init();
350    verify(singleRms).removeMetric(singleGaugeName);
351    verify(globalRms).removeMetric(globalGaugeName);
352    verify(singleRms).setGauge(singleGaugeName, delta);
353    verify(globalRms).setGauge(globalGaugeName, delta);
354    verify(singleRms).updateHistogram(singleCounterName, count);
355    verify(globalRms).updateHistogram(globalCounterName, count);
356  }
358  private void doPut(byte[] row) throws IOException {
359    try (Connection connection = ConnectionFactory.createConnection(conf1)) {
360      doPut(connection, row);
361    }
362  }
364  private void doPut(final Connection connection, final byte [] row) throws IOException {
365    try (Table t = connection.getTable(tableName)) {
366      Put put = new Put(row);
367      put.addColumn(famName, row, row);
368      t.put(put);
369    }
370  }
372  private static void doAssert(byte[] row) throws Exception {
373    if (ReplicationEndpointForTest.lastEntries == null) {
374      return; // first call
375    }
376    Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
377    List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
378    Assert.assertEquals(1, cells.size());
379    Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
380      cells.get(0).getRowLength(), row, 0, row.length));
381  }
383  public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
384    static UUID uuid = UUID.randomUUID();
385    static AtomicInteger contructedCount = new AtomicInteger();
386    static AtomicInteger startedCount = new AtomicInteger();
387    static AtomicInteger stoppedCount = new AtomicInteger();
388    static AtomicInteger replicateCount = new AtomicInteger();
389    static volatile List<Entry> lastEntries = null;
391    public ReplicationEndpointForTest() {
392      contructedCount.incrementAndGet();
393    }
395    @Override
396    public UUID getPeerUUID() {
397      return uuid;
398    }
400    @Override
401    public boolean replicate(ReplicateContext replicateContext) {
402      replicateCount.incrementAndGet();
403      lastEntries = new ArrayList<>(replicateContext.entries);
404      return true;
405    }
407    @Override
408    public void start() {
409      startAsync();
410    }
412    @Override
413    public void stop() {
414      stopAsync();
415    }
417    @Override
418    protected void doStart() {
419      startedCount.incrementAndGet();
420      notifyStarted();
421    }
423    @Override
424    protected void doStop() {
425      stoppedCount.incrementAndGet();
426      notifyStopped();
427    }
428  }
430  public static class InterClusterReplicationEndpointForTest
431      extends HBaseInterClusterReplicationEndpoint {
433    static AtomicInteger replicateCount = new AtomicInteger();
434    static boolean failedOnce;
436    @Override
437    public boolean replicate(ReplicateContext replicateContext) {
438      boolean success = super.replicate(replicateContext);
439      if (success) {
440        replicateCount.addAndGet(replicateContext.entries.size());
441      }
442      return success;
443    }
445    @Override
446    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
447      // Fail only once, we don't want to slow down the test.
448      if (failedOnce) {
449        return new DummyReplicator(entries, ordinal);
450      } else {
451        failedOnce = true;
452        return new FailingDummyReplicator(entries, ordinal);
453      }
454    }
456    protected class DummyReplicator extends Replicator {
458      private int ordinal;
460      public DummyReplicator(List<Entry> entries, int ordinal) {
461        super(entries, ordinal);
462        this.ordinal = ordinal;
463      }
465      @Override
466      public Integer call() throws IOException {
467        return ordinal;
468      }
469    }
471    protected class FailingDummyReplicator extends DummyReplicator {
473      public FailingDummyReplicator(List<Entry> entries, int ordinal) {
474        super(entries, ordinal);
475      }
477      @Override
478      public Integer call() throws IOException {
479        throw new IOException("Sample Exception: Failed to replicate.");
480      }
481    }
482  }
484  public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
485    static int COUNT = 10;
486    static AtomicReference<Exception> ex = new AtomicReference<>(null);
487    static AtomicBoolean replicated = new AtomicBoolean(false);
488    @Override
489    public boolean replicate(ReplicateContext replicateContext) {
490      try {
491        // check row
492        doAssert(row);
493      } catch (Exception e) {
494        ex.set(e);
495      }
497      super.replicate(replicateContext);
498      LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get());
500      replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
501      return replicated.get();
502    }
503  }
505  // return a WALEntry filter which only accepts "row", but not other rows
506  public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
507    static AtomicReference<Exception> ex = new AtomicReference<>(null);
509    @Override
510    public boolean replicate(ReplicateContext replicateContext) {
511      try {
512        super.replicate(replicateContext);
513        doAssert(row);
514      } catch (Exception e) {
515        ex.set(e);
516      }
517      return true;
518    }
520    @Override
521    public WALEntryFilter getWALEntryfilter() {
522      return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
523        @Override
524        public Entry filter(Entry entry) {
525          ArrayList<Cell> cells = entry.getEdit().getCells();
526          int size = cells.size();
527          for (int i = size-1; i >= 0; i--) {
528            Cell cell = cells.get(i);
529            if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
530              row, 0, row.length)) {
531              cells.remove(i);
532            }
533          }
534          return entry;
535        }
536      });
537    }
538  }
540  public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
541    private static boolean passedEntry = false;
542    @Override
543    public Entry filter(Entry entry) {
544      passedEntry = true;
545      return entry;
546    }
548    public static boolean hasPassedAnEntry(){
549      return passedEntry;
550    }
551  }
553  public static class EverythingPassesWALEntryFilterSubclass extends EverythingPassesWALEntryFilter {
555  }