001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.mockito.Mockito.doNothing;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.spy;
023import static org.mockito.Mockito.verify;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.TreeMap;
032import java.util.UUID;
033import java.util.concurrent.Callable;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicReference;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.Waiter;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
049import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
050import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
051import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
052import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
053import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
054import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource;
055import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.testclassification.ReplicationTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.hbase.util.Threads;
063import org.apache.hadoop.hbase.wal.WAL.Entry;
064import org.apache.hadoop.hbase.wal.WALEdit;
065import org.apache.hadoop.hbase.wal.WALKeyImpl;
066import org.apache.hadoop.hbase.zookeeper.ZKConfig;
067import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
068import org.junit.AfterClass;
069import org.junit.Assert;
070import org.junit.Before;
071import org.junit.BeforeClass;
072import org.junit.ClassRule;
073import org.junit.Test;
074import org.junit.experimental.categories.Category;
075import org.slf4j.Logger;
076import org.slf4j.LoggerFactory;
077
078/**
079 * Tests ReplicationSource and ReplicationEndpoint interactions
080 */
081@Category({ ReplicationTests.class, MediumTests.class })
082public class TestReplicationEndpoint extends TestReplicationBase {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086      HBaseClassTestRule.forClass(TestReplicationEndpoint.class);
087
088  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class);
089
090  static int numRegionServers;
091
092  @BeforeClass
093  public static void setUpBeforeClass() throws Exception {
094    TestReplicationBase.setUpBeforeClass();
095    numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size();
096  }
097
098  @AfterClass
099  public static void tearDownAfterClass() throws Exception {
100    TestReplicationBase.tearDownAfterClass();
101    // check stop is called
102    Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
103  }
104
105  @Before
106  public void setup() throws Exception {
107    ReplicationEndpointForTest.contructedCount.set(0);
108    ReplicationEndpointForTest.startedCount.set(0);
109    ReplicationEndpointForTest.replicateCount.set(0);
110    ReplicationEndpointReturningFalse.replicated.set(false);
111    ReplicationEndpointForTest.lastEntries = null;
112    final List<RegionServerThread> rsThreads =
113        UTIL1.getMiniHBaseCluster().getRegionServerThreads();
114    for (RegionServerThread rs : rsThreads) {
115      UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
116    }
117    // Wait for  all log roll to finish
118    UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
119      @Override
120      public boolean evaluate() throws Exception {
121        for (RegionServerThread rs : rsThreads) {
122          if (!rs.getRegionServer().walRollRequestFinished()) {
123            return false;
124          }
125        }
126        return true;
127      }
128
129      @Override
130      public String explainFailure() throws Exception {
131        List<String> logRollInProgressRsList = new ArrayList<>();
132        for (RegionServerThread rs : rsThreads) {
133          if (!rs.getRegionServer().walRollRequestFinished()) {
134            logRollInProgressRsList.add(rs.getRegionServer().toString());
135          }
136        }
137        return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
138      }
139    });
140  }
141
142  @Test
143  public void testCustomReplicationEndpoint() throws Exception {
144    // test installing a custom replication endpoint other than the default one.
145    hbaseAdmin.addReplicationPeer("testCustomReplicationEndpoint",
146      ReplicationPeerConfig.newBuilder()
147        .setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
148        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName())
149        .build());
150
151    // check whether the class has been constructed and started
152    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
153      @Override
154      public boolean evaluate() throws Exception {
155        return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
156      }
157    });
158
159    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
160      @Override
161      public boolean evaluate() throws Exception {
162        return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
163      }
164    });
165
166    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
167
168    // now replicate some data.
169    doPut(Bytes.toBytes("row42"));
170
171    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
172      @Override
173      public boolean evaluate() throws Exception {
174        return ReplicationEndpointForTest.replicateCount.get() >= 1;
175      }
176    });
177
178    doAssert(Bytes.toBytes("row42"));
179
180    hbaseAdmin.removeReplicationPeer("testCustomReplicationEndpoint");
181  }
182
183  @Test
184  public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
185    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
186    Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
187    int peerCount = hbaseAdmin.listReplicationPeers().size();
188    final String id = "testReplicationEndpointReturnsFalseOnReplicate";
189    hbaseAdmin.addReplicationPeer(id,
190      ReplicationPeerConfig.newBuilder()
191        .setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
192        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName())
193        .build());
194    // This test is flakey and then there is so much stuff flying around in here its, hard to
195    // debug.  Peer needs to be up for the edit to make it across. This wait on
196    // peer count seems to be a hack that has us not progress till peer is up.
197    if (hbaseAdmin.listReplicationPeers().size() <= peerCount) {
198      LOG.info("Waiting on peercount to go up from " + peerCount);
199      Threads.sleep(100);
200    }
201    // now replicate some data
202    doPut(row);
203
204    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
205      @Override
206      public boolean evaluate() throws Exception {
207        // Looks like replication endpoint returns false unless we put more than 10 edits. We
208        // only send over one edit.
209        int count = ReplicationEndpointForTest.replicateCount.get();
210        LOG.info("count=" + count);
211        return ReplicationEndpointReturningFalse.replicated.get();
212      }
213    });
214    if (ReplicationEndpointReturningFalse.ex.get() != null) {
215      throw ReplicationEndpointReturningFalse.ex.get();
216    }
217
218    hbaseAdmin.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate");
219  }
220
221  @Test
222  public void testInterClusterReplication() throws Exception {
223    final String id = "testInterClusterReplication";
224
225    List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName);
226    int totEdits = 0;
227
228    // Make sure edits are spread across regions because we do region based batching
229    // before shipping edits.
230    for(HRegion region: regions) {
231      RegionInfo hri = region.getRegionInfo();
232      byte[] row = hri.getStartKey();
233      for (int i = 0; i < 100; i++) {
234        if (row.length > 0) {
235          Put put = new Put(row);
236          put.addColumn(famName, row, row);
237          region.put(put);
238          totEdits++;
239        }
240      }
241    }
242
243    hbaseAdmin.addReplicationPeer(id,
244      ReplicationPeerConfig.newBuilder()
245        .setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2))
246        .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName())
247        .build());
248
249    final int numEdits = totEdits;
250    Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() {
251      @Override
252      public boolean evaluate() throws Exception {
253        return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
254      }
255
256      @Override
257      public String explainFailure() throws Exception {
258        String failure = "Failed to replicate all edits, expected = " + numEdits
259            + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
260        return failure;
261      }
262    });
263
264    hbaseAdmin.removeReplicationPeer("testInterClusterReplication");
265    UTIL1.deleteTableData(tableName);
266  }
267
268  @Test
269  public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
270    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
271      .setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
272      .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName())
273      // test that we can create mutliple WALFilters reflectively
274      .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
275        EverythingPassesWALEntryFilter.class.getName() + "," +
276          EverythingPassesWALEntryFilterSubclass.class.getName())
277      .build();
278
279    hbaseAdmin.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
280    // now replicate some data.
281    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
282      doPut(connection, Bytes.toBytes("row1"));
283      doPut(connection, row);
284      doPut(connection, Bytes.toBytes("row2"));
285    }
286
287    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
288      @Override
289      public boolean evaluate() throws Exception {
290        return ReplicationEndpointForTest.replicateCount.get() >= 1;
291      }
292    });
293
294    Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
295    //make sure our reflectively created filter is in the filter chain
296    Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
297    hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint");
298  }
299
300  @Test(expected = IOException.class)
301  public void testWALEntryFilterAddValidation() throws Exception {
302    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
303      .setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
304      .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName())
305      // test that we can create mutliple WALFilters reflectively
306      .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
307        "IAmNotARealWalEntryFilter")
308      .build();
309    hbaseAdmin.addReplicationPeer("testWALEntryFilterAddValidation", rpc);
310  }
311
312  @Test(expected = IOException.class)
313  public void testWALEntryFilterUpdateValidation() throws Exception {
314    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
315      .setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
316      .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName())
317      // test that we can create mutliple WALFilters reflectively
318      .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
319        "IAmNotARealWalEntryFilter")
320      .build();
321    hbaseAdmin.updateReplicationPeerConfig("testWALEntryFilterUpdateValidation", rpc);
322  }
323
324  @Test
325  public void testMetricsSourceBaseSourcePassThrough() {
326    /*
327     * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl,
328     * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource,
329     * so that metrics get written to both namespaces. Both of those classes wrap a
330     * MetricsReplicationSourceImpl that implements BaseSource, which allows
331     * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
332     * MetricsSource actually calls down through the two layers of wrapping to the actual
333     * BaseSource.
334     */
335    String id = "id";
336    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
337    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
338    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
339    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
340    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
341
342    MetricsReplicationSourceSource singleSourceSource =
343      new MetricsReplicationSourceSourceImpl(singleRms, id);
344    MetricsReplicationGlobalSourceSource globalSourceSource =
345      new MetricsReplicationGlobalSourceSourceImpl(globalRms);
346    MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource);
347    doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
348
349    Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
350      new HashMap<>();
351    MetricsSource source = new MetricsSource(id, singleSourceSource,
352      spyglobalSourceSource, singleSourceSourceByTable);
353
354
355    String gaugeName = "gauge";
356    String singleGaugeName = "source.id." + gaugeName;
357    String globalGaugeName = "source." + gaugeName;
358    long delta = 1;
359    String counterName = "counter";
360    String singleCounterName = "source.id." + counterName;
361    String globalCounterName = "source." + counterName;
362    long count = 2;
363    source.decGauge(gaugeName, delta);
364    source.getMetricsContext();
365    source.getMetricsDescription();
366    source.getMetricsJmxContext();
367    source.getMetricsName();
368    source.incCounters(counterName, count);
369    source.incGauge(gaugeName, delta);
370    source.init();
371    source.removeMetric(gaugeName);
372    source.setGauge(gaugeName, delta);
373    source.updateHistogram(counterName, count);
374    source.incrFailedRecoveryQueue();
375
376
377    verify(singleRms).decGauge(singleGaugeName, delta);
378    verify(globalRms).decGauge(globalGaugeName, delta);
379    verify(globalRms).getMetricsContext();
380    verify(globalRms).getMetricsJmxContext();
381    verify(globalRms).getMetricsName();
382    verify(singleRms).incCounters(singleCounterName, count);
383    verify(globalRms).incCounters(globalCounterName, count);
384    verify(singleRms).incGauge(singleGaugeName, delta);
385    verify(globalRms).incGauge(globalGaugeName, delta);
386    verify(globalRms).init();
387    verify(singleRms).removeMetric(singleGaugeName);
388    verify(globalRms).removeMetric(globalGaugeName);
389    verify(singleRms).setGauge(singleGaugeName, delta);
390    verify(globalRms).setGauge(globalGaugeName, delta);
391    verify(singleRms).updateHistogram(singleCounterName, count);
392    verify(globalRms).updateHistogram(globalCounterName, count);
393    verify(spyglobalSourceSource).incrFailedRecoveryQueue();
394
395    //check singleSourceSourceByTable metrics.
396    // singleSourceSourceByTable map entry will be created only
397    // after calling #setAgeOfLastShippedOpByTable
398    boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
399        .containsKey("RandomNewTable");
400    Assert.assertEquals(false, containsRandomNewTable);
401    source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
402    containsRandomNewTable = source.getSingleSourceSourceByTable()
403        .containsKey("RandomNewTable");
404    Assert.assertEquals(true, containsRandomNewTable);
405    MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable()
406        .get("RandomNewTable");
407
408    // age should be greater than zero we created the entry with time in the past
409    Assert.assertTrue(msr.getLastShippedAge() > 0);
410    Assert.assertTrue(msr.getShippedBytes() > 0);
411
412  }
413
414  private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
415    List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
416    byte[] a = new byte[] { 'a' };
417    Entry entry = createEntry(tableName, null, a);
418    walEntriesWithSize.add(new Pair<>(entry, 10L));
419    return walEntriesWithSize;
420  }
421
422  private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) {
423    WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName),
424      EnvironmentEdgeManager.currentTime() - 1L,
425        scopes);
426    WALEdit edit1 = new WALEdit();
427
428    for (byte[] kv : kvs) {
429      edit1.add(new KeyValue(kv, kv, kv));
430    }
431    return new Entry(key1, edit1);
432  }
433
434  private void doPut(byte[] row) throws IOException {
435    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
436      doPut(connection, row);
437    }
438  }
439
440  private void doPut(final Connection connection, final byte [] row) throws IOException {
441    try (Table t = connection.getTable(tableName)) {
442      Put put = new Put(row);
443      put.addColumn(famName, row, row);
444      t.put(put);
445    }
446  }
447
448  private static void doAssert(byte[] row) throws Exception {
449    if (ReplicationEndpointForTest.lastEntries == null) {
450      return; // first call
451    }
452    Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
453    List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
454    Assert.assertEquals(1, cells.size());
455    Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
456      cells.get(0).getRowLength(), row, 0, row.length));
457  }
458
459  public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
460    static UUID uuid = UTIL1.getRandomUUID();
461    static AtomicInteger contructedCount = new AtomicInteger();
462    static AtomicInteger startedCount = new AtomicInteger();
463    static AtomicInteger stoppedCount = new AtomicInteger();
464    static AtomicInteger replicateCount = new AtomicInteger();
465    static volatile List<Entry> lastEntries = null;
466
467    public ReplicationEndpointForTest() {
468      replicateCount.set(0);
469      contructedCount.incrementAndGet();
470    }
471
472    @Override
473    public UUID getPeerUUID() {
474      return uuid;
475    }
476
477    @Override
478    public boolean replicate(ReplicateContext replicateContext) {
479      replicateCount.incrementAndGet();
480      lastEntries = new ArrayList<>(replicateContext.entries);
481      return true;
482    }
483
484    @Override
485    public void start() {
486      startAsync();
487    }
488
489    @Override
490    public void stop() {
491      stopAsync();
492    }
493
494    @Override
495    protected void doStart() {
496      startedCount.incrementAndGet();
497      notifyStarted();
498    }
499
500    @Override
501    protected void doStop() {
502      stoppedCount.incrementAndGet();
503      notifyStopped();
504    }
505
506    @Override
507    public boolean canReplicateToSameCluster() {
508      return true;
509    }
510  }
511
512  /**
513   * Not used by unit tests, helpful for manual testing with replication.
514   * <p>
515   * Snippet for `hbase shell`:
516   * <pre>
517   * create 't', 'f'
518   * add_peer '1', ENDPOINT_CLASSNAME =&gt; 'org.apache.hadoop.hbase.replication.' + \
519   *    'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
520   * alter 't', {NAME=&gt;'f', REPLICATION_SCOPE=&gt;1}
521   * </pre>
522   */
523  public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest {
524    private long duration;
525    public SleepingReplicationEndpointForTest() {
526      super();
527    }
528
529    @Override
530    public void init(Context context) throws IOException {
531      super.init(context);
532      if (this.ctx != null) {
533        duration = this.ctx.getConfiguration().getLong(
534            "hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
535      }
536    }
537
538    @Override
539    public boolean replicate(ReplicateContext context) {
540      try {
541        Thread.sleep(duration);
542      } catch (InterruptedException e) {
543        Thread.currentThread().interrupt();
544        return false;
545      }
546      return super.replicate(context);
547    }
548  }
549
550  public static class InterClusterReplicationEndpointForTest
551      extends HBaseInterClusterReplicationEndpoint {
552
553    static AtomicInteger replicateCount = new AtomicInteger();
554    static boolean failedOnce;
555
556    public InterClusterReplicationEndpointForTest() {
557      replicateCount.set(0);
558    }
559
560    @Override
561    public boolean replicate(ReplicateContext replicateContext) {
562      boolean success = super.replicate(replicateContext);
563      if (success) {
564        replicateCount.addAndGet(replicateContext.entries.size());
565      }
566      return success;
567    }
568
569    @Override
570    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
571      // Fail only once, we don't want to slow down the test.
572      if (failedOnce) {
573        return () -> ordinal;
574      } else {
575        failedOnce = true;
576        return () -> {
577          throw new IOException("Sample Exception: Failed to replicate.");
578        };
579      }
580    }
581  }
582
583  public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
584    static int COUNT = 10;
585    static AtomicReference<Exception> ex = new AtomicReference<>(null);
586    static AtomicBoolean replicated = new AtomicBoolean(false);
587    @Override
588    public boolean replicate(ReplicateContext replicateContext) {
589      try {
590        // check row
591        doAssert(row);
592      } catch (Exception e) {
593        ex.set(e);
594      }
595
596      super.replicate(replicateContext);
597      LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get());
598
599      replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
600      return replicated.get();
601    }
602  }
603
604  // return a WALEntry filter which only accepts "row", but not other rows
605  public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
606    static AtomicReference<Exception> ex = new AtomicReference<>(null);
607
608    @Override
609    public boolean replicate(ReplicateContext replicateContext) {
610      try {
611        super.replicate(replicateContext);
612        doAssert(row);
613      } catch (Exception e) {
614        ex.set(e);
615      }
616      return true;
617    }
618
619    @Override
620    public WALEntryFilter getWALEntryfilter() {
621      return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
622        @Override
623        public Entry filter(Entry entry) {
624          ArrayList<Cell> cells = entry.getEdit().getCells();
625          int size = cells.size();
626          for (int i = size-1; i >= 0; i--) {
627            Cell cell = cells.get(i);
628            if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
629              row, 0, row.length)) {
630              cells.remove(i);
631            }
632          }
633          return entry;
634        }
635      });
636    }
637  }
638
639  public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
640    private static boolean passedEntry = false;
641    @Override
642    public Entry filter(Entry entry) {
643      passedEntry = true;
644      return entry;
645    }
646
647    public static boolean hasPassedAnEntry(){
648      return passedEntry;
649    }
650  }
651
652  public static class EverythingPassesWALEntryFilterSubclass
653      extends EverythingPassesWALEntryFilter {
654  }
655}