001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellBuilder;
033import org.apache.hadoop.hbase.CellBuilderFactory;
034import org.apache.hadoop.hbase.CellBuilderType;
035import org.apache.hadoop.hbase.CellScanner;
036import org.apache.hadoop.hbase.CompareOperator;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HTableDescriptor;
040import org.apache.hadoop.hbase.Stoppable;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.Append;
044import org.apache.hadoop.hbase.client.BufferedMutator;
045import org.apache.hadoop.hbase.client.BufferedMutatorParams;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.Delete;
048import org.apache.hadoop.hbase.client.Durability;
049import org.apache.hadoop.hbase.client.Get;
050import org.apache.hadoop.hbase.client.Increment;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionLocator;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.ResultScanner;
055import org.apache.hadoop.hbase.client.Row;
056import org.apache.hadoop.hbase.client.RowMutations;
057import org.apache.hadoop.hbase.client.Scan;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.client.TableBuilder;
060import org.apache.hadoop.hbase.client.TableDescriptor;
061import org.apache.hadoop.hbase.client.coprocessor.Batch;
062import org.apache.hadoop.hbase.filter.CompareFilter;
063import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
064import org.apache.hadoop.hbase.security.User;
065import org.apache.hadoop.hbase.testclassification.ReplicationTests;
066import org.apache.hadoop.hbase.testclassification.SmallTests;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.junit.ClassRule;
069import org.junit.Rule;
070import org.junit.Test;
071import org.junit.experimental.categories.Category;
072import org.junit.rules.TestName;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
077
078/**
079 * Simple test of sink-side wal entry filter facility.
080 */
081@Category({ReplicationTests.class, SmallTests.class})
082public class TestWALEntrySinkFilter {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086      HBaseClassTestRule.forClass(TestWALEntrySinkFilter.class);
087
088  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSink.class);
089  @Rule public TestName name = new TestName();
090  static final int BOUNDARY = 5;
091  static final AtomicInteger UNFILTERED = new AtomicInteger();
092  static final AtomicInteger FILTERED = new AtomicInteger();
093
094  /**
095   * Implemetentation of Stoppable to pass into ReplicationSink.
096   */
097  private static Stoppable STOPPABLE = new Stoppable() {
098    private final AtomicBoolean stop = new AtomicBoolean(false);
099
100    @Override
101    public boolean isStopped() {
102      return this.stop.get();
103    }
104
105    @Override
106    public void stop(String why) {
107      LOG.info("STOPPING BECAUSE: " + why);
108      this.stop.set(true);
109    }
110  };
111
112  /**
113   * Test filter.
114   * Filter will filter out any write time that is <= 5 (BOUNDARY). We count how many items we
115   * filter out and we count how many cells make it through for distribution way down below in the
116   * Table#batch implementation. Puts in place a custom DevNullConnection so we can insert our
117   * counting Table.
118   * @throws IOException
119   */
120  @Test
121  public void testWALEntryFilter() throws IOException {
122    Configuration conf = HBaseConfiguration.create();
123    // Make it so our filter is instantiated on construction of ReplicationSink.
124    conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY,
125        IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
126    conf.setClass("hbase.client.connection.impl", DevNullConnection.class,
127        Connection.class);
128    ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
129    // Create some dumb walentries.
130    List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries =
131        new ArrayList<>();
132    AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
133    // Need a tablename.
134    ByteString tableName =
135        ByteString.copyFromUtf8(TableName.valueOf(this.name.getMethodName()).toString());
136    // Add WALEdit Cells to Cells List. The way edits arrive at the sink is with protos
137    // describing the edit with all Cells from all edits aggregated in a single CellScanner.
138    final List<Cell> cells = new ArrayList<>();
139    int count = BOUNDARY * 2;
140    for(int i = 0; i < count; i++) {
141      byte [] bytes = Bytes.toBytes(i);
142      // Create a wal entry. Everything is set to the current index as bytes or int/long.
143      entryBuilder.clear();
144      entryBuilder.setKey(entryBuilder.getKeyBuilder().
145          setLogSequenceNumber(i).
146          setEncodedRegionName(ByteString.copyFrom(bytes)).
147          setWriteTime(i).
148          setTableName(tableName).build());
149      // Lets have one Cell associated with each WALEdit.
150      entryBuilder.setAssociatedCellCount(1);
151      entries.add(entryBuilder.build());
152      // We need to add a Cell per WALEdit to the cells array.
153      CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
154      // Make cells whose row, family, cell, value, and ts are == 'i'.
155      Cell cell = cellBuilder.
156          setRow(bytes).
157          setFamily(bytes).
158          setQualifier(bytes).
159          setType(Cell.Type.Put).
160          setTimestamp(i).
161          setValue(bytes).build();
162      cells.add(cell);
163    }
164    // Now wrap our cells array in a CellScanner that we can pass in to replicateEntries. It has
165    // all Cells from all the WALEntries made above.
166    CellScanner cellScanner = new CellScanner() {
167      // Set to -1 because advance gets called before current.
168      int index = -1;
169
170      @Override
171      public Cell current() {
172        return cells.get(index);
173      }
174
175      @Override
176      public boolean advance() throws IOException {
177        index++;
178        return index < cells.size();
179      }
180    };
181    // Call our sink.
182    sink.replicateEntries(entries, cellScanner, null, null, null);
183    // Check what made it through and what was filtered.
184    assertTrue(FILTERED.get() > 0);
185    assertTrue(UNFILTERED.get() > 0);
186    assertEquals(count, FILTERED.get() + UNFILTERED.get());
187  }
188
189  /**
190   * Simple filter that will filter out any entry wholse writeTime is <= 5.
191   */
192  public static class IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl implements WALEntrySinkFilter {
193    public IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl() {}
194
195    @Override
196    public void init(Connection connection) {
197      // Do nothing.
198    }
199
200    @Override
201    public boolean filter(TableName table, long writeTime) {
202      boolean b = writeTime <= BOUNDARY;
203      if (b) {
204        FILTERED.incrementAndGet();
205      }
206      return b;
207    }
208  }
209
210  /**
211   * A DevNull Connection whose only purpose is checking what edits made it through. See down in
212   * {@link Table#batch(List, Object[])}.
213   */
214  public static class DevNullConnection implements Connection {
215    private final Configuration configuration;
216
217    DevNullConnection(Configuration configuration, ExecutorService es, User user) {
218      this.configuration = configuration;
219    }
220
221    @Override
222    public void abort(String why, Throwable e) {
223
224    }
225
226    @Override
227    public boolean isAborted() {
228      return false;
229    }
230
231    @Override
232    public Configuration getConfiguration() {
233      return this.configuration;
234    }
235
236    @Override
237    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
238      return null;
239    }
240
241    @Override
242    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
243      return null;
244    }
245
246    @Override
247    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
248      return null;
249    }
250
251    @Override
252    public Admin getAdmin() throws IOException {
253      return null;
254    }
255
256    @Override
257    public void close() throws IOException {
258
259    }
260
261    @Override
262    public boolean isClosed() {
263      return false;
264    }
265
266    @Override
267    public TableBuilder getTableBuilder(final TableName tableName, ExecutorService pool) {
268      return new TableBuilder() {
269        @Override
270        public TableBuilder setOperationTimeout(int timeout) {
271          return this;
272        }
273
274        @Override
275        public TableBuilder setRpcTimeout(int timeout) {
276          return this;
277        }
278
279        @Override
280        public TableBuilder setReadRpcTimeout(int timeout) {
281          return this;
282        }
283
284        @Override
285        public TableBuilder setWriteRpcTimeout(int timeout) {
286          return this;
287        }
288
289        @Override
290        public Table build() {
291          return new Table() {
292            @Override
293            public TableName getName() {
294              return tableName;
295            }
296
297            @Override
298            public Configuration getConfiguration() {
299              return configuration;
300            }
301
302            @Override
303            public HTableDescriptor getTableDescriptor() throws IOException {
304              return null;
305            }
306
307            @Override
308            public TableDescriptor getDescriptor() throws IOException {
309              return null;
310            }
311
312            @Override
313            public boolean exists(Get get) throws IOException {
314              return false;
315            }
316
317            @Override
318            public boolean[] exists(List<Get> gets) throws IOException {
319              return new boolean[0];
320            }
321
322            @Override
323            public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
324              for (Row action: actions) {
325                // Row is the index of the loop above where we make WALEntry and Cells.
326                int row = Bytes.toInt(action.getRow());
327                assertTrue("" + row, row> BOUNDARY);
328                UNFILTERED.incrementAndGet();
329              }
330            }
331
332            @Override
333            public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException {
334
335            }
336
337            @Override
338            public Result get(Get get) throws IOException {
339              return null;
340            }
341
342            @Override
343            public Result[] get(List<Get> gets) throws IOException {
344              return new Result[0];
345            }
346
347            @Override
348            public ResultScanner getScanner(Scan scan) throws IOException {
349              return null;
350            }
351
352            @Override
353            public ResultScanner getScanner(byte[] family) throws IOException {
354              return null;
355            }
356
357            @Override
358            public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
359              return null;
360            }
361
362            @Override
363            public void put(Put put) throws IOException {
364
365            }
366
367            @Override
368            public void put(List<Put> puts) throws IOException {
369
370            }
371
372            @Override
373            public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
374              return false;
375            }
376
377            @Override
378            public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
379              return false;
380            }
381
382            @Override
383            public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Put put) throws IOException {
384              return false;
385            }
386
387            @Override
388            public void delete(Delete delete) throws IOException {
389
390            }
391
392            @Override
393            public void delete(List<Delete> deletes) throws IOException {
394
395            }
396
397            @Override
398            public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException {
399              return false;
400            }
401
402            @Override
403            public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
404              return false;
405            }
406
407            @Override
408            public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Delete delete) throws IOException {
409              return false;
410            }
411
412            @Override
413            public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
414              return null;
415            }
416
417            @Override
418            public void mutateRow(RowMutations rm) throws IOException {
419
420            }
421
422            @Override
423            public Result append(Append append) throws IOException {
424              return null;
425            }
426
427            @Override
428            public Result increment(Increment increment) throws IOException {
429              return null;
430            }
431
432            @Override
433            public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
434              return 0;
435            }
436
437            @Override
438            public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException {
439              return 0;
440            }
441
442            @Override
443            public void close() throws IOException {
444
445            }
446
447            @Override
448            public CoprocessorRpcChannel coprocessorService(byte[] row) {
449              return null;
450            }
451
452            @Override
453            public <T extends com.google.protobuf.Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws com.google.protobuf.ServiceException, Throwable {
454              return null;
455            }
456
457            @Override
458            public <T extends com.google.protobuf.Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable {
459
460            }
461
462            @Override
463            public <R extends com.google.protobuf.Message> Map<byte[], R> batchCoprocessorService(com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, com.google.protobuf.Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws com.google.protobuf.ServiceException, Throwable {
464              return null;
465            }
466
467            @Override
468            public <R extends com.google.protobuf.Message> void batchCoprocessorService(com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, com.google.protobuf.Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable {
469
470            }
471
472            @Override
473            public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
474              return false;
475            }
476
477            @Override
478            public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, RowMutations mutation) throws IOException {
479              return false;
480            }
481
482            @Override
483            public long getRpcTimeout(TimeUnit unit) {
484              return 0;
485            }
486
487            @Override
488            public int getRpcTimeout() {
489              return 0;
490            }
491
492            @Override
493            public void setRpcTimeout(int rpcTimeout) {
494
495            }
496
497            @Override
498            public long getReadRpcTimeout(TimeUnit unit) {
499              return 0;
500            }
501
502            @Override
503            public int getReadRpcTimeout() {
504              return 0;
505            }
506
507            @Override
508            public void setReadRpcTimeout(int readRpcTimeout) {
509
510            }
511
512            @Override
513            public long getWriteRpcTimeout(TimeUnit unit) {
514              return 0;
515            }
516
517            @Override
518            public int getWriteRpcTimeout() {
519              return 0;
520            }
521
522            @Override
523            public void setWriteRpcTimeout(int writeRpcTimeout) {
524
525            }
526
527            @Override
528            public long getOperationTimeout(TimeUnit unit) {
529              return 0;
530            }
531
532            @Override
533            public int getOperationTimeout() {
534              return 0;
535            }
536
537            @Override
538            public void setOperationTimeout(int operationTimeout) {
539            }
540
541            @Override
542            public RegionLocator getRegionLocator() throws IOException {
543              return null;
544            }
545          };
546        }
547      };
548    }
549
550    @Override
551    public void clearRegionLocationCache() {
552    }
553  }
554}
555
556