001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; 023import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; 024import static org.junit.Assert.assertArrayEquals; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertFalse; 027import static org.junit.Assert.assertNotNull; 028import static org.junit.Assert.assertNull; 029import static org.junit.Assert.assertTrue; 030import static org.junit.Assert.fail; 031import static org.mockito.ArgumentMatchers.any; 032import static org.mockito.ArgumentMatchers.anyLong; 033import static org.mockito.Mockito.doThrow; 034import static org.mockito.Mockito.mock; 035import static org.mockito.Mockito.never; 036import static org.mockito.Mockito.spy; 037import static org.mockito.Mockito.times; 038import static org.mockito.Mockito.verify; 039import static org.mockito.Mockito.when; 040 041import java.io.IOException; 042import java.io.InterruptedIOException; 043import java.math.BigDecimal; 044import java.nio.charset.StandardCharsets; 045import java.security.PrivilegedExceptionAction; 046import java.util.ArrayList; 047import java.util.Arrays; 048import java.util.Collection; 049import java.util.List; 050import java.util.Map; 051import java.util.NavigableMap; 052import java.util.Objects; 053import java.util.TreeMap; 054import java.util.concurrent.Callable; 055import java.util.concurrent.CountDownLatch; 056import java.util.concurrent.ExecutorService; 057import java.util.concurrent.Executors; 058import java.util.concurrent.Future; 059import java.util.concurrent.TimeUnit; 060import java.util.concurrent.atomic.AtomicBoolean; 061import java.util.concurrent.atomic.AtomicInteger; 062import java.util.concurrent.atomic.AtomicReference; 063import org.apache.commons.lang3.RandomStringUtils; 064import org.apache.hadoop.conf.Configuration; 065import org.apache.hadoop.fs.FSDataOutputStream; 066import org.apache.hadoop.fs.FileStatus; 067import org.apache.hadoop.fs.FileSystem; 068import org.apache.hadoop.fs.Path; 069import org.apache.hadoop.hbase.ArrayBackedTag; 070import org.apache.hadoop.hbase.Cell; 071import org.apache.hadoop.hbase.Cell.Type; 072import org.apache.hadoop.hbase.CellBuilderFactory; 073import org.apache.hadoop.hbase.CellBuilderType; 074import org.apache.hadoop.hbase.CellUtil; 075import org.apache.hadoop.hbase.CompareOperator; 076import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 077import org.apache.hadoop.hbase.DroppedSnapshotException; 078import org.apache.hadoop.hbase.HBaseClassTestRule; 079import org.apache.hadoop.hbase.HBaseConfiguration; 080import org.apache.hadoop.hbase.HBaseTestingUtility; 081import org.apache.hadoop.hbase.HColumnDescriptor; 082import org.apache.hadoop.hbase.HConstants; 083import org.apache.hadoop.hbase.HConstants.OperationStatusCode; 084import org.apache.hadoop.hbase.HDFSBlocksDistribution; 085import org.apache.hadoop.hbase.HRegionInfo; 086import org.apache.hadoop.hbase.HTableDescriptor; 087import org.apache.hadoop.hbase.KeyValue; 088import org.apache.hadoop.hbase.MiniHBaseCluster; 089import org.apache.hadoop.hbase.MultithreadedTestUtil; 090import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 091import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 092import org.apache.hadoop.hbase.NotServingRegionException; 093import org.apache.hadoop.hbase.PrivateCellUtil; 094import org.apache.hadoop.hbase.RegionTooBusyException; 095import org.apache.hadoop.hbase.ServerName; 096import org.apache.hadoop.hbase.StartMiniClusterOption; 097import org.apache.hadoop.hbase.TableName; 098import org.apache.hadoop.hbase.TagType; 099import org.apache.hadoop.hbase.Waiter; 100import org.apache.hadoop.hbase.client.Append; 101import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 102import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 103import org.apache.hadoop.hbase.client.Delete; 104import org.apache.hadoop.hbase.client.Durability; 105import org.apache.hadoop.hbase.client.Get; 106import org.apache.hadoop.hbase.client.Increment; 107import org.apache.hadoop.hbase.client.Mutation; 108import org.apache.hadoop.hbase.client.Put; 109import org.apache.hadoop.hbase.client.RegionInfo; 110import org.apache.hadoop.hbase.client.RegionInfoBuilder; 111import org.apache.hadoop.hbase.client.Result; 112import org.apache.hadoop.hbase.client.RowMutations; 113import org.apache.hadoop.hbase.client.Scan; 114import org.apache.hadoop.hbase.client.Table; 115import org.apache.hadoop.hbase.client.TableDescriptor; 116import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 117import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 118import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 119import org.apache.hadoop.hbase.filter.BigDecimalComparator; 120import org.apache.hadoop.hbase.filter.BinaryComparator; 121import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; 122import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 123import org.apache.hadoop.hbase.filter.Filter; 124import org.apache.hadoop.hbase.filter.FilterBase; 125import org.apache.hadoop.hbase.filter.FilterList; 126import org.apache.hadoop.hbase.filter.NullComparator; 127import org.apache.hadoop.hbase.filter.PrefixFilter; 128import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; 129import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 130import org.apache.hadoop.hbase.filter.SubstringComparator; 131import org.apache.hadoop.hbase.filter.ValueFilter; 132import org.apache.hadoop.hbase.io.hfile.HFile; 133import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 134import org.apache.hadoop.hbase.monitoring.MonitoredTask; 135import org.apache.hadoop.hbase.monitoring.TaskMonitor; 136import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation; 137import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; 138import org.apache.hadoop.hbase.regionserver.Region.RowLock; 139import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem; 140import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 141import org.apache.hadoop.hbase.regionserver.wal.FSHLog; 142import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; 143import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 144import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver; 145import org.apache.hadoop.hbase.security.User; 146import org.apache.hadoop.hbase.test.MetricsAssertHelper; 147import org.apache.hadoop.hbase.testclassification.LargeTests; 148import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 149import org.apache.hadoop.hbase.util.Bytes; 150import org.apache.hadoop.hbase.util.CommonFSUtils; 151import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 152import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 153import org.apache.hadoop.hbase.util.FSUtils; 154import org.apache.hadoop.hbase.util.HFileArchiveUtil; 155import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 156import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 157import org.apache.hadoop.hbase.util.Threads; 158import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 159import org.apache.hadoop.hbase.wal.FaultyFSLog; 160import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper; 161import org.apache.hadoop.hbase.wal.WAL; 162import org.apache.hadoop.hbase.wal.WALEdit; 163import org.apache.hadoop.hbase.wal.WALFactory; 164import org.apache.hadoop.hbase.wal.WALKeyImpl; 165import org.apache.hadoop.hbase.wal.WALProvider; 166import org.apache.hadoop.hbase.wal.WALProvider.Writer; 167import org.apache.hadoop.hbase.wal.WALSplitUtil; 168import org.junit.After; 169import org.junit.Assert; 170import org.junit.Before; 171import org.junit.ClassRule; 172import org.junit.Rule; 173import org.junit.Test; 174import org.junit.experimental.categories.Category; 175import org.junit.rules.ExpectedException; 176import org.junit.rules.TestName; 177import org.mockito.ArgumentCaptor; 178import org.mockito.ArgumentMatcher; 179import org.mockito.Mockito; 180import org.mockito.invocation.InvocationOnMock; 181import org.mockito.stubbing.Answer; 182import org.slf4j.Logger; 183import org.slf4j.LoggerFactory; 184 185import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 186import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 187import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 188import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 189import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 190 191import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 198 199/** 200 * Basic stand-alone testing of HRegion. No clusters! 201 * 202 * A lot of the meta information for an HRegion now lives inside other HRegions 203 * or in the HBaseMaster, so only basic testing is possible. 204 */ 205@Category({VerySlowRegionServerTests.class, LargeTests.class}) 206@SuppressWarnings("deprecation") 207public class TestHRegion { 208 209 @ClassRule 210 public static final HBaseClassTestRule CLASS_RULE = 211 HBaseClassTestRule.forClass(TestHRegion.class); 212 213 // Do not spin up clusters in here. If you need to spin up a cluster, do it 214 // over in TestHRegionOnCluster. 215 private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class); 216 @Rule 217 public TestName name = new TestName(); 218 @Rule public final ExpectedException thrown = ExpectedException.none(); 219 220 private static final String COLUMN_FAMILY = "MyCF"; 221 private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY); 222 private static final EventLoopGroup GROUP = new NioEventLoopGroup(); 223 224 HRegion region = null; 225 // Do not run unit tests in parallel (? Why not? It don't work? Why not? St.Ack) 226 protected static HBaseTestingUtility TEST_UTIL; 227 public static Configuration CONF ; 228 private String dir; 229 private static FileSystem FILESYSTEM; 230 private final int MAX_VERSIONS = 2; 231 232 // Test names 233 protected TableName tableName; 234 protected String method; 235 protected final byte[] qual = Bytes.toBytes("qual"); 236 protected final byte[] qual1 = Bytes.toBytes("qual1"); 237 protected final byte[] qual2 = Bytes.toBytes("qual2"); 238 protected final byte[] qual3 = Bytes.toBytes("qual3"); 239 protected final byte[] value = Bytes.toBytes("value"); 240 protected final byte[] value1 = Bytes.toBytes("value1"); 241 protected final byte[] value2 = Bytes.toBytes("value2"); 242 protected final byte[] row = Bytes.toBytes("rowA"); 243 protected final byte[] row2 = Bytes.toBytes("rowB"); 244 245 protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory 246 .getInstance(MetricsAssertHelper.class); 247 248 @Before 249 public void setup() throws IOException { 250 TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 251 FILESYSTEM = TEST_UTIL.getTestFileSystem(); 252 CONF = TEST_UTIL.getConfiguration(); 253 NettyAsyncFSWALConfigHelper.setEventLoopConfig(CONF, GROUP, NioSocketChannel.class); 254 dir = TEST_UTIL.getDataTestDir("TestHRegion").toString(); 255 method = name.getMethodName(); 256 tableName = TableName.valueOf(method); 257 CONF.set(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, String.valueOf(0.09)); 258 } 259 260 @After 261 public void tearDown() throws IOException { 262 // Region may have been closed, but it is still no harm if we close it again here using HTU. 263 HBaseTestingUtility.closeRegionAndWAL(region); 264 EnvironmentEdgeManagerTestHelper.reset(); 265 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); 266 TEST_UTIL.cleanupTestDir(); 267 } 268 269 /** 270 * Test that I can use the max flushed sequence id after the close. 271 * @throws IOException 272 */ 273 @Test 274 public void testSequenceId() throws IOException { 275 region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES); 276 assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); 277 // Weird. This returns 0 if no store files or no edits. Afraid to change it. 278 assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); 279 HBaseTestingUtility.closeRegionAndWAL(this.region); 280 assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); 281 assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); 282 // Open region again. 283 region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES); 284 byte [] value = Bytes.toBytes(method); 285 // Make a random put against our cf. 286 Put put = new Put(value); 287 put.addColumn(COLUMN_FAMILY_BYTES, null, value); 288 region.put(put); 289 // No flush yet so init numbers should still be in place. 290 assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); 291 assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); 292 region.flush(true); 293 long max = region.getMaxFlushedSeqId(); 294 HBaseTestingUtility.closeRegionAndWAL(this.region); 295 assertEquals(max, region.getMaxFlushedSeqId()); 296 this.region = null; 297 } 298 299 /** 300 * Test for Bug 2 of HBASE-10466. 301 * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize 302 * is smaller than a certain value, or when region close starts a flush is ongoing, the first 303 * flush is skipped and only the second flush takes place. However, two flushes are required in 304 * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data 305 * in current memstore. The fix is removing all conditions except abort check so we ensure 2 306 * flushes for region close." 307 * @throws IOException 308 */ 309 @Test 310 public void testCloseCarryingSnapshot() throws IOException { 311 region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES); 312 HStore store = region.getStore(COLUMN_FAMILY_BYTES); 313 // Get some random bytes. 314 byte [] value = Bytes.toBytes(method); 315 // Make a random put against our cf. 316 Put put = new Put(value); 317 put.addColumn(COLUMN_FAMILY_BYTES, null, value); 318 // First put something in current memstore, which will be in snapshot after flusher.prepare() 319 region.put(put); 320 StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY); 321 storeFlushCtx.prepare(); 322 // Second put something in current memstore 323 put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); 324 region.put(put); 325 // Close with something in memstore and something in the snapshot. Make sure all is cleared. 326 HBaseTestingUtility.closeRegionAndWAL(region); 327 assertEquals(0, region.getMemStoreDataSize()); 328 region = null; 329 } 330 331 /* 332 * This test is for verifying memstore snapshot size is correctly updated in case of rollback 333 * See HBASE-10845 334 */ 335 @Test 336 public void testMemstoreSnapshotSize() throws IOException { 337 class MyFaultyFSLog extends FaultyFSLog { 338 StoreFlushContext storeFlushCtx; 339 public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf) 340 throws IOException { 341 super(fs, rootDir, logName, conf); 342 } 343 344 void setStoreFlushCtx(StoreFlushContext storeFlushCtx) { 345 this.storeFlushCtx = storeFlushCtx; 346 } 347 348 @Override 349 public void sync(long txid) throws IOException { 350 storeFlushCtx.prepare(); 351 super.sync(txid); 352 } 353 } 354 355 FileSystem fs = FileSystem.get(CONF); 356 Path rootDir = new Path(dir + "testMemstoreSnapshotSize"); 357 MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF); 358 region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog, 359 COLUMN_FAMILY_BYTES); 360 361 HStore store = region.getStore(COLUMN_FAMILY_BYTES); 362 // Get some random bytes. 363 byte [] value = Bytes.toBytes(method); 364 faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY)); 365 366 Put put = new Put(value); 367 put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); 368 faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC); 369 370 boolean threwIOE = false; 371 try { 372 region.put(put); 373 } catch (IOException ioe) { 374 threwIOE = true; 375 } finally { 376 assertTrue("The regionserver should have thrown an exception", threwIOE); 377 } 378 MemStoreSize mss = store.getFlushableSize(); 379 assertTrue("flushable size should be zero, but it is " + mss, 380 mss.getDataSize() == 0); 381 } 382 383 /** 384 * Create a WAL outside of the usual helper in 385 * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method 386 * doesn't play nicely with FaultyFileSystem. Call this method before overriding 387 * {@code fs.file.impl}. 388 * @param callingMethod a unique component for the path, probably the name of the test method. 389 */ 390 private static WAL createWALCompatibleWithFaultyFileSystem(String callingMethod, 391 Configuration conf, TableName tableName) throws IOException { 392 final Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log"); 393 final Configuration walConf = new Configuration(conf); 394 FSUtils.setRootDir(walConf, logDir); 395 return new WALFactory(walConf, callingMethod) 396 .getWAL(RegionInfoBuilder.newBuilder(tableName).build()); 397 } 398 399 @Test 400 public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException { 401 String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate"; 402 FileSystem fs = FileSystem.get(CONF); 403 Path rootDir = new Path(dir + testName); 404 FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF); 405 hLog.init(); 406 HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog, 407 COLUMN_FAMILY_BYTES); 408 HStore store = region.getStore(COLUMN_FAMILY_BYTES); 409 assertEquals(0, region.getMemStoreDataSize()); 410 411 // Put one value 412 byte [] value = Bytes.toBytes(method); 413 Put put = new Put(value); 414 put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); 415 region.put(put); 416 long onePutSize = region.getMemStoreDataSize(); 417 assertTrue(onePutSize > 0); 418 419 RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); 420 doThrow(new IOException()) 421 .when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any()); 422 region.setCoprocessorHost(mockedCPHost); 423 424 put = new Put(value); 425 put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value); 426 try { 427 region.put(put); 428 fail("Should have failed with IOException"); 429 } catch (IOException expected) { 430 } 431 long expectedSize = onePutSize * 2; 432 assertEquals("memstoreSize should be incremented", 433 expectedSize, region.getMemStoreDataSize()); 434 assertEquals("flushable size should be incremented", 435 expectedSize, store.getFlushableSize().getDataSize()); 436 437 region.setCoprocessorHost(null); 438 } 439 440 /** 441 * A test case of HBASE-21041 442 * @throws Exception Exception 443 */ 444 @Test 445 public void testFlushAndMemstoreSizeCounting() throws Exception { 446 byte[] family = Bytes.toBytes("family"); 447 this.region = initHRegion(tableName, method, CONF, family); 448 final WALFactory wals = new WALFactory(CONF, method); 449 try { 450 for (byte[] row : HBaseTestingUtility.ROWS) { 451 Put put = new Put(row); 452 put.addColumn(family, family, row); 453 region.put(put); 454 } 455 region.flush(true); 456 // After flush, data size should be zero 457 assertEquals(0, region.getMemStoreDataSize()); 458 // After flush, a new active mutable segment is created, so the heap size 459 // should equal to MutableSegment.DEEP_OVERHEAD 460 assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize()); 461 // After flush, offheap should be zero 462 assertEquals(0, region.getMemStoreOffHeapSize()); 463 } finally { 464 HBaseTestingUtility.closeRegionAndWAL(this.region); 465 this.region = null; 466 wals.close(); 467 } 468 } 469 470 /** 471 * Test we do not lose data if we fail a flush and then close. 472 * Part of HBase-10466. Tests the following from the issue description: 473 * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is 474 * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when 475 * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by 476 * the sum of current memstore sizes instead of snapshots left from previous failed flush. This 477 * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize 478 * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size 479 * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize 480 * much smaller than expected. In extreme case, if the error accumulates to even bigger than 481 * HRegion's memstore size limit, any further flush is skipped because flush does not do anything 482 * if memstoreSize is not larger than 0." 483 * @throws Exception 484 */ 485 @Test 486 public void testFlushSizeAccounting() throws Exception { 487 final Configuration conf = HBaseConfiguration.create(CONF); 488 final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName); 489 // Only retry once. 490 conf.setInt("hbase.hstore.flush.retries.number", 1); 491 final User user = 492 User.createUserForTesting(conf, method, new String[]{"foo"}); 493 // Inject our faulty LocalFileSystem 494 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 495 user.runAs(new PrivilegedExceptionAction<Object>() { 496 @Override 497 public Object run() throws Exception { 498 // Make sure it worked (above is sensitive to caching details in hadoop core) 499 FileSystem fs = FileSystem.get(conf); 500 Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); 501 FaultyFileSystem ffs = (FaultyFileSystem)fs; 502 HRegion region = null; 503 try { 504 // Initialize region 505 region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal, 506 COLUMN_FAMILY_BYTES); 507 long size = region.getMemStoreDataSize(); 508 Assert.assertEquals(0, size); 509 // Put one item into memstore. Measure the size of one item in memstore. 510 Put p1 = new Put(row); 511 p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[]) null)); 512 region.put(p1); 513 final long sizeOfOnePut = region.getMemStoreDataSize(); 514 // Fail a flush which means the current memstore will hang out as memstore 'snapshot'. 515 try { 516 LOG.info("Flushing"); 517 region.flush(true); 518 Assert.fail("Didn't bubble up IOE!"); 519 } catch (DroppedSnapshotException dse) { 520 // What we are expecting 521 region.closing.set(false); // this is needed for the rest of the test to work 522 } 523 // Make it so all writes succeed from here on out 524 ffs.fault.set(false); 525 // Check sizes. Should still be the one entry. 526 Assert.assertEquals(sizeOfOnePut, region.getMemStoreDataSize()); 527 // Now add two entries so that on this next flush that fails, we can see if we 528 // subtract the right amount, the snapshot size only. 529 Put p2 = new Put(row); 530 p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null)); 531 p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null)); 532 region.put(p2); 533 long expectedSize = sizeOfOnePut * 3; 534 Assert.assertEquals(expectedSize, region.getMemStoreDataSize()); 535 // Do a successful flush. It will clear the snapshot only. Thats how flushes work. 536 // If already a snapshot, we clear it else we move the memstore to be snapshot and flush 537 // it 538 region.flush(true); 539 // Make sure our memory accounting is right. 540 Assert.assertEquals(sizeOfOnePut * 2, region.getMemStoreDataSize()); 541 } finally { 542 HBaseTestingUtility.closeRegionAndWAL(region); 543 } 544 return null; 545 } 546 }); 547 FileSystem.closeAllForUGI(user.getUGI()); 548 } 549 550 @Test 551 public void testCloseWithFailingFlush() throws Exception { 552 final Configuration conf = HBaseConfiguration.create(CONF); 553 final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName); 554 // Only retry once. 555 conf.setInt("hbase.hstore.flush.retries.number", 1); 556 final User user = 557 User.createUserForTesting(conf, this.method, new String[]{"foo"}); 558 // Inject our faulty LocalFileSystem 559 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 560 user.runAs(new PrivilegedExceptionAction<Object>() { 561 @Override 562 public Object run() throws Exception { 563 // Make sure it worked (above is sensitive to caching details in hadoop core) 564 FileSystem fs = FileSystem.get(conf); 565 Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); 566 FaultyFileSystem ffs = (FaultyFileSystem)fs; 567 HRegion region = null; 568 try { 569 // Initialize region 570 region = initHRegion(tableName, null, null, false, 571 Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES); 572 long size = region.getMemStoreDataSize(); 573 Assert.assertEquals(0, size); 574 // Put one item into memstore. Measure the size of one item in memstore. 575 Put p1 = new Put(row); 576 p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null)); 577 region.put(p1); 578 // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only. 579 HStore store = region.getStore(COLUMN_FAMILY_BYTES); 580 StoreFlushContext storeFlushCtx = 581 store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY); 582 storeFlushCtx.prepare(); 583 // Now add two entries to the foreground memstore. 584 Put p2 = new Put(row); 585 p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null)); 586 p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null)); 587 region.put(p2); 588 // Now try close on top of a failing flush. 589 HBaseTestingUtility.closeRegionAndWAL(region); 590 region = null; 591 fail(); 592 } catch (DroppedSnapshotException dse) { 593 // Expected 594 LOG.info("Expected DroppedSnapshotException"); 595 } finally { 596 // Make it so all writes succeed from here on out so can close clean 597 ffs.fault.set(false); 598 HBaseTestingUtility.closeRegionAndWAL(region); 599 } 600 return null; 601 } 602 }); 603 FileSystem.closeAllForUGI(user.getUGI()); 604 } 605 606 @Test 607 public void testCompactionAffectedByScanners() throws Exception { 608 byte[] family = Bytes.toBytes("family"); 609 this.region = initHRegion(tableName, method, CONF, family); 610 611 Put put = new Put(Bytes.toBytes("r1")); 612 put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); 613 region.put(put); 614 region.flush(true); 615 616 Scan scan = new Scan(); 617 scan.setMaxVersions(3); 618 // open the first scanner 619 RegionScanner scanner1 = region.getScanner(scan); 620 621 Delete delete = new Delete(Bytes.toBytes("r1")); 622 region.delete(delete); 623 region.flush(true); 624 625 // open the second scanner 626 RegionScanner scanner2 = region.getScanner(scan); 627 628 List<Cell> results = new ArrayList<>(); 629 630 System.out.println("Smallest read point:" + region.getSmallestReadPoint()); 631 632 // make a major compaction 633 region.compact(true); 634 635 // open the third scanner 636 RegionScanner scanner3 = region.getScanner(scan); 637 638 // get data from scanner 1, 2, 3 after major compaction 639 scanner1.next(results); 640 System.out.println(results); 641 assertEquals(1, results.size()); 642 643 results.clear(); 644 scanner2.next(results); 645 System.out.println(results); 646 assertEquals(0, results.size()); 647 648 results.clear(); 649 scanner3.next(results); 650 System.out.println(results); 651 assertEquals(0, results.size()); 652 } 653 654 @Test 655 public void testToShowNPEOnRegionScannerReseek() throws Exception { 656 byte[] family = Bytes.toBytes("family"); 657 this.region = initHRegion(tableName, method, CONF, family); 658 659 Put put = new Put(Bytes.toBytes("r1")); 660 put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); 661 region.put(put); 662 put = new Put(Bytes.toBytes("r2")); 663 put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); 664 region.put(put); 665 region.flush(true); 666 667 Scan scan = new Scan(); 668 scan.setMaxVersions(3); 669 // open the first scanner 670 RegionScanner scanner1 = region.getScanner(scan); 671 672 System.out.println("Smallest read point:" + region.getSmallestReadPoint()); 673 674 region.compact(true); 675 676 scanner1.reseek(Bytes.toBytes("r2")); 677 List<Cell> results = new ArrayList<>(); 678 scanner1.next(results); 679 Cell keyValue = results.get(0); 680 Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0); 681 scanner1.close(); 682 } 683 684 @Test 685 public void testArchiveRecoveredEditsReplay() throws Exception { 686 byte[] family = Bytes.toBytes("family"); 687 this.region = initHRegion(tableName, method, CONF, family); 688 final WALFactory wals = new WALFactory(CONF, method); 689 try { 690 Path regiondir = region.getRegionFileSystem().getRegionDir(); 691 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 692 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 693 694 Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 695 696 long maxSeqId = 1050; 697 long minSeqId = 1000; 698 699 for (long i = minSeqId; i <= maxSeqId; i += 10) { 700 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 701 fs.create(recoveredEdits); 702 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 703 704 long time = System.nanoTime(); 705 WALEdit edit = new WALEdit(); 706 edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes 707 .toBytes(i))); 708 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, 709 HConstants.DEFAULT_CLUSTER_ID), edit)); 710 711 writer.close(); 712 } 713 MonitoredTask status = TaskMonitor.get().createStatus(method); 714 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 715 for (HStore store : region.getStores()) { 716 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1); 717 } 718 CONF.set("hbase.region.archive.recovered.edits", "true"); 719 CONF.set(CommonFSUtils.HBASE_WAL_DIR, "/custom_wal_dir"); 720 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); 721 assertEquals(maxSeqId, seqId); 722 region.getMVCC().advanceTo(seqId); 723 String fakeFamilyName = recoveredEditsDir.getName(); 724 Path rootDir = new Path(CONF.get(HConstants.HBASE_DIR)); 725 Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(rootDir, 726 region.getRegionInfo(), Bytes.toBytes(fakeFamilyName)); 727 FileStatus[] list = TEST_UTIL.getTestFileSystem().listStatus(storeArchiveDir); 728 assertEquals(6, list.length); 729 } finally { 730 CONF.set("hbase.region.archive.recovered.edits", "false"); 731 CONF.set(CommonFSUtils.HBASE_WAL_DIR, ""); 732 HBaseTestingUtility.closeRegionAndWAL(this.region); 733 this.region = null; 734 wals.close(); 735 } 736 } 737 738 @Test 739 public void testSkipRecoveredEditsReplay() throws Exception { 740 byte[] family = Bytes.toBytes("family"); 741 this.region = initHRegion(tableName, method, CONF, family); 742 final WALFactory wals = new WALFactory(CONF, method); 743 try { 744 Path regiondir = region.getRegionFileSystem().getRegionDir(); 745 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 746 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 747 748 Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 749 750 long maxSeqId = 1050; 751 long minSeqId = 1000; 752 753 for (long i = minSeqId; i <= maxSeqId; i += 10) { 754 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 755 fs.create(recoveredEdits); 756 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 757 758 long time = System.nanoTime(); 759 WALEdit edit = new WALEdit(); 760 edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes 761 .toBytes(i))); 762 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, 763 HConstants.DEFAULT_CLUSTER_ID), edit)); 764 765 writer.close(); 766 } 767 MonitoredTask status = TaskMonitor.get().createStatus(method); 768 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 769 for (HStore store : region.getStores()) { 770 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1); 771 } 772 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); 773 assertEquals(maxSeqId, seqId); 774 region.getMVCC().advanceTo(seqId); 775 Get get = new Get(row); 776 Result result = region.get(get); 777 for (long i = minSeqId; i <= maxSeqId; i += 10) { 778 List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i)); 779 assertEquals(1, kvs.size()); 780 assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0))); 781 } 782 } finally { 783 HBaseTestingUtility.closeRegionAndWAL(this.region); 784 this.region = null; 785 wals.close(); 786 } 787 } 788 789 @Test 790 public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception { 791 byte[] family = Bytes.toBytes("family"); 792 this.region = initHRegion(tableName, method, CONF, family); 793 final WALFactory wals = new WALFactory(CONF, method); 794 try { 795 Path regiondir = region.getRegionFileSystem().getRegionDir(); 796 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 797 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 798 799 Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 800 801 long maxSeqId = 1050; 802 long minSeqId = 1000; 803 804 for (long i = minSeqId; i <= maxSeqId; i += 10) { 805 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 806 fs.create(recoveredEdits); 807 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 808 809 long time = System.nanoTime(); 810 WALEdit edit = new WALEdit(); 811 edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes 812 .toBytes(i))); 813 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, 814 HConstants.DEFAULT_CLUSTER_ID), edit)); 815 816 writer.close(); 817 } 818 long recoverSeqId = 1030; 819 MonitoredTask status = TaskMonitor.get().createStatus(method); 820 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 821 for (HStore store : region.getStores()) { 822 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); 823 } 824 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); 825 assertEquals(maxSeqId, seqId); 826 region.getMVCC().advanceTo(seqId); 827 Get get = new Get(row); 828 Result result = region.get(get); 829 for (long i = minSeqId; i <= maxSeqId; i += 10) { 830 List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i)); 831 if (i < recoverSeqId) { 832 assertEquals(0, kvs.size()); 833 } else { 834 assertEquals(1, kvs.size()); 835 assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0))); 836 } 837 } 838 } finally { 839 HBaseTestingUtility.closeRegionAndWAL(this.region); 840 this.region = null; 841 wals.close(); 842 } 843 } 844 845 @Test 846 public void testSkipRecoveredEditsReplayAllIgnored() throws Exception { 847 byte[] family = Bytes.toBytes("family"); 848 this.region = initHRegion(tableName, method, CONF, family); 849 Path regiondir = region.getRegionFileSystem().getRegionDir(); 850 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 851 852 Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 853 for (int i = 1000; i < 1050; i += 10) { 854 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 855 FSDataOutputStream dos = fs.create(recoveredEdits); 856 dos.writeInt(i); 857 dos.close(); 858 } 859 long minSeqId = 2000; 860 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1)); 861 FSDataOutputStream dos = fs.create(recoveredEdits); 862 dos.close(); 863 864 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 865 for (HStore store : region.getStores()) { 866 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId); 867 } 868 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null); 869 assertEquals(minSeqId, seqId); 870 } 871 872 @Test 873 public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception { 874 byte[] family = Bytes.toBytes("family"); 875 this.region = initHRegion(tableName, method, CONF, family); 876 final WALFactory wals = new WALFactory(CONF, method); 877 try { 878 Path regiondir = region.getRegionFileSystem().getRegionDir(); 879 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 880 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 881 byte[][] columns = region.getTableDescriptor().getColumnFamilyNames().toArray(new byte[0][]); 882 883 assertEquals(0, region.getStoreFileList(columns).size()); 884 885 Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 886 887 long maxSeqId = 1050; 888 long minSeqId = 1000; 889 890 for (long i = minSeqId; i <= maxSeqId; i += 10) { 891 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 892 fs.create(recoveredEdits); 893 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 894 895 long time = System.nanoTime(); 896 WALEdit edit = null; 897 if (i == maxSeqId) { 898 edit = WALEdit.createCompaction(region.getRegionInfo(), 899 CompactionDescriptor.newBuilder() 900 .setTableName(ByteString.copyFrom(tableName.getName())) 901 .setFamilyName(ByteString.copyFrom(regionName)) 902 .setEncodedRegionName(ByteString.copyFrom(regionName)) 903 .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString()))) 904 .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName())) 905 .build()); 906 } else { 907 edit = new WALEdit(); 908 edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes 909 .toBytes(i))); 910 } 911 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, 912 HConstants.DEFAULT_CLUSTER_ID), edit)); 913 writer.close(); 914 } 915 916 long recoverSeqId = 1030; 917 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 918 MonitoredTask status = TaskMonitor.get().createStatus(method); 919 for (HStore store : region.getStores()) { 920 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); 921 } 922 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); 923 assertEquals(maxSeqId, seqId); 924 925 // assert that the files are flushed 926 assertEquals(1, region.getStoreFileList(columns).size()); 927 928 } finally { 929 HBaseTestingUtility.closeRegionAndWAL(this.region); 930 this.region = null; 931 wals.close(); 932 } 933 } 934 935 @Test 936 public void testRecoveredEditsReplayCompaction() throws Exception { 937 testRecoveredEditsReplayCompaction(false); 938 testRecoveredEditsReplayCompaction(true); 939 } 940 941 public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception { 942 CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class); 943 byte[] family = Bytes.toBytes("family"); 944 this.region = initHRegion(tableName, method, CONF, family); 945 final WALFactory wals = new WALFactory(CONF, method); 946 try { 947 Path regiondir = region.getRegionFileSystem().getRegionDir(); 948 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 949 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 950 951 long maxSeqId = 3; 952 long minSeqId = 0; 953 954 for (long i = minSeqId; i < maxSeqId; i++) { 955 Put put = new Put(Bytes.toBytes(i)); 956 put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i)); 957 region.put(put); 958 region.flush(true); 959 } 960 961 // this will create a region with 3 files 962 assertEquals(3, region.getStore(family).getStorefilesCount()); 963 List<Path> storeFiles = new ArrayList<>(3); 964 for (HStoreFile sf : region.getStore(family).getStorefiles()) { 965 storeFiles.add(sf.getPath()); 966 } 967 968 // disable compaction completion 969 CONF.setBoolean("hbase.hstore.compaction.complete", false); 970 region.compactStores(); 971 972 // ensure that nothing changed 973 assertEquals(3, region.getStore(family).getStorefilesCount()); 974 975 // now find the compacted file, and manually add it to the recovered edits 976 Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family)); 977 FileStatus[] files = FSUtils.listStatus(fs, tmpDir); 978 String errorMsg = "Expected to find 1 file in the region temp directory " 979 + "from the compaction, could not find any"; 980 assertNotNull(errorMsg, files); 981 assertEquals(errorMsg, 1, files.length); 982 // move the file inside region dir 983 Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family), 984 files[0].getPath()); 985 986 byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes(); 987 byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length]; 988 for (int i=0; i < encodedNameAsBytes.length; i++) { 989 // Mix the byte array to have a new encodedName 990 fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1); 991 } 992 993 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region 994 .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family, 995 storeFiles, Lists.newArrayList(newFile), 996 region.getRegionFileSystem().getStoreDir(Bytes.toString(family))); 997 998 WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(), 999 this.region.getRegionInfo(), compactionDescriptor, region.getMVCC()); 1000 1001 Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 1002 1003 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); 1004 fs.create(recoveredEdits); 1005 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 1006 1007 long time = System.nanoTime(); 1008 1009 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, 10, time, 1010 HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(), 1011 compactionDescriptor))); 1012 writer.close(); 1013 1014 // close the region now, and reopen again 1015 region.getTableDescriptor(); 1016 region.getRegionInfo(); 1017 HBaseTestingUtility.closeRegionAndWAL(this.region); 1018 try { 1019 region = HRegion.openHRegion(region, null); 1020 } catch (WrongRegionException wre) { 1021 fail("Matching encoded region name should not have produced WrongRegionException"); 1022 } 1023 1024 // now check whether we have only one store file, the compacted one 1025 Collection<HStoreFile> sfs = region.getStore(family).getStorefiles(); 1026 for (HStoreFile sf : sfs) { 1027 LOG.info(Objects.toString(sf.getPath())); 1028 } 1029 if (!mismatchedRegionName) { 1030 assertEquals(1, region.getStore(family).getStorefilesCount()); 1031 } 1032 files = FSUtils.listStatus(fs, tmpDir); 1033 assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0); 1034 1035 for (long i = minSeqId; i < maxSeqId; i++) { 1036 Get get = new Get(Bytes.toBytes(i)); 1037 Result result = region.get(get); 1038 byte[] value = result.getValue(family, Bytes.toBytes(i)); 1039 assertArrayEquals(Bytes.toBytes(i), value); 1040 } 1041 } finally { 1042 HBaseTestingUtility.closeRegionAndWAL(this.region); 1043 this.region = null; 1044 wals.close(); 1045 CONF.setClass(HConstants.REGION_IMPL, HRegion.class, Region.class); 1046 } 1047 } 1048 1049 @Test 1050 public void testFlushMarkers() throws Exception { 1051 // tests that flush markers are written to WAL and handled at recovered edits 1052 byte[] family = Bytes.toBytes("family"); 1053 Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log"); 1054 final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration()); 1055 FSUtils.setRootDir(walConf, logDir); 1056 final WALFactory wals = new WALFactory(walConf, method); 1057 final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()); 1058 1059 this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, 1060 HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family); 1061 try { 1062 Path regiondir = region.getRegionFileSystem().getRegionDir(); 1063 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 1064 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 1065 1066 long maxSeqId = 3; 1067 long minSeqId = 0; 1068 1069 for (long i = minSeqId; i < maxSeqId; i++) { 1070 Put put = new Put(Bytes.toBytes(i)); 1071 put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i)); 1072 region.put(put); 1073 region.flush(true); 1074 } 1075 1076 // this will create a region with 3 files from flush 1077 assertEquals(3, region.getStore(family).getStorefilesCount()); 1078 List<String> storeFiles = new ArrayList<>(3); 1079 for (HStoreFile sf : region.getStore(family).getStorefiles()) { 1080 storeFiles.add(sf.getPath().getName()); 1081 } 1082 1083 // now verify that the flush markers are written 1084 wal.shutdown(); 1085 WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal), 1086 TEST_UTIL.getConfiguration()); 1087 try { 1088 List<WAL.Entry> flushDescriptors = new ArrayList<>(); 1089 long lastFlushSeqId = -1; 1090 while (true) { 1091 WAL.Entry entry = reader.next(); 1092 if (entry == null) { 1093 break; 1094 } 1095 Cell cell = entry.getEdit().getCells().get(0); 1096 if (WALEdit.isMetaEditFamily(cell)) { 1097 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell); 1098 assertNotNull(flushDesc); 1099 assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray()); 1100 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 1101 assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId); 1102 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 1103 assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId); 1104 } 1105 lastFlushSeqId = flushDesc.getFlushSequenceNumber(); 1106 assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray()); 1107 assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store 1108 StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0); 1109 assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray()); 1110 assertEquals("family", storeFlushDesc.getStoreHomeDir()); 1111 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 1112 assertEquals(0, storeFlushDesc.getFlushOutputCount()); 1113 } else { 1114 assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush 1115 assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0))); 1116 } 1117 1118 flushDescriptors.add(entry); 1119 } 1120 } 1121 1122 assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush 1123 1124 // now write those markers to the recovered edits again. 1125 1126 Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 1127 1128 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); 1129 fs.create(recoveredEdits); 1130 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 1131 1132 for (WAL.Entry entry : flushDescriptors) { 1133 writer.append(entry); 1134 } 1135 writer.close(); 1136 } finally { 1137 if (null != reader) { 1138 try { 1139 reader.close(); 1140 } catch (IOException exception) { 1141 LOG.warn("Problem closing wal: " + exception.getMessage()); 1142 LOG.debug("exception details", exception); 1143 } 1144 } 1145 } 1146 1147 // close the region now, and reopen again 1148 HBaseTestingUtility.closeRegionAndWAL(this.region); 1149 region = HRegion.openHRegion(region, null); 1150 1151 // now check whether we have can read back the data from region 1152 for (long i = minSeqId; i < maxSeqId; i++) { 1153 Get get = new Get(Bytes.toBytes(i)); 1154 Result result = region.get(get); 1155 byte[] value = result.getValue(family, Bytes.toBytes(i)); 1156 assertArrayEquals(Bytes.toBytes(i), value); 1157 } 1158 } finally { 1159 HBaseTestingUtility.closeRegionAndWAL(this.region); 1160 this.region = null; 1161 wals.close(); 1162 } 1163 } 1164 1165 static class IsFlushWALMarker implements ArgumentMatcher<WALEdit> { 1166 volatile FlushAction[] actions; 1167 public IsFlushWALMarker(FlushAction... actions) { 1168 this.actions = actions; 1169 } 1170 @Override 1171 public boolean matches(WALEdit edit) { 1172 List<Cell> cells = edit.getCells(); 1173 if (cells.isEmpty()) { 1174 return false; 1175 } 1176 if (WALEdit.isMetaEditFamily(cells.get(0))) { 1177 FlushDescriptor desc; 1178 try { 1179 desc = WALEdit.getFlushDescriptor(cells.get(0)); 1180 } catch (IOException e) { 1181 LOG.warn(e.toString(), e); 1182 return false; 1183 } 1184 if (desc != null) { 1185 for (FlushAction action : actions) { 1186 if (desc.getAction() == action) { 1187 return true; 1188 } 1189 } 1190 } 1191 } 1192 return false; 1193 } 1194 public IsFlushWALMarker set(FlushAction... actions) { 1195 this.actions = actions; 1196 return this; 1197 } 1198 } 1199 1200 @Test 1201 public void testFlushMarkersWALFail() throws Exception { 1202 // test the cases where the WAL append for flush markers fail. 1203 byte[] family = Bytes.toBytes("family"); 1204 1205 // spy an actual WAL implementation to throw exception (was not able to mock) 1206 Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log"); 1207 1208 final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration()); 1209 FSUtils.setRootDir(walConf, logDir); 1210 // Make up a WAL that we can manipulate at append time. 1211 class FailAppendFlushMarkerWAL extends FSHLog { 1212 volatile FlushAction [] flushActions = null; 1213 1214 public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf) 1215 throws IOException { 1216 super(fs, root, logDir, conf); 1217 } 1218 1219 @Override 1220 protected Writer createWriterInstance(Path path) throws IOException { 1221 final Writer w = super.createWriterInstance(path); 1222 return new Writer() { 1223 @Override 1224 public void close() throws IOException { 1225 w.close(); 1226 } 1227 1228 @Override 1229 public void sync(boolean forceSync) throws IOException { 1230 w.sync(forceSync); 1231 } 1232 1233 @Override 1234 public void append(Entry entry) throws IOException { 1235 List<Cell> cells = entry.getEdit().getCells(); 1236 if (WALEdit.isMetaEditFamily(cells.get(0))) { 1237 FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0)); 1238 if (desc != null) { 1239 for (FlushAction flushAction: flushActions) { 1240 if (desc.getAction().equals(flushAction)) { 1241 throw new IOException("Failed to append flush marker! " + flushAction); 1242 } 1243 } 1244 } 1245 } 1246 w.append(entry); 1247 } 1248 1249 @Override 1250 public long getLength() { 1251 return w.getLength(); 1252 } 1253 }; 1254 } 1255 } 1256 FailAppendFlushMarkerWAL wal = 1257 new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf), 1258 method, walConf); 1259 wal.init(); 1260 this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, 1261 HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family); 1262 int i = 0; 1263 Put put = new Put(Bytes.toBytes(i)); 1264 put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal 1265 put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i)); 1266 region.put(put); 1267 1268 // 1. Test case where START_FLUSH throws exception 1269 wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH}; 1270 1271 // start cache flush will throw exception 1272 try { 1273 region.flush(true); 1274 fail("This should have thrown exception"); 1275 } catch (DroppedSnapshotException unexpected) { 1276 // this should not be a dropped snapshot exception. Meaning that RS will not abort 1277 throw unexpected; 1278 } catch (IOException expected) { 1279 // expected 1280 } 1281 // The WAL is hosed now. It has two edits appended. We cannot roll the log without it 1282 // throwing a DroppedSnapshotException to force an abort. Just clean up the mess. 1283 region.close(true); 1284 wal.close(); 1285 1286 // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception 1287 wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH}; 1288 wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf), 1289 method, walConf); 1290 wal.init(); 1291 this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, 1292 HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family); 1293 region.put(put); 1294 // 3. Test case where ABORT_FLUSH will throw exception. 1295 // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with 1296 // DroppedSnapshotException. Below COMMIT_FLUSH will cause flush to abort 1297 wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH}; 1298 1299 try { 1300 region.flush(true); 1301 fail("This should have thrown exception"); 1302 } catch (DroppedSnapshotException expected) { 1303 // we expect this exception, since we were able to write the snapshot, but failed to 1304 // write the flush marker to WAL 1305 } catch (IOException unexpected) { 1306 throw unexpected; 1307 } 1308 } 1309 1310 @Test 1311 public void testGetWhileRegionClose() throws IOException { 1312 Configuration hc = initSplit(); 1313 int numRows = 100; 1314 byte[][] families = { fam1, fam2, fam3 }; 1315 1316 // Setting up region 1317 this.region = initHRegion(tableName, method, hc, families); 1318 // Put data in region 1319 final int startRow = 100; 1320 putData(startRow, numRows, qual1, families); 1321 putData(startRow, numRows, qual2, families); 1322 putData(startRow, numRows, qual3, families); 1323 final AtomicBoolean done = new AtomicBoolean(false); 1324 final AtomicInteger gets = new AtomicInteger(0); 1325 GetTillDoneOrException[] threads = new GetTillDoneOrException[10]; 1326 try { 1327 // Set ten threads running concurrently getting from the region. 1328 for (int i = 0; i < threads.length / 2; i++) { 1329 threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets); 1330 threads[i].setDaemon(true); 1331 threads[i].start(); 1332 } 1333 // Artificially make the condition by setting closing flag explicitly. 1334 // I can't make the issue happen with a call to region.close(). 1335 this.region.closing.set(true); 1336 for (int i = threads.length / 2; i < threads.length; i++) { 1337 threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets); 1338 threads[i].setDaemon(true); 1339 threads[i].start(); 1340 } 1341 } finally { 1342 if (this.region != null) { 1343 HBaseTestingUtility.closeRegionAndWAL(this.region); 1344 this.region = null; 1345 } 1346 } 1347 done.set(true); 1348 for (GetTillDoneOrException t : threads) { 1349 try { 1350 t.join(); 1351 } catch (InterruptedException e) { 1352 e.printStackTrace(); 1353 } 1354 if (t.e != null) { 1355 LOG.info("Exception=" + t.e); 1356 assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException); 1357 } 1358 } 1359 } 1360 1361 /* 1362 * Thread that does get on single row until 'done' flag is flipped. If an 1363 * exception causes us to fail, it records it. 1364 */ 1365 class GetTillDoneOrException extends Thread { 1366 private final Get g; 1367 private final AtomicBoolean done; 1368 private final AtomicInteger count; 1369 private Exception e; 1370 1371 GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, 1372 final AtomicInteger c) { 1373 super("getter." + i); 1374 this.g = new Get(r); 1375 this.done = d; 1376 this.count = c; 1377 } 1378 1379 @Override 1380 public void run() { 1381 while (!this.done.get()) { 1382 try { 1383 assertTrue(region.get(g).size() > 0); 1384 this.count.incrementAndGet(); 1385 } catch (Exception e) { 1386 this.e = e; 1387 break; 1388 } 1389 } 1390 } 1391 } 1392 1393 /* 1394 * An involved filter test. Has multiple column families and deletes in mix. 1395 */ 1396 @Test 1397 public void testWeirdCacheBehaviour() throws Exception { 1398 final TableName tableName = TableName.valueOf(name.getMethodName()); 1399 byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"), 1400 Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") }; 1401 this.region = initHRegion(tableName, method, CONF, FAMILIES); 1402 String value = "this is the value"; 1403 String value2 = "this is some other value"; 1404 String keyPrefix1 = "prefix1"; 1405 String keyPrefix2 = "prefix2"; 1406 String keyPrefix3 = "prefix3"; 1407 putRows(this.region, 3, value, keyPrefix1); 1408 putRows(this.region, 3, value, keyPrefix2); 1409 putRows(this.region, 3, value, keyPrefix3); 1410 putRows(this.region, 3, value2, keyPrefix1); 1411 putRows(this.region, 3, value2, keyPrefix2); 1412 putRows(this.region, 3, value2, keyPrefix3); 1413 System.out.println("Checking values for key: " + keyPrefix1); 1414 assertEquals("Got back incorrect number of rows from scan", 3, 1415 getNumberOfRows(keyPrefix1, value2, this.region)); 1416 System.out.println("Checking values for key: " + keyPrefix2); 1417 assertEquals("Got back incorrect number of rows from scan", 3, 1418 getNumberOfRows(keyPrefix2, value2, this.region)); 1419 System.out.println("Checking values for key: " + keyPrefix3); 1420 assertEquals("Got back incorrect number of rows from scan", 3, 1421 getNumberOfRows(keyPrefix3, value2, this.region)); 1422 deleteColumns(this.region, value2, keyPrefix1); 1423 deleteColumns(this.region, value2, keyPrefix2); 1424 deleteColumns(this.region, value2, keyPrefix3); 1425 System.out.println("Starting important checks....."); 1426 assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0, 1427 getNumberOfRows(keyPrefix1, value2, this.region)); 1428 assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0, 1429 getNumberOfRows(keyPrefix2, value2, this.region)); 1430 assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0, 1431 getNumberOfRows(keyPrefix3, value2, this.region)); 1432 } 1433 1434 @Test 1435 public void testAppendWithReadOnlyTable() throws Exception { 1436 final TableName tableName = TableName.valueOf(name.getMethodName()); 1437 this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily")); 1438 boolean exceptionCaught = false; 1439 Append append = new Append(Bytes.toBytes("somerow")); 1440 append.setDurability(Durability.SKIP_WAL); 1441 append.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1442 Bytes.toBytes("somevalue")); 1443 try { 1444 region.append(append); 1445 } catch (IOException e) { 1446 exceptionCaught = true; 1447 } 1448 assertTrue(exceptionCaught == true); 1449 } 1450 1451 @Test 1452 public void testIncrWithReadOnlyTable() throws Exception { 1453 final TableName tableName = TableName.valueOf(name.getMethodName()); 1454 this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily")); 1455 boolean exceptionCaught = false; 1456 Increment inc = new Increment(Bytes.toBytes("somerow")); 1457 inc.setDurability(Durability.SKIP_WAL); 1458 inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L); 1459 try { 1460 region.increment(inc); 1461 } catch (IOException e) { 1462 exceptionCaught = true; 1463 } 1464 assertTrue(exceptionCaught == true); 1465 } 1466 1467 private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException { 1468 InternalScanner scanner = buildScanner(keyPrefix, value, r); 1469 int count = 0; 1470 boolean more = false; 1471 List<Cell> results = new ArrayList<>(); 1472 do { 1473 more = scanner.next(results); 1474 if (results != null && !results.isEmpty()) 1475 count++; 1476 else 1477 break; 1478 Delete delete = new Delete(CellUtil.cloneRow(results.get(0))); 1479 delete.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2")); 1480 r.delete(delete); 1481 results.clear(); 1482 } while (more); 1483 assertEquals("Did not perform correct number of deletes", 3, count); 1484 } 1485 1486 private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception { 1487 InternalScanner resultScanner = buildScanner(keyPrefix, value, r); 1488 int numberOfResults = 0; 1489 List<Cell> results = new ArrayList<>(); 1490 boolean more = false; 1491 do { 1492 more = resultScanner.next(results); 1493 if (results != null && !results.isEmpty()) 1494 numberOfResults++; 1495 else 1496 break; 1497 for (Cell kv : results) { 1498 System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv))); 1499 } 1500 results.clear(); 1501 } while (more); 1502 return numberOfResults; 1503 } 1504 1505 private InternalScanner buildScanner(String keyPrefix, String value, HRegion r) 1506 throws IOException { 1507 // Defaults FilterList.Operator.MUST_PASS_ALL. 1508 FilterList allFilters = new FilterList(); 1509 allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix))); 1510 // Only return rows where this column value exists in the row. 1511 SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"), 1512 Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value)); 1513 filter.setFilterIfMissing(true); 1514 allFilters.addFilter(filter); 1515 Scan scan = new Scan(); 1516 scan.addFamily(Bytes.toBytes("trans-blob")); 1517 scan.addFamily(Bytes.toBytes("trans-type")); 1518 scan.addFamily(Bytes.toBytes("trans-date")); 1519 scan.addFamily(Bytes.toBytes("trans-tags")); 1520 scan.addFamily(Bytes.toBytes("trans-group")); 1521 scan.setFilter(allFilters); 1522 return r.getScanner(scan); 1523 } 1524 1525 private void putRows(HRegion r, int numRows, String value, String key) throws IOException { 1526 for (int i = 0; i < numRows; i++) { 1527 String row = key + "_" + i/* UUID.randomUUID().toString() */; 1528 System.out.println(String.format("Saving row: %s, with value %s", row, value)); 1529 Put put = new Put(Bytes.toBytes(row)); 1530 put.setDurability(Durability.SKIP_WAL); 1531 put.addColumn(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob")); 1532 put.addColumn(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement")); 1533 put.addColumn(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999")); 1534 put.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value)); 1535 put.addColumn(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId")); 1536 r.put(put); 1537 } 1538 } 1539 1540 @Test 1541 public void testFamilyWithAndWithoutColon() throws Exception { 1542 byte[] cf = Bytes.toBytes(COLUMN_FAMILY); 1543 this.region = initHRegion(tableName, method, CONF, cf); 1544 Put p = new Put(tableName.toBytes()); 1545 byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":"); 1546 p.addColumn(cfwithcolon, cfwithcolon, cfwithcolon); 1547 boolean exception = false; 1548 try { 1549 this.region.put(p); 1550 } catch (NoSuchColumnFamilyException e) { 1551 exception = true; 1552 } 1553 assertTrue(exception); 1554 } 1555 1556 @Test 1557 public void testBatchPut_whileNoRowLocksHeld() throws IOException { 1558 final Put[] puts = new Put[10]; 1559 MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); 1560 long syncs = prepareRegionForBachPut(puts, source, false); 1561 1562 OperationStatus[] codes = this.region.batchMutate(puts); 1563 assertEquals(10, codes.length); 1564 for (int i = 0; i < 10; i++) { 1565 assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); 1566 } 1567 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); 1568 1569 LOG.info("Next a batch put with one invalid family"); 1570 puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); 1571 codes = this.region.batchMutate(puts); 1572 assertEquals(10, codes.length); 1573 for (int i = 0; i < 10; i++) { 1574 assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, 1575 codes[i].getOperationStatusCode()); 1576 } 1577 1578 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source); 1579 } 1580 1581 @Test 1582 public void testBatchPut_whileMultipleRowLocksHeld() throws Exception { 1583 final Put[] puts = new Put[10]; 1584 MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); 1585 long syncs = prepareRegionForBachPut(puts, source, false); 1586 1587 puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); 1588 1589 LOG.info("batchPut will have to break into four batches to avoid row locks"); 1590 RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2")); 1591 RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_1")); 1592 RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3")); 1593 RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true); 1594 1595 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); 1596 final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>(); 1597 final CountDownLatch startingPuts = new CountDownLatch(1); 1598 final CountDownLatch startingClose = new CountDownLatch(1); 1599 TestThread putter = new TestThread(ctx) { 1600 @Override 1601 public void doWork() throws IOException { 1602 startingPuts.countDown(); 1603 retFromThread.set(region.batchMutate(puts)); 1604 } 1605 }; 1606 LOG.info("...starting put thread while holding locks"); 1607 ctx.addThread(putter); 1608 ctx.startThreads(); 1609 1610 // Now attempt to close the region from another thread. Prior to HBASE-12565 1611 // this would cause the in-progress batchMutate operation to to fail with 1612 // exception because it use to release and re-acquire the close-guard lock 1613 // between batches. Caller then didn't get status indicating which writes succeeded. 1614 // We now expect this thread to block until the batchMutate call finishes. 1615 Thread regionCloseThread = new TestThread(ctx) { 1616 @Override 1617 public void doWork() { 1618 try { 1619 startingPuts.await(); 1620 // Give some time for the batch mutate to get in. 1621 // We don't want to race with the mutate 1622 Thread.sleep(10); 1623 startingClose.countDown(); 1624 HBaseTestingUtility.closeRegionAndWAL(region); 1625 region = null; 1626 } catch (IOException e) { 1627 throw new RuntimeException(e); 1628 } catch (InterruptedException e) { 1629 throw new RuntimeException(e); 1630 } 1631 } 1632 }; 1633 regionCloseThread.start(); 1634 1635 startingClose.await(); 1636 startingPuts.await(); 1637 Thread.sleep(100); 1638 LOG.info("...releasing row lock 1, which should let put thread continue"); 1639 rowLock1.release(); 1640 rowLock2.release(); 1641 rowLock3.release(); 1642 waitForCounter(source, "syncTimeNumOps", syncs + 1); 1643 1644 LOG.info("...joining on put thread"); 1645 ctx.stop(); 1646 regionCloseThread.join(); 1647 1648 OperationStatus[] codes = retFromThread.get(); 1649 for (int i = 0; i < codes.length; i++) { 1650 assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, 1651 codes[i].getOperationStatusCode()); 1652 } 1653 rowLock4.release(); 1654 } 1655 1656 private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount) 1657 throws InterruptedException { 1658 long startWait = System.currentTimeMillis(); 1659 long currentCount; 1660 while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) { 1661 Thread.sleep(100); 1662 if (System.currentTimeMillis() - startWait > 10000) { 1663 fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName, 1664 expectedCount, currentCount)); 1665 } 1666 } 1667 } 1668 1669 @Test 1670 public void testAtomicBatchPut() throws IOException { 1671 final Put[] puts = new Put[10]; 1672 MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); 1673 long syncs = prepareRegionForBachPut(puts, source, false); 1674 1675 // 1. Straight forward case, should succeed 1676 MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true, 1677 HConstants.NO_NONCE, HConstants.NO_NONCE); 1678 OperationStatus[] codes = this.region.batchMutate(batchOp); 1679 assertEquals(10, codes.length); 1680 for (int i = 0; i < 10; i++) { 1681 assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); 1682 } 1683 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); 1684 1685 // 2. Failed to get lock 1686 RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3)); 1687 // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this 1688 // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread 1689 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); 1690 final AtomicReference<IOException> retFromThread = new AtomicReference<>(); 1691 final CountDownLatch finishedPuts = new CountDownLatch(1); 1692 final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true, 1693 HConstants 1694 .NO_NONCE, 1695 HConstants.NO_NONCE); 1696 TestThread putter = new TestThread(ctx) { 1697 @Override 1698 public void doWork() throws IOException { 1699 try { 1700 region.batchMutate(finalBatchOp); 1701 } catch (IOException ioe) { 1702 LOG.error("test failed!", ioe); 1703 retFromThread.set(ioe); 1704 } 1705 finishedPuts.countDown(); 1706 } 1707 }; 1708 LOG.info("...starting put thread while holding locks"); 1709 ctx.addThread(putter); 1710 ctx.startThreads(); 1711 LOG.info("...waiting for batch puts while holding locks"); 1712 try { 1713 finishedPuts.await(); 1714 } catch (InterruptedException e) { 1715 LOG.error("Interrupted!", e); 1716 } finally { 1717 if (lock != null) { 1718 lock.release(); 1719 } 1720 } 1721 assertNotNull(retFromThread.get()); 1722 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); 1723 1724 // 3. Exception thrown in validation 1725 LOG.info("Next a batch put with one invalid family"); 1726 puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); 1727 batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE, 1728 HConstants.NO_NONCE); 1729 thrown.expect(NoSuchColumnFamilyException.class); 1730 this.region.batchMutate(batchOp); 1731 } 1732 1733 @Test 1734 public void testBatchPutWithTsSlop() throws Exception { 1735 // add data with a timestamp that is too recent for range. Ensure assert 1736 CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000); 1737 final Put[] puts = new Put[10]; 1738 MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); 1739 1740 long syncs = prepareRegionForBachPut(puts, source, true); 1741 1742 OperationStatus[] codes = this.region.batchMutate(puts); 1743 assertEquals(10, codes.length); 1744 for (int i = 0; i < 10; i++) { 1745 assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode()); 1746 } 1747 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); 1748 } 1749 1750 /** 1751 * @return syncs initial syncTimeNumOps 1752 */ 1753 private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source, 1754 boolean slop) throws IOException { 1755 this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES); 1756 1757 LOG.info("First a batch put with all valid puts"); 1758 for (int i = 0; i < puts.length; i++) { 1759 puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) : 1760 new Put(Bytes.toBytes("row_" + i)); 1761 puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value); 1762 } 1763 1764 long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); 1765 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); 1766 return syncs; 1767 } 1768 1769 // //////////////////////////////////////////////////////////////////////////// 1770 // checkAndMutate tests 1771 // //////////////////////////////////////////////////////////////////////////// 1772 @Test 1773 public void testCheckAndMutate_WithEmptyRowValue() throws IOException { 1774 byte[] row1 = Bytes.toBytes("row1"); 1775 byte[] fam1 = Bytes.toBytes("fam1"); 1776 byte[] qf1 = Bytes.toBytes("qualifier"); 1777 byte[] emptyVal = new byte[] {}; 1778 byte[] val1 = Bytes.toBytes("value1"); 1779 byte[] val2 = Bytes.toBytes("value2"); 1780 1781 // Setting up region 1782 this.region = initHRegion(tableName, method, CONF, fam1); 1783 // Putting empty data in key 1784 Put put = new Put(row1); 1785 put.addColumn(fam1, qf1, emptyVal); 1786 1787 // checkAndPut with empty value 1788 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1789 new BinaryComparator(emptyVal), put); 1790 assertTrue(res); 1791 1792 // Putting data in key 1793 put = new Put(row1); 1794 put.addColumn(fam1, qf1, val1); 1795 1796 // checkAndPut with correct value 1797 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1798 new BinaryComparator(emptyVal), put); 1799 assertTrue(res); 1800 1801 // not empty anymore 1802 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1803 new BinaryComparator(emptyVal), put); 1804 assertFalse(res); 1805 1806 Delete delete = new Delete(row1); 1807 delete.addColumn(fam1, qf1); 1808 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1809 new BinaryComparator(emptyVal), delete); 1810 assertFalse(res); 1811 1812 put = new Put(row1); 1813 put.addColumn(fam1, qf1, val2); 1814 // checkAndPut with correct value 1815 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1816 new BinaryComparator(val1), put); 1817 assertTrue(res); 1818 1819 // checkAndDelete with correct value 1820 delete = new Delete(row1); 1821 delete.addColumn(fam1, qf1); 1822 delete.addColumn(fam1, qf1); 1823 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1824 new BinaryComparator(val2), delete); 1825 assertTrue(res); 1826 1827 delete = new Delete(row1); 1828 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1829 new BinaryComparator(emptyVal), delete); 1830 assertTrue(res); 1831 1832 // checkAndPut looking for a null value 1833 put = new Put(row1); 1834 put.addColumn(fam1, qf1, val1); 1835 1836 res = region 1837 .checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put); 1838 assertTrue(res); 1839 } 1840 1841 @Test 1842 public void testCheckAndMutate_WithWrongValue() throws IOException { 1843 byte[] row1 = Bytes.toBytes("row1"); 1844 byte[] fam1 = Bytes.toBytes("fam1"); 1845 byte[] qf1 = Bytes.toBytes("qualifier"); 1846 byte[] val1 = Bytes.toBytes("value1"); 1847 byte[] val2 = Bytes.toBytes("value2"); 1848 BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE); 1849 BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE); 1850 1851 // Setting up region 1852 this.region = initHRegion(tableName, method, CONF, fam1); 1853 // Putting data in key 1854 Put put = new Put(row1); 1855 put.addColumn(fam1, qf1, val1); 1856 region.put(put); 1857 1858 // checkAndPut with wrong value 1859 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1860 new BinaryComparator(val2), put); 1861 assertEquals(false, res); 1862 1863 // checkAndDelete with wrong value 1864 Delete delete = new Delete(row1); 1865 delete.addFamily(fam1); 1866 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1867 new BinaryComparator(val2), put); 1868 assertEquals(false, res); 1869 1870 // Putting data in key 1871 put = new Put(row1); 1872 put.addColumn(fam1, qf1, Bytes.toBytes(bd1)); 1873 region.put(put); 1874 1875 // checkAndPut with wrong value 1876 res = 1877 region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1878 new BigDecimalComparator(bd2), put); 1879 assertEquals(false, res); 1880 1881 // checkAndDelete with wrong value 1882 delete = new Delete(row1); 1883 delete.addFamily(fam1); 1884 res = 1885 region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1886 new BigDecimalComparator(bd2), put); 1887 assertEquals(false, res); 1888 } 1889 1890 @Test 1891 public void testCheckAndMutate_WithCorrectValue() throws IOException { 1892 byte[] row1 = Bytes.toBytes("row1"); 1893 byte[] fam1 = Bytes.toBytes("fam1"); 1894 byte[] qf1 = Bytes.toBytes("qualifier"); 1895 byte[] val1 = Bytes.toBytes("value1"); 1896 BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE); 1897 1898 // Setting up region 1899 this.region = initHRegion(tableName, method, CONF, fam1); 1900 // Putting data in key 1901 Put put = new Put(row1); 1902 put.addColumn(fam1, qf1, val1); 1903 region.put(put); 1904 1905 // checkAndPut with correct value 1906 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1907 new BinaryComparator(val1), put); 1908 assertEquals(true, res); 1909 1910 // checkAndDelete with correct value 1911 Delete delete = new Delete(row1); 1912 delete.addColumn(fam1, qf1); 1913 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1), 1914 delete); 1915 assertEquals(true, res); 1916 1917 // Putting data in key 1918 put = new Put(row1); 1919 put.addColumn(fam1, qf1, Bytes.toBytes(bd1)); 1920 region.put(put); 1921 1922 // checkAndPut with correct value 1923 res = 1924 region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator( 1925 bd1), put); 1926 assertEquals(true, res); 1927 1928 // checkAndDelete with correct value 1929 delete = new Delete(row1); 1930 delete.addColumn(fam1, qf1); 1931 res = 1932 region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator( 1933 bd1), delete); 1934 assertEquals(true, res); 1935 } 1936 1937 @Test 1938 public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException { 1939 byte[] row1 = Bytes.toBytes("row1"); 1940 byte[] fam1 = Bytes.toBytes("fam1"); 1941 byte[] qf1 = Bytes.toBytes("qualifier"); 1942 byte[] val1 = Bytes.toBytes("value1"); 1943 byte[] val2 = Bytes.toBytes("value2"); 1944 byte[] val3 = Bytes.toBytes("value3"); 1945 byte[] val4 = Bytes.toBytes("value4"); 1946 1947 // Setting up region 1948 this.region = initHRegion(tableName, method, CONF, fam1); 1949 // Putting val3 in key 1950 Put put = new Put(row1); 1951 put.addColumn(fam1, qf1, val3); 1952 region.put(put); 1953 1954 // Test CompareOp.LESS: original = val3, compare with val3, fail 1955 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS, 1956 new BinaryComparator(val3), put); 1957 assertEquals(false, res); 1958 1959 // Test CompareOp.LESS: original = val3, compare with val4, fail 1960 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS, 1961 new BinaryComparator(val4), put); 1962 assertEquals(false, res); 1963 1964 // Test CompareOp.LESS: original = val3, compare with val2, 1965 // succeed (now value = val2) 1966 put = new Put(row1); 1967 put.addColumn(fam1, qf1, val2); 1968 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS, 1969 new BinaryComparator(val2), put); 1970 assertEquals(true, res); 1971 1972 // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail 1973 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL, 1974 new BinaryComparator(val3), put); 1975 assertEquals(false, res); 1976 1977 // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2, 1978 // succeed (value still = val2) 1979 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL, 1980 new BinaryComparator(val2), put); 1981 assertEquals(true, res); 1982 1983 // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1, 1984 // succeed (now value = val3) 1985 put = new Put(row1); 1986 put.addColumn(fam1, qf1, val3); 1987 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL, 1988 new BinaryComparator(val1), put); 1989 assertEquals(true, res); 1990 1991 // Test CompareOp.GREATER: original = val3, compare with val3, fail 1992 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER, 1993 new BinaryComparator(val3), put); 1994 assertEquals(false, res); 1995 1996 // Test CompareOp.GREATER: original = val3, compare with val2, fail 1997 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER, 1998 new BinaryComparator(val2), put); 1999 assertEquals(false, res); 2000 2001 // Test CompareOp.GREATER: original = val3, compare with val4, 2002 // succeed (now value = val2) 2003 put = new Put(row1); 2004 put.addColumn(fam1, qf1, val2); 2005 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER, 2006 new BinaryComparator(val4), put); 2007 assertEquals(true, res); 2008 2009 // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail 2010 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL, 2011 new BinaryComparator(val1), put); 2012 assertEquals(false, res); 2013 2014 // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2, 2015 // succeed (value still = val2) 2016 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL, 2017 new BinaryComparator(val2), put); 2018 assertEquals(true, res); 2019 2020 // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed 2021 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL, 2022 new BinaryComparator(val3), put); 2023 assertEquals(true, res); 2024 } 2025 2026 @Test 2027 public void testCheckAndPut_ThatPutWasWritten() throws IOException { 2028 byte[] row1 = Bytes.toBytes("row1"); 2029 byte[] fam1 = Bytes.toBytes("fam1"); 2030 byte[] fam2 = Bytes.toBytes("fam2"); 2031 byte[] qf1 = Bytes.toBytes("qualifier"); 2032 byte[] val1 = Bytes.toBytes("value1"); 2033 byte[] val2 = Bytes.toBytes("value2"); 2034 2035 byte[][] families = { fam1, fam2 }; 2036 2037 // Setting up region 2038 this.region = initHRegion(tableName, method, CONF, families); 2039 // Putting data in the key to check 2040 Put put = new Put(row1); 2041 put.addColumn(fam1, qf1, val1); 2042 region.put(put); 2043 2044 // Creating put to add 2045 long ts = System.currentTimeMillis(); 2046 KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2); 2047 put = new Put(row1); 2048 put.add(kv); 2049 2050 // checkAndPut with wrong value 2051 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 2052 new BinaryComparator(val1), put); 2053 assertEquals(true, res); 2054 2055 Get get = new Get(row1); 2056 get.addColumn(fam2, qf1); 2057 Cell[] actual = region.get(get).rawCells(); 2058 2059 Cell[] expected = { kv }; 2060 2061 assertEquals(expected.length, actual.length); 2062 for (int i = 0; i < actual.length; i++) { 2063 assertEquals(expected[i], actual[i]); 2064 } 2065 } 2066 2067 @Test 2068 public void testCheckAndPut_wrongRowInPut() throws IOException { 2069 this.region = initHRegion(tableName, method, CONF, COLUMNS); 2070 Put put = new Put(row2); 2071 put.addColumn(fam1, qual1, value1); 2072 try { 2073 region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, 2074 new BinaryComparator(value2), put); 2075 fail(); 2076 } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) { 2077 // expected exception. 2078 } 2079 } 2080 2081 @Test 2082 public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException { 2083 byte[] row1 = Bytes.toBytes("row1"); 2084 byte[] fam1 = Bytes.toBytes("fam1"); 2085 byte[] fam2 = Bytes.toBytes("fam2"); 2086 byte[] qf1 = Bytes.toBytes("qualifier1"); 2087 byte[] qf2 = Bytes.toBytes("qualifier2"); 2088 byte[] qf3 = Bytes.toBytes("qualifier3"); 2089 byte[] val1 = Bytes.toBytes("value1"); 2090 byte[] val2 = Bytes.toBytes("value2"); 2091 byte[] val3 = Bytes.toBytes("value3"); 2092 byte[] emptyVal = new byte[] {}; 2093 2094 byte[][] families = { fam1, fam2 }; 2095 2096 // Setting up region 2097 this.region = initHRegion(tableName, method, CONF, families); 2098 // Put content 2099 Put put = new Put(row1); 2100 put.addColumn(fam1, qf1, val1); 2101 region.put(put); 2102 Threads.sleep(2); 2103 2104 put = new Put(row1); 2105 put.addColumn(fam1, qf1, val2); 2106 put.addColumn(fam2, qf1, val3); 2107 put.addColumn(fam2, qf2, val2); 2108 put.addColumn(fam2, qf3, val1); 2109 put.addColumn(fam1, qf3, val1); 2110 region.put(put); 2111 2112 // Multi-column delete 2113 Delete delete = new Delete(row1); 2114 delete.addColumn(fam1, qf1); 2115 delete.addColumn(fam2, qf1); 2116 delete.addColumn(fam1, qf3); 2117 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 2118 new BinaryComparator(val2), delete); 2119 assertEquals(true, res); 2120 2121 Get get = new Get(row1); 2122 get.addColumn(fam1, qf1); 2123 get.addColumn(fam1, qf3); 2124 get.addColumn(fam2, qf2); 2125 Result r = region.get(get); 2126 assertEquals(2, r.size()); 2127 assertArrayEquals(val1, r.getValue(fam1, qf1)); 2128 assertArrayEquals(val2, r.getValue(fam2, qf2)); 2129 2130 // Family delete 2131 delete = new Delete(row1); 2132 delete.addFamily(fam2); 2133 res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL, 2134 new BinaryComparator(emptyVal), delete); 2135 assertEquals(true, res); 2136 2137 get = new Get(row1); 2138 r = region.get(get); 2139 assertEquals(1, r.size()); 2140 assertArrayEquals(val1, r.getValue(fam1, qf1)); 2141 2142 // Row delete 2143 delete = new Delete(row1); 2144 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1), 2145 delete); 2146 assertEquals(true, res); 2147 get = new Get(row1); 2148 r = region.get(get); 2149 assertEquals(0, r.size()); 2150 } 2151 2152 // //////////////////////////////////////////////////////////////////////////// 2153 // Delete tests 2154 // //////////////////////////////////////////////////////////////////////////// 2155 @Test 2156 public void testDelete_multiDeleteColumn() throws IOException { 2157 byte[] row1 = Bytes.toBytes("row1"); 2158 byte[] fam1 = Bytes.toBytes("fam1"); 2159 byte[] qual = Bytes.toBytes("qualifier"); 2160 byte[] value = Bytes.toBytes("value"); 2161 2162 Put put = new Put(row1); 2163 put.addColumn(fam1, qual, 1, value); 2164 put.addColumn(fam1, qual, 2, value); 2165 2166 this.region = initHRegion(tableName, method, CONF, fam1); 2167 region.put(put); 2168 2169 // We do support deleting more than 1 'latest' version 2170 Delete delete = new Delete(row1); 2171 delete.addColumn(fam1, qual); 2172 delete.addColumn(fam1, qual); 2173 region.delete(delete); 2174 2175 Get get = new Get(row1); 2176 get.addFamily(fam1); 2177 Result r = region.get(get); 2178 assertEquals(0, r.size()); 2179 } 2180 2181 @Test 2182 public void testDelete_CheckFamily() throws IOException { 2183 byte[] row1 = Bytes.toBytes("row1"); 2184 byte[] fam1 = Bytes.toBytes("fam1"); 2185 byte[] fam2 = Bytes.toBytes("fam2"); 2186 byte[] fam3 = Bytes.toBytes("fam3"); 2187 byte[] fam4 = Bytes.toBytes("fam4"); 2188 2189 // Setting up region 2190 this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3); 2191 List<Cell> kvs = new ArrayList<>(); 2192 kvs.add(new KeyValue(row1, fam4, null, null)); 2193 2194 // testing existing family 2195 byte[] family = fam2; 2196 NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 2197 deleteMap.put(family, kvs); 2198 region.delete(deleteMap, Durability.SYNC_WAL); 2199 2200 // testing non existing family 2201 boolean ok = false; 2202 family = fam4; 2203 try { 2204 deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 2205 deleteMap.put(family, kvs); 2206 region.delete(deleteMap, Durability.SYNC_WAL); 2207 } catch (Exception e) { 2208 ok = true; 2209 } 2210 assertTrue("Family " + new String(family, StandardCharsets.UTF_8) + " does exist", ok); 2211 } 2212 2213 @Test 2214 public void testDelete_mixed() throws IOException, InterruptedException { 2215 byte[] fam = Bytes.toBytes("info"); 2216 byte[][] families = { fam }; 2217 this.region = initHRegion(tableName, method, CONF, families); 2218 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 2219 2220 byte[] row = Bytes.toBytes("table_name"); 2221 // column names 2222 byte[] serverinfo = Bytes.toBytes("serverinfo"); 2223 byte[] splitA = Bytes.toBytes("splitA"); 2224 byte[] splitB = Bytes.toBytes("splitB"); 2225 2226 // add some data: 2227 Put put = new Put(row); 2228 put.addColumn(fam, splitA, Bytes.toBytes("reference_A")); 2229 region.put(put); 2230 2231 put = new Put(row); 2232 put.addColumn(fam, splitB, Bytes.toBytes("reference_B")); 2233 region.put(put); 2234 2235 put = new Put(row); 2236 put.addColumn(fam, serverinfo, Bytes.toBytes("ip_address")); 2237 region.put(put); 2238 2239 // ok now delete a split: 2240 Delete delete = new Delete(row); 2241 delete.addColumns(fam, splitA); 2242 region.delete(delete); 2243 2244 // assert some things: 2245 Get get = new Get(row).addColumn(fam, serverinfo); 2246 Result result = region.get(get); 2247 assertEquals(1, result.size()); 2248 2249 get = new Get(row).addColumn(fam, splitA); 2250 result = region.get(get); 2251 assertEquals(0, result.size()); 2252 2253 get = new Get(row).addColumn(fam, splitB); 2254 result = region.get(get); 2255 assertEquals(1, result.size()); 2256 2257 // Assert that after a delete, I can put. 2258 put = new Put(row); 2259 put.addColumn(fam, splitA, Bytes.toBytes("reference_A")); 2260 region.put(put); 2261 get = new Get(row); 2262 result = region.get(get); 2263 assertEquals(3, result.size()); 2264 2265 // Now delete all... then test I can add stuff back 2266 delete = new Delete(row); 2267 region.delete(delete); 2268 assertEquals(0, region.get(get).size()); 2269 2270 region.put(new Put(row).addColumn(fam, splitA, Bytes.toBytes("reference_A"))); 2271 result = region.get(get); 2272 assertEquals(1, result.size()); 2273 } 2274 2275 @Test 2276 public void testDeleteRowWithFutureTs() throws IOException { 2277 byte[] fam = Bytes.toBytes("info"); 2278 byte[][] families = { fam }; 2279 this.region = initHRegion(tableName, method, CONF, families); 2280 byte[] row = Bytes.toBytes("table_name"); 2281 // column names 2282 byte[] serverinfo = Bytes.toBytes("serverinfo"); 2283 2284 // add data in the far future 2285 Put put = new Put(row); 2286 put.addColumn(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value")); 2287 region.put(put); 2288 2289 // now delete something in the present 2290 Delete delete = new Delete(row); 2291 region.delete(delete); 2292 2293 // make sure we still see our data 2294 Get get = new Get(row).addColumn(fam, serverinfo); 2295 Result result = region.get(get); 2296 assertEquals(1, result.size()); 2297 2298 // delete the future row 2299 delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3); 2300 region.delete(delete); 2301 2302 // make sure it is gone 2303 get = new Get(row).addColumn(fam, serverinfo); 2304 result = region.get(get); 2305 assertEquals(0, result.size()); 2306 } 2307 2308 /** 2309 * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by 2310 * the actual timestamp 2311 */ 2312 @Test 2313 public void testPutWithLatestTS() throws IOException { 2314 byte[] fam = Bytes.toBytes("info"); 2315 byte[][] families = { fam }; 2316 this.region = initHRegion(tableName, method, CONF, families); 2317 byte[] row = Bytes.toBytes("row1"); 2318 // column names 2319 byte[] qual = Bytes.toBytes("qual"); 2320 2321 // add data with LATEST_TIMESTAMP, put without WAL 2322 Put put = new Put(row); 2323 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value")); 2324 region.put(put); 2325 2326 // Make sure it shows up with an actual timestamp 2327 Get get = new Get(row).addColumn(fam, qual); 2328 Result result = region.get(get); 2329 assertEquals(1, result.size()); 2330 Cell kv = result.rawCells()[0]; 2331 LOG.info("Got: " + kv); 2332 assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp", 2333 kv.getTimestamp() != HConstants.LATEST_TIMESTAMP); 2334 2335 // Check same with WAL enabled (historically these took different 2336 // code paths, so check both) 2337 row = Bytes.toBytes("row2"); 2338 put = new Put(row); 2339 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value")); 2340 region.put(put); 2341 2342 // Make sure it shows up with an actual timestamp 2343 get = new Get(row).addColumn(fam, qual); 2344 result = region.get(get); 2345 assertEquals(1, result.size()); 2346 kv = result.rawCells()[0]; 2347 LOG.info("Got: " + kv); 2348 assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp", 2349 kv.getTimestamp() != HConstants.LATEST_TIMESTAMP); 2350 } 2351 2352 /** 2353 * Tests that there is server-side filtering for invalid timestamp upper 2354 * bound. Note that the timestamp lower bound is automatically handled for us 2355 * by the TTL field. 2356 */ 2357 @Test 2358 public void testPutWithTsSlop() throws IOException { 2359 byte[] fam = Bytes.toBytes("info"); 2360 byte[][] families = { fam }; 2361 2362 // add data with a timestamp that is too recent for range. Ensure assert 2363 CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000); 2364 this.region = initHRegion(tableName, method, CONF, families); 2365 boolean caughtExcep = false; 2366 try { 2367 // no TS specified == use latest. should not error 2368 region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), Bytes.toBytes("value"))); 2369 // TS out of range. should error 2370 region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), 2371 System.currentTimeMillis() + 2000, Bytes.toBytes("value"))); 2372 fail("Expected IOE for TS out of configured timerange"); 2373 } catch (FailedSanityCheckException ioe) { 2374 LOG.debug("Received expected exception", ioe); 2375 caughtExcep = true; 2376 } 2377 assertTrue("Should catch FailedSanityCheckException", caughtExcep); 2378 } 2379 2380 @Test 2381 public void testScanner_DeleteOneFamilyNotAnother() throws IOException { 2382 byte[] fam1 = Bytes.toBytes("columnA"); 2383 byte[] fam2 = Bytes.toBytes("columnB"); 2384 this.region = initHRegion(tableName, method, CONF, fam1, fam2); 2385 byte[] rowA = Bytes.toBytes("rowA"); 2386 byte[] rowB = Bytes.toBytes("rowB"); 2387 2388 byte[] value = Bytes.toBytes("value"); 2389 2390 Delete delete = new Delete(rowA); 2391 delete.addFamily(fam1); 2392 2393 region.delete(delete); 2394 2395 // now create data. 2396 Put put = new Put(rowA); 2397 put.addColumn(fam2, null, value); 2398 region.put(put); 2399 2400 put = new Put(rowB); 2401 put.addColumn(fam1, null, value); 2402 put.addColumn(fam2, null, value); 2403 region.put(put); 2404 2405 Scan scan = new Scan(); 2406 scan.addFamily(fam1).addFamily(fam2); 2407 InternalScanner s = region.getScanner(scan); 2408 List<Cell> results = new ArrayList<>(); 2409 s.next(results); 2410 assertTrue(CellUtil.matchingRows(results.get(0), rowA)); 2411 2412 results.clear(); 2413 s.next(results); 2414 assertTrue(CellUtil.matchingRows(results.get(0), rowB)); 2415 } 2416 2417 @Test 2418 public void testDataInMemoryWithoutWAL() throws IOException { 2419 FileSystem fs = FileSystem.get(CONF); 2420 Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL"); 2421 FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF); 2422 hLog.init(); 2423 // This chunk creation is done throughout the code base. Do we want to move it into core? 2424 // It is missing from this test. W/o it we NPE. 2425 region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog, 2426 COLUMN_FAMILY_BYTES); 2427 2428 Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1, 2429 System.currentTimeMillis(), KeyValue.Type.Put.getCode(), value1); 2430 final long originalSize = originalCell.getSerializedSize(); 2431 2432 Cell addCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1, 2433 System.currentTimeMillis(), KeyValue.Type.Put.getCode(), Bytes.toBytes("xxxxxxxxxx")); 2434 final long addSize = addCell.getSerializedSize(); 2435 2436 LOG.info("originalSize:" + originalSize 2437 + ", addSize:" + addSize); 2438 // start test. We expect that the addPut's durability will be replaced 2439 // by originalPut's durability. 2440 2441 // case 1: 2442 testDataInMemoryWithoutWAL(region, 2443 new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL), 2444 new Put(row).add(addCell).setDurability(Durability.SKIP_WAL), 2445 originalSize + addSize); 2446 2447 // case 2: 2448 testDataInMemoryWithoutWAL(region, 2449 new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL), 2450 new Put(row).add(addCell).setDurability(Durability.SYNC_WAL), 2451 originalSize + addSize); 2452 2453 // case 3: 2454 testDataInMemoryWithoutWAL(region, 2455 new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL), 2456 new Put(row).add(addCell).setDurability(Durability.SKIP_WAL), 2457 0); 2458 2459 // case 4: 2460 testDataInMemoryWithoutWAL(region, 2461 new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL), 2462 new Put(row).add(addCell).setDurability(Durability.SYNC_WAL), 2463 0); 2464 } 2465 2466 private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut, 2467 final Put addPut, long delta) throws IOException { 2468 final long initSize = region.getDataInMemoryWithoutWAL(); 2469 // save normalCPHost and replaced by mockedCPHost 2470 RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); 2471 RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); 2472 // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must 2473 // do below format (from Mockito doc). 2474 Mockito.doAnswer(new Answer() { 2475 @Override 2476 public Object answer(InvocationOnMock invocation) throws Throwable { 2477 MiniBatchOperationInProgress<Mutation> mb = invocation.getArgument(0); 2478 mb.addOperationsFromCP(0, new Mutation[]{addPut}); 2479 return null; 2480 } 2481 }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class)); 2482 region.setCoprocessorHost(mockedCPHost); 2483 region.put(originalPut); 2484 region.setCoprocessorHost(normalCPHost); 2485 final long finalSize = region.getDataInMemoryWithoutWAL(); 2486 assertEquals("finalSize:" + finalSize + ", initSize:" 2487 + initSize + ", delta:" + delta,finalSize, initSize + delta); 2488 } 2489 2490 @Test 2491 public void testDeleteColumns_PostInsert() throws IOException, InterruptedException { 2492 Delete delete = new Delete(row); 2493 delete.addColumns(fam1, qual1); 2494 doTestDelete_AndPostInsert(delete); 2495 } 2496 2497 @Test 2498 public void testaddFamily_PostInsert() throws IOException, InterruptedException { 2499 Delete delete = new Delete(row); 2500 delete.addFamily(fam1); 2501 doTestDelete_AndPostInsert(delete); 2502 } 2503 2504 public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException { 2505 this.region = initHRegion(tableName, method, CONF, fam1); 2506 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 2507 Put put = new Put(row); 2508 put.addColumn(fam1, qual1, value1); 2509 region.put(put); 2510 2511 // now delete the value: 2512 region.delete(delete); 2513 2514 // ok put data: 2515 put = new Put(row); 2516 put.addColumn(fam1, qual1, value2); 2517 region.put(put); 2518 2519 // ok get: 2520 Get get = new Get(row); 2521 get.addColumn(fam1, qual1); 2522 2523 Result r = region.get(get); 2524 assertEquals(1, r.size()); 2525 assertArrayEquals(value2, r.getValue(fam1, qual1)); 2526 2527 // next: 2528 Scan scan = new Scan(row); 2529 scan.addColumn(fam1, qual1); 2530 InternalScanner s = region.getScanner(scan); 2531 2532 List<Cell> results = new ArrayList<>(); 2533 assertEquals(false, s.next(results)); 2534 assertEquals(1, results.size()); 2535 Cell kv = results.get(0); 2536 2537 assertArrayEquals(value2, CellUtil.cloneValue(kv)); 2538 assertArrayEquals(fam1, CellUtil.cloneFamily(kv)); 2539 assertArrayEquals(qual1, CellUtil.cloneQualifier(kv)); 2540 assertArrayEquals(row, CellUtil.cloneRow(kv)); 2541 } 2542 2543 @Test 2544 public void testDelete_CheckTimestampUpdated() throws IOException { 2545 byte[] row1 = Bytes.toBytes("row1"); 2546 byte[] col1 = Bytes.toBytes("col1"); 2547 byte[] col2 = Bytes.toBytes("col2"); 2548 byte[] col3 = Bytes.toBytes("col3"); 2549 2550 // Setting up region 2551 this.region = initHRegion(tableName, method, CONF, fam1); 2552 // Building checkerList 2553 List<Cell> kvs = new ArrayList<>(); 2554 kvs.add(new KeyValue(row1, fam1, col1, null)); 2555 kvs.add(new KeyValue(row1, fam1, col2, null)); 2556 kvs.add(new KeyValue(row1, fam1, col3, null)); 2557 2558 NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 2559 deleteMap.put(fam1, kvs); 2560 region.delete(deleteMap, Durability.SYNC_WAL); 2561 2562 // extract the key values out the memstore: 2563 // This is kinda hacky, but better than nothing... 2564 long now = System.currentTimeMillis(); 2565 AbstractMemStore memstore = (AbstractMemStore)region.getStore(fam1).memstore; 2566 Cell firstCell = memstore.getActive().first(); 2567 assertTrue(firstCell.getTimestamp() <= now); 2568 now = firstCell.getTimestamp(); 2569 for (Cell cell : memstore.getActive().getCellSet()) { 2570 assertTrue(cell.getTimestamp() <= now); 2571 now = cell.getTimestamp(); 2572 } 2573 } 2574 2575 // //////////////////////////////////////////////////////////////////////////// 2576 // Get tests 2577 // //////////////////////////////////////////////////////////////////////////// 2578 @Test 2579 public void testGet_FamilyChecker() throws IOException { 2580 byte[] row1 = Bytes.toBytes("row1"); 2581 byte[] fam1 = Bytes.toBytes("fam1"); 2582 byte[] fam2 = Bytes.toBytes("False"); 2583 byte[] col1 = Bytes.toBytes("col1"); 2584 2585 // Setting up region 2586 this.region = initHRegion(tableName, method, CONF, fam1); 2587 Get get = new Get(row1); 2588 get.addColumn(fam2, col1); 2589 2590 // Test 2591 try { 2592 region.get(get); 2593 fail("Expecting DoNotRetryIOException in get but did not get any"); 2594 } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) { 2595 LOG.info("Got expected DoNotRetryIOException successfully"); 2596 } 2597 } 2598 2599 @Test 2600 public void testGet_Basic() throws IOException { 2601 byte[] row1 = Bytes.toBytes("row1"); 2602 byte[] fam1 = Bytes.toBytes("fam1"); 2603 byte[] col1 = Bytes.toBytes("col1"); 2604 byte[] col2 = Bytes.toBytes("col2"); 2605 byte[] col3 = Bytes.toBytes("col3"); 2606 byte[] col4 = Bytes.toBytes("col4"); 2607 byte[] col5 = Bytes.toBytes("col5"); 2608 2609 // Setting up region 2610 this.region = initHRegion(tableName, method, CONF, fam1); 2611 // Add to memstore 2612 Put put = new Put(row1); 2613 put.addColumn(fam1, col1, null); 2614 put.addColumn(fam1, col2, null); 2615 put.addColumn(fam1, col3, null); 2616 put.addColumn(fam1, col4, null); 2617 put.addColumn(fam1, col5, null); 2618 region.put(put); 2619 2620 Get get = new Get(row1); 2621 get.addColumn(fam1, col2); 2622 get.addColumn(fam1, col4); 2623 // Expected result 2624 KeyValue kv1 = new KeyValue(row1, fam1, col2); 2625 KeyValue kv2 = new KeyValue(row1, fam1, col4); 2626 KeyValue[] expected = { kv1, kv2 }; 2627 2628 // Test 2629 Result res = region.get(get); 2630 assertEquals(expected.length, res.size()); 2631 for (int i = 0; i < res.size(); i++) { 2632 assertTrue(CellUtil.matchingRows(expected[i], res.rawCells()[i])); 2633 assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i])); 2634 assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i])); 2635 } 2636 2637 // Test using a filter on a Get 2638 Get g = new Get(row1); 2639 final int count = 2; 2640 g.setFilter(new ColumnCountGetFilter(count)); 2641 res = region.get(g); 2642 assertEquals(count, res.size()); 2643 } 2644 2645 @Test 2646 public void testGet_Empty() throws IOException { 2647 byte[] row = Bytes.toBytes("row"); 2648 byte[] fam = Bytes.toBytes("fam"); 2649 2650 this.region = initHRegion(tableName, method, CONF, fam); 2651 Get get = new Get(row); 2652 get.addFamily(fam); 2653 Result r = region.get(get); 2654 2655 assertTrue(r.isEmpty()); 2656 } 2657 2658 @Test 2659 public void testGetWithFilter() throws IOException, InterruptedException { 2660 byte[] row1 = Bytes.toBytes("row1"); 2661 byte[] fam1 = Bytes.toBytes("fam1"); 2662 byte[] col1 = Bytes.toBytes("col1"); 2663 byte[] value1 = Bytes.toBytes("value1"); 2664 byte[] value2 = Bytes.toBytes("value2"); 2665 2666 final int maxVersions = 3; 2667 HColumnDescriptor hcd = new HColumnDescriptor(fam1); 2668 hcd.setMaxVersions(maxVersions); 2669 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker")); 2670 htd.addFamily(hcd); 2671 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 2672 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 2673 Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log"); 2674 final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info); 2675 this.region = TEST_UTIL.createLocalHRegion(info, htd, wal); 2676 2677 // Put 4 version to memstore 2678 long ts = 0; 2679 Put put = new Put(row1, ts); 2680 put.addColumn(fam1, col1, value1); 2681 region.put(put); 2682 put = new Put(row1, ts + 1); 2683 put.addColumn(fam1, col1, Bytes.toBytes("filter1")); 2684 region.put(put); 2685 put = new Put(row1, ts + 2); 2686 put.addColumn(fam1, col1, Bytes.toBytes("filter2")); 2687 region.put(put); 2688 put = new Put(row1, ts + 3); 2689 put.addColumn(fam1, col1, value2); 2690 region.put(put); 2691 2692 Get get = new Get(row1); 2693 get.setMaxVersions(); 2694 Result res = region.get(get); 2695 // Get 3 versions, the oldest version has gone from user view 2696 assertEquals(maxVersions, res.size()); 2697 2698 get.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value"))); 2699 res = region.get(get); 2700 // When use value filter, the oldest version should still gone from user view and it 2701 // should only return one key vaule 2702 assertEquals(1, res.size()); 2703 assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0])); 2704 assertEquals(ts + 3, res.rawCells()[0].getTimestamp()); 2705 2706 region.flush(true); 2707 region.compact(true); 2708 Thread.sleep(1000); 2709 res = region.get(get); 2710 // After flush and compact, the result should be consistent with previous result 2711 assertEquals(1, res.size()); 2712 assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0])); 2713 } 2714 2715 // //////////////////////////////////////////////////////////////////////////// 2716 // Scanner tests 2717 // //////////////////////////////////////////////////////////////////////////// 2718 @Test 2719 public void testGetScanner_WithOkFamilies() throws IOException { 2720 byte[] fam1 = Bytes.toBytes("fam1"); 2721 byte[] fam2 = Bytes.toBytes("fam2"); 2722 2723 byte[][] families = { fam1, fam2 }; 2724 2725 // Setting up region 2726 this.region = initHRegion(tableName, method, CONF, families); 2727 Scan scan = new Scan(); 2728 scan.addFamily(fam1); 2729 scan.addFamily(fam2); 2730 try { 2731 region.getScanner(scan); 2732 } catch (Exception e) { 2733 assertTrue("Families could not be found in Region", false); 2734 } 2735 } 2736 2737 @Test 2738 public void testGetScanner_WithNotOkFamilies() throws IOException { 2739 byte[] fam1 = Bytes.toBytes("fam1"); 2740 byte[] fam2 = Bytes.toBytes("fam2"); 2741 2742 byte[][] families = { fam1 }; 2743 2744 // Setting up region 2745 this.region = initHRegion(tableName, method, CONF, families); 2746 Scan scan = new Scan(); 2747 scan.addFamily(fam2); 2748 boolean ok = false; 2749 try { 2750 region.getScanner(scan); 2751 } catch (Exception e) { 2752 ok = true; 2753 } 2754 assertTrue("Families could not be found in Region", ok); 2755 } 2756 2757 @Test 2758 public void testGetScanner_WithNoFamilies() throws IOException { 2759 byte[] row1 = Bytes.toBytes("row1"); 2760 byte[] fam1 = Bytes.toBytes("fam1"); 2761 byte[] fam2 = Bytes.toBytes("fam2"); 2762 byte[] fam3 = Bytes.toBytes("fam3"); 2763 byte[] fam4 = Bytes.toBytes("fam4"); 2764 2765 byte[][] families = { fam1, fam2, fam3, fam4 }; 2766 2767 // Setting up region 2768 this.region = initHRegion(tableName, method, CONF, families); 2769 // Putting data in Region 2770 Put put = new Put(row1); 2771 put.addColumn(fam1, null, null); 2772 put.addColumn(fam2, null, null); 2773 put.addColumn(fam3, null, null); 2774 put.addColumn(fam4, null, null); 2775 region.put(put); 2776 2777 Scan scan = null; 2778 HRegion.RegionScannerImpl is = null; 2779 2780 // Testing to see how many scanners that is produced by getScanner, 2781 // starting 2782 // with known number, 2 - current = 1 2783 scan = new Scan(); 2784 scan.addFamily(fam2); 2785 scan.addFamily(fam4); 2786 is = region.getScanner(scan); 2787 assertEquals(1, is.storeHeap.getHeap().size()); 2788 2789 scan = new Scan(); 2790 is = region.getScanner(scan); 2791 assertEquals(families.length - 1, is.storeHeap.getHeap().size()); 2792 } 2793 2794 /** 2795 * This method tests https://issues.apache.org/jira/browse/HBASE-2516. 2796 * 2797 * @throws IOException 2798 */ 2799 @Test 2800 public void testGetScanner_WithRegionClosed() throws IOException { 2801 byte[] fam1 = Bytes.toBytes("fam1"); 2802 byte[] fam2 = Bytes.toBytes("fam2"); 2803 2804 byte[][] families = { fam1, fam2 }; 2805 2806 // Setting up region 2807 try { 2808 this.region = initHRegion(tableName, method, CONF, families); 2809 } catch (IOException e) { 2810 e.printStackTrace(); 2811 fail("Got IOException during initHRegion, " + e.getMessage()); 2812 } 2813 region.closed.set(true); 2814 try { 2815 region.getScanner(null); 2816 fail("Expected to get an exception during getScanner on a region that is closed"); 2817 } catch (NotServingRegionException e) { 2818 // this is the correct exception that is expected 2819 } catch (IOException e) { 2820 fail("Got wrong type of exception - should be a NotServingRegionException, " + 2821 "but was an IOException: " 2822 + e.getMessage()); 2823 } 2824 } 2825 2826 @Test 2827 public void testRegionScanner_Next() throws IOException { 2828 byte[] row1 = Bytes.toBytes("row1"); 2829 byte[] row2 = Bytes.toBytes("row2"); 2830 byte[] fam1 = Bytes.toBytes("fam1"); 2831 byte[] fam2 = Bytes.toBytes("fam2"); 2832 byte[] fam3 = Bytes.toBytes("fam3"); 2833 byte[] fam4 = Bytes.toBytes("fam4"); 2834 2835 byte[][] families = { fam1, fam2, fam3, fam4 }; 2836 long ts = System.currentTimeMillis(); 2837 2838 // Setting up region 2839 this.region = initHRegion(tableName, method, CONF, families); 2840 // Putting data in Region 2841 Put put = null; 2842 put = new Put(row1); 2843 put.addColumn(fam1, (byte[]) null, ts, null); 2844 put.addColumn(fam2, (byte[]) null, ts, null); 2845 put.addColumn(fam3, (byte[]) null, ts, null); 2846 put.addColumn(fam4, (byte[]) null, ts, null); 2847 region.put(put); 2848 2849 put = new Put(row2); 2850 put.addColumn(fam1, (byte[]) null, ts, null); 2851 put.addColumn(fam2, (byte[]) null, ts, null); 2852 put.addColumn(fam3, (byte[]) null, ts, null); 2853 put.addColumn(fam4, (byte[]) null, ts, null); 2854 region.put(put); 2855 2856 Scan scan = new Scan(); 2857 scan.addFamily(fam2); 2858 scan.addFamily(fam4); 2859 InternalScanner is = region.getScanner(scan); 2860 2861 List<Cell> res = null; 2862 2863 // Result 1 2864 List<Cell> expected1 = new ArrayList<>(); 2865 expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null)); 2866 expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null)); 2867 2868 res = new ArrayList<>(); 2869 is.next(res); 2870 for (int i = 0; i < res.size(); i++) { 2871 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected1.get(i), res.get(i))); 2872 } 2873 2874 // Result 2 2875 List<Cell> expected2 = new ArrayList<>(); 2876 expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null)); 2877 expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null)); 2878 2879 res = new ArrayList<>(); 2880 is.next(res); 2881 for (int i = 0; i < res.size(); i++) { 2882 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected2.get(i), res.get(i))); 2883 } 2884 } 2885 2886 @Test 2887 public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException { 2888 byte[] row1 = Bytes.toBytes("row1"); 2889 byte[] qf1 = Bytes.toBytes("qualifier1"); 2890 byte[] qf2 = Bytes.toBytes("qualifier2"); 2891 byte[] fam1 = Bytes.toBytes("fam1"); 2892 byte[][] families = { fam1 }; 2893 2894 long ts1 = System.currentTimeMillis(); 2895 long ts2 = ts1 + 1; 2896 long ts3 = ts1 + 2; 2897 2898 // Setting up region 2899 this.region = initHRegion(tableName, method, CONF, families); 2900 // Putting data in Region 2901 Put put = null; 2902 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 2903 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 2904 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 2905 2906 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 2907 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 2908 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 2909 2910 put = new Put(row1); 2911 put.add(kv13); 2912 put.add(kv12); 2913 put.add(kv11); 2914 put.add(kv23); 2915 put.add(kv22); 2916 put.add(kv21); 2917 region.put(put); 2918 2919 // Expected 2920 List<Cell> expected = new ArrayList<>(); 2921 expected.add(kv13); 2922 expected.add(kv12); 2923 2924 Scan scan = new Scan(row1); 2925 scan.addColumn(fam1, qf1); 2926 scan.setMaxVersions(MAX_VERSIONS); 2927 List<Cell> actual = new ArrayList<>(); 2928 InternalScanner scanner = region.getScanner(scan); 2929 2930 boolean hasNext = scanner.next(actual); 2931 assertEquals(false, hasNext); 2932 2933 // Verify result 2934 for (int i = 0; i < expected.size(); i++) { 2935 assertEquals(expected.get(i), actual.get(i)); 2936 } 2937 } 2938 2939 @Test 2940 public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException { 2941 byte[] row1 = Bytes.toBytes("row1"); 2942 byte[] qf1 = Bytes.toBytes("qualifier1"); 2943 byte[] qf2 = Bytes.toBytes("qualifier2"); 2944 byte[] fam1 = Bytes.toBytes("fam1"); 2945 byte[][] families = { fam1 }; 2946 2947 long ts1 = 1; // System.currentTimeMillis(); 2948 long ts2 = ts1 + 1; 2949 long ts3 = ts1 + 2; 2950 2951 // Setting up region 2952 this.region = initHRegion(tableName, method, CONF, families); 2953 // Putting data in Region 2954 Put put = null; 2955 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 2956 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 2957 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 2958 2959 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 2960 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 2961 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 2962 2963 put = new Put(row1); 2964 put.add(kv13); 2965 put.add(kv12); 2966 put.add(kv11); 2967 put.add(kv23); 2968 put.add(kv22); 2969 put.add(kv21); 2970 region.put(put); 2971 region.flush(true); 2972 2973 // Expected 2974 List<Cell> expected = new ArrayList<>(); 2975 expected.add(kv13); 2976 expected.add(kv12); 2977 expected.add(kv23); 2978 expected.add(kv22); 2979 2980 Scan scan = new Scan(row1); 2981 scan.addColumn(fam1, qf1); 2982 scan.addColumn(fam1, qf2); 2983 scan.setMaxVersions(MAX_VERSIONS); 2984 List<Cell> actual = new ArrayList<>(); 2985 InternalScanner scanner = region.getScanner(scan); 2986 2987 boolean hasNext = scanner.next(actual); 2988 assertEquals(false, hasNext); 2989 2990 // Verify result 2991 for (int i = 0; i < expected.size(); i++) { 2992 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i))); 2993 } 2994 } 2995 2996 @Test 2997 public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws 2998 IOException { 2999 byte[] row1 = Bytes.toBytes("row1"); 3000 byte[] fam1 = Bytes.toBytes("fam1"); 3001 byte[][] families = { fam1 }; 3002 byte[] qf1 = Bytes.toBytes("qualifier1"); 3003 byte[] qf2 = Bytes.toBytes("qualifier2"); 3004 3005 long ts1 = 1; 3006 long ts2 = ts1 + 1; 3007 long ts3 = ts1 + 2; 3008 long ts4 = ts1 + 3; 3009 3010 // Setting up region 3011 this.region = initHRegion(tableName, method, CONF, families); 3012 // Putting data in Region 3013 KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null); 3014 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3015 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3016 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3017 3018 KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null); 3019 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3020 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3021 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3022 3023 Put put = null; 3024 put = new Put(row1); 3025 put.add(kv14); 3026 put.add(kv24); 3027 region.put(put); 3028 region.flush(true); 3029 3030 put = new Put(row1); 3031 put.add(kv23); 3032 put.add(kv13); 3033 region.put(put); 3034 region.flush(true); 3035 3036 put = new Put(row1); 3037 put.add(kv22); 3038 put.add(kv12); 3039 region.put(put); 3040 region.flush(true); 3041 3042 put = new Put(row1); 3043 put.add(kv21); 3044 put.add(kv11); 3045 region.put(put); 3046 3047 // Expected 3048 List<Cell> expected = new ArrayList<>(); 3049 expected.add(kv14); 3050 expected.add(kv13); 3051 expected.add(kv12); 3052 expected.add(kv24); 3053 expected.add(kv23); 3054 expected.add(kv22); 3055 3056 Scan scan = new Scan(row1); 3057 scan.addColumn(fam1, qf1); 3058 scan.addColumn(fam1, qf2); 3059 int versions = 3; 3060 scan.setMaxVersions(versions); 3061 List<Cell> actual = new ArrayList<>(); 3062 InternalScanner scanner = region.getScanner(scan); 3063 3064 boolean hasNext = scanner.next(actual); 3065 assertEquals(false, hasNext); 3066 3067 // Verify result 3068 for (int i = 0; i < expected.size(); i++) { 3069 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i))); 3070 } 3071 } 3072 3073 @Test 3074 public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException { 3075 byte[] row1 = Bytes.toBytes("row1"); 3076 byte[] qf1 = Bytes.toBytes("qualifier1"); 3077 byte[] qf2 = Bytes.toBytes("qualifier2"); 3078 byte[] fam1 = Bytes.toBytes("fam1"); 3079 byte[][] families = { fam1 }; 3080 3081 long ts1 = System.currentTimeMillis(); 3082 long ts2 = ts1 + 1; 3083 long ts3 = ts1 + 2; 3084 3085 // Setting up region 3086 this.region = initHRegion(tableName, method, CONF, families); 3087 // Putting data in Region 3088 Put put = null; 3089 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3090 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3091 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3092 3093 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3094 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3095 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3096 3097 put = new Put(row1); 3098 put.add(kv13); 3099 put.add(kv12); 3100 put.add(kv11); 3101 put.add(kv23); 3102 put.add(kv22); 3103 put.add(kv21); 3104 region.put(put); 3105 3106 // Expected 3107 List<Cell> expected = new ArrayList<>(); 3108 expected.add(kv13); 3109 expected.add(kv12); 3110 expected.add(kv23); 3111 expected.add(kv22); 3112 3113 Scan scan = new Scan(row1); 3114 scan.addFamily(fam1); 3115 scan.setMaxVersions(MAX_VERSIONS); 3116 List<Cell> actual = new ArrayList<>(); 3117 InternalScanner scanner = region.getScanner(scan); 3118 3119 boolean hasNext = scanner.next(actual); 3120 assertEquals(false, hasNext); 3121 3122 // Verify result 3123 for (int i = 0; i < expected.size(); i++) { 3124 assertEquals(expected.get(i), actual.get(i)); 3125 } 3126 } 3127 3128 @Test 3129 public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException { 3130 byte[] row1 = Bytes.toBytes("row1"); 3131 byte[] qf1 = Bytes.toBytes("qualifier1"); 3132 byte[] qf2 = Bytes.toBytes("qualifier2"); 3133 byte[] fam1 = Bytes.toBytes("fam1"); 3134 3135 long ts1 = 1; // System.currentTimeMillis(); 3136 long ts2 = ts1 + 1; 3137 long ts3 = ts1 + 2; 3138 3139 // Setting up region 3140 this.region = initHRegion(tableName, method, CONF, fam1); 3141 // Putting data in Region 3142 Put put = null; 3143 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3144 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3145 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3146 3147 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3148 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3149 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3150 3151 put = new Put(row1); 3152 put.add(kv13); 3153 put.add(kv12); 3154 put.add(kv11); 3155 put.add(kv23); 3156 put.add(kv22); 3157 put.add(kv21); 3158 region.put(put); 3159 region.flush(true); 3160 3161 // Expected 3162 List<Cell> expected = new ArrayList<>(); 3163 expected.add(kv13); 3164 expected.add(kv12); 3165 expected.add(kv23); 3166 expected.add(kv22); 3167 3168 Scan scan = new Scan(row1); 3169 scan.addFamily(fam1); 3170 scan.setMaxVersions(MAX_VERSIONS); 3171 List<Cell> actual = new ArrayList<>(); 3172 InternalScanner scanner = region.getScanner(scan); 3173 3174 boolean hasNext = scanner.next(actual); 3175 assertEquals(false, hasNext); 3176 3177 // Verify result 3178 for (int i = 0; i < expected.size(); i++) { 3179 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i))); 3180 } 3181 } 3182 3183 @Test 3184 public void testScanner_StopRow1542() throws IOException { 3185 byte[] family = Bytes.toBytes("testFamily"); 3186 this.region = initHRegion(tableName, method, CONF, family); 3187 byte[] row1 = Bytes.toBytes("row111"); 3188 byte[] row2 = Bytes.toBytes("row222"); 3189 byte[] row3 = Bytes.toBytes("row333"); 3190 byte[] row4 = Bytes.toBytes("row444"); 3191 byte[] row5 = Bytes.toBytes("row555"); 3192 3193 byte[] col1 = Bytes.toBytes("Pub111"); 3194 byte[] col2 = Bytes.toBytes("Pub222"); 3195 3196 Put put = new Put(row1); 3197 put.addColumn(family, col1, Bytes.toBytes(10L)); 3198 region.put(put); 3199 3200 put = new Put(row2); 3201 put.addColumn(family, col1, Bytes.toBytes(15L)); 3202 region.put(put); 3203 3204 put = new Put(row3); 3205 put.addColumn(family, col2, Bytes.toBytes(20L)); 3206 region.put(put); 3207 3208 put = new Put(row4); 3209 put.addColumn(family, col2, Bytes.toBytes(30L)); 3210 region.put(put); 3211 3212 put = new Put(row5); 3213 put.addColumn(family, col1, Bytes.toBytes(40L)); 3214 region.put(put); 3215 3216 Scan scan = new Scan(row3, row4); 3217 scan.setMaxVersions(); 3218 scan.addColumn(family, col1); 3219 InternalScanner s = region.getScanner(scan); 3220 3221 List<Cell> results = new ArrayList<>(); 3222 assertEquals(false, s.next(results)); 3223 assertEquals(0, results.size()); 3224 } 3225 3226 @Test 3227 public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException { 3228 byte[] row1 = Bytes.toBytes("row1"); 3229 byte[] fam1 = Bytes.toBytes("fam1"); 3230 byte[] qf1 = Bytes.toBytes("qualifier1"); 3231 byte[] qf2 = Bytes.toBytes("quateslifier2"); 3232 3233 long ts1 = 1; 3234 long ts2 = ts1 + 1; 3235 long ts3 = ts1 + 2; 3236 long ts4 = ts1 + 3; 3237 3238 // Setting up region 3239 this.region = initHRegion(tableName, method, CONF, fam1); 3240 // Putting data in Region 3241 KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null); 3242 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3243 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3244 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3245 3246 KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null); 3247 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3248 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3249 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3250 3251 Put put = null; 3252 put = new Put(row1); 3253 put.add(kv14); 3254 put.add(kv24); 3255 region.put(put); 3256 region.flush(true); 3257 3258 put = new Put(row1); 3259 put.add(kv23); 3260 put.add(kv13); 3261 region.put(put); 3262 region.flush(true); 3263 3264 put = new Put(row1); 3265 put.add(kv22); 3266 put.add(kv12); 3267 region.put(put); 3268 region.flush(true); 3269 3270 put = new Put(row1); 3271 put.add(kv21); 3272 put.add(kv11); 3273 region.put(put); 3274 3275 // Expected 3276 List<KeyValue> expected = new ArrayList<>(); 3277 expected.add(kv14); 3278 expected.add(kv13); 3279 expected.add(kv12); 3280 expected.add(kv24); 3281 expected.add(kv23); 3282 expected.add(kv22); 3283 3284 Scan scan = new Scan(row1); 3285 int versions = 3; 3286 scan.setMaxVersions(versions); 3287 List<Cell> actual = new ArrayList<>(); 3288 InternalScanner scanner = region.getScanner(scan); 3289 3290 boolean hasNext = scanner.next(actual); 3291 assertEquals(false, hasNext); 3292 3293 // Verify result 3294 for (int i = 0; i < expected.size(); i++) { 3295 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i))); 3296 } 3297 } 3298 3299 /** 3300 * Added for HBASE-5416 3301 * 3302 * Here we test scan optimization when only subset of CFs are used in filter 3303 * conditions. 3304 */ 3305 @Test 3306 public void testScanner_JoinedScanners() throws IOException { 3307 byte[] cf_essential = Bytes.toBytes("essential"); 3308 byte[] cf_joined = Bytes.toBytes("joined"); 3309 byte[] cf_alpha = Bytes.toBytes("alpha"); 3310 this.region = initHRegion(tableName, method, CONF, cf_essential, cf_joined, cf_alpha); 3311 byte[] row1 = Bytes.toBytes("row1"); 3312 byte[] row2 = Bytes.toBytes("row2"); 3313 byte[] row3 = Bytes.toBytes("row3"); 3314 3315 byte[] col_normal = Bytes.toBytes("d"); 3316 byte[] col_alpha = Bytes.toBytes("a"); 3317 3318 byte[] filtered_val = Bytes.toBytes(3); 3319 3320 Put put = new Put(row1); 3321 put.addColumn(cf_essential, col_normal, Bytes.toBytes(1)); 3322 put.addColumn(cf_joined, col_alpha, Bytes.toBytes(1)); 3323 region.put(put); 3324 3325 put = new Put(row2); 3326 put.addColumn(cf_essential, col_alpha, Bytes.toBytes(2)); 3327 put.addColumn(cf_joined, col_normal, Bytes.toBytes(2)); 3328 put.addColumn(cf_alpha, col_alpha, Bytes.toBytes(2)); 3329 region.put(put); 3330 3331 put = new Put(row3); 3332 put.addColumn(cf_essential, col_normal, filtered_val); 3333 put.addColumn(cf_joined, col_normal, filtered_val); 3334 region.put(put); 3335 3336 // Check two things: 3337 // 1. result list contains expected values 3338 // 2. result list is sorted properly 3339 3340 Scan scan = new Scan(); 3341 Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal, 3342 CompareOp.NOT_EQUAL, filtered_val); 3343 scan.setFilter(filter); 3344 scan.setLoadColumnFamiliesOnDemand(true); 3345 InternalScanner s = region.getScanner(scan); 3346 3347 List<Cell> results = new ArrayList<>(); 3348 assertTrue(s.next(results)); 3349 assertEquals(1, results.size()); 3350 results.clear(); 3351 3352 assertTrue(s.next(results)); 3353 assertEquals(3, results.size()); 3354 assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha)); 3355 assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential)); 3356 assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined)); 3357 results.clear(); 3358 3359 assertFalse(s.next(results)); 3360 assertEquals(0, results.size()); 3361 } 3362 3363 /** 3364 * HBASE-5416 3365 * 3366 * Test case when scan limits amount of KVs returned on each next() call. 3367 */ 3368 @Test 3369 public void testScanner_JoinedScannersWithLimits() throws IOException { 3370 final byte[] cf_first = Bytes.toBytes("first"); 3371 final byte[] cf_second = Bytes.toBytes("second"); 3372 3373 this.region = initHRegion(tableName, method, CONF, cf_first, cf_second); 3374 final byte[] col_a = Bytes.toBytes("a"); 3375 final byte[] col_b = Bytes.toBytes("b"); 3376 3377 Put put; 3378 3379 for (int i = 0; i < 10; i++) { 3380 put = new Put(Bytes.toBytes("r" + Integer.toString(i))); 3381 put.addColumn(cf_first, col_a, Bytes.toBytes(i)); 3382 if (i < 5) { 3383 put.addColumn(cf_first, col_b, Bytes.toBytes(i)); 3384 put.addColumn(cf_second, col_a, Bytes.toBytes(i)); 3385 put.addColumn(cf_second, col_b, Bytes.toBytes(i)); 3386 } 3387 region.put(put); 3388 } 3389 3390 Scan scan = new Scan(); 3391 scan.setLoadColumnFamiliesOnDemand(true); 3392 Filter bogusFilter = new FilterBase() { 3393 @Override 3394 public ReturnCode filterCell(final Cell ignored) throws IOException { 3395 return ReturnCode.INCLUDE; 3396 } 3397 @Override 3398 public boolean isFamilyEssential(byte[] name) { 3399 return Bytes.equals(name, cf_first); 3400 } 3401 }; 3402 3403 scan.setFilter(bogusFilter); 3404 InternalScanner s = region.getScanner(scan); 3405 3406 // Our data looks like this: 3407 // r0: first:a, first:b, second:a, second:b 3408 // r1: first:a, first:b, second:a, second:b 3409 // r2: first:a, first:b, second:a, second:b 3410 // r3: first:a, first:b, second:a, second:b 3411 // r4: first:a, first:b, second:a, second:b 3412 // r5: first:a 3413 // r6: first:a 3414 // r7: first:a 3415 // r8: first:a 3416 // r9: first:a 3417 3418 // But due to next's limit set to 3, we should get this: 3419 // r0: first:a, first:b, second:a 3420 // r0: second:b 3421 // r1: first:a, first:b, second:a 3422 // r1: second:b 3423 // r2: first:a, first:b, second:a 3424 // r2: second:b 3425 // r3: first:a, first:b, second:a 3426 // r3: second:b 3427 // r4: first:a, first:b, second:a 3428 // r4: second:b 3429 // r5: first:a 3430 // r6: first:a 3431 // r7: first:a 3432 // r8: first:a 3433 // r9: first:a 3434 3435 List<Cell> results = new ArrayList<>(); 3436 int index = 0; 3437 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(3).build(); 3438 while (true) { 3439 boolean more = s.next(results, scannerContext); 3440 if ((index >> 1) < 5) { 3441 if (index % 2 == 0) { 3442 assertEquals(3, results.size()); 3443 } else { 3444 assertEquals(1, results.size()); 3445 } 3446 } else { 3447 assertEquals(1, results.size()); 3448 } 3449 results.clear(); 3450 index++; 3451 if (!more) { 3452 break; 3453 } 3454 } 3455 } 3456 3457 /** 3458 * Write an HFile block full with Cells whose qualifier that are identical between 3459 * 0 and Short.MAX_VALUE. See HBASE-13329. 3460 * @throws Exception 3461 */ 3462 @Test 3463 public void testLongQualifier() throws Exception { 3464 byte[] family = Bytes.toBytes("family"); 3465 this.region = initHRegion(tableName, method, CONF, family); 3466 byte[] q = new byte[Short.MAX_VALUE+2]; 3467 Arrays.fill(q, 0, q.length-1, (byte)42); 3468 for (byte i=0; i<10; i++) { 3469 Put p = new Put(Bytes.toBytes("row")); 3470 // qualifiers that differ past Short.MAX_VALUE 3471 q[q.length-1]=i; 3472 p.addColumn(family, q, q); 3473 region.put(p); 3474 } 3475 region.flush(false); 3476 } 3477 3478 /** 3479 * Flushes the cache in a thread while scanning. The tests verify that the 3480 * scan is coherent - e.g. the returned results are always of the same or 3481 * later update as the previous results. 3482 * 3483 * @throws IOException 3484 * scan / compact 3485 * @throws InterruptedException 3486 * thread join 3487 */ 3488 @Test 3489 public void testFlushCacheWhileScanning() throws IOException, InterruptedException { 3490 byte[] family = Bytes.toBytes("family"); 3491 int numRows = 1000; 3492 int flushAndScanInterval = 10; 3493 int compactInterval = 10 * flushAndScanInterval; 3494 3495 this.region = initHRegion(tableName, method, CONF, family); 3496 FlushThread flushThread = new FlushThread(); 3497 try { 3498 flushThread.start(); 3499 3500 Scan scan = new Scan(); 3501 scan.addFamily(family); 3502 scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOp.EQUAL, 3503 new BinaryComparator(Bytes.toBytes(5L)))); 3504 3505 int expectedCount = 0; 3506 List<Cell> res = new ArrayList<>(); 3507 3508 boolean toggle = true; 3509 for (long i = 0; i < numRows; i++) { 3510 Put put = new Put(Bytes.toBytes(i)); 3511 put.setDurability(Durability.SKIP_WAL); 3512 put.addColumn(family, qual1, Bytes.toBytes(i % 10)); 3513 region.put(put); 3514 3515 if (i != 0 && i % compactInterval == 0) { 3516 LOG.debug("iteration = " + i+ " ts="+System.currentTimeMillis()); 3517 region.compact(true); 3518 } 3519 3520 if (i % 10 == 5L) { 3521 expectedCount++; 3522 } 3523 3524 if (i != 0 && i % flushAndScanInterval == 0) { 3525 res.clear(); 3526 InternalScanner scanner = region.getScanner(scan); 3527 if (toggle) { 3528 flushThread.flush(); 3529 } 3530 while (scanner.next(res)) 3531 ; 3532 if (!toggle) { 3533 flushThread.flush(); 3534 } 3535 assertEquals("toggle="+toggle+"i=" + i + " ts="+System.currentTimeMillis(), 3536 expectedCount, res.size()); 3537 toggle = !toggle; 3538 } 3539 } 3540 3541 } finally { 3542 try { 3543 flushThread.done(); 3544 flushThread.join(); 3545 flushThread.checkNoError(); 3546 } catch (InterruptedException ie) { 3547 LOG.warn("Caught exception when joining with flushThread", ie); 3548 } 3549 HBaseTestingUtility.closeRegionAndWAL(this.region); 3550 this.region = null; 3551 } 3552 } 3553 3554 protected class FlushThread extends Thread { 3555 private volatile boolean done; 3556 private Throwable error = null; 3557 3558 FlushThread() { 3559 super("FlushThread"); 3560 } 3561 3562 public void done() { 3563 done = true; 3564 synchronized (this) { 3565 interrupt(); 3566 } 3567 } 3568 3569 public void checkNoError() { 3570 if (error != null) { 3571 assertNull(error); 3572 } 3573 } 3574 3575 @Override 3576 public void run() { 3577 done = false; 3578 while (!done) { 3579 synchronized (this) { 3580 try { 3581 wait(); 3582 } catch (InterruptedException ignored) { 3583 if (done) { 3584 break; 3585 } 3586 } 3587 } 3588 try { 3589 region.flush(true); 3590 } catch (IOException e) { 3591 if (!done) { 3592 LOG.error("Error while flushing cache", e); 3593 error = e; 3594 } 3595 break; 3596 } catch (Throwable t) { 3597 LOG.error("Uncaught exception", t); 3598 throw t; 3599 } 3600 } 3601 } 3602 3603 public void flush() { 3604 synchronized (this) { 3605 notify(); 3606 } 3607 } 3608 } 3609 3610 /** 3611 * Writes very wide records and scans for the latest every time.. Flushes and 3612 * compacts the region every now and then to keep things realistic. 3613 * 3614 * @throws IOException 3615 * by flush / scan / compaction 3616 * @throws InterruptedException 3617 * when joining threads 3618 */ 3619 @Test 3620 public void testWritesWhileScanning() throws IOException, InterruptedException { 3621 int testCount = 100; 3622 int numRows = 1; 3623 int numFamilies = 10; 3624 int numQualifiers = 100; 3625 int flushInterval = 7; 3626 int compactInterval = 5 * flushInterval; 3627 byte[][] families = new byte[numFamilies][]; 3628 for (int i = 0; i < numFamilies; i++) { 3629 families[i] = Bytes.toBytes("family" + i); 3630 } 3631 byte[][] qualifiers = new byte[numQualifiers][]; 3632 for (int i = 0; i < numQualifiers; i++) { 3633 qualifiers[i] = Bytes.toBytes("qual" + i); 3634 } 3635 3636 this.region = initHRegion(tableName, method, CONF, families); 3637 FlushThread flushThread = new FlushThread(); 3638 PutThread putThread = new PutThread(numRows, families, qualifiers); 3639 try { 3640 putThread.start(); 3641 putThread.waitForFirstPut(); 3642 3643 flushThread.start(); 3644 3645 Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1")); 3646 3647 int expectedCount = numFamilies * numQualifiers; 3648 List<Cell> res = new ArrayList<>(); 3649 3650 long prevTimestamp = 0L; 3651 for (int i = 0; i < testCount; i++) { 3652 3653 if (i != 0 && i % compactInterval == 0) { 3654 region.compact(true); 3655 for (HStore store : region.getStores()) { 3656 store.closeAndArchiveCompactedFiles(); 3657 } 3658 } 3659 3660 if (i != 0 && i % flushInterval == 0) { 3661 flushThread.flush(); 3662 } 3663 3664 boolean previousEmpty = res.isEmpty(); 3665 res.clear(); 3666 InternalScanner scanner = region.getScanner(scan); 3667 while (scanner.next(res)) 3668 ; 3669 if (!res.isEmpty() || !previousEmpty || i > compactInterval) { 3670 assertEquals("i=" + i, expectedCount, res.size()); 3671 long timestamp = res.get(0).getTimestamp(); 3672 assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp, 3673 timestamp >= prevTimestamp); 3674 prevTimestamp = timestamp; 3675 } 3676 } 3677 3678 putThread.done(); 3679 3680 region.flush(true); 3681 3682 } finally { 3683 try { 3684 flushThread.done(); 3685 flushThread.join(); 3686 flushThread.checkNoError(); 3687 3688 putThread.join(); 3689 putThread.checkNoError(); 3690 } catch (InterruptedException ie) { 3691 LOG.warn("Caught exception when joining with flushThread", ie); 3692 } 3693 3694 try { 3695 HBaseTestingUtility.closeRegionAndWAL(this.region); 3696 } catch (DroppedSnapshotException dse) { 3697 // We could get this on way out because we interrupt the background flusher and it could 3698 // fail anywhere causing a DSE over in the background flusher... only it is not properly 3699 // dealt with so could still be memory hanging out when we get to here -- memory we can't 3700 // flush because the accounting is 'off' since original DSE. 3701 } 3702 this.region = null; 3703 } 3704 } 3705 3706 protected class PutThread extends Thread { 3707 private volatile boolean done; 3708 private volatile int numPutsFinished = 0; 3709 3710 private Throwable error = null; 3711 private int numRows; 3712 private byte[][] families; 3713 private byte[][] qualifiers; 3714 3715 private PutThread(int numRows, byte[][] families, byte[][] qualifiers) { 3716 super("PutThread"); 3717 this.numRows = numRows; 3718 this.families = families; 3719 this.qualifiers = qualifiers; 3720 } 3721 3722 /** 3723 * Block calling thread until this instance of PutThread has put at least one row. 3724 */ 3725 public void waitForFirstPut() throws InterruptedException { 3726 // wait until put thread actually puts some data 3727 while (isAlive() && numPutsFinished == 0) { 3728 checkNoError(); 3729 Thread.sleep(50); 3730 } 3731 } 3732 3733 public void done() { 3734 done = true; 3735 synchronized (this) { 3736 interrupt(); 3737 } 3738 } 3739 3740 public void checkNoError() { 3741 if (error != null) { 3742 assertNull(error); 3743 } 3744 } 3745 3746 @Override 3747 public void run() { 3748 done = false; 3749 while (!done) { 3750 try { 3751 for (int r = 0; r < numRows; r++) { 3752 byte[] row = Bytes.toBytes("row" + r); 3753 Put put = new Put(row); 3754 put.setDurability(Durability.SKIP_WAL); 3755 byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished)); 3756 for (byte[] family : families) { 3757 for (byte[] qualifier : qualifiers) { 3758 put.addColumn(family, qualifier, numPutsFinished, value); 3759 } 3760 } 3761 region.put(put); 3762 numPutsFinished++; 3763 if (numPutsFinished > 0 && numPutsFinished % 47 == 0) { 3764 System.out.println("put iteration = " + numPutsFinished); 3765 Delete delete = new Delete(row, (long) numPutsFinished - 30); 3766 region.delete(delete); 3767 } 3768 numPutsFinished++; 3769 } 3770 } catch (InterruptedIOException e) { 3771 // This is fine. It means we are done, or didn't get the lock on time 3772 LOG.info("Interrupted", e); 3773 } catch (IOException e) { 3774 LOG.error("Error while putting records", e); 3775 error = e; 3776 break; 3777 } 3778 } 3779 3780 } 3781 3782 } 3783 3784 /** 3785 * Writes very wide records and gets the latest row every time.. Flushes and 3786 * compacts the region aggressivly to catch issues. 3787 * 3788 * @throws IOException 3789 * by flush / scan / compaction 3790 * @throws InterruptedException 3791 * when joining threads 3792 */ 3793 @Test 3794 public void testWritesWhileGetting() throws Exception { 3795 int testCount = 50; 3796 int numRows = 1; 3797 int numFamilies = 10; 3798 int numQualifiers = 100; 3799 int compactInterval = 100; 3800 byte[][] families = new byte[numFamilies][]; 3801 for (int i = 0; i < numFamilies; i++) { 3802 families[i] = Bytes.toBytes("family" + i); 3803 } 3804 byte[][] qualifiers = new byte[numQualifiers][]; 3805 for (int i = 0; i < numQualifiers; i++) { 3806 qualifiers[i] = Bytes.toBytes("qual" + i); 3807 } 3808 3809 3810 // This test flushes constantly and can cause many files to be created, 3811 // possibly 3812 // extending over the ulimit. Make sure compactions are aggressive in 3813 // reducing 3814 // the number of HFiles created. 3815 Configuration conf = HBaseConfiguration.create(CONF); 3816 conf.setInt("hbase.hstore.compaction.min", 1); 3817 conf.setInt("hbase.hstore.compaction.max", 1000); 3818 this.region = initHRegion(tableName, method, conf, families); 3819 PutThread putThread = null; 3820 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 3821 try { 3822 putThread = new PutThread(numRows, families, qualifiers); 3823 putThread.start(); 3824 putThread.waitForFirstPut(); 3825 3826 // Add a thread that flushes as fast as possible 3827 ctx.addThread(new RepeatingTestThread(ctx) { 3828 3829 @Override 3830 public void doAnAction() throws Exception { 3831 region.flush(true); 3832 // Compact regularly to avoid creating too many files and exceeding 3833 // the ulimit. 3834 region.compact(false); 3835 for (HStore store : region.getStores()) { 3836 store.closeAndArchiveCompactedFiles(); 3837 } 3838 } 3839 }); 3840 ctx.startThreads(); 3841 3842 Get get = new Get(Bytes.toBytes("row0")); 3843 Result result = null; 3844 3845 int expectedCount = numFamilies * numQualifiers; 3846 3847 long prevTimestamp = 0L; 3848 for (int i = 0; i < testCount; i++) { 3849 LOG.info("testWritesWhileGetting verify turn " + i); 3850 boolean previousEmpty = result == null || result.isEmpty(); 3851 result = region.get(get); 3852 if (!result.isEmpty() || !previousEmpty || i > compactInterval) { 3853 assertEquals("i=" + i, expectedCount, result.size()); 3854 // TODO this was removed, now what dangit?! 3855 // search looking for the qualifier in question? 3856 long timestamp = 0; 3857 for (Cell kv : result.rawCells()) { 3858 if (CellUtil.matchingFamily(kv, families[0]) 3859 && CellUtil.matchingQualifier(kv, qualifiers[0])) { 3860 timestamp = kv.getTimestamp(); 3861 } 3862 } 3863 assertTrue(timestamp >= prevTimestamp); 3864 prevTimestamp = timestamp; 3865 Cell previousKV = null; 3866 3867 for (Cell kv : result.rawCells()) { 3868 byte[] thisValue = CellUtil.cloneValue(kv); 3869 if (previousKV != null) { 3870 if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) { 3871 LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV 3872 + "(memStoreTS:" + previousKV.getSequenceId() + ")" + ", New KV: " + kv 3873 + "(memStoreTS:" + kv.getSequenceId() + ")"); 3874 assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue)); 3875 } 3876 } 3877 previousKV = kv; 3878 } 3879 } 3880 } 3881 } finally { 3882 if (putThread != null) 3883 putThread.done(); 3884 3885 region.flush(true); 3886 3887 if (putThread != null) { 3888 putThread.join(); 3889 putThread.checkNoError(); 3890 } 3891 3892 ctx.stop(); 3893 HBaseTestingUtility.closeRegionAndWAL(this.region); 3894 this.region = null; 3895 } 3896 } 3897 3898 @Test 3899 public void testHolesInMeta() throws Exception { 3900 byte[] family = Bytes.toBytes("family"); 3901 this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF, 3902 false, family); 3903 byte[] rowNotServed = Bytes.toBytes("a"); 3904 Get g = new Get(rowNotServed); 3905 try { 3906 region.get(g); 3907 fail(); 3908 } catch (WrongRegionException x) { 3909 // OK 3910 } 3911 byte[] row = Bytes.toBytes("y"); 3912 g = new Get(row); 3913 region.get(g); 3914 } 3915 3916 @Test 3917 public void testIndexesScanWithOneDeletedRow() throws IOException { 3918 byte[] family = Bytes.toBytes("family"); 3919 3920 // Setting up region 3921 this.region = initHRegion(tableName, method, CONF, family); 3922 Put put = new Put(Bytes.toBytes(1L)); 3923 put.addColumn(family, qual1, 1L, Bytes.toBytes(1L)); 3924 region.put(put); 3925 3926 region.flush(true); 3927 3928 Delete delete = new Delete(Bytes.toBytes(1L), 1L); 3929 region.delete(delete); 3930 3931 put = new Put(Bytes.toBytes(2L)); 3932 put.addColumn(family, qual1, 2L, Bytes.toBytes(2L)); 3933 region.put(put); 3934 3935 Scan idxScan = new Scan(); 3936 idxScan.addFamily(family); 3937 idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList( 3938 new SingleColumnValueFilter(family, qual1, CompareOp.GREATER_OR_EQUAL, 3939 new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1, 3940 CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L)))))); 3941 InternalScanner scanner = region.getScanner(idxScan); 3942 List<Cell> res = new ArrayList<>(); 3943 3944 while (scanner.next(res)) { 3945 // Ignore res value. 3946 } 3947 assertEquals(1L, res.size()); 3948 } 3949 3950 // //////////////////////////////////////////////////////////////////////////// 3951 // Bloom filter test 3952 // //////////////////////////////////////////////////////////////////////////// 3953 @Test 3954 public void testBloomFilterSize() throws IOException { 3955 byte[] fam1 = Bytes.toBytes("fam1"); 3956 byte[] qf1 = Bytes.toBytes("col"); 3957 byte[] val1 = Bytes.toBytes("value1"); 3958 // Create Table 3959 HColumnDescriptor hcd = new HColumnDescriptor(fam1).setMaxVersions(Integer.MAX_VALUE) 3960 .setBloomFilterType(BloomType.ROWCOL); 3961 3962 HTableDescriptor htd = new HTableDescriptor(tableName); 3963 htd.addFamily(hcd); 3964 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 3965 this.region = TEST_UTIL.createLocalHRegion(info, htd); 3966 int num_unique_rows = 10; 3967 int duplicate_multiplier = 2; 3968 int num_storefiles = 4; 3969 3970 int version = 0; 3971 for (int f = 0; f < num_storefiles; f++) { 3972 for (int i = 0; i < duplicate_multiplier; i++) { 3973 for (int j = 0; j < num_unique_rows; j++) { 3974 Put put = new Put(Bytes.toBytes("row" + j)); 3975 put.setDurability(Durability.SKIP_WAL); 3976 long ts = version++; 3977 put.addColumn(fam1, qf1, ts, val1); 3978 region.put(put); 3979 } 3980 } 3981 region.flush(true); 3982 } 3983 // before compaction 3984 HStore store = region.getStore(fam1); 3985 Collection<HStoreFile> storeFiles = store.getStorefiles(); 3986 for (HStoreFile storefile : storeFiles) { 3987 StoreFileReader reader = storefile.getReader(); 3988 reader.loadFileInfo(); 3989 reader.loadBloomfilter(); 3990 assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries()); 3991 assertEquals(num_unique_rows, reader.getFilterEntries()); 3992 } 3993 3994 region.compact(true); 3995 3996 // after compaction 3997 storeFiles = store.getStorefiles(); 3998 for (HStoreFile storefile : storeFiles) { 3999 StoreFileReader reader = storefile.getReader(); 4000 reader.loadFileInfo(); 4001 reader.loadBloomfilter(); 4002 assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries()); 4003 assertEquals(num_unique_rows, reader.getFilterEntries()); 4004 } 4005 } 4006 4007 @Test 4008 public void testAllColumnsWithBloomFilter() throws IOException { 4009 byte[] TABLE = Bytes.toBytes(name.getMethodName()); 4010 byte[] FAMILY = Bytes.toBytes("family"); 4011 4012 // Create table 4013 HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE) 4014 .setBloomFilterType(BloomType.ROWCOL); 4015 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE)); 4016 htd.addFamily(hcd); 4017 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 4018 this.region = TEST_UTIL.createLocalHRegion(info, htd); 4019 // For row:0, col:0: insert versions 1 through 5. 4020 byte[] row = Bytes.toBytes("row:" + 0); 4021 byte[] column = Bytes.toBytes("column:" + 0); 4022 Put put = new Put(row); 4023 put.setDurability(Durability.SKIP_WAL); 4024 for (long idx = 1; idx <= 4; idx++) { 4025 put.addColumn(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx)); 4026 } 4027 region.put(put); 4028 4029 // Flush 4030 region.flush(true); 4031 4032 // Get rows 4033 Get get = new Get(row); 4034 get.setMaxVersions(); 4035 Cell[] kvs = region.get(get).rawCells(); 4036 4037 // Check if rows are correct 4038 assertEquals(4, kvs.length); 4039 checkOneCell(kvs[0], FAMILY, 0, 0, 4); 4040 checkOneCell(kvs[1], FAMILY, 0, 0, 3); 4041 checkOneCell(kvs[2], FAMILY, 0, 0, 2); 4042 checkOneCell(kvs[3], FAMILY, 0, 0, 1); 4043 } 4044 4045 /** 4046 * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when 4047 * issuing delete row on columns with bloom filter set to row+col 4048 * (BloomType.ROWCOL) 4049 */ 4050 @Test 4051 public void testDeleteRowWithBloomFilter() throws IOException { 4052 byte[] familyName = Bytes.toBytes("familyName"); 4053 4054 // Create Table 4055 HColumnDescriptor hcd = new HColumnDescriptor(familyName).setMaxVersions(Integer.MAX_VALUE) 4056 .setBloomFilterType(BloomType.ROWCOL); 4057 4058 HTableDescriptor htd = new HTableDescriptor(tableName); 4059 htd.addFamily(hcd); 4060 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 4061 this.region = TEST_UTIL.createLocalHRegion(info, htd); 4062 // Insert some data 4063 byte[] row = Bytes.toBytes("row1"); 4064 byte[] col = Bytes.toBytes("col1"); 4065 4066 Put put = new Put(row); 4067 put.addColumn(familyName, col, 1, Bytes.toBytes("SomeRandomValue")); 4068 region.put(put); 4069 region.flush(true); 4070 4071 Delete del = new Delete(row); 4072 region.delete(del); 4073 region.flush(true); 4074 4075 // Get remaining rows (should have none) 4076 Get get = new Get(row); 4077 get.addColumn(familyName, col); 4078 4079 Cell[] keyValues = region.get(get).rawCells(); 4080 assertEquals(0, keyValues.length); 4081 } 4082 4083 @Test 4084 public void testgetHDFSBlocksDistribution() throws Exception { 4085 HBaseTestingUtility htu = new HBaseTestingUtility(); 4086 // Why do we set the block size in this test? If we set it smaller than the kvs, then we'll 4087 // break up the file in to more pieces that can be distributed across the three nodes and we 4088 // won't be able to have the condition this test asserts; that at least one node has 4089 // a copy of all replicas -- if small block size, then blocks are spread evenly across the 4090 // the three nodes. hfilev3 with tags seems to put us over the block size. St.Ack. 4091 // final int DEFAULT_BLOCK_SIZE = 1024; 4092 // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); 4093 htu.getConfiguration().setInt("dfs.replication", 2); 4094 4095 // set up a cluster with 3 nodes 4096 MiniHBaseCluster cluster = null; 4097 String dataNodeHosts[] = new String[] { "host1", "host2", "host3" }; 4098 int regionServersCount = 3; 4099 4100 try { 4101 StartMiniClusterOption option = StartMiniClusterOption.builder() 4102 .numRegionServers(regionServersCount).dataNodeHosts(dataNodeHosts).build(); 4103 cluster = htu.startMiniCluster(option); 4104 byte[][] families = { fam1, fam2 }; 4105 Table ht = htu.createTable(tableName, families); 4106 4107 // Setting up region 4108 byte row[] = Bytes.toBytes("row1"); 4109 byte col[] = Bytes.toBytes("col1"); 4110 4111 Put put = new Put(row); 4112 put.addColumn(fam1, col, 1, Bytes.toBytes("test1")); 4113 put.addColumn(fam2, col, 1, Bytes.toBytes("test2")); 4114 ht.put(put); 4115 4116 HRegion firstRegion = htu.getHBaseCluster().getRegions(tableName).get(0); 4117 firstRegion.flush(true); 4118 HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution(); 4119 4120 // Given the default replication factor is 2 and we have 2 HFiles, 4121 // we will have total of 4 replica of blocks on 3 datanodes; thus there 4122 // must be at least one host that have replica for 2 HFiles. That host's 4123 // weight will be equal to the unique block weight. 4124 long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight(); 4125 StringBuilder sb = new StringBuilder(); 4126 for (String host: blocksDistribution1.getTopHosts()) { 4127 if (sb.length() > 0) sb.append(", "); 4128 sb.append(host); 4129 sb.append("="); 4130 sb.append(blocksDistribution1.getWeight(host)); 4131 } 4132 4133 String topHost = blocksDistribution1.getTopHosts().get(0); 4134 long topHostWeight = blocksDistribution1.getWeight(topHost); 4135 String msg = "uniqueBlocksWeight=" + uniqueBlocksWeight1 + ", topHostWeight=" + 4136 topHostWeight + ", topHost=" + topHost + "; " + sb.toString(); 4137 LOG.info(msg); 4138 assertTrue(msg, uniqueBlocksWeight1 == topHostWeight); 4139 4140 // use the static method to compute the value, it should be the same. 4141 // static method is used by load balancer or other components 4142 HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution( 4143 htu.getConfiguration(), firstRegion.getTableDescriptor(), firstRegion.getRegionInfo()); 4144 long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight(); 4145 4146 assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2); 4147 4148 ht.close(); 4149 } finally { 4150 if (cluster != null) { 4151 htu.shutdownMiniCluster(); 4152 } 4153 } 4154 } 4155 4156 /** 4157 * Testcase to check state of region initialization task set to ABORTED or not 4158 * if any exceptions during initialization 4159 * 4160 * @throws Exception 4161 */ 4162 @Test 4163 public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception { 4164 HRegionInfo info; 4165 try { 4166 FileSystem fs = Mockito.mock(FileSystem.class); 4167 Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException()); 4168 HTableDescriptor htd = new HTableDescriptor(tableName); 4169 htd.addFamily(new HColumnDescriptor("cf")); 4170 info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY, 4171 HConstants.EMPTY_BYTE_ARRAY, false); 4172 Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization"); 4173 region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null); 4174 // region initialization throws IOException and set task state to ABORTED. 4175 region.initialize(); 4176 fail("Region initialization should fail due to IOException"); 4177 } catch (IOException io) { 4178 List<MonitoredTask> tasks = TaskMonitor.get().getTasks(); 4179 for (MonitoredTask monitoredTask : tasks) { 4180 if (!(monitoredTask instanceof MonitoredRPCHandler) 4181 && monitoredTask.getDescription().contains(region.toString())) { 4182 assertTrue("Region state should be ABORTED.", 4183 monitoredTask.getState().equals(MonitoredTask.State.ABORTED)); 4184 break; 4185 } 4186 } 4187 } 4188 } 4189 4190 /** 4191 * Verifies that the .regioninfo file is written on region creation and that 4192 * is recreated if missing during region opening. 4193 */ 4194 @Test 4195 public void testRegionInfoFileCreation() throws IOException { 4196 Path rootDir = new Path(dir + "testRegionInfoFileCreation"); 4197 4198 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 4199 htd.addFamily(new HColumnDescriptor("cf")); 4200 4201 HRegionInfo hri = new HRegionInfo(htd.getTableName()); 4202 4203 // Create a region and skip the initialization (like CreateTableHandler) 4204 region = HBaseTestingUtility.createRegionAndWAL(hri, rootDir, CONF, htd, false); 4205 Path regionDir = region.getRegionFileSystem().getRegionDir(); 4206 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 4207 HBaseTestingUtility.closeRegionAndWAL(region); 4208 4209 Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE); 4210 4211 // Verify that the .regioninfo file is present 4212 assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir", 4213 fs.exists(regionInfoFile)); 4214 4215 // Try to open the region 4216 region = HRegion.openHRegion(rootDir, hri, htd, null, CONF); 4217 assertEquals(regionDir, region.getRegionFileSystem().getRegionDir()); 4218 HBaseTestingUtility.closeRegionAndWAL(region); 4219 4220 // Verify that the .regioninfo file is still there 4221 assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir", 4222 fs.exists(regionInfoFile)); 4223 4224 // Remove the .regioninfo file and verify is recreated on region open 4225 fs.delete(regionInfoFile, true); 4226 assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir", 4227 fs.exists(regionInfoFile)); 4228 4229 region = HRegion.openHRegion(rootDir, hri, htd, null, CONF); 4230// region = TEST_UTIL.openHRegion(hri, htd); 4231 assertEquals(regionDir, region.getRegionFileSystem().getRegionDir()); 4232 HBaseTestingUtility.closeRegionAndWAL(region); 4233 4234 // Verify that the .regioninfo file is still there 4235 assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir", 4236 fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE))); 4237 4238 region = null; 4239 } 4240 4241 /** 4242 * TestCase for increment 4243 */ 4244 private static class Incrementer implements Runnable { 4245 private HRegion region; 4246 private final static byte[] incRow = Bytes.toBytes("incRow"); 4247 private final static byte[] family = Bytes.toBytes("family"); 4248 private final static byte[] qualifier = Bytes.toBytes("qualifier"); 4249 private final static long ONE = 1L; 4250 private int incCounter; 4251 4252 public Incrementer(HRegion region, int incCounter) { 4253 this.region = region; 4254 this.incCounter = incCounter; 4255 } 4256 4257 @Override 4258 public void run() { 4259 int count = 0; 4260 while (count < incCounter) { 4261 Increment inc = new Increment(incRow); 4262 inc.addColumn(family, qualifier, ONE); 4263 count++; 4264 try { 4265 region.increment(inc); 4266 } catch (IOException e) { 4267 LOG.info("Count=" + count + ", " + e); 4268 break; 4269 } 4270 } 4271 } 4272 } 4273 4274 /** 4275 * Test case to check increment function with memstore flushing 4276 * @throws Exception 4277 */ 4278 @Test 4279 public void testParallelIncrementWithMemStoreFlush() throws Exception { 4280 byte[] family = Incrementer.family; 4281 this.region = initHRegion(tableName, method, CONF, family); 4282 final HRegion region = this.region; 4283 final AtomicBoolean incrementDone = new AtomicBoolean(false); 4284 Runnable flusher = new Runnable() { 4285 @Override 4286 public void run() { 4287 while (!incrementDone.get()) { 4288 try { 4289 region.flush(true); 4290 } catch (Exception e) { 4291 e.printStackTrace(); 4292 } 4293 } 4294 } 4295 }; 4296 4297 // after all increment finished, the row will increment to 20*100 = 2000 4298 int threadNum = 20; 4299 int incCounter = 100; 4300 long expected = (long) threadNum * incCounter; 4301 Thread[] incrementers = new Thread[threadNum]; 4302 Thread flushThread = new Thread(flusher); 4303 for (int i = 0; i < threadNum; i++) { 4304 incrementers[i] = new Thread(new Incrementer(this.region, incCounter)); 4305 incrementers[i].start(); 4306 } 4307 flushThread.start(); 4308 for (int i = 0; i < threadNum; i++) { 4309 incrementers[i].join(); 4310 } 4311 4312 incrementDone.set(true); 4313 flushThread.join(); 4314 4315 Get get = new Get(Incrementer.incRow); 4316 get.addColumn(Incrementer.family, Incrementer.qualifier); 4317 get.setMaxVersions(1); 4318 Result res = this.region.get(get); 4319 List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier); 4320 4321 // we just got the latest version 4322 assertEquals(1, kvs.size()); 4323 Cell kv = kvs.get(0); 4324 assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset())); 4325 } 4326 4327 /** 4328 * TestCase for append 4329 */ 4330 private static class Appender implements Runnable { 4331 private HRegion region; 4332 private final static byte[] appendRow = Bytes.toBytes("appendRow"); 4333 private final static byte[] family = Bytes.toBytes("family"); 4334 private final static byte[] qualifier = Bytes.toBytes("qualifier"); 4335 private final static byte[] CHAR = Bytes.toBytes("a"); 4336 private int appendCounter; 4337 4338 public Appender(HRegion region, int appendCounter) { 4339 this.region = region; 4340 this.appendCounter = appendCounter; 4341 } 4342 4343 @Override 4344 public void run() { 4345 int count = 0; 4346 while (count < appendCounter) { 4347 Append app = new Append(appendRow); 4348 app.addColumn(family, qualifier, CHAR); 4349 count++; 4350 try { 4351 region.append(app); 4352 } catch (IOException e) { 4353 LOG.info("Count=" + count + ", max=" + appendCounter + ", " + e); 4354 break; 4355 } 4356 } 4357 } 4358 } 4359 4360 /** 4361 * Test case to check append function with memstore flushing 4362 * @throws Exception 4363 */ 4364 @Test 4365 public void testParallelAppendWithMemStoreFlush() throws Exception { 4366 byte[] family = Appender.family; 4367 this.region = initHRegion(tableName, method, CONF, family); 4368 final HRegion region = this.region; 4369 final AtomicBoolean appendDone = new AtomicBoolean(false); 4370 Runnable flusher = new Runnable() { 4371 @Override 4372 public void run() { 4373 while (!appendDone.get()) { 4374 try { 4375 region.flush(true); 4376 } catch (Exception e) { 4377 e.printStackTrace(); 4378 } 4379 } 4380 } 4381 }; 4382 4383 // After all append finished, the value will append to threadNum * 4384 // appendCounter Appender.CHAR 4385 int threadNum = 20; 4386 int appendCounter = 100; 4387 byte[] expected = new byte[threadNum * appendCounter]; 4388 for (int i = 0; i < threadNum * appendCounter; i++) { 4389 System.arraycopy(Appender.CHAR, 0, expected, i, 1); 4390 } 4391 Thread[] appenders = new Thread[threadNum]; 4392 Thread flushThread = new Thread(flusher); 4393 for (int i = 0; i < threadNum; i++) { 4394 appenders[i] = new Thread(new Appender(this.region, appendCounter)); 4395 appenders[i].start(); 4396 } 4397 flushThread.start(); 4398 for (int i = 0; i < threadNum; i++) { 4399 appenders[i].join(); 4400 } 4401 4402 appendDone.set(true); 4403 flushThread.join(); 4404 4405 Get get = new Get(Appender.appendRow); 4406 get.addColumn(Appender.family, Appender.qualifier); 4407 get.setMaxVersions(1); 4408 Result res = this.region.get(get); 4409 List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier); 4410 4411 // we just got the latest version 4412 assertEquals(1, kvs.size()); 4413 Cell kv = kvs.get(0); 4414 byte[] appendResult = new byte[kv.getValueLength()]; 4415 System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength()); 4416 assertArrayEquals(expected, appendResult); 4417 } 4418 4419 /** 4420 * Test case to check put function with memstore flushing for same row, same ts 4421 * @throws Exception 4422 */ 4423 @Test 4424 public void testPutWithMemStoreFlush() throws Exception { 4425 byte[] family = Bytes.toBytes("family"); 4426 byte[] qualifier = Bytes.toBytes("qualifier"); 4427 byte[] row = Bytes.toBytes("putRow"); 4428 byte[] value = null; 4429 this.region = initHRegion(tableName, method, CONF, family); 4430 Put put = null; 4431 Get get = null; 4432 List<Cell> kvs = null; 4433 Result res = null; 4434 4435 put = new Put(row); 4436 value = Bytes.toBytes("value0"); 4437 put.addColumn(family, qualifier, 1234567L, value); 4438 region.put(put); 4439 get = new Get(row); 4440 get.addColumn(family, qualifier); 4441 get.setMaxVersions(); 4442 res = this.region.get(get); 4443 kvs = res.getColumnCells(family, qualifier); 4444 assertEquals(1, kvs.size()); 4445 assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0))); 4446 4447 region.flush(true); 4448 get = new Get(row); 4449 get.addColumn(family, qualifier); 4450 get.setMaxVersions(); 4451 res = this.region.get(get); 4452 kvs = res.getColumnCells(family, qualifier); 4453 assertEquals(1, kvs.size()); 4454 assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0))); 4455 4456 put = new Put(row); 4457 value = Bytes.toBytes("value1"); 4458 put.addColumn(family, qualifier, 1234567L, value); 4459 region.put(put); 4460 get = new Get(row); 4461 get.addColumn(family, qualifier); 4462 get.setMaxVersions(); 4463 res = this.region.get(get); 4464 kvs = res.getColumnCells(family, qualifier); 4465 assertEquals(1, kvs.size()); 4466 assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0))); 4467 4468 region.flush(true); 4469 get = new Get(row); 4470 get.addColumn(family, qualifier); 4471 get.setMaxVersions(); 4472 res = this.region.get(get); 4473 kvs = res.getColumnCells(family, qualifier); 4474 assertEquals(1, kvs.size()); 4475 assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0))); 4476 } 4477 4478 @Test 4479 public void testDurability() throws Exception { 4480 // there are 5 x 5 cases: 4481 // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation 4482 // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) 4483 4484 // expected cases for append and sync wal 4485 durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); 4486 durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); 4487 durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false); 4488 4489 durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); 4490 durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); 4491 durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false); 4492 4493 durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); 4494 durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); 4495 4496 durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false); 4497 durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false); 4498 4499 durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false); 4500 durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false); 4501 durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false); 4502 4503 // expected cases for async wal 4504 durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); 4505 durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); 4506 durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); 4507 durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false); 4508 durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false); 4509 durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false); 4510 4511 durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); 4512 durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); 4513 durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); 4514 durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true); 4515 durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true); 4516 durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true); 4517 4518 // expect skip wal cases 4519 durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); 4520 durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); 4521 durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); 4522 durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false); 4523 durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false); 4524 durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false); 4525 4526 } 4527 4528 private void durabilityTest(String method, Durability tableDurability, 4529 Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync, 4530 final boolean expectSyncFromLogSyncer) throws Exception { 4531 Configuration conf = HBaseConfiguration.create(CONF); 4532 method = method + "_" + tableDurability.name() + "_" + mutationDurability.name(); 4533 byte[] family = Bytes.toBytes("family"); 4534 Path logDir = new Path(new Path(dir + method), "log"); 4535 final Configuration walConf = new Configuration(conf); 4536 FSUtils.setRootDir(walConf, logDir); 4537 // XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not 4538 // deal with classes which have a field of an inner class. See discussions in HBASE-15536. 4539 walConf.set(WALFactory.WAL_PROVIDER, "filesystem"); 4540 final WALFactory wals = new WALFactory(walConf, TEST_UTIL.getRandomUUID().toString()); 4541 final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build())); 4542 this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, 4543 HConstants.EMPTY_END_ROW, false, tableDurability, wal, 4544 new byte[][] { family }); 4545 4546 Put put = new Put(Bytes.toBytes("r1")); 4547 put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); 4548 put.setDurability(mutationDurability); 4549 region.put(put); 4550 4551 // verify append called or not 4552 verify(wal, expectAppend ? times(1) : never()).appendData((HRegionInfo) any(), 4553 (WALKeyImpl) any(), (WALEdit) any()); 4554 4555 // verify sync called or not 4556 if (expectSync || expectSyncFromLogSyncer) { 4557 TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() { 4558 @Override 4559 public boolean evaluate() throws Exception { 4560 try { 4561 if (expectSync) { 4562 verify(wal, times(1)).sync(anyLong()); // Hregion calls this one 4563 } else if (expectSyncFromLogSyncer) { 4564 verify(wal, times(1)).sync(); // wal syncer calls this one 4565 } 4566 } catch (Throwable ignore) { 4567 } 4568 return true; 4569 } 4570 }); 4571 } else { 4572 //verify(wal, never()).sync(anyLong()); 4573 verify(wal, never()).sync(); 4574 } 4575 4576 HBaseTestingUtility.closeRegionAndWAL(this.region); 4577 wals.close(); 4578 this.region = null; 4579 } 4580 4581 @Test 4582 public void testRegionReplicaSecondary() throws IOException { 4583 // create a primary region, load some data and flush 4584 // create a secondary region, and do a get against that 4585 Path rootDir = new Path(dir + name.getMethodName()); 4586 FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); 4587 4588 byte[][] families = new byte[][] { 4589 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") 4590 }; 4591 byte[] cq = Bytes.toBytes("cq"); 4592 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 4593 for (byte[] family : families) { 4594 htd.addFamily(new HColumnDescriptor(family)); 4595 } 4596 4597 long time = System.currentTimeMillis(); 4598 HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(), 4599 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4600 false, time, 0); 4601 HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(), 4602 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4603 false, time, 1); 4604 4605 HRegion primaryRegion = null, secondaryRegion = null; 4606 4607 try { 4608 primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri, 4609 rootDir, TEST_UTIL.getConfiguration(), htd); 4610 4611 // load some data 4612 putData(primaryRegion, 0, 1000, cq, families); 4613 4614 // flush region 4615 primaryRegion.flush(true); 4616 4617 // open secondary region 4618 secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF); 4619 4620 verifyData(secondaryRegion, 0, 1000, cq, families); 4621 } finally { 4622 if (primaryRegion != null) { 4623 HBaseTestingUtility.closeRegionAndWAL(primaryRegion); 4624 } 4625 if (secondaryRegion != null) { 4626 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); 4627 } 4628 } 4629 } 4630 4631 @Test 4632 public void testRegionReplicaSecondaryIsReadOnly() throws IOException { 4633 // create a primary region, load some data and flush 4634 // create a secondary region, and do a put against that 4635 Path rootDir = new Path(dir + name.getMethodName()); 4636 FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); 4637 4638 byte[][] families = new byte[][] { 4639 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") 4640 }; 4641 byte[] cq = Bytes.toBytes("cq"); 4642 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 4643 for (byte[] family : families) { 4644 htd.addFamily(new HColumnDescriptor(family)); 4645 } 4646 4647 long time = System.currentTimeMillis(); 4648 HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(), 4649 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4650 false, time, 0); 4651 HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(), 4652 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4653 false, time, 1); 4654 4655 HRegion primaryRegion = null, secondaryRegion = null; 4656 4657 try { 4658 primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri, 4659 rootDir, TEST_UTIL.getConfiguration(), htd); 4660 4661 // load some data 4662 putData(primaryRegion, 0, 1000, cq, families); 4663 4664 // flush region 4665 primaryRegion.flush(true); 4666 4667 // open secondary region 4668 secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF); 4669 4670 try { 4671 putData(secondaryRegion, 0, 1000, cq, families); 4672 fail("Should have thrown exception"); 4673 } catch (IOException ex) { 4674 // expected 4675 } 4676 } finally { 4677 if (primaryRegion != null) { 4678 HBaseTestingUtility.closeRegionAndWAL(primaryRegion); 4679 } 4680 if (secondaryRegion != null) { 4681 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); 4682 } 4683 } 4684 } 4685 4686 static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException { 4687 Configuration confForWAL = new Configuration(conf); 4688 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); 4689 return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)); 4690 } 4691 4692 @Test 4693 public void testCompactionFromPrimary() throws IOException { 4694 Path rootDir = new Path(dir + name.getMethodName()); 4695 FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); 4696 4697 byte[][] families = new byte[][] { 4698 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") 4699 }; 4700 byte[] cq = Bytes.toBytes("cq"); 4701 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 4702 for (byte[] family : families) { 4703 htd.addFamily(new HColumnDescriptor(family)); 4704 } 4705 4706 long time = System.currentTimeMillis(); 4707 HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(), 4708 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4709 false, time, 0); 4710 HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(), 4711 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4712 false, time, 1); 4713 4714 HRegion primaryRegion = null, secondaryRegion = null; 4715 4716 try { 4717 primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri, 4718 rootDir, TEST_UTIL.getConfiguration(), htd); 4719 4720 // load some data 4721 putData(primaryRegion, 0, 1000, cq, families); 4722 4723 // flush region 4724 primaryRegion.flush(true); 4725 4726 // open secondary region 4727 secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF); 4728 4729 // move the file of the primary region to the archive, simulating a compaction 4730 Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles(); 4731 primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles); 4732 Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem() 4733 .getStoreFiles(families[0]); 4734 Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty()); 4735 4736 verifyData(secondaryRegion, 0, 1000, cq, families); 4737 } finally { 4738 if (primaryRegion != null) { 4739 HBaseTestingUtility.closeRegionAndWAL(primaryRegion); 4740 } 4741 if (secondaryRegion != null) { 4742 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); 4743 } 4744 } 4745 } 4746 4747 private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws 4748 IOException { 4749 putData(this.region, startRow, numRows, qf, families); 4750 } 4751 4752 private void putData(HRegion region, 4753 int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { 4754 putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families); 4755 } 4756 4757 static void putData(HRegion region, Durability durability, 4758 int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { 4759 for (int i = startRow; i < startRow + numRows; i++) { 4760 Put put = new Put(Bytes.toBytes("" + i)); 4761 put.setDurability(durability); 4762 for (byte[] family : families) { 4763 put.addColumn(family, qf, null); 4764 } 4765 region.put(put); 4766 LOG.info(put.toString()); 4767 } 4768 } 4769 4770 static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families) 4771 throws IOException { 4772 for (int i = startRow; i < startRow + numRows; i++) { 4773 byte[] row = Bytes.toBytes("" + i); 4774 Get get = new Get(row); 4775 for (byte[] family : families) { 4776 get.addColumn(family, qf); 4777 } 4778 Result result = newReg.get(get); 4779 Cell[] raw = result.rawCells(); 4780 assertEquals(families.length, result.size()); 4781 for (int j = 0; j < families.length; j++) { 4782 assertTrue(CellUtil.matchingRows(raw[j], row)); 4783 assertTrue(CellUtil.matchingFamily(raw[j], families[j])); 4784 assertTrue(CellUtil.matchingQualifier(raw[j], qf)); 4785 } 4786 } 4787 } 4788 4789 static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException { 4790 // Now I have k, get values out and assert they are as expected. 4791 Get get = new Get(k).addFamily(family).setMaxVersions(); 4792 Cell[] results = r.get(get).rawCells(); 4793 for (int j = 0; j < results.length; j++) { 4794 byte[] tmp = CellUtil.cloneValue(results[j]); 4795 // Row should be equal to value every time. 4796 assertTrue(Bytes.equals(k, tmp)); 4797 } 4798 } 4799 4800 /* 4801 * Assert first value in the passed region is <code>firstValue</code>. 4802 * 4803 * @param r 4804 * 4805 * @param fs 4806 * 4807 * @param firstValue 4808 * 4809 * @throws IOException 4810 */ 4811 protected void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue) 4812 throws IOException { 4813 byte[][] families = { fs }; 4814 Scan scan = new Scan(); 4815 for (int i = 0; i < families.length; i++) 4816 scan.addFamily(families[i]); 4817 InternalScanner s = r.getScanner(scan); 4818 try { 4819 List<Cell> curVals = new ArrayList<>(); 4820 boolean first = true; 4821 OUTER_LOOP: while (s.next(curVals)) { 4822 for (Cell kv : curVals) { 4823 byte[] val = CellUtil.cloneValue(kv); 4824 byte[] curval = val; 4825 if (first) { 4826 first = false; 4827 assertTrue(Bytes.compareTo(curval, firstValue) == 0); 4828 } else { 4829 // Not asserting anything. Might as well break. 4830 break OUTER_LOOP; 4831 } 4832 } 4833 } 4834 } finally { 4835 s.close(); 4836 } 4837 } 4838 4839 /** 4840 * Test that we get the expected flush results back 4841 */ 4842 @Test 4843 public void testFlushResult() throws IOException { 4844 byte[] family = Bytes.toBytes("family"); 4845 4846 this.region = initHRegion(tableName, method, family); 4847 4848 // empty memstore, flush doesn't run 4849 HRegion.FlushResult fr = region.flush(true); 4850 assertFalse(fr.isFlushSucceeded()); 4851 assertFalse(fr.isCompactionNeeded()); 4852 4853 // Flush enough files to get up to the threshold, doesn't need compactions 4854 for (int i = 0; i < 2; i++) { 4855 Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes()); 4856 region.put(put); 4857 fr = region.flush(true); 4858 assertTrue(fr.isFlushSucceeded()); 4859 assertFalse(fr.isCompactionNeeded()); 4860 } 4861 4862 // Two flushes after the threshold, compactions are needed 4863 for (int i = 0; i < 2; i++) { 4864 Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes()); 4865 region.put(put); 4866 fr = region.flush(true); 4867 assertTrue(fr.isFlushSucceeded()); 4868 assertTrue(fr.isCompactionNeeded()); 4869 } 4870 } 4871 4872 protected Configuration initSplit() { 4873 // Always compact if there is more than one store file. 4874 CONF.setInt("hbase.hstore.compactionThreshold", 2); 4875 4876 CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000); 4877 4878 // Increase the amount of time between client retries 4879 CONF.setLong("hbase.client.pause", 15 * 1000); 4880 4881 // This size should make it so we always split using the addContent 4882 // below. After adding all data, the first region is 1.3M 4883 CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128); 4884 return CONF; 4885 } 4886 4887 /** 4888 * @return A region on which you must call 4889 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. 4890 */ 4891 protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf, 4892 byte[]... families) throws IOException { 4893 return initHRegion(tableName, callingMethod, conf, false, families); 4894 } 4895 4896 /** 4897 * @return A region on which you must call 4898 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. 4899 */ 4900 protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf, 4901 boolean isReadOnly, byte[]... families) throws IOException { 4902 return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families); 4903 } 4904 4905 protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 4906 String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families) 4907 throws IOException { 4908 Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log"); 4909 HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey); 4910 final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri); 4911 return initHRegion(tableName, startKey, stopKey, isReadOnly, 4912 Durability.SYNC_WAL, wal, families); 4913 } 4914 4915 /** 4916 * @return A region on which you must call 4917 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. 4918 */ 4919 public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 4920 boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException { 4921 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 4922 return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, 4923 isReadOnly, durability, wal, families); 4924 } 4925 4926 /** 4927 * Assert that the passed in Cell has expected contents for the specified row, 4928 * column & timestamp. 4929 */ 4930 private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) { 4931 String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts; 4932 assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx, 4933 Bytes.toString(CellUtil.cloneRow(kv))); 4934 assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf), 4935 Bytes.toString(CellUtil.cloneFamily(kv))); 4936 assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx, 4937 Bytes.toString(CellUtil.cloneQualifier(kv))); 4938 assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp()); 4939 assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts, 4940 Bytes.toString(CellUtil.cloneValue(kv))); 4941 } 4942 4943 @Test 4944 public void testReverseScanner_FromMemStore_SingleCF_Normal() 4945 throws IOException { 4946 byte[] rowC = Bytes.toBytes("rowC"); 4947 byte[] rowA = Bytes.toBytes("rowA"); 4948 byte[] rowB = Bytes.toBytes("rowB"); 4949 byte[] cf = Bytes.toBytes("CF"); 4950 byte[][] families = { cf }; 4951 byte[] col = Bytes.toBytes("C"); 4952 long ts = 1; 4953 this.region = initHRegion(tableName, method, families); 4954 KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); 4955 KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, 4956 null); 4957 KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); 4958 KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); 4959 Put put = null; 4960 put = new Put(rowC); 4961 put.add(kv1); 4962 put.add(kv11); 4963 region.put(put); 4964 put = new Put(rowA); 4965 put.add(kv2); 4966 region.put(put); 4967 put = new Put(rowB); 4968 put.add(kv3); 4969 region.put(put); 4970 4971 Scan scan = new Scan(rowC); 4972 scan.setMaxVersions(5); 4973 scan.setReversed(true); 4974 InternalScanner scanner = region.getScanner(scan); 4975 List<Cell> currRow = new ArrayList<>(); 4976 boolean hasNext = scanner.next(currRow); 4977 assertEquals(2, currRow.size()); 4978 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 4979 .get(0).getRowLength(), rowC, 0, rowC.length)); 4980 assertTrue(hasNext); 4981 currRow.clear(); 4982 hasNext = scanner.next(currRow); 4983 assertEquals(1, currRow.size()); 4984 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 4985 .get(0).getRowLength(), rowB, 0, rowB.length)); 4986 assertTrue(hasNext); 4987 currRow.clear(); 4988 hasNext = scanner.next(currRow); 4989 assertEquals(1, currRow.size()); 4990 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 4991 .get(0).getRowLength(), rowA, 0, rowA.length)); 4992 assertFalse(hasNext); 4993 scanner.close(); 4994 } 4995 4996 @Test 4997 public void testReverseScanner_FromMemStore_SingleCF_LargerKey() 4998 throws IOException { 4999 byte[] rowC = Bytes.toBytes("rowC"); 5000 byte[] rowA = Bytes.toBytes("rowA"); 5001 byte[] rowB = Bytes.toBytes("rowB"); 5002 byte[] rowD = Bytes.toBytes("rowD"); 5003 byte[] cf = Bytes.toBytes("CF"); 5004 byte[][] families = { cf }; 5005 byte[] col = Bytes.toBytes("C"); 5006 long ts = 1; 5007 this.region = initHRegion(tableName, method, families); 5008 KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); 5009 KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, 5010 null); 5011 KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); 5012 KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); 5013 Put put = null; 5014 put = new Put(rowC); 5015 put.add(kv1); 5016 put.add(kv11); 5017 region.put(put); 5018 put = new Put(rowA); 5019 put.add(kv2); 5020 region.put(put); 5021 put = new Put(rowB); 5022 put.add(kv3); 5023 region.put(put); 5024 5025 Scan scan = new Scan(rowD); 5026 List<Cell> currRow = new ArrayList<>(); 5027 scan.setReversed(true); 5028 scan.setMaxVersions(5); 5029 InternalScanner scanner = region.getScanner(scan); 5030 boolean hasNext = scanner.next(currRow); 5031 assertEquals(2, currRow.size()); 5032 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5033 .get(0).getRowLength(), rowC, 0, rowC.length)); 5034 assertTrue(hasNext); 5035 currRow.clear(); 5036 hasNext = scanner.next(currRow); 5037 assertEquals(1, currRow.size()); 5038 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5039 .get(0).getRowLength(), rowB, 0, rowB.length)); 5040 assertTrue(hasNext); 5041 currRow.clear(); 5042 hasNext = scanner.next(currRow); 5043 assertEquals(1, currRow.size()); 5044 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5045 .get(0).getRowLength(), rowA, 0, rowA.length)); 5046 assertFalse(hasNext); 5047 scanner.close(); 5048 } 5049 5050 @Test 5051 public void testReverseScanner_FromMemStore_SingleCF_FullScan() 5052 throws IOException { 5053 byte[] rowC = Bytes.toBytes("rowC"); 5054 byte[] rowA = Bytes.toBytes("rowA"); 5055 byte[] rowB = Bytes.toBytes("rowB"); 5056 byte[] cf = Bytes.toBytes("CF"); 5057 byte[][] families = { cf }; 5058 byte[] col = Bytes.toBytes("C"); 5059 long ts = 1; 5060 this.region = initHRegion(tableName, method, families); 5061 KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); 5062 KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, 5063 null); 5064 KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); 5065 KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); 5066 Put put = null; 5067 put = new Put(rowC); 5068 put.add(kv1); 5069 put.add(kv11); 5070 region.put(put); 5071 put = new Put(rowA); 5072 put.add(kv2); 5073 region.put(put); 5074 put = new Put(rowB); 5075 put.add(kv3); 5076 region.put(put); 5077 Scan scan = new Scan(); 5078 List<Cell> currRow = new ArrayList<>(); 5079 scan.setReversed(true); 5080 InternalScanner scanner = region.getScanner(scan); 5081 boolean hasNext = scanner.next(currRow); 5082 assertEquals(1, currRow.size()); 5083 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5084 .get(0).getRowLength(), rowC, 0, rowC.length)); 5085 assertTrue(hasNext); 5086 currRow.clear(); 5087 hasNext = scanner.next(currRow); 5088 assertEquals(1, currRow.size()); 5089 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5090 .get(0).getRowLength(), rowB, 0, rowB.length)); 5091 assertTrue(hasNext); 5092 currRow.clear(); 5093 hasNext = scanner.next(currRow); 5094 assertEquals(1, currRow.size()); 5095 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5096 .get(0).getRowLength(), rowA, 0, rowA.length)); 5097 assertFalse(hasNext); 5098 scanner.close(); 5099 } 5100 5101 @Test 5102 public void testReverseScanner_moreRowsMayExistAfter() throws IOException { 5103 // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop 5104 byte[] rowA = Bytes.toBytes("rowA"); 5105 byte[] rowB = Bytes.toBytes("rowB"); 5106 byte[] rowC = Bytes.toBytes("rowC"); 5107 byte[] rowD = Bytes.toBytes("rowD"); 5108 byte[] rowE = Bytes.toBytes("rowE"); 5109 byte[] cf = Bytes.toBytes("CF"); 5110 byte[][] families = { cf }; 5111 byte[] col1 = Bytes.toBytes("col1"); 5112 byte[] col2 = Bytes.toBytes("col2"); 5113 long ts = 1; 5114 this.region = initHRegion(tableName, method, families); 5115 KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); 5116 KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); 5117 KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); 5118 KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); 5119 KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); 5120 KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); 5121 Put put = null; 5122 put = new Put(rowA); 5123 put.add(kv1); 5124 region.put(put); 5125 put = new Put(rowB); 5126 put.add(kv2); 5127 region.put(put); 5128 put = new Put(rowC); 5129 put.add(kv3); 5130 region.put(put); 5131 put = new Put(rowD); 5132 put.add(kv4_1); 5133 region.put(put); 5134 put = new Put(rowD); 5135 put.add(kv4_2); 5136 region.put(put); 5137 put = new Put(rowE); 5138 put.add(kv5); 5139 region.put(put); 5140 region.flush(true); 5141 Scan scan = new Scan(rowD, rowA); 5142 scan.addColumn(families[0], col1); 5143 scan.setReversed(true); 5144 List<Cell> currRow = new ArrayList<>(); 5145 InternalScanner scanner = region.getScanner(scan); 5146 boolean hasNext = scanner.next(currRow); 5147 assertEquals(1, currRow.size()); 5148 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5149 .get(0).getRowLength(), rowD, 0, rowD.length)); 5150 assertTrue(hasNext); 5151 currRow.clear(); 5152 hasNext = scanner.next(currRow); 5153 assertEquals(1, currRow.size()); 5154 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5155 .get(0).getRowLength(), rowC, 0, rowC.length)); 5156 assertTrue(hasNext); 5157 currRow.clear(); 5158 hasNext = scanner.next(currRow); 5159 assertEquals(1, currRow.size()); 5160 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5161 .get(0).getRowLength(), rowB, 0, rowB.length)); 5162 assertFalse(hasNext); 5163 scanner.close(); 5164 5165 scan = new Scan(rowD, rowA); 5166 scan.addColumn(families[0], col2); 5167 scan.setReversed(true); 5168 currRow.clear(); 5169 scanner = region.getScanner(scan); 5170 hasNext = scanner.next(currRow); 5171 assertEquals(1, currRow.size()); 5172 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5173 .get(0).getRowLength(), rowD, 0, rowD.length)); 5174 scanner.close(); 5175 } 5176 5177 @Test 5178 public void testReverseScanner_smaller_blocksize() throws IOException { 5179 // case to ensure no conflict with HFile index optimization 5180 byte[] rowA = Bytes.toBytes("rowA"); 5181 byte[] rowB = Bytes.toBytes("rowB"); 5182 byte[] rowC = Bytes.toBytes("rowC"); 5183 byte[] rowD = Bytes.toBytes("rowD"); 5184 byte[] rowE = Bytes.toBytes("rowE"); 5185 byte[] cf = Bytes.toBytes("CF"); 5186 byte[][] families = { cf }; 5187 byte[] col1 = Bytes.toBytes("col1"); 5188 byte[] col2 = Bytes.toBytes("col2"); 5189 long ts = 1; 5190 HBaseConfiguration config = new HBaseConfiguration(); 5191 config.setInt("test.block.size", 1); 5192 this.region = initHRegion(tableName, method, config, families); 5193 KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); 5194 KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); 5195 KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); 5196 KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); 5197 KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); 5198 KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); 5199 Put put = null; 5200 put = new Put(rowA); 5201 put.add(kv1); 5202 region.put(put); 5203 put = new Put(rowB); 5204 put.add(kv2); 5205 region.put(put); 5206 put = new Put(rowC); 5207 put.add(kv3); 5208 region.put(put); 5209 put = new Put(rowD); 5210 put.add(kv4_1); 5211 region.put(put); 5212 put = new Put(rowD); 5213 put.add(kv4_2); 5214 region.put(put); 5215 put = new Put(rowE); 5216 put.add(kv5); 5217 region.put(put); 5218 region.flush(true); 5219 Scan scan = new Scan(rowD, rowA); 5220 scan.addColumn(families[0], col1); 5221 scan.setReversed(true); 5222 List<Cell> currRow = new ArrayList<>(); 5223 InternalScanner scanner = region.getScanner(scan); 5224 boolean hasNext = scanner.next(currRow); 5225 assertEquals(1, currRow.size()); 5226 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5227 .get(0).getRowLength(), rowD, 0, rowD.length)); 5228 assertTrue(hasNext); 5229 currRow.clear(); 5230 hasNext = scanner.next(currRow); 5231 assertEquals(1, currRow.size()); 5232 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5233 .get(0).getRowLength(), rowC, 0, rowC.length)); 5234 assertTrue(hasNext); 5235 currRow.clear(); 5236 hasNext = scanner.next(currRow); 5237 assertEquals(1, currRow.size()); 5238 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5239 .get(0).getRowLength(), rowB, 0, rowB.length)); 5240 assertFalse(hasNext); 5241 scanner.close(); 5242 5243 scan = new Scan(rowD, rowA); 5244 scan.addColumn(families[0], col2); 5245 scan.setReversed(true); 5246 currRow.clear(); 5247 scanner = region.getScanner(scan); 5248 hasNext = scanner.next(currRow); 5249 assertEquals(1, currRow.size()); 5250 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5251 .get(0).getRowLength(), rowD, 0, rowD.length)); 5252 scanner.close(); 5253 } 5254 5255 @Test 5256 public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1() 5257 throws IOException { 5258 byte[] row0 = Bytes.toBytes("row0"); // 1 kv 5259 byte[] row1 = Bytes.toBytes("row1"); // 2 kv 5260 byte[] row2 = Bytes.toBytes("row2"); // 4 kv 5261 byte[] row3 = Bytes.toBytes("row3"); // 2 kv 5262 byte[] row4 = Bytes.toBytes("row4"); // 5 kv 5263 byte[] row5 = Bytes.toBytes("row5"); // 2 kv 5264 byte[] cf1 = Bytes.toBytes("CF1"); 5265 byte[] cf2 = Bytes.toBytes("CF2"); 5266 byte[] cf3 = Bytes.toBytes("CF3"); 5267 byte[][] families = { cf1, cf2, cf3 }; 5268 byte[] col = Bytes.toBytes("C"); 5269 long ts = 1; 5270 HBaseConfiguration conf = new HBaseConfiguration(); 5271 // disable compactions in this test. 5272 conf.setInt("hbase.hstore.compactionThreshold", 10000); 5273 this.region = initHRegion(tableName, method, conf, families); 5274 // kv naming style: kv(row number) totalKvCountInThisRow seq no 5275 KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put, 5276 null); 5277 KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put, 5278 null); 5279 KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1, 5280 KeyValue.Type.Put, null); 5281 KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, 5282 null); 5283 KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put, 5284 null); 5285 KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put, 5286 null); 5287 KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4, 5288 KeyValue.Type.Put, null); 5289 KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put, 5290 null); 5291 KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4, 5292 KeyValue.Type.Put, null); 5293 KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put, 5294 null); 5295 KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put, 5296 null); 5297 KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5, 5298 KeyValue.Type.Put, null); 5299 KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put, 5300 null); 5301 KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3, 5302 KeyValue.Type.Put, null); 5303 KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put, 5304 null); 5305 KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put, 5306 null); 5307 // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv) 5308 Put put = null; 5309 put = new Put(row1); 5310 put.add(kv1_2_1); 5311 region.put(put); 5312 put = new Put(row2); 5313 put.add(kv2_4_1); 5314 region.put(put); 5315 put = new Put(row4); 5316 put.add(kv4_5_4); 5317 put.add(kv4_5_5); 5318 region.put(put); 5319 region.flush(true); 5320 // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv) 5321 put = new Put(row4); 5322 put.add(kv4_5_1); 5323 put.add(kv4_5_3); 5324 region.put(put); 5325 put = new Put(row1); 5326 put.add(kv1_2_2); 5327 region.put(put); 5328 put = new Put(row2); 5329 put.add(kv2_4_4); 5330 region.put(put); 5331 region.flush(true); 5332 // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv) 5333 put = new Put(row4); 5334 put.add(kv4_5_2); 5335 region.put(put); 5336 put = new Put(row2); 5337 put.add(kv2_4_2); 5338 put.add(kv2_4_3); 5339 region.put(put); 5340 put = new Put(row3); 5341 put.add(kv3_2_2); 5342 region.put(put); 5343 region.flush(true); 5344 // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max) 5345 // ( 2 kv) 5346 put = new Put(row0); 5347 put.add(kv0_1_1); 5348 region.put(put); 5349 put = new Put(row3); 5350 put.add(kv3_2_1); 5351 region.put(put); 5352 put = new Put(row5); 5353 put.add(kv5_2_1); 5354 put.add(kv5_2_2); 5355 region.put(put); 5356 // scan range = ["row4", min), skip the max "row5" 5357 Scan scan = new Scan(row4); 5358 scan.setMaxVersions(5); 5359 scan.setBatch(3); 5360 scan.setReversed(true); 5361 InternalScanner scanner = region.getScanner(scan); 5362 List<Cell> currRow = new ArrayList<>(); 5363 boolean hasNext = false; 5364 // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not 5365 // included in scan range 5366 // "row4" takes 2 next() calls since batch=3 5367 hasNext = scanner.next(currRow); 5368 assertEquals(3, currRow.size()); 5369 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5370 .get(0).getRowLength(), row4, 0, row4.length)); 5371 assertTrue(hasNext); 5372 currRow.clear(); 5373 hasNext = scanner.next(currRow); 5374 assertEquals(2, currRow.size()); 5375 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), 5376 currRow.get(0).getRowLength(), row4, 0, 5377 row4.length)); 5378 assertTrue(hasNext); 5379 // 2. scan out "row3" (2 kv) 5380 currRow.clear(); 5381 hasNext = scanner.next(currRow); 5382 assertEquals(2, currRow.size()); 5383 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5384 .get(0).getRowLength(), row3, 0, row3.length)); 5385 assertTrue(hasNext); 5386 // 3. scan out "row2" (4 kvs) 5387 // "row2" takes 2 next() calls since batch=3 5388 currRow.clear(); 5389 hasNext = scanner.next(currRow); 5390 assertEquals(3, currRow.size()); 5391 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5392 .get(0).getRowLength(), row2, 0, row2.length)); 5393 assertTrue(hasNext); 5394 currRow.clear(); 5395 hasNext = scanner.next(currRow); 5396 assertEquals(1, currRow.size()); 5397 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5398 .get(0).getRowLength(), row2, 0, row2.length)); 5399 assertTrue(hasNext); 5400 // 4. scan out "row1" (2 kv) 5401 currRow.clear(); 5402 hasNext = scanner.next(currRow); 5403 assertEquals(2, currRow.size()); 5404 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5405 .get(0).getRowLength(), row1, 0, row1.length)); 5406 assertTrue(hasNext); 5407 // 5. scan out "row0" (1 kv) 5408 currRow.clear(); 5409 hasNext = scanner.next(currRow); 5410 assertEquals(1, currRow.size()); 5411 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5412 .get(0).getRowLength(), row0, 0, row0.length)); 5413 assertFalse(hasNext); 5414 5415 scanner.close(); 5416 } 5417 5418 @Test 5419 public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2() 5420 throws IOException { 5421 byte[] row1 = Bytes.toBytes("row1"); 5422 byte[] row2 = Bytes.toBytes("row2"); 5423 byte[] row3 = Bytes.toBytes("row3"); 5424 byte[] row4 = Bytes.toBytes("row4"); 5425 byte[] cf1 = Bytes.toBytes("CF1"); 5426 byte[] cf2 = Bytes.toBytes("CF2"); 5427 byte[] cf3 = Bytes.toBytes("CF3"); 5428 byte[] cf4 = Bytes.toBytes("CF4"); 5429 byte[][] families = { cf1, cf2, cf3, cf4 }; 5430 byte[] col = Bytes.toBytes("C"); 5431 long ts = 1; 5432 HBaseConfiguration conf = new HBaseConfiguration(); 5433 // disable compactions in this test. 5434 conf.setInt("hbase.hstore.compactionThreshold", 10000); 5435 this.region = initHRegion(tableName, method, conf, families); 5436 KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null); 5437 KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null); 5438 KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null); 5439 KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null); 5440 // storefile1 5441 Put put = new Put(row1); 5442 put.add(kv1); 5443 region.put(put); 5444 region.flush(true); 5445 // storefile2 5446 put = new Put(row2); 5447 put.add(kv2); 5448 region.put(put); 5449 region.flush(true); 5450 // storefile3 5451 put = new Put(row3); 5452 put.add(kv3); 5453 region.put(put); 5454 region.flush(true); 5455 // memstore 5456 put = new Put(row4); 5457 put.add(kv4); 5458 region.put(put); 5459 // scan range = ["row4", min) 5460 Scan scan = new Scan(row4); 5461 scan.setReversed(true); 5462 scan.setBatch(10); 5463 InternalScanner scanner = region.getScanner(scan); 5464 List<Cell> currRow = new ArrayList<>(); 5465 boolean hasNext = scanner.next(currRow); 5466 assertEquals(1, currRow.size()); 5467 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5468 .get(0).getRowLength(), row4, 0, row4.length)); 5469 assertTrue(hasNext); 5470 currRow.clear(); 5471 hasNext = scanner.next(currRow); 5472 assertEquals(1, currRow.size()); 5473 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5474 .get(0).getRowLength(), row3, 0, row3.length)); 5475 assertTrue(hasNext); 5476 currRow.clear(); 5477 hasNext = scanner.next(currRow); 5478 assertEquals(1, currRow.size()); 5479 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5480 .get(0).getRowLength(), row2, 0, row2.length)); 5481 assertTrue(hasNext); 5482 currRow.clear(); 5483 hasNext = scanner.next(currRow); 5484 assertEquals(1, currRow.size()); 5485 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5486 .get(0).getRowLength(), row1, 0, row1.length)); 5487 assertFalse(hasNext); 5488 } 5489 5490 /** 5491 * Test for HBASE-14497: Reverse Scan threw StackOverflow caused by readPt checking 5492 */ 5493 @Test 5494 public void testReverseScanner_StackOverflow() throws IOException { 5495 byte[] cf1 = Bytes.toBytes("CF1"); 5496 byte[][] families = {cf1}; 5497 byte[] col = Bytes.toBytes("C"); 5498 HBaseConfiguration conf = new HBaseConfiguration(); 5499 this.region = initHRegion(tableName, method, conf, families); 5500 // setup with one storefile and one memstore, to create scanner and get an earlier readPt 5501 Put put = new Put(Bytes.toBytes("19998")); 5502 put.addColumn(cf1, col, Bytes.toBytes("val")); 5503 region.put(put); 5504 region.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 5505 Put put2 = new Put(Bytes.toBytes("19997")); 5506 put2.addColumn(cf1, col, Bytes.toBytes("val")); 5507 region.put(put2); 5508 5509 Scan scan = new Scan(Bytes.toBytes("19998")); 5510 scan.setReversed(true); 5511 InternalScanner scanner = region.getScanner(scan); 5512 5513 // create one storefile contains many rows will be skipped 5514 // to check StoreFileScanner.seekToPreviousRow 5515 for (int i = 10000; i < 20000; i++) { 5516 Put p = new Put(Bytes.toBytes(""+i)); 5517 p.addColumn(cf1, col, Bytes.toBytes("" + i)); 5518 region.put(p); 5519 } 5520 region.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 5521 5522 // create one memstore contains many rows will be skipped 5523 // to check MemStoreScanner.seekToPreviousRow 5524 for (int i = 10000; i < 20000; i++) { 5525 Put p = new Put(Bytes.toBytes(""+i)); 5526 p.addColumn(cf1, col, Bytes.toBytes("" + i)); 5527 region.put(p); 5528 } 5529 5530 List<Cell> currRow = new ArrayList<>(); 5531 boolean hasNext; 5532 do { 5533 hasNext = scanner.next(currRow); 5534 } while (hasNext); 5535 assertEquals(2, currRow.size()); 5536 assertEquals("19998", Bytes.toString(currRow.get(0).getRowArray(), 5537 currRow.get(0).getRowOffset(), currRow.get(0).getRowLength())); 5538 assertEquals("19997", Bytes.toString(currRow.get(1).getRowArray(), 5539 currRow.get(1).getRowOffset(), currRow.get(1).getRowLength())); 5540 } 5541 5542 @Test 5543 public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception { 5544 byte[] cf1 = Bytes.toBytes("CF1"); 5545 byte[][] families = { cf1 }; 5546 byte[] col = Bytes.toBytes("C"); 5547 HBaseConfiguration conf = new HBaseConfiguration(); 5548 this.region = initHRegion(tableName, method, conf, families); 5549 // setup with one storefile and one memstore, to create scanner and get an earlier readPt 5550 Put put = new Put(Bytes.toBytes("19996")); 5551 put.addColumn(cf1, col, Bytes.toBytes("val")); 5552 region.put(put); 5553 Put put2 = new Put(Bytes.toBytes("19995")); 5554 put2.addColumn(cf1, col, Bytes.toBytes("val")); 5555 region.put(put2); 5556 // create a reverse scan 5557 Scan scan = new Scan(Bytes.toBytes("19996")); 5558 scan.setReversed(true); 5559 RegionScannerImpl scanner = region.getScanner(scan); 5560 5561 // flush the cache. This will reset the store scanner 5562 region.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 5563 5564 // create one memstore contains many rows will be skipped 5565 // to check MemStoreScanner.seekToPreviousRow 5566 for (int i = 10000; i < 20000; i++) { 5567 Put p = new Put(Bytes.toBytes("" + i)); 5568 p.addColumn(cf1, col, Bytes.toBytes("" + i)); 5569 region.put(p); 5570 } 5571 List<Cell> currRow = new ArrayList<>(); 5572 boolean hasNext; 5573 boolean assertDone = false; 5574 do { 5575 hasNext = scanner.next(currRow); 5576 // With HBASE-15871, after the scanner is reset the memstore scanner should not be 5577 // added here 5578 if (!assertDone) { 5579 StoreScanner current = 5580 (StoreScanner) (scanner.storeHeap).getCurrentForTesting(); 5581 List<KeyValueScanner> scanners = current.getAllScannersForTesting(); 5582 assertEquals("There should be only one scanner the store file scanner", 1, 5583 scanners.size()); 5584 assertDone = true; 5585 } 5586 } while (hasNext); 5587 assertEquals(2, currRow.size()); 5588 assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(), 5589 currRow.get(0).getRowOffset(), currRow.get(0).getRowLength())); 5590 assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(), 5591 currRow.get(1).getRowOffset(), currRow.get(1).getRowLength())); 5592 } 5593 5594 @Test 5595 public void testReverseScanWhenPutCellsAfterOpenReverseScan() throws Exception { 5596 byte[] cf1 = Bytes.toBytes("CF1"); 5597 byte[][] families = { cf1 }; 5598 byte[] col = Bytes.toBytes("C"); 5599 5600 HBaseConfiguration conf = new HBaseConfiguration(); 5601 this.region = initHRegion(tableName, method, conf, families); 5602 5603 Put put = new Put(Bytes.toBytes("199996")); 5604 put.addColumn(cf1, col, Bytes.toBytes("val")); 5605 region.put(put); 5606 Put put2 = new Put(Bytes.toBytes("199995")); 5607 put2.addColumn(cf1, col, Bytes.toBytes("val")); 5608 region.put(put2); 5609 5610 // Create a reverse scan 5611 Scan scan = new Scan(Bytes.toBytes("199996")); 5612 scan.setReversed(true); 5613 RegionScannerImpl scanner = region.getScanner(scan); 5614 5615 // Put a lot of cells that have sequenceIDs grater than the readPt of the reverse scan 5616 for (int i = 100000; i < 200000; i++) { 5617 Put p = new Put(Bytes.toBytes("" + i)); 5618 p.addColumn(cf1, col, Bytes.toBytes("" + i)); 5619 region.put(p); 5620 } 5621 List<Cell> currRow = new ArrayList<>(); 5622 boolean hasNext; 5623 do { 5624 hasNext = scanner.next(currRow); 5625 } while (hasNext); 5626 5627 assertEquals(2, currRow.size()); 5628 assertEquals("199996", Bytes.toString(currRow.get(0).getRowArray(), 5629 currRow.get(0).getRowOffset(), currRow.get(0).getRowLength())); 5630 assertEquals("199995", Bytes.toString(currRow.get(1).getRowArray(), 5631 currRow.get(1).getRowOffset(), currRow.get(1).getRowLength())); 5632 } 5633 5634 @Test 5635 public void testWriteRequestsCounter() throws IOException { 5636 byte[] fam = Bytes.toBytes("info"); 5637 byte[][] families = { fam }; 5638 this.region = initHRegion(tableName, method, CONF, families); 5639 5640 Assert.assertEquals(0L, region.getWriteRequestsCount()); 5641 5642 Put put = new Put(row); 5643 put.addColumn(fam, fam, fam); 5644 5645 Assert.assertEquals(0L, region.getWriteRequestsCount()); 5646 region.put(put); 5647 Assert.assertEquals(1L, region.getWriteRequestsCount()); 5648 region.put(put); 5649 Assert.assertEquals(2L, region.getWriteRequestsCount()); 5650 region.put(put); 5651 Assert.assertEquals(3L, region.getWriteRequestsCount()); 5652 5653 region.delete(new Delete(row)); 5654 Assert.assertEquals(4L, region.getWriteRequestsCount()); 5655 } 5656 5657 @Test 5658 public void testOpenRegionWrittenToWAL() throws Exception { 5659 final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42); 5660 final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); 5661 5662 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 5663 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1)) 5664 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build(); 5665 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 5666 5667 // open the region w/o rss and wal and flush some files 5668 region = 5669 HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL 5670 .getConfiguration(), htd); 5671 assertNotNull(region); 5672 5673 // create a file in fam1 for the region before opening in OpenRegionHandler 5674 region.put(new Put(Bytes.toBytes("a")).addColumn(fam1, fam1, fam1)); 5675 region.flush(true); 5676 HBaseTestingUtility.closeRegionAndWAL(region); 5677 5678 ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class); 5679 5680 // capture append() calls 5681 WAL wal = mockWAL(); 5682 when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal); 5683 5684 region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), 5685 TEST_UTIL.getConfiguration(), rss, null); 5686 5687 verify(wal, times(1)).appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), 5688 editCaptor.capture()); 5689 5690 WALEdit edit = editCaptor.getValue(); 5691 assertNotNull(edit); 5692 assertNotNull(edit.getCells()); 5693 assertEquals(1, edit.getCells().size()); 5694 RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0)); 5695 assertNotNull(desc); 5696 5697 LOG.info("RegionEventDescriptor from WAL: " + desc); 5698 5699 assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType()); 5700 assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes())); 5701 assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(), 5702 hri.getEncodedNameAsBytes())); 5703 assertTrue(desc.getLogSequenceNumber() > 0); 5704 assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer())); 5705 assertEquals(2, desc.getStoresCount()); 5706 5707 StoreDescriptor store = desc.getStores(0); 5708 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1)); 5709 assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1)); 5710 assertEquals(1, store.getStoreFileCount()); // 1store file 5711 assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative 5712 5713 store = desc.getStores(1); 5714 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2)); 5715 assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2)); 5716 assertEquals(0, store.getStoreFileCount()); // no store files 5717 } 5718 5719 // Helper for test testOpenRegionWrittenToWALForLogReplay 5720 static class HRegionWithSeqId extends HRegion { 5721 public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs, 5722 final Configuration confParam, final RegionInfo regionInfo, 5723 final TableDescriptor htd, final RegionServerServices rsServices) { 5724 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 5725 } 5726 @Override 5727 protected long getNextSequenceId(WAL wal) throws IOException { 5728 return 42; 5729 } 5730 } 5731 5732 @Test 5733 public void testFlushedFileWithNoTags() throws Exception { 5734 final TableName tableName = TableName.valueOf(name.getMethodName()); 5735 HTableDescriptor htd = new HTableDescriptor(tableName); 5736 htd.addFamily(new HColumnDescriptor(fam1)); 5737 HRegionInfo info = new HRegionInfo(tableName, null, null, false); 5738 Path path = TEST_UTIL.getDataTestDir(getClass().getSimpleName()); 5739 region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd); 5740 Put put = new Put(Bytes.toBytes("a-b-0-0")); 5741 put.addColumn(fam1, qual1, Bytes.toBytes("c1-value")); 5742 region.put(put); 5743 region.flush(true); 5744 HStore store = region.getStore(fam1); 5745 Collection<HStoreFile> storefiles = store.getStorefiles(); 5746 for (HStoreFile sf : storefiles) { 5747 assertFalse("Tags should not be present " 5748 ,sf.getReader().getHFileReader().getFileContext().isIncludesTags()); 5749 } 5750 } 5751 5752 /** 5753 * Utility method to setup a WAL mock. 5754 * <p/> 5755 * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs. 5756 * @return a mock WAL 5757 */ 5758 private WAL mockWAL() throws IOException { 5759 WAL wal = mock(WAL.class); 5760 when(wal.appendData(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class))) 5761 .thenAnswer(new Answer<Long>() { 5762 @Override 5763 public Long answer(InvocationOnMock invocation) throws Throwable { 5764 WALKeyImpl key = invocation.getArgument(1); 5765 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); 5766 key.setWriteEntry(we); 5767 return 1L; 5768 } 5769 }); 5770 when(wal.appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class))). 5771 thenAnswer(new Answer<Long>() { 5772 @Override 5773 public Long answer(InvocationOnMock invocation) throws Throwable { 5774 WALKeyImpl key = invocation.getArgument(1); 5775 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); 5776 key.setWriteEntry(we); 5777 return 1L; 5778 } 5779 }); 5780 return wal; 5781 } 5782 5783 @Test 5784 public void testCloseRegionWrittenToWAL() throws Exception { 5785 Path rootDir = new Path(dir + name.getMethodName()); 5786 FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); 5787 5788 final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42); 5789 final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); 5790 5791 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 5792 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1)) 5793 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build(); 5794 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 5795 5796 ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class); 5797 5798 // capture append() calls 5799 WAL wal = mockWAL(); 5800 when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal); 5801 5802 5803 // create and then open a region first so that it can be closed later 5804 region = HRegion.createHRegion(hri, rootDir, TEST_UTIL.getConfiguration(), htd, rss.getWAL(hri)); 5805 region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), 5806 TEST_UTIL.getConfiguration(), rss, null); 5807 5808 // close the region 5809 region.close(false); 5810 5811 // 2 times, one for region open, the other close region 5812 verify(wal, times(2)).appendMarker(any(RegionInfo.class), 5813 (WALKeyImpl) any(WALKeyImpl.class), editCaptor.capture()); 5814 5815 WALEdit edit = editCaptor.getAllValues().get(1); 5816 assertNotNull(edit); 5817 assertNotNull(edit.getCells()); 5818 assertEquals(1, edit.getCells().size()); 5819 RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0)); 5820 assertNotNull(desc); 5821 5822 LOG.info("RegionEventDescriptor from WAL: " + desc); 5823 5824 assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType()); 5825 assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes())); 5826 assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(), 5827 hri.getEncodedNameAsBytes())); 5828 assertTrue(desc.getLogSequenceNumber() > 0); 5829 assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer())); 5830 assertEquals(2, desc.getStoresCount()); 5831 5832 StoreDescriptor store = desc.getStores(0); 5833 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1)); 5834 assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1)); 5835 assertEquals(0, store.getStoreFileCount()); // no store files 5836 5837 store = desc.getStores(1); 5838 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2)); 5839 assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2)); 5840 assertEquals(0, store.getStoreFileCount()); // no store files 5841 } 5842 5843 /** 5844 * Test RegionTooBusyException thrown when region is busy 5845 */ 5846 @Test 5847 public void testRegionTooBusy() throws IOException { 5848 byte[] family = Bytes.toBytes("family"); 5849 long defaultBusyWaitDuration = CONF.getLong("hbase.busy.wait.duration", 5850 HRegion.DEFAULT_BUSY_WAIT_DURATION); 5851 CONF.setLong("hbase.busy.wait.duration", 1000); 5852 region = initHRegion(tableName, method, CONF, family); 5853 final AtomicBoolean stopped = new AtomicBoolean(true); 5854 Thread t = new Thread(new Runnable() { 5855 @Override 5856 public void run() { 5857 try { 5858 region.lock.writeLock().lock(); 5859 stopped.set(false); 5860 while (!stopped.get()) { 5861 Thread.sleep(100); 5862 } 5863 } catch (InterruptedException ie) { 5864 } finally { 5865 region.lock.writeLock().unlock(); 5866 } 5867 } 5868 }); 5869 t.start(); 5870 Get get = new Get(row); 5871 try { 5872 while (stopped.get()) { 5873 Thread.sleep(100); 5874 } 5875 region.get(get); 5876 fail("Should throw RegionTooBusyException"); 5877 } catch (InterruptedException ie) { 5878 fail("test interrupted"); 5879 } catch (RegionTooBusyException e) { 5880 // Good, expected 5881 } finally { 5882 stopped.set(true); 5883 try { 5884 t.join(); 5885 } catch (Throwable e) { 5886 } 5887 5888 HBaseTestingUtility.closeRegionAndWAL(region); 5889 region = null; 5890 CONF.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration); 5891 } 5892 } 5893 5894 @Test 5895 public void testCellTTLs() throws IOException { 5896 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 5897 EnvironmentEdgeManager.injectEdge(edge); 5898 5899 final byte[] row = Bytes.toBytes("testRow"); 5900 final byte[] q1 = Bytes.toBytes("q1"); 5901 final byte[] q2 = Bytes.toBytes("q2"); 5902 final byte[] q3 = Bytes.toBytes("q3"); 5903 final byte[] q4 = Bytes.toBytes("q4"); 5904 5905 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 5906 HColumnDescriptor hcd = new HColumnDescriptor(fam1); 5907 hcd.setTimeToLive(10); // 10 seconds 5908 htd.addFamily(hcd); 5909 5910 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 5911 conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 5912 5913 region = HBaseTestingUtility.createRegionAndWAL(new HRegionInfo(htd.getTableName(), 5914 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY), 5915 TEST_UTIL.getDataTestDir(), conf, htd); 5916 assertNotNull(region); 5917 long now = EnvironmentEdgeManager.currentTime(); 5918 // Add a cell that will expire in 5 seconds via cell TTL 5919 region.put(new Put(row).add(new KeyValue(row, fam1, q1, now, 5920 HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] { 5921 // TTL tags specify ts in milliseconds 5922 new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) }))); 5923 // Add a cell that will expire after 10 seconds via family setting 5924 region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY)); 5925 // Add a cell that will expire in 15 seconds via cell TTL 5926 region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1, 5927 HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] { 5928 // TTL tags specify ts in milliseconds 5929 new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) }))); 5930 // Add a cell that will expire in 20 seconds via family setting 5931 region.put(new Put(row).addColumn(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY)); 5932 5933 // Flush so we are sure store scanning gets this right 5934 region.flush(true); 5935 5936 // A query at time T+0 should return all cells 5937 Result r = region.get(new Get(row)); 5938 assertNotNull(r.getValue(fam1, q1)); 5939 assertNotNull(r.getValue(fam1, q2)); 5940 assertNotNull(r.getValue(fam1, q3)); 5941 assertNotNull(r.getValue(fam1, q4)); 5942 5943 // Increment time to T+5 seconds 5944 edge.incrementTime(5000); 5945 5946 r = region.get(new Get(row)); 5947 assertNull(r.getValue(fam1, q1)); 5948 assertNotNull(r.getValue(fam1, q2)); 5949 assertNotNull(r.getValue(fam1, q3)); 5950 assertNotNull(r.getValue(fam1, q4)); 5951 5952 // Increment time to T+10 seconds 5953 edge.incrementTime(5000); 5954 5955 r = region.get(new Get(row)); 5956 assertNull(r.getValue(fam1, q1)); 5957 assertNull(r.getValue(fam1, q2)); 5958 assertNotNull(r.getValue(fam1, q3)); 5959 assertNotNull(r.getValue(fam1, q4)); 5960 5961 // Increment time to T+15 seconds 5962 edge.incrementTime(5000); 5963 5964 r = region.get(new Get(row)); 5965 assertNull(r.getValue(fam1, q1)); 5966 assertNull(r.getValue(fam1, q2)); 5967 assertNull(r.getValue(fam1, q3)); 5968 assertNotNull(r.getValue(fam1, q4)); 5969 5970 // Increment time to T+20 seconds 5971 edge.incrementTime(10000); 5972 5973 r = region.get(new Get(row)); 5974 assertNull(r.getValue(fam1, q1)); 5975 assertNull(r.getValue(fam1, q2)); 5976 assertNull(r.getValue(fam1, q3)); 5977 assertNull(r.getValue(fam1, q4)); 5978 5979 // Fun with disappearing increments 5980 5981 // Start at 1 5982 region.put(new Put(row).addColumn(fam1, q1, Bytes.toBytes(1L))); 5983 r = region.get(new Get(row)); 5984 byte[] val = r.getValue(fam1, q1); 5985 assertNotNull(val); 5986 assertEquals(1L, Bytes.toLong(val)); 5987 5988 // Increment with a TTL of 5 seconds 5989 Increment incr = new Increment(row).addColumn(fam1, q1, 1L); 5990 incr.setTTL(5000); 5991 region.increment(incr); // 2 5992 5993 // New value should be 2 5994 r = region.get(new Get(row)); 5995 val = r.getValue(fam1, q1); 5996 assertNotNull(val); 5997 assertEquals(2L, Bytes.toLong(val)); 5998 5999 // Increment time to T+25 seconds 6000 edge.incrementTime(5000); 6001 6002 // Value should be back to 1 6003 r = region.get(new Get(row)); 6004 val = r.getValue(fam1, q1); 6005 assertNotNull(val); 6006 assertEquals(1L, Bytes.toLong(val)); 6007 6008 // Increment time to T+30 seconds 6009 edge.incrementTime(5000); 6010 6011 // Original value written at T+20 should be gone now via family TTL 6012 r = region.get(new Get(row)); 6013 assertNull(r.getValue(fam1, q1)); 6014 } 6015 6016 @Test 6017 public void testIncrementTimestampsAreMonotonic() throws IOException { 6018 region = initHRegion(tableName, method, CONF, fam1); 6019 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 6020 EnvironmentEdgeManager.injectEdge(edge); 6021 6022 edge.setValue(10); 6023 Increment inc = new Increment(row); 6024 inc.setDurability(Durability.SKIP_WAL); 6025 inc.addColumn(fam1, qual1, 1L); 6026 region.increment(inc); 6027 6028 Result result = region.get(new Get(row)); 6029 Cell c = result.getColumnLatestCell(fam1, qual1); 6030 assertNotNull(c); 6031 assertEquals(10L, c.getTimestamp()); 6032 6033 edge.setValue(1); // clock goes back 6034 region.increment(inc); 6035 result = region.get(new Get(row)); 6036 c = result.getColumnLatestCell(fam1, qual1); 6037 assertEquals(11L, c.getTimestamp()); 6038 assertEquals(2L, Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength())); 6039 } 6040 6041 @Test 6042 public void testAppendTimestampsAreMonotonic() throws IOException { 6043 region = initHRegion(tableName, method, CONF, fam1); 6044 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 6045 EnvironmentEdgeManager.injectEdge(edge); 6046 6047 edge.setValue(10); 6048 Append a = new Append(row); 6049 a.setDurability(Durability.SKIP_WAL); 6050 a.addColumn(fam1, qual1, qual1); 6051 region.append(a); 6052 6053 Result result = region.get(new Get(row)); 6054 Cell c = result.getColumnLatestCell(fam1, qual1); 6055 assertNotNull(c); 6056 assertEquals(10L, c.getTimestamp()); 6057 6058 edge.setValue(1); // clock goes back 6059 region.append(a); 6060 result = region.get(new Get(row)); 6061 c = result.getColumnLatestCell(fam1, qual1); 6062 assertEquals(11L, c.getTimestamp()); 6063 6064 byte[] expected = new byte[qual1.length*2]; 6065 System.arraycopy(qual1, 0, expected, 0, qual1.length); 6066 System.arraycopy(qual1, 0, expected, qual1.length, qual1.length); 6067 6068 assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(), 6069 expected, 0, expected.length)); 6070 } 6071 6072 @Test 6073 public void testCheckAndMutateTimestampsAreMonotonic() throws IOException { 6074 region = initHRegion(tableName, method, CONF, fam1); 6075 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 6076 EnvironmentEdgeManager.injectEdge(edge); 6077 6078 edge.setValue(10); 6079 Put p = new Put(row); 6080 p.setDurability(Durability.SKIP_WAL); 6081 p.addColumn(fam1, qual1, qual1); 6082 region.put(p); 6083 6084 Result result = region.get(new Get(row)); 6085 Cell c = result.getColumnLatestCell(fam1, qual1); 6086 assertNotNull(c); 6087 assertEquals(10L, c.getTimestamp()); 6088 6089 edge.setValue(1); // clock goes back 6090 p = new Put(row); 6091 p.setDurability(Durability.SKIP_WAL); 6092 p.addColumn(fam1, qual1, qual2); 6093 region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p); 6094 result = region.get(new Get(row)); 6095 c = result.getColumnLatestCell(fam1, qual1); 6096 assertEquals(10L, c.getTimestamp()); 6097 6098 assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(), 6099 qual2, 0, qual2.length)); 6100 } 6101 6102 @Test 6103 public void testBatchMutateWithWrongRegionException() throws Exception { 6104 final byte[] a = Bytes.toBytes("a"); 6105 final byte[] b = Bytes.toBytes("b"); 6106 final byte[] c = Bytes.toBytes("c"); // exclusive 6107 6108 int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000); 6109 CONF.setInt("hbase.rowlock.wait.duration", 1000); 6110 region = initHRegion(tableName, a, c, method, CONF, false, fam1); 6111 6112 Mutation[] mutations = new Mutation[] { 6113 new Put(a) 6114 .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6115 .setRow(a) 6116 .setFamily(fam1) 6117 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6118 .setType(Cell.Type.Put) 6119 .build()), 6120 // this is outside the region boundary 6121 new Put(c).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6122 .setRow(c) 6123 .setFamily(fam1) 6124 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6125 .setType(Type.Put) 6126 .build()), 6127 new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6128 .setRow(b) 6129 .setFamily(fam1) 6130 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6131 .setType(Cell.Type.Put) 6132 .build()) 6133 }; 6134 6135 OperationStatus[] status = region.batchMutate(mutations); 6136 assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode()); 6137 assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, status[1].getOperationStatusCode()); 6138 assertEquals(OperationStatusCode.SUCCESS, status[2].getOperationStatusCode()); 6139 6140 6141 // test with a row lock held for a long time 6142 final CountDownLatch obtainedRowLock = new CountDownLatch(1); 6143 ExecutorService exec = Executors.newFixedThreadPool(2); 6144 Future<Void> f1 = exec.submit(new Callable<Void>() { 6145 @Override 6146 public Void call() throws Exception { 6147 LOG.info("Acquiring row lock"); 6148 RowLock rl = region.getRowLock(b); 6149 obtainedRowLock.countDown(); 6150 LOG.info("Waiting for 5 seconds before releasing lock"); 6151 Threads.sleep(5000); 6152 LOG.info("Releasing row lock"); 6153 rl.release(); 6154 return null; 6155 } 6156 }); 6157 obtainedRowLock.await(30, TimeUnit.SECONDS); 6158 6159 Future<Void> f2 = exec.submit(new Callable<Void>() { 6160 @Override 6161 public Void call() throws Exception { 6162 Mutation[] mutations = new Mutation[] { 6163 new Put(a).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6164 .setRow(a) 6165 .setFamily(fam1) 6166 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6167 .setType(Cell.Type.Put) 6168 .build()), 6169 new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6170 .setRow(b) 6171 .setFamily(fam1) 6172 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6173 .setType(Cell.Type.Put) 6174 .build()), 6175 }; 6176 6177 // this will wait for the row lock, and it will eventually succeed 6178 OperationStatus[] status = region.batchMutate(mutations); 6179 assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode()); 6180 assertEquals(OperationStatusCode.SUCCESS, status[1].getOperationStatusCode()); 6181 return null; 6182 } 6183 }); 6184 6185 f1.get(); 6186 f2.get(); 6187 6188 CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout); 6189 } 6190 6191 @Test 6192 public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException { 6193 region = initHRegion(tableName, method, CONF, fam1); 6194 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 6195 EnvironmentEdgeManager.injectEdge(edge); 6196 6197 edge.setValue(10); 6198 Put p = new Put(row); 6199 p.setDurability(Durability.SKIP_WAL); 6200 p.addColumn(fam1, qual1, qual1); 6201 region.put(p); 6202 6203 Result result = region.get(new Get(row)); 6204 Cell c = result.getColumnLatestCell(fam1, qual1); 6205 assertNotNull(c); 6206 assertEquals(10L, c.getTimestamp()); 6207 6208 edge.setValue(1); // clock goes back 6209 p = new Put(row); 6210 p.setDurability(Durability.SKIP_WAL); 6211 p.addColumn(fam1, qual1, qual2); 6212 RowMutations rm = new RowMutations(row); 6213 rm.add(p); 6214 assertTrue(region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL, 6215 new BinaryComparator(qual1), rm)); 6216 result = region.get(new Get(row)); 6217 c = result.getColumnLatestCell(fam1, qual1); 6218 assertEquals(10L, c.getTimestamp()); 6219 LOG.info("c value " + 6220 Bytes.toStringBinary(c.getValueArray(), c.getValueOffset(), c.getValueLength())); 6221 6222 assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(), 6223 qual2, 0, qual2.length)); 6224 } 6225 6226 HRegion initHRegion(TableName tableName, String callingMethod, 6227 byte[]... families) throws IOException { 6228 return initHRegion(tableName, callingMethod, HBaseConfiguration.create(), 6229 families); 6230 } 6231 6232 /** 6233 * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends 6234 * @throws IOException if IO error occurred during test 6235 */ 6236 @Test 6237 public void testWritesWhileRollWriter() throws IOException { 6238 int testCount = 10; 6239 int numRows = 1024; 6240 int numFamilies = 2; 6241 int numQualifiers = 2; 6242 final byte[][] families = new byte[numFamilies][]; 6243 for (int i = 0; i < numFamilies; i++) { 6244 families[i] = Bytes.toBytes("family" + i); 6245 } 6246 final byte[][] qualifiers = new byte[numQualifiers][]; 6247 for (int i = 0; i < numQualifiers; i++) { 6248 qualifiers[i] = Bytes.toBytes("qual" + i); 6249 } 6250 6251 CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2); 6252 this.region = initHRegion(tableName, method, CONF, families); 6253 try { 6254 List<Thread> threads = new ArrayList<>(); 6255 for (int i = 0; i < numRows; i++) { 6256 final int count = i; 6257 Thread t = new Thread(new Runnable() { 6258 6259 @Override 6260 public void run() { 6261 byte[] row = Bytes.toBytes("row" + count); 6262 Put put = new Put(row); 6263 put.setDurability(Durability.SYNC_WAL); 6264 byte[] value = Bytes.toBytes(String.valueOf(count)); 6265 for (byte[] family : families) { 6266 for (byte[] qualifier : qualifiers) { 6267 put.addColumn(family, qualifier, count, value); 6268 } 6269 } 6270 try { 6271 region.put(put); 6272 } catch (IOException e) { 6273 throw new RuntimeException(e); 6274 } 6275 } 6276 }); 6277 threads.add(t); 6278 } 6279 for (Thread t : threads) { 6280 t.start(); 6281 } 6282 6283 for (int i = 0; i < testCount; i++) { 6284 region.getWAL().rollWriter(); 6285 Thread.yield(); 6286 } 6287 } finally { 6288 try { 6289 HBaseTestingUtility.closeRegionAndWAL(this.region); 6290 CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024); 6291 } catch (DroppedSnapshotException dse) { 6292 // We could get this on way out because we interrupt the background flusher and it could 6293 // fail anywhere causing a DSE over in the background flusher... only it is not properly 6294 // dealt with so could still be memory hanging out when we get to here -- memory we can't 6295 // flush because the accounting is 'off' since original DSE. 6296 } 6297 this.region = null; 6298 } 6299 } 6300 6301 @Test 6302 public void testMutateRow_WriteRequestCount() throws Exception { 6303 byte[] row1 = Bytes.toBytes("row1"); 6304 byte[] fam1 = Bytes.toBytes("fam1"); 6305 byte[] qf1 = Bytes.toBytes("qualifier"); 6306 byte[] val1 = Bytes.toBytes("value1"); 6307 6308 RowMutations rm = new RowMutations(row1); 6309 Put put = new Put(row1); 6310 put.addColumn(fam1, qf1, val1); 6311 rm.add(put); 6312 6313 this.region = initHRegion(tableName, method, CONF, fam1); 6314 long wrcBeforeMutate = this.region.writeRequestsCount.longValue(); 6315 this.region.mutateRow(rm); 6316 long wrcAfterMutate = this.region.writeRequestsCount.longValue(); 6317 Assert.assertEquals(wrcBeforeMutate + rm.getMutations().size(), wrcAfterMutate); 6318 } 6319 6320 @Test 6321 public void testBulkLoadReplicationEnabled() throws IOException { 6322 TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 6323 final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42); 6324 final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); 6325 6326 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 6327 htd.addFamily(new HColumnDescriptor(fam1)); 6328 HRegionInfo hri = new HRegionInfo(htd.getTableName(), 6329 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY); 6330 region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(), 6331 rss, null); 6332 6333 assertTrue(region.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false)); 6334 String plugins = region.conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 6335 String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName(); 6336 assertTrue(plugins.contains(replicationCoprocessorClass)); 6337 assertTrue(region.getCoprocessorHost(). 6338 getCoprocessors().contains(ReplicationObserver.class.getSimpleName())); 6339 } 6340 6341 /** 6342 * The same as HRegion class, the only difference is that instantiateHStore will 6343 * create a different HStore - HStoreForTesting. [HBASE-8518] 6344 */ 6345 public static class HRegionForTesting extends HRegion { 6346 6347 public HRegionForTesting(final Path tableDir, final WAL wal, final FileSystem fs, 6348 final Configuration confParam, final RegionInfo regionInfo, 6349 final TableDescriptor htd, final RegionServerServices rsServices) { 6350 this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo), 6351 wal, confParam, htd, rsServices); 6352 } 6353 6354 public HRegionForTesting(HRegionFileSystem fs, WAL wal, 6355 Configuration confParam, TableDescriptor htd, 6356 RegionServerServices rsServices) { 6357 super(fs, wal, confParam, htd, rsServices); 6358 } 6359 6360 /** 6361 * Create HStore instance. 6362 * @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting. 6363 */ 6364 @Override 6365 protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup) 6366 throws IOException { 6367 if (family.isMobEnabled()) { 6368 if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) { 6369 throw new IOException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS + 6370 " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY + 6371 " accordingly."); 6372 } 6373 return new HMobStore(this, family, this.conf, warmup); 6374 } 6375 return new HStoreForTesting(this, family, this.conf, warmup); 6376 } 6377 } 6378 6379 /** 6380 * HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method 6381 * of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which 6382 * doesn't let hstore compaction complete. In the former edition, this config is set in 6383 * HStore class inside compact method, though this is just for testing, otherwise it 6384 * doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete" 6385 * config (except for testing code). 6386 */ 6387 public static class HStoreForTesting extends HStore { 6388 6389 protected HStoreForTesting(final HRegion region, 6390 final ColumnFamilyDescriptor family, 6391 final Configuration confParam, boolean warmup) throws IOException { 6392 super(region, family, confParam, warmup); 6393 } 6394 6395 @Override 6396 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 6397 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, 6398 List<Path> newFiles) throws IOException { 6399 // let compaction incomplete. 6400 if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { 6401 LOG.warn("hbase.hstore.compaction.complete is set to false"); 6402 List<HStoreFile> sfs = new ArrayList<>(newFiles.size()); 6403 final boolean evictOnClose = 6404 cacheConf != null? cacheConf.shouldEvictOnClose(): true; 6405 for (Path newFile : newFiles) { 6406 // Create storefile around what we wrote with a reader on it. 6407 HStoreFile sf = createStoreFileAndReader(newFile); 6408 sf.closeStoreFile(evictOnClose); 6409 sfs.add(sf); 6410 } 6411 return sfs; 6412 } 6413 return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles); 6414 } 6415 } 6416}