001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellBuilder; 033import org.apache.hadoop.hbase.CellBuilderFactory; 034import org.apache.hadoop.hbase.CellBuilderType; 035import org.apache.hadoop.hbase.CellScanner; 036import org.apache.hadoop.hbase.CompareOperator; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HTableDescriptor; 040import org.apache.hadoop.hbase.Stoppable; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.Append; 044import org.apache.hadoop.hbase.client.BufferedMutator; 045import org.apache.hadoop.hbase.client.BufferedMutatorParams; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.Delete; 048import org.apache.hadoop.hbase.client.Durability; 049import org.apache.hadoop.hbase.client.Get; 050import org.apache.hadoop.hbase.client.Increment; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.RegionLocator; 053import org.apache.hadoop.hbase.client.Result; 054import org.apache.hadoop.hbase.client.ResultScanner; 055import org.apache.hadoop.hbase.client.Row; 056import org.apache.hadoop.hbase.client.RowMutations; 057import org.apache.hadoop.hbase.client.Scan; 058import org.apache.hadoop.hbase.client.Table; 059import org.apache.hadoop.hbase.client.TableBuilder; 060import org.apache.hadoop.hbase.client.TableDescriptor; 061import org.apache.hadoop.hbase.client.coprocessor.Batch; 062import org.apache.hadoop.hbase.filter.CompareFilter; 063import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 064import org.apache.hadoop.hbase.security.User; 065import org.apache.hadoop.hbase.testclassification.ReplicationTests; 066import org.apache.hadoop.hbase.testclassification.SmallTests; 067import org.apache.hadoop.hbase.util.Bytes; 068import org.junit.ClassRule; 069import org.junit.Rule; 070import org.junit.Test; 071import org.junit.experimental.categories.Category; 072import org.junit.rules.TestName; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 077 078/** 079 * Simple test of sink-side wal entry filter facility. 080 */ 081@Category({ReplicationTests.class, SmallTests.class}) 082public class TestWALEntrySinkFilter { 083 084 @ClassRule 085 public static final HBaseClassTestRule CLASS_RULE = 086 HBaseClassTestRule.forClass(TestWALEntrySinkFilter.class); 087 088 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSink.class); 089 @Rule public TestName name = new TestName(); 090 static final int BOUNDARY = 5; 091 static final AtomicInteger UNFILTERED = new AtomicInteger(); 092 static final AtomicInteger FILTERED = new AtomicInteger(); 093 094 /** 095 * Implemetentation of Stoppable to pass into ReplicationSink. 096 */ 097 private static Stoppable STOPPABLE = new Stoppable() { 098 private final AtomicBoolean stop = new AtomicBoolean(false); 099 100 @Override 101 public boolean isStopped() { 102 return this.stop.get(); 103 } 104 105 @Override 106 public void stop(String why) { 107 LOG.info("STOPPING BECAUSE: " + why); 108 this.stop.set(true); 109 } 110 }; 111 112 /** 113 * Test filter. 114 * Filter will filter out any write time that is <= 5 (BOUNDARY). We count how many items we 115 * filter out and we count how many cells make it through for distribution way down below in the 116 * Table#batch implementation. Puts in place a custom DevNullConnection so we can insert our 117 * counting Table. 118 * @throws IOException 119 */ 120 @Test 121 public void testWALEntryFilter() throws IOException { 122 Configuration conf = HBaseConfiguration.create(); 123 // Make it so our filter is instantiated on construction of ReplicationSink. 124 conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, 125 IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); 126 conf.setClass("hbase.client.connection.impl", DevNullConnection.class, 127 Connection.class); 128 ReplicationSink sink = new ReplicationSink(conf, STOPPABLE); 129 // Create some dumb walentries. 130 List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries = 131 new ArrayList<>(); 132 AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); 133 // Need a tablename. 134 ByteString tableName = 135 ByteString.copyFromUtf8(TableName.valueOf(this.name.getMethodName()).toString()); 136 // Add WALEdit Cells to Cells List. The way edits arrive at the sink is with protos 137 // describing the edit with all Cells from all edits aggregated in a single CellScanner. 138 final List<Cell> cells = new ArrayList<>(); 139 int count = BOUNDARY * 2; 140 for(int i = 0; i < count; i++) { 141 byte [] bytes = Bytes.toBytes(i); 142 // Create a wal entry. Everything is set to the current index as bytes or int/long. 143 entryBuilder.clear(); 144 entryBuilder.setKey(entryBuilder.getKeyBuilder(). 145 setLogSequenceNumber(i). 146 setEncodedRegionName(ByteString.copyFrom(bytes)). 147 setWriteTime(i). 148 setTableName(tableName).build()); 149 // Lets have one Cell associated with each WALEdit. 150 entryBuilder.setAssociatedCellCount(1); 151 entries.add(entryBuilder.build()); 152 // We need to add a Cell per WALEdit to the cells array. 153 CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY); 154 // Make cells whose row, family, cell, value, and ts are == 'i'. 155 Cell cell = cellBuilder. 156 setRow(bytes). 157 setFamily(bytes). 158 setQualifier(bytes). 159 setType(Cell.Type.Put). 160 setTimestamp(i). 161 setValue(bytes).build(); 162 cells.add(cell); 163 } 164 // Now wrap our cells array in a CellScanner that we can pass in to replicateEntries. It has 165 // all Cells from all the WALEntries made above. 166 CellScanner cellScanner = new CellScanner() { 167 // Set to -1 because advance gets called before current. 168 int index = -1; 169 170 @Override 171 public Cell current() { 172 return cells.get(index); 173 } 174 175 @Override 176 public boolean advance() throws IOException { 177 index++; 178 return index < cells.size(); 179 } 180 }; 181 // Call our sink. 182 sink.replicateEntries(entries, cellScanner, null, null, null); 183 // Check what made it through and what was filtered. 184 assertTrue(FILTERED.get() > 0); 185 assertTrue(UNFILTERED.get() > 0); 186 assertEquals(count, FILTERED.get() + UNFILTERED.get()); 187 } 188 189 /** 190 * Simple filter that will filter out any entry wholse writeTime is <= 5. 191 */ 192 public static class IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl implements WALEntrySinkFilter { 193 public IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl() {} 194 195 @Override 196 public void init(Connection connection) { 197 // Do nothing. 198 } 199 200 @Override 201 public boolean filter(TableName table, long writeTime) { 202 boolean b = writeTime <= BOUNDARY; 203 if (b) { 204 FILTERED.incrementAndGet(); 205 } 206 return b; 207 } 208 } 209 210 /** 211 * A DevNull Connection whose only purpose is checking what edits made it through. See down in 212 * {@link Table#batch(List, Object[])}. 213 */ 214 public static class DevNullConnection implements Connection { 215 private final Configuration configuration; 216 217 DevNullConnection(Configuration configuration, ExecutorService es, User user) { 218 this.configuration = configuration; 219 } 220 221 @Override 222 public void abort(String why, Throwable e) { 223 224 } 225 226 @Override 227 public boolean isAborted() { 228 return false; 229 } 230 231 @Override 232 public Configuration getConfiguration() { 233 return this.configuration; 234 } 235 236 @Override 237 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 238 return null; 239 } 240 241 @Override 242 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 243 return null; 244 } 245 246 @Override 247 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 248 return null; 249 } 250 251 @Override 252 public Admin getAdmin() throws IOException { 253 return null; 254 } 255 256 @Override 257 public void close() throws IOException { 258 259 } 260 261 @Override 262 public boolean isClosed() { 263 return false; 264 } 265 266 @Override 267 public TableBuilder getTableBuilder(final TableName tableName, ExecutorService pool) { 268 return new TableBuilder() { 269 @Override 270 public TableBuilder setOperationTimeout(int timeout) { 271 return this; 272 } 273 274 @Override 275 public TableBuilder setRpcTimeout(int timeout) { 276 return this; 277 } 278 279 @Override 280 public TableBuilder setReadRpcTimeout(int timeout) { 281 return this; 282 } 283 284 @Override 285 public TableBuilder setWriteRpcTimeout(int timeout) { 286 return this; 287 } 288 289 @Override 290 public Table build() { 291 return new Table() { 292 @Override 293 public TableName getName() { 294 return tableName; 295 } 296 297 @Override 298 public Configuration getConfiguration() { 299 return configuration; 300 } 301 302 @Override 303 public HTableDescriptor getTableDescriptor() throws IOException { 304 return null; 305 } 306 307 @Override 308 public TableDescriptor getDescriptor() throws IOException { 309 return null; 310 } 311 312 @Override 313 public boolean exists(Get get) throws IOException { 314 return false; 315 } 316 317 @Override 318 public boolean[] exists(List<Get> gets) throws IOException { 319 return new boolean[0]; 320 } 321 322 @Override 323 public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { 324 for (Row action: actions) { 325 // Row is the index of the loop above where we make WALEntry and Cells. 326 int row = Bytes.toInt(action.getRow()); 327 assertTrue("" + row, row> BOUNDARY); 328 UNFILTERED.incrementAndGet(); 329 } 330 } 331 332 @Override 333 public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { 334 335 } 336 337 @Override 338 public Result get(Get get) throws IOException { 339 return null; 340 } 341 342 @Override 343 public Result[] get(List<Get> gets) throws IOException { 344 return new Result[0]; 345 } 346 347 @Override 348 public ResultScanner getScanner(Scan scan) throws IOException { 349 return null; 350 } 351 352 @Override 353 public ResultScanner getScanner(byte[] family) throws IOException { 354 return null; 355 } 356 357 @Override 358 public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { 359 return null; 360 } 361 362 @Override 363 public void put(Put put) throws IOException { 364 365 } 366 367 @Override 368 public void put(List<Put> puts) throws IOException { 369 370 } 371 372 @Override 373 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { 374 return false; 375 } 376 377 @Override 378 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException { 379 return false; 380 } 381 382 @Override 383 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Put put) throws IOException { 384 return false; 385 } 386 387 @Override 388 public void delete(Delete delete) throws IOException { 389 390 } 391 392 @Override 393 public void delete(List<Delete> deletes) throws IOException { 394 395 } 396 397 @Override 398 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException { 399 return false; 400 } 401 402 @Override 403 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException { 404 return false; 405 } 406 407 @Override 408 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Delete delete) throws IOException { 409 return false; 410 } 411 412 @Override 413 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 414 return null; 415 } 416 417 @Override 418 public void mutateRow(RowMutations rm) throws IOException { 419 420 } 421 422 @Override 423 public Result append(Append append) throws IOException { 424 return null; 425 } 426 427 @Override 428 public Result increment(Increment increment) throws IOException { 429 return null; 430 } 431 432 @Override 433 public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { 434 return 0; 435 } 436 437 @Override 438 public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException { 439 return 0; 440 } 441 442 @Override 443 public void close() throws IOException { 444 445 } 446 447 @Override 448 public CoprocessorRpcChannel coprocessorService(byte[] row) { 449 return null; 450 } 451 452 @Override 453 public <T extends com.google.protobuf.Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws com.google.protobuf.ServiceException, Throwable { 454 return null; 455 } 456 457 @Override 458 public <T extends com.google.protobuf.Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable { 459 460 } 461 462 @Override 463 public <R extends com.google.protobuf.Message> Map<byte[], R> batchCoprocessorService(com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, com.google.protobuf.Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws com.google.protobuf.ServiceException, Throwable { 464 return null; 465 } 466 467 @Override 468 public <R extends com.google.protobuf.Message> void batchCoprocessorService(com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, com.google.protobuf.Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable { 469 470 } 471 472 @Override 473 public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException { 474 return false; 475 } 476 477 @Override 478 public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, RowMutations mutation) throws IOException { 479 return false; 480 } 481 482 @Override 483 public long getRpcTimeout(TimeUnit unit) { 484 return 0; 485 } 486 487 @Override 488 public int getRpcTimeout() { 489 return 0; 490 } 491 492 @Override 493 public void setRpcTimeout(int rpcTimeout) { 494 495 } 496 497 @Override 498 public long getReadRpcTimeout(TimeUnit unit) { 499 return 0; 500 } 501 502 @Override 503 public int getReadRpcTimeout() { 504 return 0; 505 } 506 507 @Override 508 public void setReadRpcTimeout(int readRpcTimeout) { 509 510 } 511 512 @Override 513 public long getWriteRpcTimeout(TimeUnit unit) { 514 return 0; 515 } 516 517 @Override 518 public int getWriteRpcTimeout() { 519 return 0; 520 } 521 522 @Override 523 public void setWriteRpcTimeout(int writeRpcTimeout) { 524 525 } 526 527 @Override 528 public long getOperationTimeout(TimeUnit unit) { 529 return 0; 530 } 531 532 @Override 533 public int getOperationTimeout() { 534 return 0; 535 } 536 537 @Override 538 public void setOperationTimeout(int operationTimeout) { 539 } 540 541 @Override 542 public RegionLocator getRegionLocator() throws IOException { 543 return null; 544 } 545 }; 546 } 547 }; 548 } 549 550 @Override 551 public void clearRegionLocationCache() { 552 } 553 } 554} 555 556