001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.net.SocketAddress;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.atomic.AtomicInteger;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellBuilderType;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.ExtendedCellBuilder;
034import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
035import org.apache.hadoop.hbase.ExtendedCellScanner;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
040import org.apache.hadoop.hbase.client.AsyncClusterConnection;
041import org.apache.hadoop.hbase.client.AsyncConnection;
042import org.apache.hadoop.hbase.client.AsyncTable;
043import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
044import org.apache.hadoop.hbase.client.ConnectionRegistry;
045import org.apache.hadoop.hbase.client.DoNothingConnectionRegistry;
046import org.apache.hadoop.hbase.client.DummyAsyncClusterConnection;
047import org.apache.hadoop.hbase.client.DummyAsyncTable;
048import org.apache.hadoop.hbase.client.Row;
049import org.apache.hadoop.hbase.security.User;
050import org.apache.hadoop.hbase.testclassification.ReplicationTests;
051import org.apache.hadoop.hbase.testclassification.SmallTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.junit.jupiter.api.Tag;
054import org.junit.jupiter.api.Test;
055import org.junit.jupiter.api.TestInfo;
056
057import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
058
059import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
060
061/**
062 * Simple test of sink-side wal entry filter facility.
063 */
064@Tag(ReplicationTests.TAG)
065@Tag(SmallTests.TAG)
066public class TestWALEntrySinkFilter {
067
068  static final int BOUNDARY = 5;
069  static final AtomicInteger UNFILTERED = new AtomicInteger();
070  static final AtomicInteger FILTERED = new AtomicInteger();
071
072  /**
073   * Test filter. Filter will filter out any write time that is <= 5 (BOUNDARY). We count how many
074   * items we filter out and we count how many cells make it through for distribution way down below
075   * in the Table#batch implementation. Puts in place a custom DevNullConnection so we can insert
076   * our counting Table.
077   */
078  @Test
079  public void testWALEntryFilter(TestInfo testInfo) throws IOException {
080    String testName = testInfo.getTestMethod().get().getName();
081    Configuration conf = HBaseConfiguration.create();
082    // Make it so our filter is instantiated on construction of ReplicationSink.
083    conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
084      DevNullConnectionRegistry.class, ConnectionRegistry.class);
085    conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY,
086      IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
087    conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL,
088      DevNullAsyncClusterConnection.class, AsyncClusterConnection.class);
089    ReplicationSink sink = new ReplicationSink(conf, null);
090    // Create some dumb walentries.
091    List<AdminProtos.WALEntry> entries = new ArrayList<>();
092    AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
093    // Need a tablename.
094    ByteString tableName = ByteString.copyFromUtf8(TableName.valueOf(testName).toString());
095    // Add WALEdit Cells to Cells List. The way edits arrive at the sink is with protos
096    // describing the edit with all Cells from all edits aggregated in a single CellScanner.
097    final List<ExtendedCell> cells = new ArrayList<>();
098    int count = BOUNDARY * 2;
099    for (int i = 0; i < count; i++) {
100      byte[] bytes = Bytes.toBytes(i);
101      // Create a wal entry. Everything is set to the current index as bytes or int/long.
102      entryBuilder.clear();
103      entryBuilder.setKey(entryBuilder.getKeyBuilder().setLogSequenceNumber(i)
104        .setEncodedRegionName(ByteString.copyFrom(bytes)).setWriteTime(i).setTableName(tableName)
105        .build());
106      // Lets have one Cell associated with each WALEdit.
107      entryBuilder.setAssociatedCellCount(1);
108      entries.add(entryBuilder.build());
109      // We need to add a Cell per WALEdit to the cells array.
110      ExtendedCellBuilder cellBuilder =
111        ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
112      // Make cells whose row, family, cell, value, and ts are == 'i'.
113      ExtendedCell cell = cellBuilder.setRow(bytes).setFamily(bytes).setQualifier(bytes)
114        .setType(Cell.Type.Put).setTimestamp(i).setValue(bytes).build();
115      cells.add(cell);
116    }
117    // Now wrap our cells array in a CellScanner that we can pass in to replicateEntries. It has
118    // all Cells from all the WALEntries made above.
119    ExtendedCellScanner cellScanner = new ExtendedCellScanner() {
120      // Set to -1 because advance gets called before current.
121      int index = -1;
122
123      @Override
124      public ExtendedCell current() {
125        return cells.get(index);
126      }
127
128      @Override
129      public boolean advance() throws IOException {
130        index++;
131        return index < cells.size();
132      }
133    };
134    // Call our sink.
135    sink.replicateEntries(entries, cellScanner, null, null, null);
136    // Check what made it through and what was filtered.
137    assertTrue(FILTERED.get() > 0);
138    assertTrue(UNFILTERED.get() > 0);
139    assertEquals(count, FILTERED.get() + UNFILTERED.get());
140  }
141
142  /**
143   * Simple filter that will filter out any entry wholse writeTime is <= 5.
144   */
145  public static class IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl
146    implements WALEntrySinkFilter {
147    public IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl() {
148    }
149
150    @Override
151    public void init(AsyncConnection conn) {
152      // Do nothing.
153    }
154
155    @Override
156    public boolean filter(TableName table, long writeTime) {
157      boolean b = writeTime <= BOUNDARY;
158      if (b) {
159        FILTERED.incrementAndGet();
160      }
161      return b;
162    }
163  }
164
165  public static class DevNullConnectionRegistry extends DoNothingConnectionRegistry {
166
167    public DevNullConnectionRegistry(Configuration conf, User user) {
168      super(conf, user);
169    }
170
171    @Override
172    public CompletableFuture<String> getClusterId() {
173      return CompletableFuture.completedFuture("test");
174    }
175  }
176
177  public static class DevNullAsyncClusterConnection extends DummyAsyncClusterConnection {
178
179    private final Configuration conf;
180
181    public DevNullAsyncClusterConnection(Configuration conf, Object registry, String clusterId,
182      SocketAddress localAddress, User user) {
183      this.conf = conf;
184    }
185
186    @Override
187    public AsyncTable<AdvancedScanResultConsumer> getTable(TableName tableName) {
188      return new DummyAsyncTable<AdvancedScanResultConsumer>() {
189
190        @Override
191        public <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
192          List<T> list = new ArrayList<>(actions.size());
193          for (Row action : actions) {
194            // Row is the index of the loop above where we make WALEntry and Cells.
195            int row = Bytes.toInt(action.getRow());
196            assertTrue(row > BOUNDARY, "" + row);
197            UNFILTERED.incrementAndGet();
198            list.add(null);
199          }
200          return CompletableFuture.completedFuture(list);
201        }
202      };
203    }
204
205    @Override
206    public Configuration getConfiguration() {
207      return conf;
208    }
209  }
210}