001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.mockito.Mockito.doNothing;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.spy;
023import static org.mockito.Mockito.verify;
024import static org.mockito.Mockito.when;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.TreeMap;
031import java.util.UUID;
032import java.util.concurrent.Callable;
033import java.util.concurrent.atomic.AtomicBoolean;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.concurrent.atomic.AtomicReference;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.KeyValue;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.Waiter;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.regionserver.HRegion;
047import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
048import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
049import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
050import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
051import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
052import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
053import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource;
054import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.testclassification.ReplicationTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
059import org.apache.hadoop.hbase.util.Pair;
060import org.apache.hadoop.hbase.util.Threads;
061import org.apache.hadoop.hbase.wal.WAL.Entry;
062import org.apache.hadoop.hbase.wal.WALEdit;
063import org.apache.hadoop.hbase.wal.WALKeyImpl;
064import org.apache.hadoop.hbase.zookeeper.ZKConfig;
065import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
066import org.junit.AfterClass;
067import org.junit.Assert;
068import org.junit.Before;
069import org.junit.BeforeClass;
070import org.junit.ClassRule;
071import org.junit.Test;
072import org.junit.experimental.categories.Category;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076/**
077 * Tests ReplicationSource and ReplicationEndpoint interactions
078 */
079@Category({ ReplicationTests.class, MediumTests.class })
080public class TestReplicationEndpoint extends TestReplicationBase {
081
082  @ClassRule
083  public static final HBaseClassTestRule CLASS_RULE =
084      HBaseClassTestRule.forClass(TestReplicationEndpoint.class);
085
086  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class);
087
088  static int numRegionServers;
089
090  @BeforeClass
091  public static void setUpBeforeClass() throws Exception {
092    TestReplicationBase.setUpBeforeClass();
093    numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size();
094  }
095
096  @AfterClass
097  public static void tearDownAfterClass() throws Exception {
098    TestReplicationBase.tearDownAfterClass();
099    // check stop is called
100    Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
101  }
102
103  @Before
104  public void setup() throws Exception {
105    ReplicationEndpointForTest.contructedCount.set(0);
106    ReplicationEndpointForTest.startedCount.set(0);
107    ReplicationEndpointForTest.replicateCount.set(0);
108    ReplicationEndpointReturningFalse.replicated.set(false);
109    ReplicationEndpointForTest.lastEntries = null;
110    final List<RegionServerThread> rsThreads =
111        UTIL1.getMiniHBaseCluster().getRegionServerThreads();
112    for (RegionServerThread rs : rsThreads) {
113      UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
114    }
115    // Wait for  all log roll to finish
116    UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
117      @Override
118      public boolean evaluate() throws Exception {
119        for (RegionServerThread rs : rsThreads) {
120          if (!rs.getRegionServer().walRollRequestFinished()) {
121            return false;
122          }
123        }
124        return true;
125      }
126
127      @Override
128      public String explainFailure() throws Exception {
129        List<String> logRollInProgressRsList = new ArrayList<>();
130        for (RegionServerThread rs : rsThreads) {
131          if (!rs.getRegionServer().walRollRequestFinished()) {
132            logRollInProgressRsList.add(rs.getRegionServer().toString());
133          }
134        }
135        return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
136      }
137    });
138  }
139
140  @Test
141  public void testCustomReplicationEndpoint() throws Exception {
142    // test installing a custom replication endpoint other than the default one.
143    admin.addPeer("testCustomReplicationEndpoint",
144        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
145            .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
146
147    // check whether the class has been constructed and started
148    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
149      @Override
150      public boolean evaluate() throws Exception {
151        return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
152      }
153    });
154
155    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
156      @Override
157      public boolean evaluate() throws Exception {
158        return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
159      }
160    });
161
162    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
163
164    // now replicate some data.
165    doPut(Bytes.toBytes("row42"));
166
167    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
168      @Override
169      public boolean evaluate() throws Exception {
170        return ReplicationEndpointForTest.replicateCount.get() >= 1;
171      }
172    });
173
174    doAssert(Bytes.toBytes("row42"));
175
176    admin.removePeer("testCustomReplicationEndpoint");
177  }
178
179  @Test
180  public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
181    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
182    Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
183    int peerCount = admin.getPeersCount();
184    final String id = "testReplicationEndpointReturnsFalseOnReplicate";
185    admin.addPeer(id,
186      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
187        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
188    // This test is flakey and then there is so much stuff flying around in here its, hard to
189    // debug.  Peer needs to be up for the edit to make it across. This wait on
190    // peer count seems to be a hack that has us not progress till peer is up.
191    if (admin.getPeersCount() <= peerCount) {
192      LOG.info("Waiting on peercount to go up from " + peerCount);
193      Threads.sleep(100);
194    }
195    // now replicate some data
196    doPut(row);
197
198    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
199      @Override
200      public boolean evaluate() throws Exception {
201        // Looks like replication endpoint returns false unless we put more than 10 edits. We
202        // only send over one edit.
203        int count = ReplicationEndpointForTest.replicateCount.get();
204        LOG.info("count=" + count);
205        return ReplicationEndpointReturningFalse.replicated.get();
206      }
207    });
208    if (ReplicationEndpointReturningFalse.ex.get() != null) {
209      throw ReplicationEndpointReturningFalse.ex.get();
210    }
211
212    admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
213  }
214
215  @Test
216  public void testInterClusterReplication() throws Exception {
217    final String id = "testInterClusterReplication";
218
219    List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName);
220    // This trick of waiting on peer to show up is taken from test above.
221    int peerCount = admin.getPeersCount();
222    admin.addPeer(id,
223        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2))
224            .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
225        null);
226    // This test is flakey and then there is so much stuff flying around in here its, hard to
227    // debug.  Peer needs to be up for the edit to make it across. This wait on
228    // peer count seems to be a hack that has us not progress till peer is up.
229    if (admin.getPeersCount() <= peerCount) {
230      LOG.info("Waiting on peercount to go up from " + peerCount);
231      Threads.sleep(100);
232    }
233
234    int totEdits = 0;
235
236    // Make sure edits are spread across regions because we do region based batching
237    // before shipping edits.
238    for (HRegion region: regions) {
239      RegionInfo hri = region.getRegionInfo();
240      byte[] row = hri.getStartKey();
241      for (int i = 0; i < 100; i++) {
242        if (row.length > 0) {
243          Put put = new Put(row);
244          put.addColumn(famName, row, row);
245          region.put(put);
246          totEdits++;
247        }
248      }
249    }
250
251    final int numEdits = totEdits;
252    LOG.info("Waiting on replication of {}", numEdits);
253    Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() {
254      @Override
255      public boolean evaluate() throws Exception {
256        return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
257      }
258
259      @Override
260      public String explainFailure() throws Exception {
261        String failure = "Failed to replicate all edits, expected = " + numEdits
262            + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
263        return failure;
264      }
265    });
266
267    admin.removePeer("testInterClusterReplication");
268    UTIL1.deleteTableData(tableName);
269  }
270
271  @Test
272  public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
273    ReplicationPeerConfig rpc =
274      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
275        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
276    // test that we can create mutliple WALFilters reflectively
277    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
278      EverythingPassesWALEntryFilter.class.getName() + "," +
279        EverythingPassesWALEntryFilterSubclass.class.getName());
280    admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
281    // now replicate some data.
282    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
283      doPut(connection, Bytes.toBytes("row1"));
284      doPut(connection, row);
285      doPut(connection, Bytes.toBytes("row2"));
286    }
287
288    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
289      @Override
290      public boolean evaluate() throws Exception {
291        return ReplicationEndpointForTest.replicateCount.get() >= 1;
292      }
293    });
294
295    Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
296    //make sure our reflectively created filter is in the filter chain
297    Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
298    admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
299  }
300
301  @Test(expected = IOException.class)
302  public void testWALEntryFilterAddValidation() throws Exception {
303    ReplicationPeerConfig rpc =
304      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
305        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
306    // test that we can create mutliple WALFilters reflectively
307    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
308      "IAmNotARealWalEntryFilter");
309    admin.addPeer("testWALEntryFilterAddValidation", rpc);
310  }
311
312  @Test(expected = IOException.class)
313  public void testWALEntryFilterUpdateValidation() throws Exception {
314    ReplicationPeerConfig rpc =
315      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
316        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
317    // test that we can create mutliple WALFilters reflectively
318    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
319      "IAmNotARealWalEntryFilter");
320    admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc);
321  }
322
323  @Test
324  public void testMetricsSourceBaseSourcePassThrough() {
325    /*
326     * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl,
327     * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource,
328     * so that metrics get written to both namespaces. Both of those classes wrap a
329     * MetricsReplicationSourceImpl that implements BaseSource, which allows
330     * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
331     * MetricsSource actually calls down through the two layers of wrapping to the actual
332     * BaseSource.
333     */
334    String id = "id";
335    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
336    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
337    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
338    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
339    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
340
341    MetricsReplicationSourceSource singleSourceSource =
342      new MetricsReplicationSourceSourceImpl(singleRms, id);
343    MetricsReplicationGlobalSourceSource globalSourceSource =
344      new MetricsReplicationGlobalSourceSourceImpl(globalRms);
345    MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource);
346    doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
347
348    Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
349      new HashMap<>();
350    MetricsSource source = new MetricsSource(id, singleSourceSource,
351      spyglobalSourceSource, singleSourceSourceByTable);
352
353
354    String gaugeName = "gauge";
355    String singleGaugeName = "source.id." + gaugeName;
356    String globalGaugeName = "source." + gaugeName;
357    long delta = 1;
358    String counterName = "counter";
359    String singleCounterName = "source.id." + counterName;
360    String globalCounterName = "source." + counterName;
361    long count = 2;
362    source.decGauge(gaugeName, delta);
363    source.getMetricsContext();
364    source.getMetricsDescription();
365    source.getMetricsJmxContext();
366    source.getMetricsName();
367    source.incCounters(counterName, count);
368    source.incGauge(gaugeName, delta);
369    source.init();
370    source.removeMetric(gaugeName);
371    source.setGauge(gaugeName, delta);
372    source.updateHistogram(counterName, count);
373    source.incrFailedRecoveryQueue();
374
375
376    verify(singleRms).decGauge(singleGaugeName, delta);
377    verify(globalRms).decGauge(globalGaugeName, delta);
378    verify(globalRms).getMetricsContext();
379    verify(globalRms).getMetricsJmxContext();
380    verify(globalRms).getMetricsName();
381    verify(singleRms).incCounters(singleCounterName, count);
382    verify(globalRms).incCounters(globalCounterName, count);
383    verify(singleRms).incGauge(singleGaugeName, delta);
384    verify(globalRms).incGauge(globalGaugeName, delta);
385    verify(globalRms).init();
386    verify(singleRms).removeMetric(singleGaugeName);
387    verify(globalRms).removeMetric(globalGaugeName);
388    verify(singleRms).setGauge(singleGaugeName, delta);
389    verify(globalRms).setGauge(globalGaugeName, delta);
390    verify(singleRms).updateHistogram(singleCounterName, count);
391    verify(globalRms).updateHistogram(globalCounterName, count);
392    verify(spyglobalSourceSource).incrFailedRecoveryQueue();
393
394    //check singleSourceSourceByTable metrics.
395    // singleSourceSourceByTable map entry will be created only
396    // after calling #setAgeOfLastShippedOpByTable
397    boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
398        .containsKey("RandomNewTable");
399    Assert.assertEquals(false, containsRandomNewTable);
400    source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
401    containsRandomNewTable = source.getSingleSourceSourceByTable()
402        .containsKey("RandomNewTable");
403    Assert.assertEquals(true, containsRandomNewTable);
404    MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable()
405        .get("RandomNewTable");
406
407    // age should be greater than zero we created the entry with time in the past
408    Assert.assertTrue(msr.getLastShippedAge() > 0);
409    Assert.assertTrue(msr.getShippedBytes() > 0);
410
411  }
412
413  private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
414    List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
415    byte[] a = new byte[] { 'a' };
416    Entry entry = createEntry(tableName, null, a);
417    walEntriesWithSize.add(new Pair<>(entry, 10L));
418    return walEntriesWithSize;
419  }
420
421  private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) {
422    WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName),
423        System.currentTimeMillis() - 1L,
424        scopes);
425    WALEdit edit1 = new WALEdit();
426
427    for (byte[] kv : kvs) {
428      edit1.add(new KeyValue(kv, kv, kv));
429    }
430    return new Entry(key1, edit1);
431  }
432
433  private void doPut(byte[] row) throws IOException {
434    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
435      doPut(connection, row);
436    }
437  }
438
439  private void doPut(final Connection connection, final byte [] row) throws IOException {
440    try (Table t = connection.getTable(tableName)) {
441      Put put = new Put(row);
442      put.addColumn(famName, row, row);
443      t.put(put);
444    }
445  }
446
447  private static void doAssert(byte[] row) throws Exception {
448    if (ReplicationEndpointForTest.lastEntries == null) {
449      return; // first call
450    }
451    Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
452    List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
453    Assert.assertEquals(1, cells.size());
454    Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
455      cells.get(0).getRowLength(), row, 0, row.length));
456  }
457
458  public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
459    static UUID uuid = UTIL1.getRandomUUID();
460    static AtomicInteger contructedCount = new AtomicInteger();
461    static AtomicInteger startedCount = new AtomicInteger();
462    static AtomicInteger stoppedCount = new AtomicInteger();
463    static AtomicInteger replicateCount = new AtomicInteger();
464    static volatile List<Entry> lastEntries = null;
465
466    public ReplicationEndpointForTest() {
467      replicateCount.set(0);
468      contructedCount.incrementAndGet();
469    }
470
471    @Override
472    public UUID getPeerUUID() {
473      return uuid;
474    }
475
476    @Override
477    public boolean replicate(ReplicateContext replicateContext) {
478      replicateCount.incrementAndGet();
479      lastEntries = new ArrayList<>(replicateContext.entries);
480      return true;
481    }
482
483    @Override
484    public void start() {
485      startAsync();
486    }
487
488    @Override
489    public void stop() {
490      stopAsync();
491    }
492
493    @Override
494    protected void doStart() {
495      startedCount.incrementAndGet();
496      notifyStarted();
497    }
498
499    @Override
500    protected void doStop() {
501      stoppedCount.incrementAndGet();
502      notifyStopped();
503    }
504
505    @Override
506    public boolean canReplicateToSameCluster() {
507      return true;
508    }
509  }
510
511  /**
512   * Not used by unit tests, helpful for manual testing with replication.
513   * <p>
514   * Snippet for `hbase shell`:
515   * <pre>
516   * create 't', 'f'
517   * add_peer '1', ENDPOINT_CLASSNAME =&gt; 'org.apache.hadoop.hbase.replication.' + \
518   *    'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
519   * alter 't', {NAME=&gt;'f', REPLICATION_SCOPE=&gt;1}
520   * </pre>
521   */
522  public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest {
523    private long duration;
524    public SleepingReplicationEndpointForTest() {
525      super();
526    }
527
528    @Override
529    public void init(Context context) throws IOException {
530      super.init(context);
531      if (this.ctx != null) {
532        duration = this.ctx.getConfiguration().getLong(
533            "hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
534      }
535    }
536
537    @Override
538    public boolean replicate(ReplicateContext context) {
539      try {
540        Thread.sleep(duration);
541      } catch (InterruptedException e) {
542        Thread.currentThread().interrupt();
543        return false;
544      }
545      return super.replicate(context);
546    }
547  }
548
549  public static class InterClusterReplicationEndpointForTest
550      extends HBaseInterClusterReplicationEndpoint {
551
552    static AtomicInteger replicateCount = new AtomicInteger();
553    static boolean failedOnce;
554
555    public InterClusterReplicationEndpointForTest() {
556      replicateCount.set(0);
557    }
558
559    @Override
560    public boolean replicate(ReplicateContext replicateContext) {
561      boolean success = super.replicate(replicateContext);
562      if (success) {
563        replicateCount.addAndGet(replicateContext.entries.size());
564      }
565      return success;
566    }
567
568    @Override
569    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
570      // Fail only once, we don't want to slow down the test.
571      if (failedOnce) {
572        return () -> ordinal;
573      } else {
574        failedOnce = true;
575        return () -> {
576          throw new IOException("Sample Exception: Failed to replicate.");
577        };
578      }
579    }
580  }
581
582  public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
583    static int COUNT = 10;
584    static AtomicReference<Exception> ex = new AtomicReference<>(null);
585    static AtomicBoolean replicated = new AtomicBoolean(false);
586    @Override
587    public boolean replicate(ReplicateContext replicateContext) {
588      try {
589        // check row
590        doAssert(row);
591      } catch (Exception e) {
592        ex.set(e);
593      }
594
595      super.replicate(replicateContext);
596      LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get());
597
598      replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
599      return replicated.get();
600    }
601  }
602
603  // return a WALEntry filter which only accepts "row", but not other rows
604  public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
605    static AtomicReference<Exception> ex = new AtomicReference<>(null);
606
607    @Override
608    public boolean replicate(ReplicateContext replicateContext) {
609      try {
610        super.replicate(replicateContext);
611        doAssert(row);
612      } catch (Exception e) {
613        ex.set(e);
614      }
615      return true;
616    }
617
618    @Override
619    public WALEntryFilter getWALEntryfilter() {
620      return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
621        @Override
622        public Entry filter(Entry entry) {
623          ArrayList<Cell> cells = entry.getEdit().getCells();
624          int size = cells.size();
625          for (int i = size-1; i >= 0; i--) {
626            Cell cell = cells.get(i);
627            if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
628              row, 0, row.length)) {
629              cells.remove(i);
630            }
631          }
632          return entry;
633        }
634      });
635    }
636  }
637
638  public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
639    private static boolean passedEntry = false;
640    @Override
641    public Entry filter(Entry entry) {
642      passedEntry = true;
643      return entry;
644    }
645
646    public static boolean hasPassedAnEntry() {
647      return passedEntry;
648    }
649  }
650
651  public static class EverythingPassesWALEntryFilterSubclass
652      extends EverythingPassesWALEntryFilter {
653  }
654}