001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.mockito.Mockito.doNothing;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.spy;
023import static org.mockito.Mockito.verify;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.TreeMap;
032import java.util.UUID;
033import java.util.concurrent.Callable;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicReference;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.Waiter;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
049import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
050import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
051import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
052import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
053import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
054import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource;
055import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.testclassification.ReplicationTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.hbase.util.Threads;
063import org.apache.hadoop.hbase.wal.WAL.Entry;
064import org.apache.hadoop.hbase.wal.WALEdit;
065import org.apache.hadoop.hbase.wal.WALKeyImpl;
066import org.apache.hadoop.hbase.zookeeper.ZKConfig;
067import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
068import org.junit.AfterClass;
069import org.junit.Assert;
070import org.junit.Before;
071import org.junit.BeforeClass;
072import org.junit.ClassRule;
073import org.junit.Test;
074import org.junit.experimental.categories.Category;
075import org.slf4j.Logger;
076import org.slf4j.LoggerFactory;
077
078/**
079 * Tests ReplicationSource and ReplicationEndpoint interactions
080 */
081@Category({ ReplicationTests.class, MediumTests.class })
082public class TestReplicationEndpoint extends TestReplicationBase {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086    HBaseClassTestRule.forClass(TestReplicationEndpoint.class);
087
088  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class);
089
090  static int numRegionServers;
091
092  @BeforeClass
093  public static void setUpBeforeClass() throws Exception {
094    TestReplicationBase.setUpBeforeClass();
095    numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size();
096  }
097
098  @AfterClass
099  public static void tearDownAfterClass() throws Exception {
100    TestReplicationBase.tearDownAfterClass();
101    // check stop is called
102    Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
103  }
104
105  @Before
106  public void setup() throws Exception {
107    ReplicationEndpointForTest.contructedCount.set(0);
108    ReplicationEndpointForTest.startedCount.set(0);
109    ReplicationEndpointForTest.replicateCount.set(0);
110    ReplicationEndpointReturningFalse.replicated.set(false);
111    ReplicationEndpointForTest.lastEntries = null;
112    final List<RegionServerThread> rsThreads = UTIL1.getMiniHBaseCluster().getRegionServerThreads();
113    for (RegionServerThread rs : rsThreads) {
114      UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
115    }
116    // Wait for all log roll to finish
117    UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
118      @Override
119      public boolean evaluate() throws Exception {
120        for (RegionServerThread rs : rsThreads) {
121          if (!rs.getRegionServer().walRollRequestFinished()) {
122            return false;
123          }
124        }
125        return true;
126      }
127
128      @Override
129      public String explainFailure() throws Exception {
130        List<String> logRollInProgressRsList = new ArrayList<>();
131        for (RegionServerThread rs : rsThreads) {
132          if (!rs.getRegionServer().walRollRequestFinished()) {
133            logRollInProgressRsList.add(rs.getRegionServer().toString());
134          }
135        }
136        return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
137      }
138    });
139  }
140
141  @Test
142  public void testCustomReplicationEndpoint() throws Exception {
143    // test installing a custom replication endpoint other than the default one.
144    hbaseAdmin.addReplicationPeer("testCustomReplicationEndpoint",
145      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
146        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()).build());
147
148    // check whether the class has been constructed and started
149    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
150      @Override
151      public boolean evaluate() throws Exception {
152        return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
153      }
154    });
155
156    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
157      @Override
158      public boolean evaluate() throws Exception {
159        return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
160      }
161    });
162
163    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
164
165    // now replicate some data.
166    doPut(Bytes.toBytes("row42"));
167
168    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
169      @Override
170      public boolean evaluate() throws Exception {
171        return ReplicationEndpointForTest.replicateCount.get() >= 1;
172      }
173    });
174
175    doAssert(Bytes.toBytes("row42"));
176
177    hbaseAdmin.removeReplicationPeer("testCustomReplicationEndpoint");
178  }
179
180  @Test
181  public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
182    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
183    Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
184    int peerCount = hbaseAdmin.listReplicationPeers().size();
185    final String id = "testReplicationEndpointReturnsFalseOnReplicate";
186    hbaseAdmin.addReplicationPeer(id,
187      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
188        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()).build());
189    // This test is flakey and then there is so much stuff flying around in here its, hard to
190    // debug. Peer needs to be up for the edit to make it across. This wait on
191    // peer count seems to be a hack that has us not progress till peer is up.
192    if (hbaseAdmin.listReplicationPeers().size() <= peerCount) {
193      LOG.info("Waiting on peercount to go up from " + peerCount);
194      Threads.sleep(100);
195    }
196    // now replicate some data
197    doPut(row);
198
199    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
200      @Override
201      public boolean evaluate() throws Exception {
202        // Looks like replication endpoint returns false unless we put more than 10 edits. We
203        // only send over one edit.
204        int count = ReplicationEndpointForTest.replicateCount.get();
205        LOG.info("count=" + count);
206        return ReplicationEndpointReturningFalse.replicated.get();
207      }
208    });
209    if (ReplicationEndpointReturningFalse.ex.get() != null) {
210      throw ReplicationEndpointReturningFalse.ex.get();
211    }
212
213    hbaseAdmin.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate");
214  }
215
216  @Test
217  public void testInterClusterReplication() throws Exception {
218    final String id = "testInterClusterReplication";
219
220    List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName);
221    int totEdits = 0;
222
223    // Make sure edits are spread across regions because we do region based batching
224    // before shipping edits.
225    for (HRegion region : regions) {
226      RegionInfo hri = region.getRegionInfo();
227      byte[] row = hri.getStartKey();
228      for (int i = 0; i < 100; i++) {
229        if (row.length > 0) {
230          Put put = new Put(row);
231          put.addColumn(famName, row, row);
232          region.put(put);
233          totEdits++;
234        }
235      }
236    }
237
238    hbaseAdmin.addReplicationPeer(id,
239      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2))
240        .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName())
241        .build());
242
243    final int numEdits = totEdits;
244    Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() {
245      @Override
246      public boolean evaluate() throws Exception {
247        return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
248      }
249
250      @Override
251      public String explainFailure() throws Exception {
252        String failure = "Failed to replicate all edits, expected = " + numEdits + " replicated = "
253          + InterClusterReplicationEndpointForTest.replicateCount.get();
254        return failure;
255      }
256    });
257
258    hbaseAdmin.removeReplicationPeer("testInterClusterReplication");
259    UTIL1.deleteTableData(tableName);
260  }
261
262  @Test
263  public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
264    ReplicationPeerConfig rpc =
265      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
266        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName())
267        // test that we can create mutliple WALFilters reflectively
268        .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
269          EverythingPassesWALEntryFilter.class.getName() + ","
270            + EverythingPassesWALEntryFilterSubclass.class.getName())
271        .build();
272
273    hbaseAdmin.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
274    // now replicate some data.
275    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
276      doPut(connection, Bytes.toBytes("row1"));
277      doPut(connection, row);
278      doPut(connection, Bytes.toBytes("row2"));
279    }
280
281    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
282      @Override
283      public boolean evaluate() throws Exception {
284        return ReplicationEndpointForTest.replicateCount.get() >= 1;
285      }
286    });
287
288    Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
289    // make sure our reflectively created filter is in the filter chain
290    Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
291    hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint");
292  }
293
294  @Test(expected = IOException.class)
295  public void testWALEntryFilterAddValidation() throws Exception {
296    ReplicationPeerConfig rpc =
297      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
298        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName())
299        // test that we can create mutliple WALFilters reflectively
300        .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
301          "IAmNotARealWalEntryFilter")
302        .build();
303    hbaseAdmin.addReplicationPeer("testWALEntryFilterAddValidation", rpc);
304  }
305
306  @Test(expected = IOException.class)
307  public void testWALEntryFilterUpdateValidation() throws Exception {
308    ReplicationPeerConfig rpc =
309      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
310        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName())
311        // test that we can create mutliple WALFilters reflectively
312        .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
313          "IAmNotARealWalEntryFilter")
314        .build();
315    hbaseAdmin.updateReplicationPeerConfig("testWALEntryFilterUpdateValidation", rpc);
316  }
317
318  @Test
319  public void testMetricsSourceBaseSourcePassThrough() {
320    /*
321     * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl,
322     * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource, so that
323     * metrics get written to both namespaces. Both of those classes wrap a
324     * MetricsReplicationSourceImpl that implements BaseSource, which allows for custom JMX metrics.
325     * This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls
326     * down through the two layers of wrapping to the actual BaseSource.
327     */
328    String id = "id";
329    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
330    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
331    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
332    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
333    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
334
335    MetricsReplicationSourceSource singleSourceSource =
336      new MetricsReplicationSourceSourceImpl(singleRms, id);
337    MetricsReplicationGlobalSourceSource globalSourceSource =
338      new MetricsReplicationGlobalSourceSourceImpl(globalRms);
339    MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource);
340    doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
341
342    Map<String, MetricsReplicationTableSource> singleSourceSourceByTable = new HashMap<>();
343    MetricsSource source =
344      new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
345
346    String gaugeName = "gauge";
347    String singleGaugeName = "source.id." + gaugeName;
348    String globalGaugeName = "source." + gaugeName;
349    long delta = 1;
350    String counterName = "counter";
351    String singleCounterName = "source.id." + counterName;
352    String globalCounterName = "source." + counterName;
353    long count = 2;
354    source.decGauge(gaugeName, delta);
355    source.getMetricsContext();
356    source.getMetricsDescription();
357    source.getMetricsJmxContext();
358    source.getMetricsName();
359    source.incCounters(counterName, count);
360    source.incGauge(gaugeName, delta);
361    source.init();
362    source.removeMetric(gaugeName);
363    source.setGauge(gaugeName, delta);
364    source.updateHistogram(counterName, count);
365    source.incrFailedRecoveryQueue();
366
367    verify(singleRms).decGauge(singleGaugeName, delta);
368    verify(globalRms).decGauge(globalGaugeName, delta);
369    verify(globalRms).getMetricsContext();
370    verify(globalRms).getMetricsJmxContext();
371    verify(globalRms).getMetricsName();
372    verify(singleRms).incCounters(singleCounterName, count);
373    verify(globalRms).incCounters(globalCounterName, count);
374    verify(singleRms).incGauge(singleGaugeName, delta);
375    verify(globalRms).incGauge(globalGaugeName, delta);
376    verify(globalRms).init();
377    verify(singleRms).removeMetric(singleGaugeName);
378    verify(globalRms).removeMetric(globalGaugeName);
379    verify(singleRms).setGauge(singleGaugeName, delta);
380    verify(globalRms).setGauge(globalGaugeName, delta);
381    verify(singleRms).updateHistogram(singleCounterName, count);
382    verify(globalRms).updateHistogram(globalCounterName, count);
383    verify(spyglobalSourceSource).incrFailedRecoveryQueue();
384
385    // check singleSourceSourceByTable metrics.
386    // singleSourceSourceByTable map entry will be created only
387    // after calling #setAgeOfLastShippedOpByTable
388    boolean containsRandomNewTable =
389      source.getSingleSourceSourceByTable().containsKey("RandomNewTable");
390    Assert.assertEquals(false, containsRandomNewTable);
391    source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
392    containsRandomNewTable = source.getSingleSourceSourceByTable().containsKey("RandomNewTable");
393    Assert.assertEquals(true, containsRandomNewTable);
394    MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable().get("RandomNewTable");
395
396    // age should be greater than zero we created the entry with time in the past
397    Assert.assertTrue(msr.getLastShippedAge() > 0);
398    Assert.assertTrue(msr.getShippedBytes() > 0);
399
400  }
401
402  private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
403    List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
404    byte[] a = new byte[] { 'a' };
405    Entry entry = createEntry(tableName, null, a);
406    walEntriesWithSize.add(new Pair<>(entry, 10L));
407    return walEntriesWithSize;
408  }
409
410  private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) {
411    WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName),
412      EnvironmentEdgeManager.currentTime() - 1L, scopes);
413    WALEdit edit1 = new WALEdit();
414
415    for (byte[] kv : kvs) {
416      edit1.add(new KeyValue(kv, kv, kv));
417    }
418    return new Entry(key1, edit1);
419  }
420
421  private void doPut(byte[] row) throws IOException {
422    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
423      doPut(connection, row);
424    }
425  }
426
427  private void doPut(final Connection connection, final byte[] row) throws IOException {
428    try (Table t = connection.getTable(tableName)) {
429      Put put = new Put(row);
430      put.addColumn(famName, row, row);
431      t.put(put);
432    }
433  }
434
435  private static void doAssert(byte[] row) throws Exception {
436    if (ReplicationEndpointForTest.lastEntries == null) {
437      return; // first call
438    }
439    Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
440    List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
441    Assert.assertEquals(1, cells.size());
442    Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
443      cells.get(0).getRowLength(), row, 0, row.length));
444  }
445
446  public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
447    static UUID uuid = UTIL1.getRandomUUID();
448    static AtomicInteger contructedCount = new AtomicInteger();
449    static AtomicInteger startedCount = new AtomicInteger();
450    static AtomicInteger stoppedCount = new AtomicInteger();
451    static AtomicInteger replicateCount = new AtomicInteger();
452    static volatile List<Entry> lastEntries = null;
453
454    public ReplicationEndpointForTest() {
455      replicateCount.set(0);
456      contructedCount.incrementAndGet();
457    }
458
459    @Override
460    public UUID getPeerUUID() {
461      return uuid;
462    }
463
464    @Override
465    public boolean replicate(ReplicateContext replicateContext) {
466      replicateCount.incrementAndGet();
467      lastEntries = new ArrayList<>(replicateContext.entries);
468      return true;
469    }
470
471    @Override
472    public void start() {
473      startAsync();
474    }
475
476    @Override
477    public void stop() {
478      stopAsync();
479    }
480
481    @Override
482    protected void doStart() {
483      startedCount.incrementAndGet();
484      notifyStarted();
485    }
486
487    @Override
488    protected void doStop() {
489      stoppedCount.incrementAndGet();
490      notifyStopped();
491    }
492
493    @Override
494    public boolean canReplicateToSameCluster() {
495      return true;
496    }
497  }
498
499  /**
500   * Not used by unit tests, helpful for manual testing with replication.
501   * <p>
502   * Snippet for `hbase shell`:
503   *
504   * <pre>
505   * create 't', 'f'
506   * add_peer '1', ENDPOINT_CLASSNAME =&gt; 'org.apache.hadoop.hbase.replication.' + \
507   *    'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
508   * alter 't', {NAME=&gt;'f', REPLICATION_SCOPE=&gt;1}
509   * </pre>
510   */
511  public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest {
512    private long duration;
513
514    public SleepingReplicationEndpointForTest() {
515      super();
516    }
517
518    @Override
519    public void init(Context context) throws IOException {
520      super.init(context);
521      if (this.ctx != null) {
522        duration = this.ctx.getConfiguration()
523          .getLong("hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
524      }
525    }
526
527    @Override
528    public boolean replicate(ReplicateContext context) {
529      try {
530        Thread.sleep(duration);
531      } catch (InterruptedException e) {
532        Thread.currentThread().interrupt();
533        return false;
534      }
535      return super.replicate(context);
536    }
537  }
538
539  public static class InterClusterReplicationEndpointForTest
540    extends HBaseInterClusterReplicationEndpoint {
541
542    static AtomicInteger replicateCount = new AtomicInteger();
543    static boolean failedOnce;
544
545    public InterClusterReplicationEndpointForTest() {
546      replicateCount.set(0);
547    }
548
549    @Override
550    public boolean replicate(ReplicateContext replicateContext) {
551      boolean success = super.replicate(replicateContext);
552      if (success) {
553        replicateCount.addAndGet(replicateContext.entries.size());
554      }
555      return success;
556    }
557
558    @Override
559    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
560      // Fail only once, we don't want to slow down the test.
561      if (failedOnce) {
562        return () -> ordinal;
563      } else {
564        failedOnce = true;
565        return () -> {
566          throw new IOException("Sample Exception: Failed to replicate.");
567        };
568      }
569    }
570  }
571
572  public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
573    static int COUNT = 10;
574    static AtomicReference<Exception> ex = new AtomicReference<>(null);
575    static AtomicBoolean replicated = new AtomicBoolean(false);
576
577    @Override
578    public boolean replicate(ReplicateContext replicateContext) {
579      try {
580        // check row
581        doAssert(row);
582      } catch (Exception e) {
583        ex.set(e);
584      }
585
586      super.replicate(replicateContext);
587      LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get());
588
589      replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
590      return replicated.get();
591    }
592  }
593
594  // return a WALEntry filter which only accepts "row", but not other rows
595  public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
596    static AtomicReference<Exception> ex = new AtomicReference<>(null);
597
598    @Override
599    public boolean replicate(ReplicateContext replicateContext) {
600      try {
601        super.replicate(replicateContext);
602        doAssert(row);
603      } catch (Exception e) {
604        ex.set(e);
605      }
606      return true;
607    }
608
609    @Override
610    public WALEntryFilter getWALEntryfilter() {
611      return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
612        @Override
613        public Entry filter(Entry entry) {
614          ArrayList<Cell> cells = entry.getEdit().getCells();
615          int size = cells.size();
616          for (int i = size - 1; i >= 0; i--) {
617            Cell cell = cells.get(i);
618            if (
619              !Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), row, 0,
620                row.length)
621            ) {
622              cells.remove(i);
623            }
624          }
625          return entry;
626        }
627      });
628    }
629  }
630
631  public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
632    private static boolean passedEntry = false;
633
634    @Override
635    public Entry filter(Entry entry) {
636      passedEntry = true;
637      return entry;
638    }
639
640    public static boolean hasPassedAnEntry() {
641      return passedEntry;
642    }
643  }
644
645  public static class EverythingPassesWALEntryFilterSubclass
646    extends EverythingPassesWALEntryFilter {
647  }
648}