001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNull;
022import static org.junit.jupiter.api.Assertions.assertThrows;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.mockito.Mockito.doNothing;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.spy;
027import static org.mockito.Mockito.verify;
028import static org.mockito.Mockito.when;
029
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.TreeMap;
036import java.util.UUID;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.AtomicReference;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.Waiter;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.regionserver.HRegion;
052import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
053import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
054import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
055import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
056import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
057import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
058import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource;
059import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
063import org.apache.hadoop.hbase.util.Pair;
064import org.apache.hadoop.hbase.util.Threads;
065import org.apache.hadoop.hbase.wal.WAL.Entry;
066import org.apache.hadoop.hbase.wal.WALEdit;
067import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
068import org.apache.hadoop.hbase.wal.WALKeyImpl;
069import org.apache.hadoop.hbase.zookeeper.ZKConfig;
070import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
071import org.junit.jupiter.api.AfterAll;
072import org.junit.jupiter.api.BeforeEach;
073import org.junit.jupiter.api.Test;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077/**
078 * Tests ReplicationSource and ReplicationEndpoint interactions
079 */
080public class ReplicationEndpointTestBase extends TestReplicationBaseNoBeforeAll {
081
082  private static final Logger LOG = LoggerFactory.getLogger(ReplicationEndpointTestBase.class);
083
084  static int numRegionServers;
085
086  protected static void setUpBeforeClass() throws Exception {
087    configureClusters(UTIL1, UTIL2);
088    startClusters();
089    numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size();
090  }
091
092  @AfterAll
093  public static void assertStopped() {
094    // check stop is called
095    assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
096  }
097
098  @BeforeEach
099  public void setup() throws Exception {
100    ReplicationEndpointForTest.contructedCount.set(0);
101    ReplicationEndpointForTest.startedCount.set(0);
102    ReplicationEndpointForTest.replicateCount.set(0);
103    ReplicationEndpointReturningFalse.replicated.set(false);
104    ReplicationEndpointForTest.lastEntries = null;
105    final List<RegionServerThread> rsThreads = UTIL1.getMiniHBaseCluster().getRegionServerThreads();
106    for (RegionServerThread rs : rsThreads) {
107      UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
108    }
109    // Wait for all log roll to finish
110    UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
111      @Override
112      public boolean evaluate() throws Exception {
113        for (RegionServerThread rs : rsThreads) {
114          if (!rs.getRegionServer().walRollRequestFinished()) {
115            return false;
116          }
117        }
118        return true;
119      }
120
121      @Override
122      public String explainFailure() throws Exception {
123        List<String> logRollInProgressRsList = new ArrayList<>();
124        for (RegionServerThread rs : rsThreads) {
125          if (!rs.getRegionServer().walRollRequestFinished()) {
126            logRollInProgressRsList.add(rs.getRegionServer().toString());
127          }
128        }
129        return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
130      }
131    });
132  }
133
134  @Test
135  public void testCustomReplicationEndpoint() throws Exception {
136    // test installing a custom replication endpoint other than the default one.
137    hbaseAdmin.addReplicationPeer("testCustomReplicationEndpoint",
138      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
139        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()).build());
140
141    // check whether the class has been constructed and started
142    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
143      @Override
144      public boolean evaluate() throws Exception {
145        return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
146      }
147    });
148
149    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
150      @Override
151      public boolean evaluate() throws Exception {
152        return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
153      }
154    });
155
156    assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
157
158    // now replicate some data.
159    doPut(Bytes.toBytes("row42"));
160
161    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
162      @Override
163      public boolean evaluate() throws Exception {
164        return ReplicationEndpointForTest.replicateCount.get() >= 1;
165      }
166    });
167
168    doAssert(Bytes.toBytes("row42"));
169
170    hbaseAdmin.removeReplicationPeer("testCustomReplicationEndpoint");
171  }
172
173  @Test
174  public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
175    assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
176    assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
177    int peerCount = hbaseAdmin.listReplicationPeers().size();
178    final String id = "testReplicationEndpointReturnsFalseOnReplicate";
179    hbaseAdmin.addReplicationPeer(id,
180      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
181        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()).build());
182    // This test is flakey and then there is so much stuff flying around in here its, hard to
183    // debug. Peer needs to be up for the edit to make it across. This wait on
184    // peer count seems to be a hack that has us not progress till peer is up.
185    if (hbaseAdmin.listReplicationPeers().size() <= peerCount) {
186      LOG.info("Waiting on peercount to go up from " + peerCount);
187      Threads.sleep(100);
188    }
189    // now replicate some data
190    doPut(row);
191
192    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
193      @Override
194      public boolean evaluate() throws Exception {
195        // Looks like replication endpoint returns false unless we put more than 10 edits. We
196        // only send over one edit.
197        int count = ReplicationEndpointForTest.replicateCount.get();
198        LOG.info("count=" + count);
199        return ReplicationEndpointReturningFalse.replicated.get();
200      }
201    });
202    if (ReplicationEndpointReturningFalse.ex.get() != null) {
203      throw ReplicationEndpointReturningFalse.ex.get();
204    }
205
206    hbaseAdmin.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate");
207  }
208
209  @Test
210  public void testInterClusterReplication() throws Exception {
211    final String id = "testInterClusterReplication";
212
213    List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName);
214    int totEdits = 0;
215
216    // Make sure edits are spread across regions because we do region based batching
217    // before shipping edits.
218    for (HRegion region : regions) {
219      RegionInfo hri = region.getRegionInfo();
220      byte[] row = hri.getStartKey();
221      for (int i = 0; i < 100; i++) {
222        if (row.length > 0) {
223          Put put = new Put(row);
224          put.addColumn(famName, row, row);
225          region.put(put);
226          totEdits++;
227        }
228      }
229    }
230
231    hbaseAdmin.addReplicationPeer(id,
232      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2))
233        .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName())
234        .build());
235
236    final int numEdits = totEdits;
237    Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() {
238      @Override
239      public boolean evaluate() throws Exception {
240        return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
241      }
242
243      @Override
244      public String explainFailure() throws Exception {
245        String failure = "Failed to replicate all edits, expected = " + numEdits + " replicated = "
246          + InterClusterReplicationEndpointForTest.replicateCount.get();
247        return failure;
248      }
249    });
250
251    hbaseAdmin.removeReplicationPeer("testInterClusterReplication");
252    UTIL1.deleteTableData(tableName);
253  }
254
255  @Test
256  public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
257    ReplicationPeerConfig rpc =
258      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
259        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName())
260        // test that we can create mutliple WALFilters reflectively
261        .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
262          EverythingPassesWALEntryFilter.class.getName() + ","
263            + EverythingPassesWALEntryFilterSubclass.class.getName())
264        .build();
265
266    hbaseAdmin.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
267    // now replicate some data.
268    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
269      doPut(connection, Bytes.toBytes("row1"));
270      doPut(connection, row);
271      doPut(connection, Bytes.toBytes("row2"));
272    }
273
274    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
275      @Override
276      public boolean evaluate() throws Exception {
277        return ReplicationEndpointForTest.replicateCount.get() >= 1;
278      }
279    });
280
281    assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
282    // make sure our reflectively created filter is in the filter chain
283    assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
284    hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint");
285  }
286
287  @Test
288  public void testWALEntryFilterAddValidation() throws Exception {
289    ReplicationPeerConfig rpc =
290      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
291        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName())
292        // test that we can create mutliple WALFilters reflectively
293        .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
294          "IAmNotARealWalEntryFilter")
295        .build();
296    assertThrows(IOException.class,
297      () -> hbaseAdmin.addReplicationPeer("testWALEntryFilterAddValidation", rpc));
298  }
299
300  @Test
301  public void testWALEntryFilterUpdateValidation() throws Exception {
302    ReplicationPeerConfig rpc =
303      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
304        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName())
305        // test that we can create mutliple WALFilters reflectively
306        .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
307          "IAmNotARealWalEntryFilter")
308        .build();
309    assertThrows(IOException.class,
310      () -> hbaseAdmin.updateReplicationPeerConfig("testWALEntryFilterUpdateValidation", rpc));
311  }
312
313  @Test
314  public void testMetricsSourceBaseSourcePassThrough() {
315    /*
316     * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl,
317     * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource, so that
318     * metrics get written to both namespaces. Both of those classes wrap a
319     * MetricsReplicationSourceImpl that implements BaseSource, which allows for custom JMX metrics.
320     * This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls
321     * down through the two layers of wrapping to the actual BaseSource.
322     */
323    String id = "id";
324    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
325    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
326    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
327    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
328    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
329
330    MetricsReplicationSourceSource singleSourceSource =
331      new MetricsReplicationSourceSourceImpl(singleRms, id);
332    MetricsReplicationGlobalSourceSource globalSourceSource =
333      new MetricsReplicationGlobalSourceSourceImpl(globalRms);
334    MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource);
335    doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
336
337    Map<String, MetricsReplicationTableSource> singleSourceSourceByTable = new HashMap<>();
338    MetricsSource source =
339      new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
340
341    String gaugeName = "gauge";
342    String singleGaugeName = "source.id." + gaugeName;
343    String globalGaugeName = "source." + gaugeName;
344    long delta = 1;
345    String counterName = "counter";
346    String singleCounterName = "source.id." + counterName;
347    String globalCounterName = "source." + counterName;
348    long count = 2;
349    source.decGauge(gaugeName, delta);
350    source.getMetricsContext();
351    source.getMetricsDescription();
352    source.getMetricsJmxContext();
353    source.getMetricsName();
354    source.incCounters(counterName, count);
355    source.incGauge(gaugeName, delta);
356    source.init();
357    source.removeMetric(gaugeName);
358    source.setGauge(gaugeName, delta);
359    source.updateHistogram(counterName, count);
360    source.incrFailedRecoveryQueue();
361
362    verify(singleRms).decGauge(singleGaugeName, delta);
363    verify(globalRms).decGauge(globalGaugeName, delta);
364    verify(globalRms).getMetricsContext();
365    verify(globalRms).getMetricsJmxContext();
366    verify(globalRms).getMetricsName();
367    verify(singleRms).incCounters(singleCounterName, count);
368    verify(globalRms).incCounters(globalCounterName, count);
369    verify(singleRms).incGauge(singleGaugeName, delta);
370    verify(globalRms).incGauge(globalGaugeName, delta);
371    verify(globalRms).init();
372    verify(singleRms).removeMetric(singleGaugeName);
373    verify(globalRms).removeMetric(globalGaugeName);
374    verify(singleRms).setGauge(singleGaugeName, delta);
375    verify(globalRms).setGauge(globalGaugeName, delta);
376    verify(singleRms).updateHistogram(singleCounterName, count);
377    verify(globalRms).updateHistogram(globalCounterName, count);
378    verify(spyglobalSourceSource).incrFailedRecoveryQueue();
379
380    // check singleSourceSourceByTable metrics.
381    // singleSourceSourceByTable map entry will be created only
382    // after calling #setAgeOfLastShippedOpByTable
383    boolean containsRandomNewTable =
384      source.getSingleSourceSourceByTable().containsKey("RandomNewTable");
385    assertEquals(false, containsRandomNewTable);
386    source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
387    containsRandomNewTable = source.getSingleSourceSourceByTable().containsKey("RandomNewTable");
388    assertEquals(true, containsRandomNewTable);
389    MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable().get("RandomNewTable");
390
391    // age should be greater than zero we created the entry with time in the past
392    assertTrue(msr.getLastShippedAge() > 0);
393    assertTrue(msr.getShippedBytes() > 0);
394
395  }
396
397  private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
398    List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
399    byte[] a = new byte[] { 'a' };
400    Entry entry = createEntry(tableName, null, a);
401    walEntriesWithSize.add(new Pair<>(entry, 10L));
402    return walEntriesWithSize;
403  }
404
405  private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) {
406    WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName),
407      EnvironmentEdgeManager.currentTime() - 1L, scopes);
408    WALEdit edit1 = new WALEdit();
409
410    for (byte[] kv : kvs) {
411      WALEditInternalHelper.addExtendedCell(edit1, new KeyValue(kv, kv, kv));
412    }
413    return new Entry(key1, edit1);
414  }
415
416  private void doPut(byte[] row) throws IOException {
417    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
418      doPut(connection, row);
419    }
420  }
421
422  private void doPut(final Connection connection, final byte[] row) throws IOException {
423    try (Table t = connection.getTable(tableName)) {
424      Put put = new Put(row);
425      put.addColumn(famName, row, row);
426      t.put(put);
427    }
428  }
429
430  private static void doAssert(byte[] row) throws Exception {
431    if (ReplicationEndpointForTest.lastEntries == null) {
432      return; // first call
433    }
434    assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
435    List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
436    assertEquals(1, cells.size());
437    assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
438      cells.get(0).getRowLength(), row, 0, row.length));
439  }
440
441  public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
442    static UUID uuid = HBaseTestingUtil.getRandomUUID();
443    static AtomicInteger contructedCount = new AtomicInteger();
444    static AtomicInteger startedCount = new AtomicInteger();
445    static AtomicInteger stoppedCount = new AtomicInteger();
446    static AtomicInteger replicateCount = new AtomicInteger();
447    static volatile List<Entry> lastEntries = null;
448
449    public ReplicationEndpointForTest() {
450      replicateCount.set(0);
451      contructedCount.incrementAndGet();
452    }
453
454    @Override
455    public UUID getPeerUUID() {
456      return uuid;
457    }
458
459    @Override
460    public boolean replicate(ReplicateContext replicateContext) {
461      replicateCount.incrementAndGet();
462      lastEntries = new ArrayList<>(replicateContext.entries);
463      return true;
464    }
465
466    @Override
467    public void start() {
468      startAsync();
469    }
470
471    @Override
472    public void stop() {
473      stopAsync();
474    }
475
476    @Override
477    protected void doStart() {
478      startedCount.incrementAndGet();
479      notifyStarted();
480    }
481
482    @Override
483    protected void doStop() {
484      stoppedCount.incrementAndGet();
485      notifyStopped();
486    }
487
488    @Override
489    public boolean canReplicateToSameCluster() {
490      return true;
491    }
492  }
493
494  /**
495   * Not used by unit tests, helpful for manual testing with replication.
496   * <p>
497   * Snippet for `hbase shell`:
498   *
499   * <pre>
500   * create 't', 'f'
501   * add_peer '1', ENDPOINT_CLASSNAME =&gt; 'org.apache.hadoop.hbase.replication.' + \
502   *    'ReplicationEndpointTestBase$SleepingReplicationEndpointForTest'
503   * alter 't', {NAME=&gt;'f', REPLICATION_SCOPE=&gt;1}
504   * </pre>
505   */
506  public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest {
507    private long duration;
508
509    public SleepingReplicationEndpointForTest() {
510      super();
511    }
512
513    @Override
514    public void init(Context context) throws IOException {
515      super.init(context);
516      if (this.ctx != null) {
517        duration = this.ctx.getConfiguration()
518          .getLong("hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
519      }
520    }
521
522    @Override
523    public boolean replicate(ReplicateContext context) {
524      try {
525        Thread.sleep(duration);
526      } catch (InterruptedException e) {
527        Thread.currentThread().interrupt();
528        return false;
529      }
530      return super.replicate(context);
531    }
532  }
533
534  public static class InterClusterReplicationEndpointForTest
535    extends HBaseInterClusterReplicationEndpoint {
536
537    static AtomicInteger replicateCount = new AtomicInteger();
538    static boolean failedOnce;
539
540    public InterClusterReplicationEndpointForTest() {
541      replicateCount.set(0);
542    }
543
544    @Override
545    public boolean replicate(ReplicateContext replicateContext) {
546      boolean success = super.replicate(replicateContext);
547      if (success) {
548        replicateCount.addAndGet(replicateContext.entries.size());
549      }
550      return success;
551    }
552
553    @Override
554    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
555      int timeout) {
556      // Fail only once, we don't want to slow down the test.
557      if (failedOnce) {
558        return CompletableFuture.completedFuture(ordinal);
559      } else {
560        failedOnce = true;
561        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
562        future.completeExceptionally(new IOException("Sample Exception: Failed to replicate."));
563        return future;
564      }
565    }
566  }
567
568  public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
569    static int COUNT = 10;
570    static AtomicReference<Exception> ex = new AtomicReference<>(null);
571    static AtomicBoolean replicated = new AtomicBoolean(false);
572
573    @Override
574    public boolean replicate(ReplicateContext replicateContext) {
575      try {
576        // check row
577        doAssert(row);
578      } catch (Exception e) {
579        ex.set(e);
580      }
581
582      super.replicate(replicateContext);
583      LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get());
584
585      replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
586      return replicated.get();
587    }
588  }
589
590  // return a WALEntry filter which only accepts "row", but not other rows
591  public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
592    static AtomicReference<Exception> ex = new AtomicReference<>(null);
593
594    @Override
595    public boolean replicate(ReplicateContext replicateContext) {
596      try {
597        super.replicate(replicateContext);
598        doAssert(row);
599      } catch (Exception e) {
600        ex.set(e);
601      }
602      return true;
603    }
604
605    @Override
606    public WALEntryFilter getWALEntryfilter() {
607      return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
608        @Override
609        public Entry filter(Entry entry) {
610          ArrayList<Cell> cells = entry.getEdit().getCells();
611          int size = cells.size();
612          for (int i = size - 1; i >= 0; i--) {
613            Cell cell = cells.get(i);
614            if (
615              !Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), row, 0,
616                row.length)
617            ) {
618              cells.remove(i);
619            }
620          }
621          return entry;
622        }
623      });
624    }
625  }
626
627  public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
628    private static boolean passedEntry = false;
629
630    @Override
631    public Entry filter(Entry entry) {
632      passedEntry = true;
633      return entry;
634    }
635
636    public static boolean hasPassedAnEntry() {
637      return passedEntry;
638    }
639  }
640
641  public static class EverythingPassesWALEntryFilterSubclass
642    extends EverythingPassesWALEntryFilter {
643  }
644}