001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.net.SocketAddress; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellBuilderType; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.ExtendedCellBuilder; 034import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 035import org.apache.hadoop.hbase.ExtendedCellScanner; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; 040import org.apache.hadoop.hbase.client.AsyncClusterConnection; 041import org.apache.hadoop.hbase.client.AsyncConnection; 042import org.apache.hadoop.hbase.client.AsyncTable; 043import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 044import org.apache.hadoop.hbase.client.ConnectionRegistry; 045import org.apache.hadoop.hbase.client.DoNothingConnectionRegistry; 046import org.apache.hadoop.hbase.client.DummyAsyncClusterConnection; 047import org.apache.hadoop.hbase.client.DummyAsyncTable; 048import org.apache.hadoop.hbase.client.Row; 049import org.apache.hadoop.hbase.security.User; 050import org.apache.hadoop.hbase.testclassification.ReplicationTests; 051import org.apache.hadoop.hbase.testclassification.SmallTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.junit.jupiter.api.Tag; 054import org.junit.jupiter.api.Test; 055import org.junit.jupiter.api.TestInfo; 056 057import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 058 059import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 060 061/** 062 * Simple test of sink-side wal entry filter facility. 063 */ 064@Tag(ReplicationTests.TAG) 065@Tag(SmallTests.TAG) 066public class TestWALEntrySinkFilter { 067 068 static final int BOUNDARY = 5; 069 static final AtomicInteger UNFILTERED = new AtomicInteger(); 070 static final AtomicInteger FILTERED = new AtomicInteger(); 071 072 /** 073 * Test filter. Filter will filter out any write time that is <= 5 (BOUNDARY). We count how many 074 * items we filter out and we count how many cells make it through for distribution way down below 075 * in the Table#batch implementation. Puts in place a custom DevNullConnection so we can insert 076 * our counting Table. 077 */ 078 @Test 079 public void testWALEntryFilter(TestInfo testInfo) throws IOException { 080 String testName = testInfo.getTestMethod().get().getName(); 081 Configuration conf = HBaseConfiguration.create(); 082 // Make it so our filter is instantiated on construction of ReplicationSink. 083 conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 084 DevNullConnectionRegistry.class, ConnectionRegistry.class); 085 conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, 086 IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); 087 conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL, 088 DevNullAsyncClusterConnection.class, AsyncClusterConnection.class); 089 ReplicationSink sink = new ReplicationSink(conf, null); 090 // Create some dumb walentries. 091 List<AdminProtos.WALEntry> entries = new ArrayList<>(); 092 AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); 093 // Need a tablename. 094 ByteString tableName = ByteString.copyFromUtf8(TableName.valueOf(testName).toString()); 095 // Add WALEdit Cells to Cells List. The way edits arrive at the sink is with protos 096 // describing the edit with all Cells from all edits aggregated in a single CellScanner. 097 final List<ExtendedCell> cells = new ArrayList<>(); 098 int count = BOUNDARY * 2; 099 for (int i = 0; i < count; i++) { 100 byte[] bytes = Bytes.toBytes(i); 101 // Create a wal entry. Everything is set to the current index as bytes or int/long. 102 entryBuilder.clear(); 103 entryBuilder.setKey(entryBuilder.getKeyBuilder().setLogSequenceNumber(i) 104 .setEncodedRegionName(ByteString.copyFrom(bytes)).setWriteTime(i).setTableName(tableName) 105 .build()); 106 // Lets have one Cell associated with each WALEdit. 107 entryBuilder.setAssociatedCellCount(1); 108 entries.add(entryBuilder.build()); 109 // We need to add a Cell per WALEdit to the cells array. 110 ExtendedCellBuilder cellBuilder = 111 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); 112 // Make cells whose row, family, cell, value, and ts are == 'i'. 113 ExtendedCell cell = cellBuilder.setRow(bytes).setFamily(bytes).setQualifier(bytes) 114 .setType(Cell.Type.Put).setTimestamp(i).setValue(bytes).build(); 115 cells.add(cell); 116 } 117 // Now wrap our cells array in a CellScanner that we can pass in to replicateEntries. It has 118 // all Cells from all the WALEntries made above. 119 ExtendedCellScanner cellScanner = new ExtendedCellScanner() { 120 // Set to -1 because advance gets called before current. 121 int index = -1; 122 123 @Override 124 public ExtendedCell current() { 125 return cells.get(index); 126 } 127 128 @Override 129 public boolean advance() throws IOException { 130 index++; 131 return index < cells.size(); 132 } 133 }; 134 // Call our sink. 135 sink.replicateEntries(entries, cellScanner, null, null, null); 136 // Check what made it through and what was filtered. 137 assertTrue(FILTERED.get() > 0); 138 assertTrue(UNFILTERED.get() > 0); 139 assertEquals(count, FILTERED.get() + UNFILTERED.get()); 140 } 141 142 /** 143 * Simple filter that will filter out any entry wholse writeTime is <= 5. 144 */ 145 public static class IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl 146 implements WALEntrySinkFilter { 147 public IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl() { 148 } 149 150 @Override 151 public void init(AsyncConnection conn) { 152 // Do nothing. 153 } 154 155 @Override 156 public boolean filter(TableName table, long writeTime) { 157 boolean b = writeTime <= BOUNDARY; 158 if (b) { 159 FILTERED.incrementAndGet(); 160 } 161 return b; 162 } 163 } 164 165 public static class DevNullConnectionRegistry extends DoNothingConnectionRegistry { 166 167 public DevNullConnectionRegistry(Configuration conf, User user) { 168 super(conf, user); 169 } 170 171 @Override 172 public CompletableFuture<String> getClusterId() { 173 return CompletableFuture.completedFuture("test"); 174 } 175 } 176 177 public static class DevNullAsyncClusterConnection extends DummyAsyncClusterConnection { 178 179 private final Configuration conf; 180 181 public DevNullAsyncClusterConnection(Configuration conf, Object registry, String clusterId, 182 SocketAddress localAddress, User user) { 183 this.conf = conf; 184 } 185 186 @Override 187 public AsyncTable<AdvancedScanResultConsumer> getTable(TableName tableName) { 188 return new DummyAsyncTable<AdvancedScanResultConsumer>() { 189 190 @Override 191 public <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) { 192 List<T> list = new ArrayList<>(actions.size()); 193 for (Row action : actions) { 194 // Row is the index of the loop above where we make WALEntry and Cells. 195 int row = Bytes.toInt(action.getRow()); 196 assertTrue(row > BOUNDARY, "" + row); 197 UNFILTERED.incrementAndGet(); 198 list.add(null); 199 } 200 return CompletableFuture.completedFuture(list); 201 } 202 }; 203 } 204 205 @Override 206 public Configuration getConfiguration() { 207 return conf; 208 } 209 } 210}