001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellBuilder; 034import org.apache.hadoop.hbase.CellBuilderFactory; 035import org.apache.hadoop.hbase.CellBuilderType; 036import org.apache.hadoop.hbase.CellScanner; 037import org.apache.hadoop.hbase.CompareOperator; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HTableDescriptor; 041import org.apache.hadoop.hbase.Stoppable; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Admin; 044import org.apache.hadoop.hbase.client.Append; 045import org.apache.hadoop.hbase.client.BufferedMutator; 046import org.apache.hadoop.hbase.client.BufferedMutatorParams; 047import org.apache.hadoop.hbase.client.Connection; 048import org.apache.hadoop.hbase.client.Delete; 049import org.apache.hadoop.hbase.client.Durability; 050import org.apache.hadoop.hbase.client.Get; 051import org.apache.hadoop.hbase.client.Increment; 052import org.apache.hadoop.hbase.client.Put; 053import org.apache.hadoop.hbase.client.RegionLocator; 054import org.apache.hadoop.hbase.client.Result; 055import org.apache.hadoop.hbase.client.ResultScanner; 056import org.apache.hadoop.hbase.client.Row; 057import org.apache.hadoop.hbase.client.RowMutations; 058import org.apache.hadoop.hbase.client.Scan; 059import org.apache.hadoop.hbase.client.Table; 060import org.apache.hadoop.hbase.client.TableBuilder; 061import org.apache.hadoop.hbase.client.TableDescriptor; 062import org.apache.hadoop.hbase.client.coprocessor.Batch; 063import org.apache.hadoop.hbase.filter.CompareFilter; 064import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 065import org.apache.hadoop.hbase.security.User; 066import org.apache.hadoop.hbase.testclassification.ReplicationTests; 067import org.apache.hadoop.hbase.testclassification.SmallTests; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.junit.ClassRule; 070import org.junit.Rule; 071import org.junit.Test; 072import org.junit.experimental.categories.Category; 073import org.junit.rules.TestName; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 078 079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 080 081/** 082 * Simple test of sink-side wal entry filter facility. 083 */ 084@Category({ReplicationTests.class, SmallTests.class}) 085public class TestWALEntrySinkFilter { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestWALEntrySinkFilter.class); 090 091 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSink.class); 092 @Rule public TestName name = new TestName(); 093 static final int BOUNDARY = 5; 094 static final AtomicInteger UNFILTERED = new AtomicInteger(); 095 static final AtomicInteger FILTERED = new AtomicInteger(); 096 097 /** 098 * Implemetentation of Stoppable to pass into ReplicationSink. 099 */ 100 private static Stoppable STOPPABLE = new Stoppable() { 101 private final AtomicBoolean stop = new AtomicBoolean(false); 102 103 @Override 104 public boolean isStopped() { 105 return this.stop.get(); 106 } 107 108 @Override 109 public void stop(String why) { 110 LOG.info("STOPPING BECAUSE: " + why); 111 this.stop.set(true); 112 } 113 }; 114 115 /** 116 * Test filter. 117 * Filter will filter out any write time that is <= 5 (BOUNDARY). We count how many items we 118 * filter out and we count how many cells make it through for distribution way down below in the 119 * Table#batch implementation. Puts in place a custom DevNullConnection so we can insert our 120 * counting Table. 121 * @throws IOException 122 */ 123 @Test 124 public void testWALEntryFilter() throws IOException { 125 Configuration conf = HBaseConfiguration.create(); 126 // Make it so our filter is instantiated on construction of ReplicationSink. 127 conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, 128 IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); 129 conf.setClass("hbase.client.connection.impl", DevNullConnection.class, 130 Connection.class); 131 ReplicationSink sink = new ReplicationSink(conf, STOPPABLE); 132 // Create some dumb walentries. 133 List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries = 134 new ArrayList<>(); 135 AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); 136 // Need a tablename. 137 ByteString tableName = 138 ByteString.copyFromUtf8(TableName.valueOf(this.name.getMethodName()).toString()); 139 // Add WALEdit Cells to Cells List. The way edits arrive at the sink is with protos 140 // describing the edit with all Cells from all edits aggregated in a single CellScanner. 141 final List<Cell> cells = new ArrayList<>(); 142 int count = BOUNDARY * 2; 143 for(int i = 0; i < count; i++) { 144 byte [] bytes = Bytes.toBytes(i); 145 // Create a wal entry. Everything is set to the current index as bytes or int/long. 146 entryBuilder.clear(); 147 entryBuilder.setKey(entryBuilder.getKeyBuilder(). 148 setLogSequenceNumber(i). 149 setEncodedRegionName(ByteString.copyFrom(bytes)). 150 setWriteTime(i). 151 setTableName(tableName).build()); 152 // Lets have one Cell associated with each WALEdit. 153 entryBuilder.setAssociatedCellCount(1); 154 entries.add(entryBuilder.build()); 155 // We need to add a Cell per WALEdit to the cells array. 156 CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY); 157 // Make cells whose row, family, cell, value, and ts are == 'i'. 158 Cell cell = cellBuilder. 159 setRow(bytes). 160 setFamily(bytes). 161 setQualifier(bytes). 162 setType(Cell.Type.Put). 163 setTimestamp(i). 164 setValue(bytes).build(); 165 cells.add(cell); 166 } 167 // Now wrap our cells array in a CellScanner that we can pass in to replicateEntries. It has 168 // all Cells from all the WALEntries made above. 169 CellScanner cellScanner = new CellScanner() { 170 // Set to -1 because advance gets called before current. 171 int index = -1; 172 173 @Override 174 public Cell current() { 175 return cells.get(index); 176 } 177 178 @Override 179 public boolean advance() throws IOException { 180 index++; 181 return index < cells.size(); 182 } 183 }; 184 // Call our sink. 185 sink.replicateEntries(entries, cellScanner, null, null, null); 186 // Check what made it through and what was filtered. 187 assertTrue(FILTERED.get() > 0); 188 assertTrue(UNFILTERED.get() > 0); 189 assertEquals(count, FILTERED.get() + UNFILTERED.get()); 190 } 191 192 /** 193 * Simple filter that will filter out any entry wholse writeTime is <= 5. 194 */ 195 public static class IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl implements WALEntrySinkFilter { 196 public IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl() {} 197 198 @Override 199 public void init(Connection connection) { 200 // Do nothing. 201 } 202 203 @Override 204 public boolean filter(TableName table, long writeTime) { 205 boolean b = writeTime <= BOUNDARY; 206 if (b) { 207 FILTERED.incrementAndGet(); 208 } 209 return b; 210 } 211 } 212 213 /** 214 * A DevNull Connection whose only purpose is checking what edits made it through. See down in 215 * {@link Table#batch(List, Object[])}. 216 */ 217 public static class DevNullConnection implements Connection { 218 private final Configuration configuration; 219 220 DevNullConnection(Configuration configuration, ExecutorService es, User user) { 221 this.configuration = configuration; 222 } 223 224 @Override 225 public void abort(String why, Throwable e) { 226 227 } 228 229 @Override 230 public boolean isAborted() { 231 return false; 232 } 233 234 @Override 235 public Configuration getConfiguration() { 236 return this.configuration; 237 } 238 239 @Override 240 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 241 return null; 242 } 243 244 @Override 245 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 246 return null; 247 } 248 249 @Override 250 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 251 return null; 252 } 253 254 @Override 255 public Admin getAdmin() throws IOException { 256 return null; 257 } 258 259 @Override 260 public void close() throws IOException { 261 262 } 263 264 @Override 265 public boolean isClosed() { 266 return false; 267 } 268 269 @Override 270 public TableBuilder getTableBuilder(final TableName tableName, ExecutorService pool) { 271 return new TableBuilder() { 272 @Override 273 public TableBuilder setOperationTimeout(int timeout) { 274 return this; 275 } 276 277 @Override 278 public TableBuilder setRpcTimeout(int timeout) { 279 return this; 280 } 281 282 @Override 283 public TableBuilder setReadRpcTimeout(int timeout) { 284 return this; 285 } 286 287 @Override 288 public TableBuilder setWriteRpcTimeout(int timeout) { 289 return this; 290 } 291 292 @Override 293 public Table build() { 294 return new Table() { 295 @Override 296 public TableName getName() { 297 return tableName; 298 } 299 300 @Override 301 public Configuration getConfiguration() { 302 return configuration; 303 } 304 305 @Override 306 public HTableDescriptor getTableDescriptor() throws IOException { 307 return null; 308 } 309 310 @Override 311 public TableDescriptor getDescriptor() throws IOException { 312 return null; 313 } 314 315 @Override 316 public boolean exists(Get get) throws IOException { 317 return false; 318 } 319 320 @Override 321 public boolean[] exists(List<Get> gets) throws IOException { 322 return new boolean[0]; 323 } 324 325 @Override 326 public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { 327 for (Row action: actions) { 328 // Row is the index of the loop above where we make WALEntry and Cells. 329 int row = Bytes.toInt(action.getRow()); 330 assertTrue("" + row, row> BOUNDARY); 331 UNFILTERED.incrementAndGet(); 332 } 333 } 334 335 @Override 336 public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { 337 338 } 339 340 @Override 341 public Result get(Get get) throws IOException { 342 return null; 343 } 344 345 @Override 346 public Result[] get(List<Get> gets) throws IOException { 347 return new Result[0]; 348 } 349 350 @Override 351 public ResultScanner getScanner(Scan scan) throws IOException { 352 return null; 353 } 354 355 @Override 356 public ResultScanner getScanner(byte[] family) throws IOException { 357 return null; 358 } 359 360 @Override 361 public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { 362 return null; 363 } 364 365 @Override 366 public void put(Put put) throws IOException { 367 368 } 369 370 @Override 371 public void put(List<Put> puts) throws IOException { 372 373 } 374 375 @Override 376 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { 377 return false; 378 } 379 380 @Override 381 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException { 382 return false; 383 } 384 385 @Override 386 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Put put) throws IOException { 387 return false; 388 } 389 390 @Override 391 public void delete(Delete delete) throws IOException { 392 393 } 394 395 @Override 396 public void delete(List<Delete> deletes) throws IOException { 397 398 } 399 400 @Override 401 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException { 402 return false; 403 } 404 405 @Override 406 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException { 407 return false; 408 } 409 410 @Override 411 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Delete delete) throws IOException { 412 return false; 413 } 414 415 @Override 416 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 417 return null; 418 } 419 420 @Override 421 public void mutateRow(RowMutations rm) throws IOException { 422 423 } 424 425 @Override 426 public Result append(Append append) throws IOException { 427 return null; 428 } 429 430 @Override 431 public Result increment(Increment increment) throws IOException { 432 return null; 433 } 434 435 @Override 436 public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { 437 return 0; 438 } 439 440 @Override 441 public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException { 442 return 0; 443 } 444 445 @Override 446 public void close() throws IOException { 447 448 } 449 450 @Override 451 public CoprocessorRpcChannel coprocessorService(byte[] row) { 452 return null; 453 } 454 455 @Override 456 public <T extends com.google.protobuf.Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws com.google.protobuf.ServiceException, Throwable { 457 return null; 458 } 459 460 @Override 461 public <T extends com.google.protobuf.Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable { 462 463 } 464 465 @Override 466 public <R extends com.google.protobuf.Message> Map<byte[], R> batchCoprocessorService(com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, com.google.protobuf.Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws com.google.protobuf.ServiceException, Throwable { 467 return null; 468 } 469 470 @Override 471 public <R extends com.google.protobuf.Message> void batchCoprocessorService(com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, com.google.protobuf.Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable { 472 473 } 474 475 @Override 476 public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException { 477 return false; 478 } 479 480 @Override 481 public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, RowMutations mutation) throws IOException { 482 return false; 483 } 484 485 @Override 486 public long getRpcTimeout(TimeUnit unit) { 487 return 0; 488 } 489 490 @Override 491 public int getRpcTimeout() { 492 return 0; 493 } 494 495 @Override 496 public void setRpcTimeout(int rpcTimeout) { 497 498 } 499 500 @Override 501 public long getReadRpcTimeout(TimeUnit unit) { 502 return 0; 503 } 504 505 @Override 506 public int getReadRpcTimeout() { 507 return 0; 508 } 509 510 @Override 511 public void setReadRpcTimeout(int readRpcTimeout) { 512 513 } 514 515 @Override 516 public long getWriteRpcTimeout(TimeUnit unit) { 517 return 0; 518 } 519 520 @Override 521 public int getWriteRpcTimeout() { 522 return 0; 523 } 524 525 @Override 526 public void setWriteRpcTimeout(int writeRpcTimeout) { 527 528 } 529 530 @Override 531 public long getOperationTimeout(TimeUnit unit) { 532 return 0; 533 } 534 535 @Override 536 public int getOperationTimeout() { 537 return 0; 538 } 539 540 @Override 541 public void setOperationTimeout(int operationTimeout) { 542 543 } 544 }; 545 } 546 }; 547 } 548 } 549} 550 551