001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellBuilder;
033import org.apache.hadoop.hbase.CellBuilderFactory;
034import org.apache.hadoop.hbase.CellBuilderType;
035import org.apache.hadoop.hbase.CellScanner;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.Stoppable;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
042import org.apache.hadoop.hbase.client.AsyncAdminBuilder;
043import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;
044import org.apache.hadoop.hbase.client.AsyncConnection;
045import org.apache.hadoop.hbase.client.AsyncTable;
046import org.apache.hadoop.hbase.client.AsyncTableBuilder;
047import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.ConnectionFactory;
050import org.apache.hadoop.hbase.client.ConnectionRegistry;
051import org.apache.hadoop.hbase.client.DummyAsyncTable;
052import org.apache.hadoop.hbase.client.DummyConnectionRegistry;
053import org.apache.hadoop.hbase.client.Hbck;
054import org.apache.hadoop.hbase.client.Row;
055import org.apache.hadoop.hbase.client.ScanResultConsumer;
056import org.apache.hadoop.hbase.security.User;
057import org.apache.hadoop.hbase.testclassification.ReplicationTests;
058import org.apache.hadoop.hbase.testclassification.SmallTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.junit.ClassRule;
061import org.junit.Rule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.junit.rules.TestName;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
069
070import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
071
072/**
073 * Simple test of sink-side wal entry filter facility.
074 */
075@Category({ ReplicationTests.class, SmallTests.class })
076public class TestWALEntrySinkFilter {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestWALEntrySinkFilter.class);
081
082  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSink.class);
083  @Rule
084  public TestName name = new TestName();
085  static final int BOUNDARY = 5;
086  static final AtomicInteger UNFILTERED = new AtomicInteger();
087  static final AtomicInteger FILTERED = new AtomicInteger();
088
089  /**
090   * Implemetentation of Stoppable to pass into ReplicationSink.
091   */
092  private static Stoppable STOPPABLE = new Stoppable() {
093    private final AtomicBoolean stop = new AtomicBoolean(false);
094
095    @Override
096    public boolean isStopped() {
097      return this.stop.get();
098    }
099
100    @Override
101    public void stop(String why) {
102      LOG.info("STOPPING BECAUSE: " + why);
103      this.stop.set(true);
104    }
105  };
106
107  /**
108   * Test filter. Filter will filter out any write time that is <= 5 (BOUNDARY). We count how many
109   * items we filter out and we count how many cells make it through for distribution way down below
110   * in the Table#batch implementation. Puts in place a custom DevNullConnection so we can insert
111   * our counting Table. n
112   */
113  @Test
114  public void testWALEntryFilter() throws IOException {
115    Configuration conf = HBaseConfiguration.create();
116    // Make it so our filter is instantiated on construction of ReplicationSink.
117    conf.setClass(DummyConnectionRegistry.REGISTRY_IMPL_CONF_KEY, DevNullConnectionRegistry.class,
118      DummyConnectionRegistry.class);
119    conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY,
120      IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
121    conf.setClass(ConnectionFactory.HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
122      DevNullAsyncConnection.class, AsyncConnection.class);
123    ReplicationSink sink = new ReplicationSink(conf);
124    // Create some dumb walentries.
125    List<org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry> entries =
126      new ArrayList<>();
127    AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
128    // Need a tablename.
129    ByteString tableName =
130      ByteString.copyFromUtf8(TableName.valueOf(this.name.getMethodName()).toString());
131    // Add WALEdit Cells to Cells List. The way edits arrive at the sink is with protos
132    // describing the edit with all Cells from all edits aggregated in a single CellScanner.
133    final List<Cell> cells = new ArrayList<>();
134    int count = BOUNDARY * 2;
135    for (int i = 0; i < count; i++) {
136      byte[] bytes = Bytes.toBytes(i);
137      // Create a wal entry. Everything is set to the current index as bytes or int/long.
138      entryBuilder.clear();
139      entryBuilder.setKey(entryBuilder.getKeyBuilder().setLogSequenceNumber(i)
140        .setEncodedRegionName(ByteString.copyFrom(bytes)).setWriteTime(i).setTableName(tableName)
141        .build());
142      // Lets have one Cell associated with each WALEdit.
143      entryBuilder.setAssociatedCellCount(1);
144      entries.add(entryBuilder.build());
145      // We need to add a Cell per WALEdit to the cells array.
146      CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
147      // Make cells whose row, family, cell, value, and ts are == 'i'.
148      Cell cell = cellBuilder.setRow(bytes).setFamily(bytes).setQualifier(bytes)
149        .setType(Cell.Type.Put).setTimestamp(i).setValue(bytes).build();
150      cells.add(cell);
151    }
152    // Now wrap our cells array in a CellScanner that we can pass in to replicateEntries. It has
153    // all Cells from all the WALEntries made above.
154    CellScanner cellScanner = new CellScanner() {
155      // Set to -1 because advance gets called before current.
156      int index = -1;
157
158      @Override
159      public Cell current() {
160        return cells.get(index);
161      }
162
163      @Override
164      public boolean advance() throws IOException {
165        index++;
166        return index < cells.size();
167      }
168    };
169    // Call our sink.
170    sink.replicateEntries(entries, cellScanner, null, null, null);
171    // Check what made it through and what was filtered.
172    assertTrue(FILTERED.get() > 0);
173    assertTrue(UNFILTERED.get() > 0);
174    assertEquals(count, FILTERED.get() + UNFILTERED.get());
175  }
176
177  /**
178   * Simple filter that will filter out any entry wholse writeTime is <= 5.
179   */
180  public static class IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl
181    implements WALEntrySinkFilter {
182    public IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl() {
183    }
184
185    @Override
186    public void init(Connection connection) {
187      // Do nothing.
188    }
189
190    @Override
191    public boolean filter(TableName table, long writeTime) {
192      boolean b = writeTime <= BOUNDARY;
193      if (b) {
194        FILTERED.incrementAndGet();
195      }
196      return b;
197    }
198  }
199
200  public static class DevNullConnectionRegistry extends DummyConnectionRegistry {
201
202    public DevNullConnectionRegistry(Configuration conf) {
203    }
204
205    @Override
206    public CompletableFuture<String> getClusterId() {
207      return CompletableFuture.completedFuture("test");
208    }
209  }
210
211  /**
212   * A DevNull AsyncConnection whose only purpose is checking what edits made it through. See down
213   * in {@link AsyncTable#batchAll}.
214   */
215  public static class DevNullAsyncConnection implements AsyncConnection {
216
217    private final Configuration conf;
218
219    public DevNullAsyncConnection(Configuration conf, ConnectionRegistry registry, String clusterId,
220      User user) {
221      this.conf = conf;
222    }
223
224    @Override
225    public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
226      return null;
227    }
228
229    @Override
230    public void clearRegionLocationCache() {
231    }
232
233    @Override
234    public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
235      return null;
236    }
237
238    @Override
239    public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
240      ExecutorService pool) {
241      return null;
242    }
243
244    @Override
245    public AsyncAdminBuilder getAdminBuilder() {
246      return null;
247    }
248
249    @Override
250    public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
251      return null;
252    }
253
254    @Override
255    public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
256      return null;
257    }
258
259    @Override
260    public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
261      ExecutorService pool) {
262      return null;
263    }
264
265    @Override
266    public CompletableFuture<Hbck> getHbck() {
267      return null;
268    }
269
270    @Override
271    public Hbck getHbck(ServerName masterServer) throws IOException {
272      return null;
273    }
274
275    @Override
276    public boolean isClosed() {
277      return false;
278    }
279
280    @Override
281    public void close() throws IOException {
282    }
283
284    @Override
285    public AsyncTable<AdvancedScanResultConsumer> getTable(TableName tableName) {
286      return new DummyAsyncTable<AdvancedScanResultConsumer>() {
287
288        @Override
289        public <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
290          List<T> list = new ArrayList<>(actions.size());
291          for (Row action : actions) {
292            // Row is the index of the loop above where we make WALEntry and Cells.
293            int row = Bytes.toInt(action.getRow());
294            assertTrue("" + row, row > BOUNDARY);
295            UNFILTERED.incrementAndGet();
296            list.add(null);
297          }
298          return CompletableFuture.completedFuture(list);
299        }
300      };
301    }
302
303    @Override
304    public Configuration getConfiguration() {
305      return conf;
306    }
307  }
308}