001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.util.NavigableMap; 030import java.util.NoSuchElementException; 031import java.util.OptionalLong; 032import java.util.TreeMap; 033import java.util.concurrent.PriorityBlockingQueue; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.RegionInfoBuilder; 046import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 047import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 048import org.apache.hadoop.hbase.replication.WALEntryFilter; 049import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.testclassification.ReplicationTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.wal.WAL; 054import org.apache.hadoop.hbase.wal.WAL.Entry; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.apache.hadoop.hbase.wal.WALFactory; 057import org.apache.hadoop.hbase.wal.WALKeyImpl; 058import org.apache.hadoop.hdfs.MiniDFSCluster; 059import org.junit.After; 060import org.junit.AfterClass; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Rule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.junit.rules.TestName; 068import org.mockito.Mockito; 069 070@Category({ ReplicationTests.class, LargeTests.class }) 071public class TestWALEntryStream { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestWALEntryStream.class); 076 077 private static HBaseTestingUtility TEST_UTIL; 078 private static Configuration conf; 079 private static FileSystem fs; 080 private static MiniDFSCluster cluster; 081 private static final TableName tableName = TableName.valueOf("tablename"); 082 private static final byte[] family = Bytes.toBytes("column"); 083 private static final byte[] qualifier = Bytes.toBytes("qualifier"); 084 private static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName) 085 .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build(); 086 private static final NavigableMap<byte[], Integer> scopes = getScopes(); 087 088 private static NavigableMap<byte[], Integer> getScopes() { 089 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 090 scopes.put(family, 1); 091 return scopes; 092 } 093 094 private WAL log; 095 PriorityBlockingQueue<Path> walQueue; 096 private PathWatcher pathWatcher; 097 098 @Rule 099 public TestName tn = new TestName(); 100 private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 101 102 @BeforeClass 103 public static void setUpBeforeClass() throws Exception { 104 TEST_UTIL = new HBaseTestingUtility(); 105 conf = TEST_UTIL.getConfiguration(); 106 TEST_UTIL.startMiniDFSCluster(3); 107 108 cluster = TEST_UTIL.getDFSCluster(); 109 fs = cluster.getFileSystem(); 110 } 111 112 @AfterClass 113 public static void tearDownAfterClass() throws Exception { 114 TEST_UTIL.shutdownMiniCluster(); 115 } 116 117 @Before 118 public void setUp() throws Exception { 119 walQueue = new PriorityBlockingQueue<>(); 120 pathWatcher = new PathWatcher(); 121 final WALFactory wals = new WALFactory(conf, tn.getMethodName()); 122 wals.getWALProvider().addWALActionsListener(pathWatcher); 123 log = wals.getWAL(info); 124 } 125 126 @After 127 public void tearDown() throws Exception { 128 log.close(); 129 } 130 131 // Try out different combinations of row count and KeyValue count 132 @Test 133 public void testDifferentCounts() throws Exception { 134 int[] NB_ROWS = { 1500, 60000 }; 135 int[] NB_KVS = { 1, 100 }; 136 // whether compression is used 137 Boolean[] BOOL_VALS = { false, true }; 138 // long lastPosition = 0; 139 for (int nbRows : NB_ROWS) { 140 for (int walEditKVs : NB_KVS) { 141 for (boolean isCompressionEnabled : BOOL_VALS) { 142 TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, 143 isCompressionEnabled); 144 mvcc.advanceTo(1); 145 146 for (int i = 0; i < nbRows; i++) { 147 appendToLogPlus(walEditKVs); 148 } 149 150 log.rollWriter(); 151 152 try (WALEntryStream entryStream = 153 new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) { 154 int i = 0; 155 while (entryStream.hasNext()) { 156 assertNotNull(entryStream.next()); 157 i++; 158 } 159 assertEquals(nbRows, i); 160 161 // should've read all entries 162 assertFalse(entryStream.hasNext()); 163 } 164 // reset everything for next loop 165 log.close(); 166 setUp(); 167 } 168 } 169 } 170 } 171 172 /** 173 * Tests basic reading of log appends 174 */ 175 @Test 176 public void testAppendsWithRolls() throws Exception { 177 appendToLog(); 178 long oldPos; 179 try (WALEntryStream entryStream = 180 new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) { 181 // There's one edit in the log, read it. Reading past it needs to throw exception 182 assertTrue(entryStream.hasNext()); 183 WAL.Entry entry = entryStream.next(); 184 assertNotNull(entry); 185 assertFalse(entryStream.hasNext()); 186 try { 187 entry = entryStream.next(); 188 fail(); 189 } catch (NoSuchElementException e) { 190 // expected 191 } 192 oldPos = entryStream.getPosition(); 193 } 194 195 appendToLog(); 196 197 try (WALEntryStream entryStream = new WALEntryStream(walQueue, conf, oldPos, 198 log, null, new MetricsSource("1"))) { 199 // Read the newly added entry, make sure we made progress 200 WAL.Entry entry = entryStream.next(); 201 assertNotEquals(oldPos, entryStream.getPosition()); 202 assertNotNull(entry); 203 oldPos = entryStream.getPosition(); 204 } 205 206 // We rolled but we still should see the end of the first log and get that item 207 appendToLog(); 208 log.rollWriter(); 209 appendToLog(); 210 211 try (WALEntryStream entryStream = new WALEntryStream(walQueue, conf, oldPos, 212 log, null, new MetricsSource("1"))) { 213 WAL.Entry entry = entryStream.next(); 214 assertNotEquals(oldPos, entryStream.getPosition()); 215 assertNotNull(entry); 216 217 // next item should come from the new log 218 entry = entryStream.next(); 219 assertNotEquals(oldPos, entryStream.getPosition()); 220 assertNotNull(entry); 221 222 // no more entries to read 223 assertFalse(entryStream.hasNext()); 224 oldPos = entryStream.getPosition(); 225 } 226 } 227 228 /** 229 * Tests that if after a stream is opened, more entries come in and then the log is rolled, we 230 * don't mistakenly dequeue the current log thinking we're done with it 231 */ 232 @Test 233 public void testLogrollWhileStreaming() throws Exception { 234 appendToLog("1"); 235 appendToLog("2");// 2 236 try (WALEntryStream entryStream = 237 new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) { 238 assertEquals("1", getRow(entryStream.next())); 239 240 appendToLog("3"); // 3 - comes in after reader opened 241 log.rollWriter(); // log roll happening while we're reading 242 appendToLog("4"); // 4 - this append is in the rolled log 243 244 assertEquals("2", getRow(entryStream.next())); 245 assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an 246 // entry in first log 247 assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4 248 // and 3 would be skipped 249 assertEquals("4", getRow(entryStream.next())); // 4 250 assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly 251 assertFalse(entryStream.hasNext()); 252 } 253 } 254 255 /** 256 * Tests that if writes come in while we have a stream open, we shouldn't miss them 257 */ 258 @Test 259 public void testNewEntriesWhileStreaming() throws Exception { 260 appendToLog("1"); 261 try (WALEntryStream entryStream = 262 new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) { 263 entryStream.next(); // we've hit the end of the stream at this point 264 265 // some new entries come in while we're streaming 266 appendToLog("2"); 267 appendToLog("3"); 268 269 // don't see them 270 assertFalse(entryStream.hasNext()); 271 272 // But we do if we reset 273 entryStream.reset(); 274 assertEquals("2", getRow(entryStream.next())); 275 assertEquals("3", getRow(entryStream.next())); 276 assertFalse(entryStream.hasNext()); 277 } 278 } 279 280 @Test 281 public void testResumeStreamingFromPosition() throws Exception { 282 long lastPosition = 0; 283 appendToLog("1"); 284 try (WALEntryStream entryStream = 285 new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) { 286 entryStream.next(); // we've hit the end of the stream at this point 287 appendToLog("2"); 288 appendToLog("3"); 289 lastPosition = entryStream.getPosition(); 290 } 291 // next stream should picks up where we left off 292 try (WALEntryStream entryStream = 293 new WALEntryStream(walQueue, conf, lastPosition, log, null, new MetricsSource("1"))) { 294 assertEquals("2", getRow(entryStream.next())); 295 assertEquals("3", getRow(entryStream.next())); 296 assertFalse(entryStream.hasNext()); // done 297 assertEquals(1, walQueue.size()); 298 } 299 } 300 301 /** 302 * Tests that if we stop before hitting the end of a stream, we can continue where we left off 303 * using the last position 304 */ 305 @Test 306 public void testPosition() throws Exception { 307 long lastPosition = 0; 308 appendEntriesToLog(3); 309 // read only one element 310 try (WALEntryStream entryStream = new WALEntryStream(walQueue, conf, lastPosition, 311 log, null, new MetricsSource("1"))) { 312 entryStream.next(); 313 lastPosition = entryStream.getPosition(); 314 } 315 // there should still be two more entries from where we left off 316 try (WALEntryStream entryStream = 317 new WALEntryStream(walQueue, conf, lastPosition, log, null, new MetricsSource("1"))) { 318 assertNotNull(entryStream.next()); 319 assertNotNull(entryStream.next()); 320 assertFalse(entryStream.hasNext()); 321 } 322 } 323 324 325 @Test 326 public void testEmptyStream() throws Exception { 327 try (WALEntryStream entryStream = 328 new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) { 329 assertFalse(entryStream.hasNext()); 330 } 331 } 332 333 @Test 334 public void testReplicationSourceWALReaderThread() throws Exception { 335 appendEntriesToLog(3); 336 // get ending position 337 long position; 338 try (WALEntryStream entryStream = 339 new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) { 340 entryStream.next(); 341 entryStream.next(); 342 entryStream.next(); 343 position = entryStream.getPosition(); 344 } 345 346 // start up a batcher 347 ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); 348 when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); 349 ReplicationSource source = Mockito.mock(ReplicationSource.class); 350 when(source.getSourceManager()).thenReturn(mockSourceManager); 351 when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); 352 when(source.getWALFileLengthProvider()).thenReturn(log); 353 ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf, 354 walQueue, 0, getDummyFilter(), source); 355 Path walPath = walQueue.peek(); 356 batcher.start(); 357 WALEntryBatch entryBatch = batcher.take(); 358 359 // should've batched up our entries 360 assertNotNull(entryBatch); 361 assertEquals(3, entryBatch.getWalEntries().size()); 362 assertEquals(position, entryBatch.getLastWalPosition()); 363 assertEquals(walPath, entryBatch.getLastWalPath()); 364 assertEquals(3, entryBatch.getNbRowKeys()); 365 366 appendToLog("foo"); 367 entryBatch = batcher.take(); 368 assertEquals(1, entryBatch.getNbEntries()); 369 assertEquals("foo", getRow(entryBatch.getWalEntries().get(0))); 370 } 371 372 private String getRow(WAL.Entry entry) { 373 Cell cell = entry.getEdit().getCells().get(0); 374 return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 375 } 376 377 private void appendToLog(String key) throws IOException { 378 final long txid = log.append(info, 379 new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), 380 mvcc, scopes), getWALEdit(key), true); 381 log.sync(txid); 382 } 383 384 private void appendEntriesToLog(int count) throws IOException { 385 for (int i = 0; i < count; i++) { 386 appendToLog(); 387 } 388 } 389 390 private void appendToLog() throws IOException { 391 appendToLogPlus(1); 392 } 393 394 private void appendToLogPlus(int count) throws IOException { 395 final long txid = log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 396 System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true); 397 log.sync(txid); 398 } 399 400 private WALEdit getWALEdits(int count) { 401 WALEdit edit = new WALEdit(); 402 for (int i = 0; i < count; i++) { 403 edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, 404 System.currentTimeMillis(), qualifier)); 405 } 406 return edit; 407 } 408 409 private WALEdit getWALEdit(String row) { 410 WALEdit edit = new WALEdit(); 411 edit.add( 412 new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier)); 413 return edit; 414 } 415 416 private WALEntryFilter getDummyFilter() { 417 return new WALEntryFilter() { 418 419 @Override 420 public Entry filter(Entry entry) { 421 return entry; 422 } 423 }; 424 } 425 426 class PathWatcher implements WALActionsListener { 427 428 Path currentPath; 429 430 @Override 431 public void preLogRoll(Path oldPath, Path newPath) throws IOException { 432 walQueue.add(newPath); 433 currentPath = newPath; 434 } 435 } 436 437 @Test 438 public void testReadBeyondCommittedLength() throws IOException, InterruptedException { 439 appendToLog("1"); 440 appendToLog("2"); 441 long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong(); 442 AtomicLong fileLength = new AtomicLong(size - 1); 443 try (WALEntryStream entryStream = new WALEntryStream(walQueue, conf, 0, 444 p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) { 445 assertTrue(entryStream.hasNext()); 446 assertNotNull(entryStream.next()); 447 // can not get log 2 448 assertFalse(entryStream.hasNext()); 449 Thread.sleep(1000); 450 entryStream.reset(); 451 // still can not get log 2 452 assertFalse(entryStream.hasNext()); 453 454 // can get log 2 now 455 fileLength.set(size); 456 entryStream.reset(); 457 assertTrue(entryStream.hasNext()); 458 assertNotNull(entryStream.next()); 459 460 assertFalse(entryStream.hasNext()); 461 } 462 } 463}