001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.atomic.AtomicInteger;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellBuilder;
034import org.apache.hadoop.hbase.CellBuilderFactory;
035import org.apache.hadoop.hbase.CellBuilderType;
036import org.apache.hadoop.hbase.CellScanner;
037import org.apache.hadoop.hbase.CompareOperator;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HTableDescriptor;
041import org.apache.hadoop.hbase.Stoppable;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Admin;
044import org.apache.hadoop.hbase.client.Append;
045import org.apache.hadoop.hbase.client.BufferedMutator;
046import org.apache.hadoop.hbase.client.BufferedMutatorParams;
047import org.apache.hadoop.hbase.client.Connection;
048import org.apache.hadoop.hbase.client.Delete;
049import org.apache.hadoop.hbase.client.Durability;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Increment;
052import org.apache.hadoop.hbase.client.Put;
053import org.apache.hadoop.hbase.client.RegionLocator;
054import org.apache.hadoop.hbase.client.Result;
055import org.apache.hadoop.hbase.client.ResultScanner;
056import org.apache.hadoop.hbase.client.Row;
057import org.apache.hadoop.hbase.client.RowMutations;
058import org.apache.hadoop.hbase.client.Scan;
059import org.apache.hadoop.hbase.client.Table;
060import org.apache.hadoop.hbase.client.TableBuilder;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.client.coprocessor.Batch;
063import org.apache.hadoop.hbase.filter.CompareFilter;
064import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
065import org.apache.hadoop.hbase.security.User;
066import org.apache.hadoop.hbase.testclassification.ReplicationTests;
067import org.apache.hadoop.hbase.testclassification.SmallTests;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.junit.ClassRule;
070import org.junit.Rule;
071import org.junit.Test;
072import org.junit.experimental.categories.Category;
073import org.junit.rules.TestName;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
078
079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
080
081/**
082 * Simple test of sink-side wal entry filter facility.
083 */
084@Category({ReplicationTests.class, SmallTests.class})
085public class TestWALEntrySinkFilter {
086
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089      HBaseClassTestRule.forClass(TestWALEntrySinkFilter.class);
090
091  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSink.class);
092  @Rule public TestName name = new TestName();
093  static final int BOUNDARY = 5;
094  static final AtomicInteger UNFILTERED = new AtomicInteger();
095  static final AtomicInteger FILTERED = new AtomicInteger();
096
097  /**
098   * Implemetentation of Stoppable to pass into ReplicationSink.
099   */
100  private static Stoppable STOPPABLE = new Stoppable() {
101    private final AtomicBoolean stop = new AtomicBoolean(false);
102
103    @Override
104    public boolean isStopped() {
105      return this.stop.get();
106    }
107
108    @Override
109    public void stop(String why) {
110      LOG.info("STOPPING BECAUSE: " + why);
111      this.stop.set(true);
112    }
113  };
114
115  /**
116   * Test filter.
117   * Filter will filter out any write time that is <= 5 (BOUNDARY). We count how many items we
118   * filter out and we count how many cells make it through for distribution way down below in the
119   * Table#batch implementation. Puts in place a custom DevNullConnection so we can insert our
120   * counting Table.
121   * @throws IOException
122   */
123  @Test
124  public void testWALEntryFilter() throws IOException {
125    Configuration conf = HBaseConfiguration.create();
126    // Make it so our filter is instantiated on construction of ReplicationSink.
127    conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY,
128        IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
129    conf.setClass("hbase.client.connection.impl", DevNullConnection.class,
130        Connection.class);
131    ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
132    // Create some dumb walentries.
133    List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries =
134        new ArrayList<>();
135    AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
136    // Need a tablename.
137    ByteString tableName =
138        ByteString.copyFromUtf8(TableName.valueOf(this.name.getMethodName()).toString());
139    // Add WALEdit Cells to Cells List. The way edits arrive at the sink is with protos
140    // describing the edit with all Cells from all edits aggregated in a single CellScanner.
141    final List<Cell> cells = new ArrayList<>();
142    int count = BOUNDARY * 2;
143    for(int i = 0; i < count; i++) {
144      byte [] bytes = Bytes.toBytes(i);
145      // Create a wal entry. Everything is set to the current index as bytes or int/long.
146      entryBuilder.clear();
147      entryBuilder.setKey(entryBuilder.getKeyBuilder().
148          setLogSequenceNumber(i).
149          setEncodedRegionName(ByteString.copyFrom(bytes)).
150          setWriteTime(i).
151          setTableName(tableName).build());
152      // Lets have one Cell associated with each WALEdit.
153      entryBuilder.setAssociatedCellCount(1);
154      entries.add(entryBuilder.build());
155      // We need to add a Cell per WALEdit to the cells array.
156      CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
157      // Make cells whose row, family, cell, value, and ts are == 'i'.
158      Cell cell = cellBuilder.
159          setRow(bytes).
160          setFamily(bytes).
161          setQualifier(bytes).
162          setType(Cell.Type.Put).
163          setTimestamp(i).
164          setValue(bytes).build();
165      cells.add(cell);
166    }
167    // Now wrap our cells array in a CellScanner that we can pass in to replicateEntries. It has
168    // all Cells from all the WALEntries made above.
169    CellScanner cellScanner = new CellScanner() {
170      // Set to -1 because advance gets called before current.
171      int index = -1;
172
173      @Override
174      public Cell current() {
175        return cells.get(index);
176      }
177
178      @Override
179      public boolean advance() throws IOException {
180        index++;
181        return index < cells.size();
182      }
183    };
184    // Call our sink.
185    sink.replicateEntries(entries, cellScanner, null, null, null);
186    // Check what made it through and what was filtered.
187    assertTrue(FILTERED.get() > 0);
188    assertTrue(UNFILTERED.get() > 0);
189    assertEquals(count, FILTERED.get() + UNFILTERED.get());
190  }
191
192  /**
193   * Simple filter that will filter out any entry wholse writeTime is <= 5.
194   */
195  public static class IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl implements WALEntrySinkFilter {
196    public IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl() {}
197
198    @Override
199    public void init(Connection connection) {
200      // Do nothing.
201    }
202
203    @Override
204    public boolean filter(TableName table, long writeTime) {
205      boolean b = writeTime <= BOUNDARY;
206      if (b) {
207        FILTERED.incrementAndGet();
208      }
209      return b;
210    }
211  }
212
213  /**
214   * A DevNull Connection whose only purpose is checking what edits made it through. See down in
215   * {@link Table#batch(List, Object[])}.
216   */
217  public static class DevNullConnection implements Connection {
218    private final Configuration configuration;
219
220    DevNullConnection(Configuration configuration, ExecutorService es, User user) {
221      this.configuration = configuration;
222    }
223
224    @Override
225    public void abort(String why, Throwable e) {
226
227    }
228
229    @Override
230    public boolean isAborted() {
231      return false;
232    }
233
234    @Override
235    public Configuration getConfiguration() {
236      return this.configuration;
237    }
238
239    @Override
240    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
241      return null;
242    }
243
244    @Override
245    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
246      return null;
247    }
248
249    @Override
250    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
251      return null;
252    }
253
254    @Override
255    public Admin getAdmin() throws IOException {
256      return null;
257    }
258
259    @Override
260    public void close() throws IOException {
261
262    }
263
264    @Override
265    public boolean isClosed() {
266      return false;
267    }
268
269    @Override
270    public TableBuilder getTableBuilder(final TableName tableName, ExecutorService pool) {
271      return new TableBuilder() {
272        @Override
273        public TableBuilder setOperationTimeout(int timeout) {
274          return this;
275        }
276
277        @Override
278        public TableBuilder setRpcTimeout(int timeout) {
279          return this;
280        }
281
282        @Override
283        public TableBuilder setReadRpcTimeout(int timeout) {
284          return this;
285        }
286
287        @Override
288        public TableBuilder setWriteRpcTimeout(int timeout) {
289          return this;
290        }
291
292        @Override
293        public Table build() {
294          return new Table() {
295            @Override
296            public TableName getName() {
297              return tableName;
298            }
299
300            @Override
301            public Configuration getConfiguration() {
302              return configuration;
303            }
304
305            @Override
306            public HTableDescriptor getTableDescriptor() throws IOException {
307              return null;
308            }
309
310            @Override
311            public TableDescriptor getDescriptor() throws IOException {
312              return null;
313            }
314
315            @Override
316            public boolean exists(Get get) throws IOException {
317              return false;
318            }
319
320            @Override
321            public boolean[] exists(List<Get> gets) throws IOException {
322              return new boolean[0];
323            }
324
325            @Override
326            public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
327              for (Row action: actions) {
328                // Row is the index of the loop above where we make WALEntry and Cells.
329                int row = Bytes.toInt(action.getRow());
330                assertTrue("" + row, row> BOUNDARY);
331                UNFILTERED.incrementAndGet();
332              }
333            }
334
335            @Override
336            public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException {
337
338            }
339
340            @Override
341            public Result get(Get get) throws IOException {
342              return null;
343            }
344
345            @Override
346            public Result[] get(List<Get> gets) throws IOException {
347              return new Result[0];
348            }
349
350            @Override
351            public ResultScanner getScanner(Scan scan) throws IOException {
352              return null;
353            }
354
355            @Override
356            public ResultScanner getScanner(byte[] family) throws IOException {
357              return null;
358            }
359
360            @Override
361            public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
362              return null;
363            }
364
365            @Override
366            public void put(Put put) throws IOException {
367
368            }
369
370            @Override
371            public void put(List<Put> puts) throws IOException {
372
373            }
374
375            @Override
376            public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
377              return false;
378            }
379
380            @Override
381            public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
382              return false;
383            }
384
385            @Override
386            public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Put put) throws IOException {
387              return false;
388            }
389
390            @Override
391            public void delete(Delete delete) throws IOException {
392
393            }
394
395            @Override
396            public void delete(List<Delete> deletes) throws IOException {
397
398            }
399
400            @Override
401            public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException {
402              return false;
403            }
404
405            @Override
406            public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
407              return false;
408            }
409
410            @Override
411            public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Delete delete) throws IOException {
412              return false;
413            }
414
415            @Override
416            public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
417              return null;
418            }
419
420            @Override
421            public void mutateRow(RowMutations rm) throws IOException {
422
423            }
424
425            @Override
426            public Result append(Append append) throws IOException {
427              return null;
428            }
429
430            @Override
431            public Result increment(Increment increment) throws IOException {
432              return null;
433            }
434
435            @Override
436            public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
437              return 0;
438            }
439
440            @Override
441            public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException {
442              return 0;
443            }
444
445            @Override
446            public void close() throws IOException {
447
448            }
449
450            @Override
451            public CoprocessorRpcChannel coprocessorService(byte[] row) {
452              return null;
453            }
454
455            @Override
456            public <T extends com.google.protobuf.Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws com.google.protobuf.ServiceException, Throwable {
457              return null;
458            }
459
460            @Override
461            public <T extends com.google.protobuf.Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable {
462
463            }
464
465            @Override
466            public <R extends com.google.protobuf.Message> Map<byte[], R> batchCoprocessorService(com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, com.google.protobuf.Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws com.google.protobuf.ServiceException, Throwable {
467              return null;
468            }
469
470            @Override
471            public <R extends com.google.protobuf.Message> void batchCoprocessorService(com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, com.google.protobuf.Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable {
472
473            }
474
475            @Override
476            public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
477              return false;
478            }
479
480            @Override
481            public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, RowMutations mutation) throws IOException {
482              return false;
483            }
484
485            @Override
486            public long getRpcTimeout(TimeUnit unit) {
487              return 0;
488            }
489
490            @Override
491            public int getRpcTimeout() {
492              return 0;
493            }
494
495            @Override
496            public void setRpcTimeout(int rpcTimeout) {
497
498            }
499
500            @Override
501            public long getReadRpcTimeout(TimeUnit unit) {
502              return 0;
503            }
504
505            @Override
506            public int getReadRpcTimeout() {
507              return 0;
508            }
509
510            @Override
511            public void setReadRpcTimeout(int readRpcTimeout) {
512
513            }
514
515            @Override
516            public long getWriteRpcTimeout(TimeUnit unit) {
517              return 0;
518            }
519
520            @Override
521            public int getWriteRpcTimeout() {
522              return 0;
523            }
524
525            @Override
526            public void setWriteRpcTimeout(int writeRpcTimeout) {
527
528            }
529
530            @Override
531            public long getOperationTimeout(TimeUnit unit) {
532              return 0;
533            }
534
535            @Override
536            public int getOperationTimeout() {
537              return 0;
538            }
539
540            @Override
541            public void setOperationTimeout(int operationTimeout) {
542
543            }
544          };
545        }
546      };
547    }
548  }
549}
550
551