001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; 023import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; 024import static org.junit.Assert.assertArrayEquals; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertFalse; 027import static org.junit.Assert.assertNotNull; 028import static org.junit.Assert.assertNull; 029import static org.junit.Assert.assertTrue; 030import static org.junit.Assert.fail; 031import static org.mockito.ArgumentMatchers.any; 032import static org.mockito.ArgumentMatchers.anyLong; 033import static org.mockito.Mockito.doThrow; 034import static org.mockito.Mockito.mock; 035import static org.mockito.Mockito.never; 036import static org.mockito.Mockito.spy; 037import static org.mockito.Mockito.times; 038import static org.mockito.Mockito.verify; 039import static org.mockito.Mockito.when; 040 041import java.io.IOException; 042import java.io.InterruptedIOException; 043import java.math.BigDecimal; 044import java.nio.charset.StandardCharsets; 045import java.security.PrivilegedExceptionAction; 046import java.util.ArrayList; 047import java.util.Arrays; 048import java.util.Collection; 049import java.util.List; 050import java.util.Map; 051import java.util.NavigableMap; 052import java.util.Objects; 053import java.util.TreeMap; 054import java.util.concurrent.Callable; 055import java.util.concurrent.CountDownLatch; 056import java.util.concurrent.ExecutorService; 057import java.util.concurrent.Executors; 058import java.util.concurrent.Future; 059import java.util.concurrent.TimeUnit; 060import java.util.concurrent.atomic.AtomicBoolean; 061import java.util.concurrent.atomic.AtomicInteger; 062import java.util.concurrent.atomic.AtomicReference; 063import org.apache.commons.lang3.RandomStringUtils; 064import org.apache.hadoop.conf.Configuration; 065import org.apache.hadoop.fs.FSDataOutputStream; 066import org.apache.hadoop.fs.FileStatus; 067import org.apache.hadoop.fs.FileSystem; 068import org.apache.hadoop.fs.Path; 069import org.apache.hadoop.hbase.ArrayBackedTag; 070import org.apache.hadoop.hbase.Cell; 071import org.apache.hadoop.hbase.Cell.Type; 072import org.apache.hadoop.hbase.CellBuilderFactory; 073import org.apache.hadoop.hbase.CellBuilderType; 074import org.apache.hadoop.hbase.CellUtil; 075import org.apache.hadoop.hbase.CompareOperator; 076import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 077import org.apache.hadoop.hbase.DroppedSnapshotException; 078import org.apache.hadoop.hbase.HBaseClassTestRule; 079import org.apache.hadoop.hbase.HBaseConfiguration; 080import org.apache.hadoop.hbase.HBaseTestingUtility; 081import org.apache.hadoop.hbase.HColumnDescriptor; 082import org.apache.hadoop.hbase.HConstants; 083import org.apache.hadoop.hbase.HConstants.OperationStatusCode; 084import org.apache.hadoop.hbase.HDFSBlocksDistribution; 085import org.apache.hadoop.hbase.HRegionInfo; 086import org.apache.hadoop.hbase.HTableDescriptor; 087import org.apache.hadoop.hbase.KeyValue; 088import org.apache.hadoop.hbase.KeyValueUtil; 089import org.apache.hadoop.hbase.MiniHBaseCluster; 090import org.apache.hadoop.hbase.MultithreadedTestUtil; 091import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 092import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 093import org.apache.hadoop.hbase.NotServingRegionException; 094import org.apache.hadoop.hbase.PrivateCellUtil; 095import org.apache.hadoop.hbase.RegionTooBusyException; 096import org.apache.hadoop.hbase.ServerName; 097import org.apache.hadoop.hbase.TableName; 098import org.apache.hadoop.hbase.TagType; 099import org.apache.hadoop.hbase.Waiter; 100import org.apache.hadoop.hbase.client.Append; 101import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 102import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 103import org.apache.hadoop.hbase.client.Delete; 104import org.apache.hadoop.hbase.client.Durability; 105import org.apache.hadoop.hbase.client.Get; 106import org.apache.hadoop.hbase.client.Increment; 107import org.apache.hadoop.hbase.client.Mutation; 108import org.apache.hadoop.hbase.client.Put; 109import org.apache.hadoop.hbase.client.RegionInfo; 110import org.apache.hadoop.hbase.client.RegionInfoBuilder; 111import org.apache.hadoop.hbase.client.Result; 112import org.apache.hadoop.hbase.client.RowMutations; 113import org.apache.hadoop.hbase.client.Scan; 114import org.apache.hadoop.hbase.client.Table; 115import org.apache.hadoop.hbase.client.TableDescriptor; 116import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 117import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 118import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 119import org.apache.hadoop.hbase.filter.BigDecimalComparator; 120import org.apache.hadoop.hbase.filter.BinaryComparator; 121import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; 122import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 123import org.apache.hadoop.hbase.filter.Filter; 124import org.apache.hadoop.hbase.filter.FilterBase; 125import org.apache.hadoop.hbase.filter.FilterList; 126import org.apache.hadoop.hbase.filter.NullComparator; 127import org.apache.hadoop.hbase.filter.PrefixFilter; 128import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; 129import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 130import org.apache.hadoop.hbase.filter.SubstringComparator; 131import org.apache.hadoop.hbase.filter.ValueFilter; 132import org.apache.hadoop.hbase.io.hfile.HFile; 133import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 134import org.apache.hadoop.hbase.monitoring.MonitoredTask; 135import org.apache.hadoop.hbase.monitoring.TaskMonitor; 136import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation; 137import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; 138import org.apache.hadoop.hbase.regionserver.Region.RowLock; 139import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem; 140import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 141import org.apache.hadoop.hbase.regionserver.wal.FSHLog; 142import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; 143import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 144import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver; 145import org.apache.hadoop.hbase.security.User; 146import org.apache.hadoop.hbase.test.MetricsAssertHelper; 147import org.apache.hadoop.hbase.testclassification.LargeTests; 148import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 149import org.apache.hadoop.hbase.util.Bytes; 150import org.apache.hadoop.hbase.util.CommonFSUtils; 151import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 152import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 153import org.apache.hadoop.hbase.util.FSUtils; 154import org.apache.hadoop.hbase.util.HFileArchiveUtil; 155import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 156import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 157import org.apache.hadoop.hbase.util.Threads; 158import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 159import org.apache.hadoop.hbase.wal.FaultyFSLog; 160import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper; 161import org.apache.hadoop.hbase.wal.WAL; 162import org.apache.hadoop.hbase.wal.WALEdit; 163import org.apache.hadoop.hbase.wal.WALFactory; 164import org.apache.hadoop.hbase.wal.WALKeyImpl; 165import org.apache.hadoop.hbase.wal.WALProvider; 166import org.apache.hadoop.hbase.wal.WALProvider.Writer; 167import org.apache.hadoop.hbase.wal.WALSplitter; 168import org.junit.After; 169import org.junit.Assert; 170import org.junit.Before; 171import org.junit.ClassRule; 172import org.junit.Rule; 173import org.junit.Test; 174import org.junit.experimental.categories.Category; 175import org.junit.rules.ExpectedException; 176import org.junit.rules.TestName; 177import org.mockito.ArgumentCaptor; 178import org.mockito.ArgumentMatcher; 179import org.mockito.Mockito; 180import org.mockito.invocation.InvocationOnMock; 181import org.mockito.stubbing.Answer; 182import org.slf4j.Logger; 183import org.slf4j.LoggerFactory; 184 185import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 186import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 187import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 188import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 189import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 190 191import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 198 199/** 200 * Basic stand-alone testing of HRegion. No clusters! 201 * 202 * A lot of the meta information for an HRegion now lives inside other HRegions 203 * or in the HBaseMaster, so only basic testing is possible. 204 */ 205@Category({VerySlowRegionServerTests.class, LargeTests.class}) 206@SuppressWarnings("deprecation") 207public class TestHRegion { 208 209 @ClassRule 210 public static final HBaseClassTestRule CLASS_RULE = 211 HBaseClassTestRule.forClass(TestHRegion.class); 212 213 // Do not spin up clusters in here. If you need to spin up a cluster, do it 214 // over in TestHRegionOnCluster. 215 private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class); 216 @Rule 217 public TestName name = new TestName(); 218 @Rule public final ExpectedException thrown = ExpectedException.none(); 219 220 private static final String COLUMN_FAMILY = "MyCF"; 221 private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY); 222 private static final EventLoopGroup GROUP = new NioEventLoopGroup(); 223 224 HRegion region = null; 225 // Do not run unit tests in parallel (? Why not? It don't work? Why not? St.Ack) 226 protected static HBaseTestingUtility TEST_UTIL; 227 public static Configuration CONF ; 228 private String dir; 229 private static FileSystem FILESYSTEM; 230 private final int MAX_VERSIONS = 2; 231 232 // Test names 233 protected TableName tableName; 234 protected String method; 235 protected final byte[] qual = Bytes.toBytes("qual"); 236 protected final byte[] qual1 = Bytes.toBytes("qual1"); 237 protected final byte[] qual2 = Bytes.toBytes("qual2"); 238 protected final byte[] qual3 = Bytes.toBytes("qual3"); 239 protected final byte[] value = Bytes.toBytes("value"); 240 protected final byte[] value1 = Bytes.toBytes("value1"); 241 protected final byte[] value2 = Bytes.toBytes("value2"); 242 protected final byte[] row = Bytes.toBytes("rowA"); 243 protected final byte[] row2 = Bytes.toBytes("rowB"); 244 245 protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory 246 .getInstance(MetricsAssertHelper.class); 247 248 @Before 249 public void setup() throws IOException { 250 TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 251 FILESYSTEM = TEST_UTIL.getTestFileSystem(); 252 CONF = TEST_UTIL.getConfiguration(); 253 NettyAsyncFSWALConfigHelper.setEventLoopConfig(CONF, GROUP, NioSocketChannel.class); 254 dir = TEST_UTIL.getDataTestDir("TestHRegion").toString(); 255 method = name.getMethodName(); 256 tableName = TableName.valueOf(method); 257 CONF.set(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, String.valueOf(0.09)); 258 } 259 260 @After 261 public void tearDown() throws Exception { 262 EnvironmentEdgeManagerTestHelper.reset(); 263 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); 264 TEST_UTIL.cleanupTestDir(); 265 } 266 267 /** 268 * Test that I can use the max flushed sequence id after the close. 269 * @throws IOException 270 */ 271 @Test 272 public void testSequenceId() throws IOException { 273 HRegion region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES); 274 assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); 275 // Weird. This returns 0 if no store files or no edits. Afraid to change it. 276 assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); 277 region.close(); 278 assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); 279 assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); 280 // Open region again. 281 region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES); 282 byte [] value = Bytes.toBytes(method); 283 // Make a random put against our cf. 284 Put put = new Put(value); 285 put.addColumn(COLUMN_FAMILY_BYTES, null, value); 286 region.put(put); 287 // No flush yet so init numbers should still be in place. 288 assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); 289 assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); 290 region.flush(true); 291 long max = region.getMaxFlushedSeqId(); 292 region.close(); 293 assertEquals(max, region.getMaxFlushedSeqId()); 294 } 295 296 /** 297 * Test for Bug 2 of HBASE-10466. 298 * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize 299 * is smaller than a certain value, or when region close starts a flush is ongoing, the first 300 * flush is skipped and only the second flush takes place. However, two flushes are required in 301 * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data 302 * in current memstore. The fix is removing all conditions except abort check so we ensure 2 303 * flushes for region close." 304 * @throws IOException 305 */ 306 @Test 307 public void testCloseCarryingSnapshot() throws IOException { 308 HRegion region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES); 309 HStore store = region.getStore(COLUMN_FAMILY_BYTES); 310 // Get some random bytes. 311 byte [] value = Bytes.toBytes(method); 312 // Make a random put against our cf. 313 Put put = new Put(value); 314 put.addColumn(COLUMN_FAMILY_BYTES, null, value); 315 // First put something in current memstore, which will be in snapshot after flusher.prepare() 316 region.put(put); 317 StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY); 318 storeFlushCtx.prepare(); 319 // Second put something in current memstore 320 put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); 321 region.put(put); 322 // Close with something in memstore and something in the snapshot. Make sure all is cleared. 323 region.close(); 324 assertEquals(0, region.getMemStoreDataSize()); 325 HBaseTestingUtility.closeRegionAndWAL(region); 326 } 327 328 /* 329 * This test is for verifying memstore snapshot size is correctly updated in case of rollback 330 * See HBASE-10845 331 */ 332 @Test 333 public void testMemstoreSnapshotSize() throws IOException { 334 class MyFaultyFSLog extends FaultyFSLog { 335 StoreFlushContext storeFlushCtx; 336 public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf) 337 throws IOException { 338 super(fs, rootDir, logName, conf); 339 } 340 341 void setStoreFlushCtx(StoreFlushContext storeFlushCtx) { 342 this.storeFlushCtx = storeFlushCtx; 343 } 344 345 @Override 346 public void sync(long txid) throws IOException { 347 storeFlushCtx.prepare(); 348 super.sync(txid); 349 } 350 } 351 352 FileSystem fs = FileSystem.get(CONF); 353 Path rootDir = new Path(dir + "testMemstoreSnapshotSize"); 354 MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF); 355 HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog, 356 COLUMN_FAMILY_BYTES); 357 358 HStore store = region.getStore(COLUMN_FAMILY_BYTES); 359 // Get some random bytes. 360 byte [] value = Bytes.toBytes(method); 361 faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY)); 362 363 Put put = new Put(value); 364 put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); 365 faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC); 366 367 boolean threwIOE = false; 368 try { 369 region.put(put); 370 } catch (IOException ioe) { 371 threwIOE = true; 372 } finally { 373 assertTrue("The regionserver should have thrown an exception", threwIOE); 374 } 375 MemStoreSize mss = store.getFlushableSize(); 376 assertTrue("flushable size should be zero, but it is " + mss, 377 mss.getDataSize() == 0); 378 HBaseTestingUtility.closeRegionAndWAL(region); 379 } 380 381 /** 382 * Create a WAL outside of the usual helper in 383 * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method 384 * doesn't play nicely with FaultyFileSystem. Call this method before overriding 385 * {@code fs.file.impl}. 386 * @param callingMethod a unique component for the path, probably the name of the test method. 387 */ 388 private static WAL createWALCompatibleWithFaultyFileSystem(String callingMethod, 389 Configuration conf, TableName tableName) throws IOException { 390 final Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log"); 391 final Configuration walConf = new Configuration(conf); 392 FSUtils.setRootDir(walConf, logDir); 393 return new WALFactory(walConf, callingMethod) 394 .getWAL(RegionInfoBuilder.newBuilder(tableName).build()); 395 } 396 397 @Test 398 public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException { 399 String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate"; 400 FileSystem fs = FileSystem.get(CONF); 401 Path rootDir = new Path(dir + testName); 402 FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF); 403 hLog.init(); 404 HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog, 405 COLUMN_FAMILY_BYTES); 406 HStore store = region.getStore(COLUMN_FAMILY_BYTES); 407 assertEquals(0, region.getMemStoreDataSize()); 408 409 // Put one value 410 byte [] value = Bytes.toBytes(method); 411 Put put = new Put(value); 412 put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); 413 region.put(put); 414 long onePutSize = region.getMemStoreDataSize(); 415 assertTrue(onePutSize > 0); 416 417 RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); 418 doThrow(new IOException()) 419 .when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any()); 420 region.setCoprocessorHost(mockedCPHost); 421 422 put = new Put(value); 423 put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value); 424 try { 425 region.put(put); 426 fail("Should have failed with IOException"); 427 } catch (IOException expected) { 428 } 429 long expectedSize = onePutSize * 2; 430 assertEquals("memstoreSize should be incremented", 431 expectedSize, region.getMemStoreDataSize()); 432 assertEquals("flushable size should be incremented", 433 expectedSize, store.getFlushableSize().getDataSize()); 434 435 region.setCoprocessorHost(null); 436 HBaseTestingUtility.closeRegionAndWAL(region); 437 } 438 439 /** 440 * A test case of HBASE-21041 441 * @throws Exception Exception 442 */ 443 @Test 444 public void testFlushAndMemstoreSizeCounting() throws Exception { 445 byte[] family = Bytes.toBytes("family"); 446 this.region = initHRegion(tableName, method, CONF, family); 447 final WALFactory wals = new WALFactory(CONF, method); 448 try { 449 for (byte[] row : HBaseTestingUtility.ROWS) { 450 Put put = new Put(row); 451 put.addColumn(family, family, row); 452 region.put(put); 453 } 454 region.flush(true); 455 // After flush, data size should be zero 456 assertEquals(0, region.getMemStoreDataSize()); 457 // After flush, a new active mutable segment is created, so the heap size 458 // should equal to MutableSegment.DEEP_OVERHEAD 459 assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize()); 460 // After flush, offheap should be zero 461 assertEquals(0, region.getMemStoreOffHeapSize()); 462 } finally { 463 HBaseTestingUtility.closeRegionAndWAL(this.region); 464 this.region = null; 465 wals.close(); 466 } 467 } 468 469 /** 470 * Test we do not lose data if we fail a flush and then close. 471 * Part of HBase-10466. Tests the following from the issue description: 472 * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is 473 * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when 474 * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by 475 * the sum of current memstore sizes instead of snapshots left from previous failed flush. This 476 * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize 477 * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size 478 * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize 479 * much smaller than expected. In extreme case, if the error accumulates to even bigger than 480 * HRegion's memstore size limit, any further flush is skipped because flush does not do anything 481 * if memstoreSize is not larger than 0." 482 * @throws Exception 483 */ 484 @Test 485 public void testFlushSizeAccounting() throws Exception { 486 final Configuration conf = HBaseConfiguration.create(CONF); 487 final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName); 488 // Only retry once. 489 conf.setInt("hbase.hstore.flush.retries.number", 1); 490 final User user = 491 User.createUserForTesting(conf, method, new String[]{"foo"}); 492 // Inject our faulty LocalFileSystem 493 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 494 user.runAs(new PrivilegedExceptionAction<Object>() { 495 @Override 496 public Object run() throws Exception { 497 // Make sure it worked (above is sensitive to caching details in hadoop core) 498 FileSystem fs = FileSystem.get(conf); 499 Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); 500 FaultyFileSystem ffs = (FaultyFileSystem)fs; 501 HRegion region = null; 502 try { 503 // Initialize region 504 region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal, 505 COLUMN_FAMILY_BYTES); 506 long size = region.getMemStoreDataSize(); 507 Assert.assertEquals(0, size); 508 // Put one item into memstore. Measure the size of one item in memstore. 509 Put p1 = new Put(row); 510 p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[]) null)); 511 region.put(p1); 512 final long sizeOfOnePut = region.getMemStoreDataSize(); 513 // Fail a flush which means the current memstore will hang out as memstore 'snapshot'. 514 try { 515 LOG.info("Flushing"); 516 region.flush(true); 517 Assert.fail("Didn't bubble up IOE!"); 518 } catch (DroppedSnapshotException dse) { 519 // What we are expecting 520 region.closing.set(false); // this is needed for the rest of the test to work 521 } 522 // Make it so all writes succeed from here on out 523 ffs.fault.set(false); 524 // Check sizes. Should still be the one entry. 525 Assert.assertEquals(sizeOfOnePut, region.getMemStoreDataSize()); 526 // Now add two entries so that on this next flush that fails, we can see if we 527 // subtract the right amount, the snapshot size only. 528 Put p2 = new Put(row); 529 p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null)); 530 p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null)); 531 region.put(p2); 532 long expectedSize = sizeOfOnePut * 3; 533 Assert.assertEquals(expectedSize, region.getMemStoreDataSize()); 534 // Do a successful flush. It will clear the snapshot only. Thats how flushes work. 535 // If already a snapshot, we clear it else we move the memstore to be snapshot and flush 536 // it 537 region.flush(true); 538 // Make sure our memory accounting is right. 539 Assert.assertEquals(sizeOfOnePut * 2, region.getMemStoreDataSize()); 540 } finally { 541 HBaseTestingUtility.closeRegionAndWAL(region); 542 } 543 return null; 544 } 545 }); 546 FileSystem.closeAllForUGI(user.getUGI()); 547 } 548 549 @Test 550 public void testCloseWithFailingFlush() throws Exception { 551 final Configuration conf = HBaseConfiguration.create(CONF); 552 final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName); 553 // Only retry once. 554 conf.setInt("hbase.hstore.flush.retries.number", 1); 555 final User user = 556 User.createUserForTesting(conf, this.method, new String[]{"foo"}); 557 // Inject our faulty LocalFileSystem 558 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 559 user.runAs(new PrivilegedExceptionAction<Object>() { 560 @Override 561 public Object run() throws Exception { 562 // Make sure it worked (above is sensitive to caching details in hadoop core) 563 FileSystem fs = FileSystem.get(conf); 564 Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); 565 FaultyFileSystem ffs = (FaultyFileSystem)fs; 566 HRegion region = null; 567 try { 568 // Initialize region 569 region = initHRegion(tableName, null, null, false, 570 Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES); 571 long size = region.getMemStoreDataSize(); 572 Assert.assertEquals(0, size); 573 // Put one item into memstore. Measure the size of one item in memstore. 574 Put p1 = new Put(row); 575 p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null)); 576 region.put(p1); 577 // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only. 578 HStore store = region.getStore(COLUMN_FAMILY_BYTES); 579 StoreFlushContext storeFlushCtx = 580 store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY); 581 storeFlushCtx.prepare(); 582 // Now add two entries to the foreground memstore. 583 Put p2 = new Put(row); 584 p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null)); 585 p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null)); 586 region.put(p2); 587 // Now try close on top of a failing flush. 588 region.close(); 589 fail(); 590 } catch (DroppedSnapshotException dse) { 591 // Expected 592 LOG.info("Expected DroppedSnapshotException"); 593 } finally { 594 // Make it so all writes succeed from here on out so can close clean 595 ffs.fault.set(false); 596 HBaseTestingUtility.closeRegionAndWAL(region); 597 } 598 return null; 599 } 600 }); 601 FileSystem.closeAllForUGI(user.getUGI()); 602 } 603 604 @Test 605 public void testCompactionAffectedByScanners() throws Exception { 606 byte[] family = Bytes.toBytes("family"); 607 this.region = initHRegion(tableName, method, CONF, family); 608 609 Put put = new Put(Bytes.toBytes("r1")); 610 put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); 611 region.put(put); 612 region.flush(true); 613 614 Scan scan = new Scan(); 615 scan.setMaxVersions(3); 616 // open the first scanner 617 RegionScanner scanner1 = region.getScanner(scan); 618 619 Delete delete = new Delete(Bytes.toBytes("r1")); 620 region.delete(delete); 621 region.flush(true); 622 623 // open the second scanner 624 RegionScanner scanner2 = region.getScanner(scan); 625 626 List<Cell> results = new ArrayList<>(); 627 628 System.out.println("Smallest read point:" + region.getSmallestReadPoint()); 629 630 // make a major compaction 631 region.compact(true); 632 633 // open the third scanner 634 RegionScanner scanner3 = region.getScanner(scan); 635 636 // get data from scanner 1, 2, 3 after major compaction 637 scanner1.next(results); 638 System.out.println(results); 639 assertEquals(1, results.size()); 640 641 results.clear(); 642 scanner2.next(results); 643 System.out.println(results); 644 assertEquals(0, results.size()); 645 646 results.clear(); 647 scanner3.next(results); 648 System.out.println(results); 649 assertEquals(0, results.size()); 650 } 651 652 @Test 653 public void testToShowNPEOnRegionScannerReseek() throws Exception { 654 byte[] family = Bytes.toBytes("family"); 655 this.region = initHRegion(tableName, method, CONF, family); 656 657 Put put = new Put(Bytes.toBytes("r1")); 658 put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); 659 region.put(put); 660 put = new Put(Bytes.toBytes("r2")); 661 put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); 662 region.put(put); 663 region.flush(true); 664 665 Scan scan = new Scan(); 666 scan.setMaxVersions(3); 667 // open the first scanner 668 RegionScanner scanner1 = region.getScanner(scan); 669 670 System.out.println("Smallest read point:" + region.getSmallestReadPoint()); 671 672 region.compact(true); 673 674 scanner1.reseek(Bytes.toBytes("r2")); 675 List<Cell> results = new ArrayList<>(); 676 scanner1.next(results); 677 Cell keyValue = results.get(0); 678 Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0); 679 scanner1.close(); 680 } 681 682 @Test 683 public void testArchiveRecoveredEditsReplay() throws Exception { 684 byte[] family = Bytes.toBytes("family"); 685 this.region = initHRegion(tableName, method, CONF, family); 686 final WALFactory wals = new WALFactory(CONF, method); 687 try { 688 Path regiondir = region.getRegionFileSystem().getRegionDir(); 689 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 690 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 691 692 Path recoveredEditsDir = new Path(regiondir, HConstants.RECOVERED_EDITS_DIR); 693 694 long maxSeqId = 1050; 695 long minSeqId = 1000; 696 697 for (long i = minSeqId; i <= maxSeqId; i += 10) { 698 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 699 fs.create(recoveredEdits); 700 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 701 702 long time = System.nanoTime(); 703 WALEdit edit = new WALEdit(); 704 edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes 705 .toBytes(i))); 706 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, 707 HConstants.DEFAULT_CLUSTER_ID), edit)); 708 709 writer.close(); 710 } 711 MonitoredTask status = TaskMonitor.get().createStatus(method); 712 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 713 for (HStore store : region.getStores()) { 714 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1); 715 } 716 CONF.set("hbase.region.archive.recovered.edits", "true"); 717 CONF.set(CommonFSUtils.HBASE_WAL_DIR, "/custom_wal_dir"); 718 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); 719 assertEquals(maxSeqId, seqId); 720 region.getMVCC().advanceTo(seqId); 721 String fakeFamilyName = recoveredEditsDir.getName(); 722 Path rootDir = new Path(CONF.get(HConstants.HBASE_DIR)); 723 Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(rootDir, 724 region.getRegionInfo(), Bytes.toBytes(fakeFamilyName)); 725 FileStatus[] list = TEST_UTIL.getTestFileSystem().listStatus(storeArchiveDir); 726 assertEquals(6, list.length); 727 } finally { 728 CONF.set("hbase.region.archive.recovered.edits", "false"); 729 CONF.set(CommonFSUtils.HBASE_WAL_DIR, ""); 730 HBaseTestingUtility.closeRegionAndWAL(this.region); 731 this.region = null; 732 wals.close(); 733 } 734 } 735 736 @Test 737 public void testSkipRecoveredEditsReplay() throws Exception { 738 byte[] family = Bytes.toBytes("family"); 739 this.region = initHRegion(tableName, method, CONF, family); 740 final WALFactory wals = new WALFactory(CONF, method); 741 try { 742 Path regiondir = region.getRegionFileSystem().getRegionDir(); 743 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 744 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 745 746 Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); 747 748 long maxSeqId = 1050; 749 long minSeqId = 1000; 750 751 for (long i = minSeqId; i <= maxSeqId; i += 10) { 752 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 753 fs.create(recoveredEdits); 754 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 755 756 long time = System.nanoTime(); 757 WALEdit edit = new WALEdit(); 758 edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes 759 .toBytes(i))); 760 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, 761 HConstants.DEFAULT_CLUSTER_ID), edit)); 762 763 writer.close(); 764 } 765 MonitoredTask status = TaskMonitor.get().createStatus(method); 766 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 767 for (HStore store : region.getStores()) { 768 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1); 769 } 770 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); 771 assertEquals(maxSeqId, seqId); 772 region.getMVCC().advanceTo(seqId); 773 Get get = new Get(row); 774 Result result = region.get(get); 775 for (long i = minSeqId; i <= maxSeqId; i += 10) { 776 List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i)); 777 assertEquals(1, kvs.size()); 778 assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0))); 779 } 780 } finally { 781 HBaseTestingUtility.closeRegionAndWAL(this.region); 782 this.region = null; 783 wals.close(); 784 } 785 } 786 787 @Test 788 public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception { 789 byte[] family = Bytes.toBytes("family"); 790 this.region = initHRegion(tableName, method, CONF, family); 791 final WALFactory wals = new WALFactory(CONF, method); 792 try { 793 Path regiondir = region.getRegionFileSystem().getRegionDir(); 794 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 795 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 796 797 Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); 798 799 long maxSeqId = 1050; 800 long minSeqId = 1000; 801 802 for (long i = minSeqId; i <= maxSeqId; i += 10) { 803 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 804 fs.create(recoveredEdits); 805 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 806 807 long time = System.nanoTime(); 808 WALEdit edit = new WALEdit(); 809 edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes 810 .toBytes(i))); 811 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, 812 HConstants.DEFAULT_CLUSTER_ID), edit)); 813 814 writer.close(); 815 } 816 long recoverSeqId = 1030; 817 MonitoredTask status = TaskMonitor.get().createStatus(method); 818 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 819 for (HStore store : region.getStores()) { 820 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); 821 } 822 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); 823 assertEquals(maxSeqId, seqId); 824 region.getMVCC().advanceTo(seqId); 825 Get get = new Get(row); 826 Result result = region.get(get); 827 for (long i = minSeqId; i <= maxSeqId; i += 10) { 828 List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i)); 829 if (i < recoverSeqId) { 830 assertEquals(0, kvs.size()); 831 } else { 832 assertEquals(1, kvs.size()); 833 assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0))); 834 } 835 } 836 } finally { 837 HBaseTestingUtility.closeRegionAndWAL(this.region); 838 this.region = null; 839 wals.close(); 840 } 841 } 842 843 @Test 844 public void testSkipRecoveredEditsReplayAllIgnored() throws Exception { 845 byte[] family = Bytes.toBytes("family"); 846 this.region = initHRegion(tableName, method, CONF, family); 847 try { 848 Path regiondir = region.getRegionFileSystem().getRegionDir(); 849 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 850 851 Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); 852 for (int i = 1000; i < 1050; i += 10) { 853 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 854 FSDataOutputStream dos = fs.create(recoveredEdits); 855 dos.writeInt(i); 856 dos.close(); 857 } 858 long minSeqId = 2000; 859 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1)); 860 FSDataOutputStream dos = fs.create(recoveredEdits); 861 dos.close(); 862 863 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 864 for (HStore store : region.getStores()) { 865 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId); 866 } 867 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null); 868 assertEquals(minSeqId, seqId); 869 } finally { 870 HBaseTestingUtility.closeRegionAndWAL(this.region); 871 this.region = null; 872 } 873 } 874 875 @Test 876 public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception { 877 byte[] family = Bytes.toBytes("family"); 878 this.region = initHRegion(tableName, method, CONF, family); 879 final WALFactory wals = new WALFactory(CONF, method); 880 try { 881 Path regiondir = region.getRegionFileSystem().getRegionDir(); 882 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 883 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 884 byte[][] columns = region.getTableDescriptor().getColumnFamilyNames().toArray(new byte[0][]); 885 886 assertEquals(0, region.getStoreFileList(columns).size()); 887 888 Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); 889 890 long maxSeqId = 1050; 891 long minSeqId = 1000; 892 893 for (long i = minSeqId; i <= maxSeqId; i += 10) { 894 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 895 fs.create(recoveredEdits); 896 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 897 898 long time = System.nanoTime(); 899 WALEdit edit = null; 900 if (i == maxSeqId) { 901 edit = WALEdit.createCompaction(region.getRegionInfo(), 902 CompactionDescriptor.newBuilder() 903 .setTableName(ByteString.copyFrom(tableName.getName())) 904 .setFamilyName(ByteString.copyFrom(regionName)) 905 .setEncodedRegionName(ByteString.copyFrom(regionName)) 906 .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString()))) 907 .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName())) 908 .build()); 909 } else { 910 edit = new WALEdit(); 911 edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes 912 .toBytes(i))); 913 } 914 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, 915 HConstants.DEFAULT_CLUSTER_ID), edit)); 916 writer.close(); 917 } 918 919 long recoverSeqId = 1030; 920 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 921 MonitoredTask status = TaskMonitor.get().createStatus(method); 922 for (HStore store : region.getStores()) { 923 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); 924 } 925 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); 926 assertEquals(maxSeqId, seqId); 927 928 // assert that the files are flushed 929 assertEquals(1, region.getStoreFileList(columns).size()); 930 931 } finally { 932 HBaseTestingUtility.closeRegionAndWAL(this.region); 933 this.region = null; 934 wals.close(); 935 } 936 } 937 938 @Test 939 public void testRecoveredEditsReplayCompaction() throws Exception { 940 testRecoveredEditsReplayCompaction(false); 941 testRecoveredEditsReplayCompaction(true); 942 } 943 944 public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception { 945 CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class); 946 byte[] family = Bytes.toBytes("family"); 947 this.region = initHRegion(tableName, method, CONF, family); 948 final WALFactory wals = new WALFactory(CONF, method); 949 try { 950 Path regiondir = region.getRegionFileSystem().getRegionDir(); 951 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 952 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 953 954 long maxSeqId = 3; 955 long minSeqId = 0; 956 957 for (long i = minSeqId; i < maxSeqId; i++) { 958 Put put = new Put(Bytes.toBytes(i)); 959 put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i)); 960 region.put(put); 961 region.flush(true); 962 } 963 964 // this will create a region with 3 files 965 assertEquals(3, region.getStore(family).getStorefilesCount()); 966 List<Path> storeFiles = new ArrayList<>(3); 967 for (HStoreFile sf : region.getStore(family).getStorefiles()) { 968 storeFiles.add(sf.getPath()); 969 } 970 971 // disable compaction completion 972 CONF.setBoolean("hbase.hstore.compaction.complete", false); 973 region.compactStores(); 974 975 // ensure that nothing changed 976 assertEquals(3, region.getStore(family).getStorefilesCount()); 977 978 // now find the compacted file, and manually add it to the recovered edits 979 Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family)); 980 FileStatus[] files = FSUtils.listStatus(fs, tmpDir); 981 String errorMsg = "Expected to find 1 file in the region temp directory " 982 + "from the compaction, could not find any"; 983 assertNotNull(errorMsg, files); 984 assertEquals(errorMsg, 1, files.length); 985 // move the file inside region dir 986 Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family), 987 files[0].getPath()); 988 989 byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes(); 990 byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length]; 991 for (int i=0; i < encodedNameAsBytes.length; i++) { 992 // Mix the byte array to have a new encodedName 993 fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1); 994 } 995 996 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region 997 .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family, 998 storeFiles, Lists.newArrayList(newFile), 999 region.getRegionFileSystem().getStoreDir(Bytes.toString(family))); 1000 1001 WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(), 1002 this.region.getRegionInfo(), compactionDescriptor, region.getMVCC()); 1003 1004 Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); 1005 1006 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); 1007 fs.create(recoveredEdits); 1008 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 1009 1010 long time = System.nanoTime(); 1011 1012 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, 10, time, 1013 HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(), 1014 compactionDescriptor))); 1015 writer.close(); 1016 1017 // close the region now, and reopen again 1018 region.getTableDescriptor(); 1019 region.getRegionInfo(); 1020 region.close(); 1021 try { 1022 region = HRegion.openHRegion(region, null); 1023 } catch (WrongRegionException wre) { 1024 fail("Matching encoded region name should not have produced WrongRegionException"); 1025 } 1026 1027 // now check whether we have only one store file, the compacted one 1028 Collection<HStoreFile> sfs = region.getStore(family).getStorefiles(); 1029 for (HStoreFile sf : sfs) { 1030 LOG.info(Objects.toString(sf.getPath())); 1031 } 1032 if (!mismatchedRegionName) { 1033 assertEquals(1, region.getStore(family).getStorefilesCount()); 1034 } 1035 files = FSUtils.listStatus(fs, tmpDir); 1036 assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0); 1037 1038 for (long i = minSeqId; i < maxSeqId; i++) { 1039 Get get = new Get(Bytes.toBytes(i)); 1040 Result result = region.get(get); 1041 byte[] value = result.getValue(family, Bytes.toBytes(i)); 1042 assertArrayEquals(Bytes.toBytes(i), value); 1043 } 1044 } finally { 1045 HBaseTestingUtility.closeRegionAndWAL(this.region); 1046 this.region = null; 1047 wals.close(); 1048 CONF.setClass(HConstants.REGION_IMPL, HRegion.class, Region.class); 1049 } 1050 } 1051 1052 @Test 1053 public void testFlushMarkers() throws Exception { 1054 // tests that flush markers are written to WAL and handled at recovered edits 1055 byte[] family = Bytes.toBytes("family"); 1056 Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log"); 1057 final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration()); 1058 FSUtils.setRootDir(walConf, logDir); 1059 final WALFactory wals = new WALFactory(walConf, method); 1060 final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()); 1061 1062 this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, 1063 HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family); 1064 try { 1065 Path regiondir = region.getRegionFileSystem().getRegionDir(); 1066 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 1067 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 1068 1069 long maxSeqId = 3; 1070 long minSeqId = 0; 1071 1072 for (long i = minSeqId; i < maxSeqId; i++) { 1073 Put put = new Put(Bytes.toBytes(i)); 1074 put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i)); 1075 region.put(put); 1076 region.flush(true); 1077 } 1078 1079 // this will create a region with 3 files from flush 1080 assertEquals(3, region.getStore(family).getStorefilesCount()); 1081 List<String> storeFiles = new ArrayList<>(3); 1082 for (HStoreFile sf : region.getStore(family).getStorefiles()) { 1083 storeFiles.add(sf.getPath().getName()); 1084 } 1085 1086 // now verify that the flush markers are written 1087 wal.shutdown(); 1088 WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal), 1089 TEST_UTIL.getConfiguration()); 1090 try { 1091 List<WAL.Entry> flushDescriptors = new ArrayList<>(); 1092 long lastFlushSeqId = -1; 1093 while (true) { 1094 WAL.Entry entry = reader.next(); 1095 if (entry == null) { 1096 break; 1097 } 1098 Cell cell = entry.getEdit().getCells().get(0); 1099 if (WALEdit.isMetaEditFamily(cell)) { 1100 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell); 1101 assertNotNull(flushDesc); 1102 assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray()); 1103 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 1104 assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId); 1105 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 1106 assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId); 1107 } 1108 lastFlushSeqId = flushDesc.getFlushSequenceNumber(); 1109 assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray()); 1110 assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store 1111 StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0); 1112 assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray()); 1113 assertEquals("family", storeFlushDesc.getStoreHomeDir()); 1114 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 1115 assertEquals(0, storeFlushDesc.getFlushOutputCount()); 1116 } else { 1117 assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush 1118 assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0))); 1119 } 1120 1121 flushDescriptors.add(entry); 1122 } 1123 } 1124 1125 assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush 1126 1127 // now write those markers to the recovered edits again. 1128 1129 Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); 1130 1131 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); 1132 fs.create(recoveredEdits); 1133 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 1134 1135 for (WAL.Entry entry : flushDescriptors) { 1136 writer.append(entry); 1137 } 1138 writer.close(); 1139 } finally { 1140 if (null != reader) { 1141 try { 1142 reader.close(); 1143 } catch (IOException exception) { 1144 LOG.warn("Problem closing wal: " + exception.getMessage()); 1145 LOG.debug("exception details", exception); 1146 } 1147 } 1148 } 1149 1150 1151 // close the region now, and reopen again 1152 region.close(); 1153 region = HRegion.openHRegion(region, null); 1154 1155 // now check whether we have can read back the data from region 1156 for (long i = minSeqId; i < maxSeqId; i++) { 1157 Get get = new Get(Bytes.toBytes(i)); 1158 Result result = region.get(get); 1159 byte[] value = result.getValue(family, Bytes.toBytes(i)); 1160 assertArrayEquals(Bytes.toBytes(i), value); 1161 } 1162 } finally { 1163 HBaseTestingUtility.closeRegionAndWAL(this.region); 1164 this.region = null; 1165 wals.close(); 1166 } 1167 } 1168 1169 static class IsFlushWALMarker implements ArgumentMatcher<WALEdit> { 1170 volatile FlushAction[] actions; 1171 public IsFlushWALMarker(FlushAction... actions) { 1172 this.actions = actions; 1173 } 1174 @Override 1175 public boolean matches(WALEdit edit) { 1176 List<Cell> cells = edit.getCells(); 1177 if (cells.isEmpty()) { 1178 return false; 1179 } 1180 if (WALEdit.isMetaEditFamily(cells.get(0))) { 1181 FlushDescriptor desc; 1182 try { 1183 desc = WALEdit.getFlushDescriptor(cells.get(0)); 1184 } catch (IOException e) { 1185 LOG.warn(e.toString(), e); 1186 return false; 1187 } 1188 if (desc != null) { 1189 for (FlushAction action : actions) { 1190 if (desc.getAction() == action) { 1191 return true; 1192 } 1193 } 1194 } 1195 } 1196 return false; 1197 } 1198 public IsFlushWALMarker set(FlushAction... actions) { 1199 this.actions = actions; 1200 return this; 1201 } 1202 } 1203 1204 @Test 1205 public void testFlushMarkersWALFail() throws Exception { 1206 // test the cases where the WAL append for flush markers fail. 1207 byte[] family = Bytes.toBytes("family"); 1208 1209 // spy an actual WAL implementation to throw exception (was not able to mock) 1210 Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log"); 1211 1212 final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration()); 1213 FSUtils.setRootDir(walConf, logDir); 1214 // Make up a WAL that we can manipulate at append time. 1215 class FailAppendFlushMarkerWAL extends FSHLog { 1216 volatile FlushAction [] flushActions = null; 1217 1218 public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf) 1219 throws IOException { 1220 super(fs, root, logDir, conf); 1221 } 1222 1223 @Override 1224 protected Writer createWriterInstance(Path path) throws IOException { 1225 final Writer w = super.createWriterInstance(path); 1226 return new Writer() { 1227 @Override 1228 public void close() throws IOException { 1229 w.close(); 1230 } 1231 1232 @Override 1233 public void sync(boolean forceSync) throws IOException { 1234 w.sync(forceSync); 1235 } 1236 1237 @Override 1238 public void append(Entry entry) throws IOException { 1239 List<Cell> cells = entry.getEdit().getCells(); 1240 if (WALEdit.isMetaEditFamily(cells.get(0))) { 1241 FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0)); 1242 if (desc != null) { 1243 for (FlushAction flushAction: flushActions) { 1244 if (desc.getAction().equals(flushAction)) { 1245 throw new IOException("Failed to append flush marker! " + flushAction); 1246 } 1247 } 1248 } 1249 } 1250 w.append(entry); 1251 } 1252 1253 @Override 1254 public long getLength() { 1255 return w.getLength(); 1256 } 1257 }; 1258 } 1259 } 1260 FailAppendFlushMarkerWAL wal = 1261 new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf), 1262 method, walConf); 1263 wal.init(); 1264 this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, 1265 HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family); 1266 try { 1267 int i = 0; 1268 Put put = new Put(Bytes.toBytes(i)); 1269 put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal 1270 put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i)); 1271 region.put(put); 1272 1273 // 1. Test case where START_FLUSH throws exception 1274 wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH}; 1275 1276 // start cache flush will throw exception 1277 try { 1278 region.flush(true); 1279 fail("This should have thrown exception"); 1280 } catch (DroppedSnapshotException unexpected) { 1281 // this should not be a dropped snapshot exception. Meaning that RS will not abort 1282 throw unexpected; 1283 } catch (IOException expected) { 1284 // expected 1285 } 1286 // The WAL is hosed now. It has two edits appended. We cannot roll the log without it 1287 // throwing a DroppedSnapshotException to force an abort. Just clean up the mess. 1288 region.close(true); 1289 wal.close(); 1290 1291 // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception 1292 wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH}; 1293 wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf), 1294 method, walConf); 1295 wal.init(); 1296 this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, 1297 HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family); 1298 region.put(put); 1299 1300 // 3. Test case where ABORT_FLUSH will throw exception. 1301 // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with 1302 // DroppedSnapshotException. Below COMMMIT_FLUSH will cause flush to abort 1303 wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH}; 1304 1305 try { 1306 region.flush(true); 1307 fail("This should have thrown exception"); 1308 } catch (DroppedSnapshotException expected) { 1309 // we expect this exception, since we were able to write the snapshot, but failed to 1310 // write the flush marker to WAL 1311 } catch (IOException unexpected) { 1312 throw unexpected; 1313 } 1314 } finally { 1315 HBaseTestingUtility.closeRegionAndWAL(this.region); 1316 this.region = null; 1317 } 1318 } 1319 1320 @Test 1321 public void testGetWhileRegionClose() throws IOException { 1322 Configuration hc = initSplit(); 1323 int numRows = 100; 1324 byte[][] families = { fam1, fam2, fam3 }; 1325 1326 // Setting up region 1327 this.region = initHRegion(tableName, method, hc, families); 1328 try { 1329 // Put data in region 1330 final int startRow = 100; 1331 putData(startRow, numRows, qual1, families); 1332 putData(startRow, numRows, qual2, families); 1333 putData(startRow, numRows, qual3, families); 1334 final AtomicBoolean done = new AtomicBoolean(false); 1335 final AtomicInteger gets = new AtomicInteger(0); 1336 GetTillDoneOrException[] threads = new GetTillDoneOrException[10]; 1337 try { 1338 // Set ten threads running concurrently getting from the region. 1339 for (int i = 0; i < threads.length / 2; i++) { 1340 threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets); 1341 threads[i].setDaemon(true); 1342 threads[i].start(); 1343 } 1344 // Artificially make the condition by setting closing flag explicitly. 1345 // I can't make the issue happen with a call to region.close(). 1346 this.region.closing.set(true); 1347 for (int i = threads.length / 2; i < threads.length; i++) { 1348 threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets); 1349 threads[i].setDaemon(true); 1350 threads[i].start(); 1351 } 1352 } finally { 1353 if (this.region != null) { 1354 HBaseTestingUtility.closeRegionAndWAL(this.region); 1355 } 1356 } 1357 done.set(true); 1358 for (GetTillDoneOrException t : threads) { 1359 try { 1360 t.join(); 1361 } catch (InterruptedException e) { 1362 e.printStackTrace(); 1363 } 1364 if (t.e != null) { 1365 LOG.info("Exception=" + t.e); 1366 assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException); 1367 } 1368 } 1369 } finally { 1370 HBaseTestingUtility.closeRegionAndWAL(this.region); 1371 this.region = null; 1372 } 1373 } 1374 1375 /* 1376 * Thread that does get on single row until 'done' flag is flipped. If an 1377 * exception causes us to fail, it records it. 1378 */ 1379 class GetTillDoneOrException extends Thread { 1380 private final Get g; 1381 private final AtomicBoolean done; 1382 private final AtomicInteger count; 1383 private Exception e; 1384 1385 GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, 1386 final AtomicInteger c) { 1387 super("getter." + i); 1388 this.g = new Get(r); 1389 this.done = d; 1390 this.count = c; 1391 } 1392 1393 @Override 1394 public void run() { 1395 while (!this.done.get()) { 1396 try { 1397 assertTrue(region.get(g).size() > 0); 1398 this.count.incrementAndGet(); 1399 } catch (Exception e) { 1400 this.e = e; 1401 break; 1402 } 1403 } 1404 } 1405 } 1406 1407 /* 1408 * An involved filter test. Has multiple column families and deletes in mix. 1409 */ 1410 @Test 1411 public void testWeirdCacheBehaviour() throws Exception { 1412 final TableName tableName = TableName.valueOf(name.getMethodName()); 1413 byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"), 1414 Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") }; 1415 this.region = initHRegion(tableName, method, CONF, FAMILIES); 1416 try { 1417 String value = "this is the value"; 1418 String value2 = "this is some other value"; 1419 String keyPrefix1 = "prefix1"; 1420 String keyPrefix2 = "prefix2"; 1421 String keyPrefix3 = "prefix3"; 1422 putRows(this.region, 3, value, keyPrefix1); 1423 putRows(this.region, 3, value, keyPrefix2); 1424 putRows(this.region, 3, value, keyPrefix3); 1425 putRows(this.region, 3, value2, keyPrefix1); 1426 putRows(this.region, 3, value2, keyPrefix2); 1427 putRows(this.region, 3, value2, keyPrefix3); 1428 System.out.println("Checking values for key: " + keyPrefix1); 1429 assertEquals("Got back incorrect number of rows from scan", 3, 1430 getNumberOfRows(keyPrefix1, value2, this.region)); 1431 System.out.println("Checking values for key: " + keyPrefix2); 1432 assertEquals("Got back incorrect number of rows from scan", 3, 1433 getNumberOfRows(keyPrefix2, value2, this.region)); 1434 System.out.println("Checking values for key: " + keyPrefix3); 1435 assertEquals("Got back incorrect number of rows from scan", 3, 1436 getNumberOfRows(keyPrefix3, value2, this.region)); 1437 deleteColumns(this.region, value2, keyPrefix1); 1438 deleteColumns(this.region, value2, keyPrefix2); 1439 deleteColumns(this.region, value2, keyPrefix3); 1440 System.out.println("Starting important checks....."); 1441 assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0, 1442 getNumberOfRows(keyPrefix1, value2, this.region)); 1443 assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0, 1444 getNumberOfRows(keyPrefix2, value2, this.region)); 1445 assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0, 1446 getNumberOfRows(keyPrefix3, value2, this.region)); 1447 } finally { 1448 HBaseTestingUtility.closeRegionAndWAL(this.region); 1449 this.region = null; 1450 } 1451 } 1452 1453 @Test 1454 public void testAppendWithReadOnlyTable() throws Exception { 1455 final TableName tableName = TableName.valueOf(name.getMethodName()); 1456 this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily")); 1457 boolean exceptionCaught = false; 1458 Append append = new Append(Bytes.toBytes("somerow")); 1459 append.setDurability(Durability.SKIP_WAL); 1460 append.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1461 Bytes.toBytes("somevalue")); 1462 try { 1463 region.append(append); 1464 } catch (IOException e) { 1465 exceptionCaught = true; 1466 } finally { 1467 HBaseTestingUtility.closeRegionAndWAL(this.region); 1468 this.region = null; 1469 } 1470 assertTrue(exceptionCaught == true); 1471 } 1472 1473 @Test 1474 public void testIncrWithReadOnlyTable() throws Exception { 1475 final TableName tableName = TableName.valueOf(name.getMethodName()); 1476 this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily")); 1477 boolean exceptionCaught = false; 1478 Increment inc = new Increment(Bytes.toBytes("somerow")); 1479 inc.setDurability(Durability.SKIP_WAL); 1480 inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L); 1481 try { 1482 region.increment(inc); 1483 } catch (IOException e) { 1484 exceptionCaught = true; 1485 } finally { 1486 HBaseTestingUtility.closeRegionAndWAL(this.region); 1487 this.region = null; 1488 } 1489 assertTrue(exceptionCaught == true); 1490 } 1491 1492 private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException { 1493 InternalScanner scanner = buildScanner(keyPrefix, value, r); 1494 int count = 0; 1495 boolean more = false; 1496 List<Cell> results = new ArrayList<>(); 1497 do { 1498 more = scanner.next(results); 1499 if (results != null && !results.isEmpty()) 1500 count++; 1501 else 1502 break; 1503 Delete delete = new Delete(CellUtil.cloneRow(results.get(0))); 1504 delete.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2")); 1505 r.delete(delete); 1506 results.clear(); 1507 } while (more); 1508 assertEquals("Did not perform correct number of deletes", 3, count); 1509 } 1510 1511 private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception { 1512 InternalScanner resultScanner = buildScanner(keyPrefix, value, r); 1513 int numberOfResults = 0; 1514 List<Cell> results = new ArrayList<>(); 1515 boolean more = false; 1516 do { 1517 more = resultScanner.next(results); 1518 if (results != null && !results.isEmpty()) 1519 numberOfResults++; 1520 else 1521 break; 1522 for (Cell kv : results) { 1523 System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv))); 1524 } 1525 results.clear(); 1526 } while (more); 1527 return numberOfResults; 1528 } 1529 1530 private InternalScanner buildScanner(String keyPrefix, String value, HRegion r) 1531 throws IOException { 1532 // Defaults FilterList.Operator.MUST_PASS_ALL. 1533 FilterList allFilters = new FilterList(); 1534 allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix))); 1535 // Only return rows where this column value exists in the row. 1536 SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"), 1537 Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value)); 1538 filter.setFilterIfMissing(true); 1539 allFilters.addFilter(filter); 1540 Scan scan = new Scan(); 1541 scan.addFamily(Bytes.toBytes("trans-blob")); 1542 scan.addFamily(Bytes.toBytes("trans-type")); 1543 scan.addFamily(Bytes.toBytes("trans-date")); 1544 scan.addFamily(Bytes.toBytes("trans-tags")); 1545 scan.addFamily(Bytes.toBytes("trans-group")); 1546 scan.setFilter(allFilters); 1547 return r.getScanner(scan); 1548 } 1549 1550 private void putRows(HRegion r, int numRows, String value, String key) throws IOException { 1551 for (int i = 0; i < numRows; i++) { 1552 String row = key + "_" + i/* UUID.randomUUID().toString() */; 1553 System.out.println(String.format("Saving row: %s, with value %s", row, value)); 1554 Put put = new Put(Bytes.toBytes(row)); 1555 put.setDurability(Durability.SKIP_WAL); 1556 put.addColumn(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob")); 1557 put.addColumn(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement")); 1558 put.addColumn(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999")); 1559 put.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value)); 1560 put.addColumn(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId")); 1561 r.put(put); 1562 } 1563 } 1564 1565 @Test 1566 public void testFamilyWithAndWithoutColon() throws Exception { 1567 byte[] cf = Bytes.toBytes(COLUMN_FAMILY); 1568 this.region = initHRegion(tableName, method, CONF, cf); 1569 try { 1570 Put p = new Put(tableName.toBytes()); 1571 byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":"); 1572 p.addColumn(cfwithcolon, cfwithcolon, cfwithcolon); 1573 boolean exception = false; 1574 try { 1575 this.region.put(p); 1576 } catch (NoSuchColumnFamilyException e) { 1577 exception = true; 1578 } 1579 assertTrue(exception); 1580 } finally { 1581 HBaseTestingUtility.closeRegionAndWAL(this.region); 1582 this.region = null; 1583 } 1584 } 1585 1586 @Test 1587 public void testBatchPut_whileNoRowLocksHeld() throws IOException { 1588 final Put[] puts = new Put[10]; 1589 MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); 1590 try { 1591 long syncs = prepareRegionForBachPut(puts, source, false); 1592 1593 OperationStatus[] codes = this.region.batchMutate(puts); 1594 assertEquals(10, codes.length); 1595 for (int i = 0; i < 10; i++) { 1596 assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); 1597 } 1598 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); 1599 1600 LOG.info("Next a batch put with one invalid family"); 1601 puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); 1602 codes = this.region.batchMutate(puts); 1603 assertEquals(10, codes.length); 1604 for (int i = 0; i < 10; i++) { 1605 assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, 1606 codes[i].getOperationStatusCode()); 1607 } 1608 1609 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source); 1610 } finally { 1611 HBaseTestingUtility.closeRegionAndWAL(this.region); 1612 this.region = null; 1613 } 1614 } 1615 1616 @Test 1617 public void testBatchPut_whileMultipleRowLocksHeld() throws Exception { 1618 final Put[] puts = new Put[10]; 1619 MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); 1620 try { 1621 long syncs = prepareRegionForBachPut(puts, source, false); 1622 1623 puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); 1624 1625 LOG.info("batchPut will have to break into four batches to avoid row locks"); 1626 RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2")); 1627 RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_1")); 1628 RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3")); 1629 RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true); 1630 1631 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); 1632 final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>(); 1633 final CountDownLatch startingPuts = new CountDownLatch(1); 1634 final CountDownLatch startingClose = new CountDownLatch(1); 1635 TestThread putter = new TestThread(ctx) { 1636 @Override 1637 public void doWork() throws IOException { 1638 startingPuts.countDown(); 1639 retFromThread.set(region.batchMutate(puts)); 1640 } 1641 }; 1642 LOG.info("...starting put thread while holding locks"); 1643 ctx.addThread(putter); 1644 ctx.startThreads(); 1645 1646 // Now attempt to close the region from another thread. Prior to HBASE-12565 1647 // this would cause the in-progress batchMutate operation to to fail with 1648 // exception because it use to release and re-acquire the close-guard lock 1649 // between batches. Caller then didn't get status indicating which writes succeeded. 1650 // We now expect this thread to block until the batchMutate call finishes. 1651 Thread regionCloseThread = new TestThread(ctx) { 1652 @Override 1653 public void doWork() { 1654 try { 1655 startingPuts.await(); 1656 // Give some time for the batch mutate to get in. 1657 // We don't want to race with the mutate 1658 Thread.sleep(10); 1659 startingClose.countDown(); 1660 HBaseTestingUtility.closeRegionAndWAL(region); 1661 } catch (IOException e) { 1662 throw new RuntimeException(e); 1663 } catch (InterruptedException e) { 1664 throw new RuntimeException(e); 1665 } 1666 } 1667 }; 1668 regionCloseThread.start(); 1669 1670 startingClose.await(); 1671 startingPuts.await(); 1672 Thread.sleep(100); 1673 LOG.info("...releasing row lock 1, which should let put thread continue"); 1674 rowLock1.release(); 1675 rowLock2.release(); 1676 rowLock3.release(); 1677 waitForCounter(source, "syncTimeNumOps", syncs + 1); 1678 1679 LOG.info("...joining on put thread"); 1680 ctx.stop(); 1681 regionCloseThread.join(); 1682 1683 OperationStatus[] codes = retFromThread.get(); 1684 for (int i = 0; i < codes.length; i++) { 1685 assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, 1686 codes[i].getOperationStatusCode()); 1687 } 1688 rowLock4.release(); 1689 } finally { 1690 HBaseTestingUtility.closeRegionAndWAL(this.region); 1691 this.region = null; 1692 } 1693 } 1694 1695 private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount) 1696 throws InterruptedException { 1697 long startWait = System.currentTimeMillis(); 1698 long currentCount; 1699 while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) { 1700 Thread.sleep(100); 1701 if (System.currentTimeMillis() - startWait > 10000) { 1702 fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName, 1703 expectedCount, currentCount)); 1704 } 1705 } 1706 } 1707 1708 @Test 1709 public void testAtomicBatchPut() throws IOException { 1710 final Put[] puts = new Put[10]; 1711 MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); 1712 try { 1713 long syncs = prepareRegionForBachPut(puts, source, false); 1714 1715 // 1. Straight forward case, should succeed 1716 MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true, 1717 HConstants.NO_NONCE, HConstants.NO_NONCE); 1718 OperationStatus[] codes = this.region.batchMutate(batchOp); 1719 assertEquals(10, codes.length); 1720 for (int i = 0; i < 10; i++) { 1721 assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); 1722 } 1723 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); 1724 1725 // 2. Failed to get lock 1726 RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3)); 1727 // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this 1728 // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread 1729 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); 1730 final AtomicReference<IOException> retFromThread = new AtomicReference<>(); 1731 final CountDownLatch finishedPuts = new CountDownLatch(1); 1732 final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true, 1733 HConstants 1734 .NO_NONCE, 1735 HConstants.NO_NONCE); 1736 TestThread putter = new TestThread(ctx) { 1737 @Override 1738 public void doWork() throws IOException { 1739 try { 1740 region.batchMutate(finalBatchOp); 1741 } catch (IOException ioe) { 1742 LOG.error("test failed!", ioe); 1743 retFromThread.set(ioe); 1744 } 1745 finishedPuts.countDown(); 1746 } 1747 }; 1748 LOG.info("...starting put thread while holding locks"); 1749 ctx.addThread(putter); 1750 ctx.startThreads(); 1751 LOG.info("...waiting for batch puts while holding locks"); 1752 try { 1753 finishedPuts.await(); 1754 } catch (InterruptedException e) { 1755 LOG.error("Interrupted!", e); 1756 } finally { 1757 if (lock != null) { 1758 lock.release(); 1759 } 1760 } 1761 assertNotNull(retFromThread.get()); 1762 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); 1763 1764 // 3. Exception thrown in validation 1765 LOG.info("Next a batch put with one invalid family"); 1766 puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); 1767 batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE, 1768 HConstants.NO_NONCE); 1769 thrown.expect(NoSuchColumnFamilyException.class); 1770 this.region.batchMutate(batchOp); 1771 } finally { 1772 HBaseTestingUtility.closeRegionAndWAL(this.region); 1773 this.region = null; 1774 } 1775 } 1776 1777 @Test 1778 public void testBatchPutWithTsSlop() throws Exception { 1779 // add data with a timestamp that is too recent for range. Ensure assert 1780 CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000); 1781 final Put[] puts = new Put[10]; 1782 MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); 1783 1784 try { 1785 long syncs = prepareRegionForBachPut(puts, source, true); 1786 1787 OperationStatus[] codes = this.region.batchMutate(puts); 1788 assertEquals(10, codes.length); 1789 for (int i = 0; i < 10; i++) { 1790 assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode()); 1791 } 1792 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); 1793 } finally { 1794 HBaseTestingUtility.closeRegionAndWAL(this.region); 1795 this.region = null; 1796 } 1797 } 1798 1799 /** 1800 * @return syncs initial syncTimeNumOps 1801 */ 1802 private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source, 1803 boolean slop) throws IOException { 1804 this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES); 1805 1806 LOG.info("First a batch put with all valid puts"); 1807 for (int i = 0; i < puts.length; i++) { 1808 puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) : 1809 new Put(Bytes.toBytes("row_" + i)); 1810 puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value); 1811 } 1812 1813 long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); 1814 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); 1815 return syncs; 1816 } 1817 1818 // //////////////////////////////////////////////////////////////////////////// 1819 // checkAndMutate tests 1820 // //////////////////////////////////////////////////////////////////////////// 1821 @Test 1822 public void testCheckAndMutate_WithEmptyRowValue() throws IOException { 1823 byte[] row1 = Bytes.toBytes("row1"); 1824 byte[] fam1 = Bytes.toBytes("fam1"); 1825 byte[] qf1 = Bytes.toBytes("qualifier"); 1826 byte[] emptyVal = new byte[] {}; 1827 byte[] val1 = Bytes.toBytes("value1"); 1828 byte[] val2 = Bytes.toBytes("value2"); 1829 1830 // Setting up region 1831 this.region = initHRegion(tableName, method, CONF, fam1); 1832 try { 1833 // Putting empty data in key 1834 Put put = new Put(row1); 1835 put.addColumn(fam1, qf1, emptyVal); 1836 1837 // checkAndPut with empty value 1838 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator( 1839 emptyVal), put); 1840 assertTrue(res); 1841 1842 // Putting data in key 1843 put = new Put(row1); 1844 put.addColumn(fam1, qf1, val1); 1845 1846 // checkAndPut with correct value 1847 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal), 1848 put); 1849 assertTrue(res); 1850 1851 // not empty anymore 1852 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal), 1853 put); 1854 assertFalse(res); 1855 1856 Delete delete = new Delete(row1); 1857 delete.addColumn(fam1, qf1); 1858 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal), 1859 delete); 1860 assertFalse(res); 1861 1862 put = new Put(row1); 1863 put.addColumn(fam1, qf1, val2); 1864 // checkAndPut with correct value 1865 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1), 1866 put); 1867 assertTrue(res); 1868 1869 // checkAndDelete with correct value 1870 delete = new Delete(row1); 1871 delete.addColumn(fam1, qf1); 1872 delete.addColumn(fam1, qf1); 1873 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val2), 1874 delete); 1875 assertTrue(res); 1876 1877 delete = new Delete(row1); 1878 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal), 1879 delete); 1880 assertTrue(res); 1881 1882 // checkAndPut looking for a null value 1883 put = new Put(row1); 1884 put.addColumn(fam1, qf1, val1); 1885 1886 res = region 1887 .checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put); 1888 assertTrue(res); 1889 } finally { 1890 HBaseTestingUtility.closeRegionAndWAL(this.region); 1891 this.region = null; 1892 } 1893 } 1894 1895 @Test 1896 public void testCheckAndMutate_WithWrongValue() throws IOException { 1897 byte[] row1 = Bytes.toBytes("row1"); 1898 byte[] fam1 = Bytes.toBytes("fam1"); 1899 byte[] qf1 = Bytes.toBytes("qualifier"); 1900 byte[] val1 = Bytes.toBytes("value1"); 1901 byte[] val2 = Bytes.toBytes("value2"); 1902 BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE); 1903 BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE); 1904 1905 // Setting up region 1906 this.region = initHRegion(tableName, method, CONF, fam1); 1907 try { 1908 // Putting data in key 1909 Put put = new Put(row1); 1910 put.addColumn(fam1, qf1, val1); 1911 region.put(put); 1912 1913 // checkAndPut with wrong value 1914 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator( 1915 val2), put); 1916 assertEquals(false, res); 1917 1918 // checkAndDelete with wrong value 1919 Delete delete = new Delete(row1); 1920 delete.addFamily(fam1); 1921 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val2), 1922 put); 1923 assertEquals(false, res); 1924 1925 // Putting data in key 1926 put = new Put(row1); 1927 put.addColumn(fam1, qf1, Bytes.toBytes(bd1)); 1928 region.put(put); 1929 1930 // checkAndPut with wrong value 1931 res = 1932 region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator( 1933 bd2), put); 1934 assertEquals(false, res); 1935 1936 // checkAndDelete with wrong value 1937 delete = new Delete(row1); 1938 delete.addFamily(fam1); 1939 res = 1940 region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator( 1941 bd2), put); 1942 assertEquals(false, res); 1943 } finally { 1944 HBaseTestingUtility.closeRegionAndWAL(this.region); 1945 this.region = null; 1946 } 1947 } 1948 1949 @Test 1950 public void testCheckAndMutate_WithCorrectValue() throws IOException { 1951 byte[] row1 = Bytes.toBytes("row1"); 1952 byte[] fam1 = Bytes.toBytes("fam1"); 1953 byte[] qf1 = Bytes.toBytes("qualifier"); 1954 byte[] val1 = Bytes.toBytes("value1"); 1955 BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE); 1956 1957 // Setting up region 1958 this.region = initHRegion(tableName, method, CONF, fam1); 1959 try { 1960 // Putting data in key 1961 Put put = new Put(row1); 1962 put.addColumn(fam1, qf1, val1); 1963 region.put(put); 1964 1965 // checkAndPut with correct value 1966 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator( 1967 val1), put); 1968 assertEquals(true, res); 1969 1970 // checkAndDelete with correct value 1971 Delete delete = new Delete(row1); 1972 delete.addColumn(fam1, qf1); 1973 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1), 1974 delete); 1975 assertEquals(true, res); 1976 1977 // Putting data in key 1978 put = new Put(row1); 1979 put.addColumn(fam1, qf1, Bytes.toBytes(bd1)); 1980 region.put(put); 1981 1982 // checkAndPut with correct value 1983 res = 1984 region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator( 1985 bd1), put); 1986 assertEquals(true, res); 1987 1988 // checkAndDelete with correct value 1989 delete = new Delete(row1); 1990 delete.addColumn(fam1, qf1); 1991 res = 1992 region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator( 1993 bd1), delete); 1994 assertEquals(true, res); 1995 } finally { 1996 HBaseTestingUtility.closeRegionAndWAL(this.region); 1997 this.region = null; 1998 } 1999 } 2000 2001 @Test 2002 public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException { 2003 byte[] row1 = Bytes.toBytes("row1"); 2004 byte[] fam1 = Bytes.toBytes("fam1"); 2005 byte[] qf1 = Bytes.toBytes("qualifier"); 2006 byte[] val1 = Bytes.toBytes("value1"); 2007 byte[] val2 = Bytes.toBytes("value2"); 2008 byte[] val3 = Bytes.toBytes("value3"); 2009 byte[] val4 = Bytes.toBytes("value4"); 2010 2011 // Setting up region 2012 this.region = initHRegion(tableName, method, CONF, fam1); 2013 try { 2014 // Putting val3 in key 2015 Put put = new Put(row1); 2016 put.addColumn(fam1, qf1, val3); 2017 region.put(put); 2018 2019 // Test CompareOp.LESS: original = val3, compare with val3, fail 2020 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS, 2021 new BinaryComparator(val3), put); 2022 assertEquals(false, res); 2023 2024 // Test CompareOp.LESS: original = val3, compare with val4, fail 2025 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS, 2026 new BinaryComparator(val4), put); 2027 assertEquals(false, res); 2028 2029 // Test CompareOp.LESS: original = val3, compare with val2, 2030 // succeed (now value = val2) 2031 put = new Put(row1); 2032 put.addColumn(fam1, qf1, val2); 2033 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS, 2034 new BinaryComparator(val2), put); 2035 assertEquals(true, res); 2036 2037 // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail 2038 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL, 2039 new BinaryComparator(val3), put); 2040 assertEquals(false, res); 2041 2042 // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2, 2043 // succeed (value still = val2) 2044 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL, 2045 new BinaryComparator(val2), put); 2046 assertEquals(true, res); 2047 2048 // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1, 2049 // succeed (now value = val3) 2050 put = new Put(row1); 2051 put.addColumn(fam1, qf1, val3); 2052 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL, 2053 new BinaryComparator(val1), put); 2054 assertEquals(true, res); 2055 2056 // Test CompareOp.GREATER: original = val3, compare with val3, fail 2057 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER, 2058 new BinaryComparator(val3), put); 2059 assertEquals(false, res); 2060 2061 // Test CompareOp.GREATER: original = val3, compare with val2, fail 2062 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER, 2063 new BinaryComparator(val2), put); 2064 assertEquals(false, res); 2065 2066 // Test CompareOp.GREATER: original = val3, compare with val4, 2067 // succeed (now value = val2) 2068 put = new Put(row1); 2069 put.addColumn(fam1, qf1, val2); 2070 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER, 2071 new BinaryComparator(val4), put); 2072 assertEquals(true, res); 2073 2074 // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail 2075 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL, 2076 new BinaryComparator(val1), put); 2077 assertEquals(false, res); 2078 2079 // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2, 2080 // succeed (value still = val2) 2081 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL, 2082 new BinaryComparator(val2), put); 2083 assertEquals(true, res); 2084 2085 // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed 2086 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL, 2087 new BinaryComparator(val3), put); 2088 assertEquals(true, res); 2089 } finally { 2090 HBaseTestingUtility.closeRegionAndWAL(this.region); 2091 this.region = null; 2092 } 2093 } 2094 2095 @Test 2096 public void testCheckAndPut_ThatPutWasWritten() throws IOException { 2097 byte[] row1 = Bytes.toBytes("row1"); 2098 byte[] fam1 = Bytes.toBytes("fam1"); 2099 byte[] fam2 = Bytes.toBytes("fam2"); 2100 byte[] qf1 = Bytes.toBytes("qualifier"); 2101 byte[] val1 = Bytes.toBytes("value1"); 2102 byte[] val2 = Bytes.toBytes("value2"); 2103 2104 byte[][] families = { fam1, fam2 }; 2105 2106 // Setting up region 2107 this.region = initHRegion(tableName, method, CONF, families); 2108 try { 2109 // Putting data in the key to check 2110 Put put = new Put(row1); 2111 put.addColumn(fam1, qf1, val1); 2112 region.put(put); 2113 2114 // Creating put to add 2115 long ts = System.currentTimeMillis(); 2116 KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2); 2117 put = new Put(row1); 2118 put.add(kv); 2119 2120 // checkAndPut with wrong value 2121 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator( 2122 val1), put); 2123 assertEquals(true, res); 2124 2125 Get get = new Get(row1); 2126 get.addColumn(fam2, qf1); 2127 Cell[] actual = region.get(get).rawCells(); 2128 2129 Cell[] expected = { kv }; 2130 2131 assertEquals(expected.length, actual.length); 2132 for (int i = 0; i < actual.length; i++) { 2133 assertEquals(expected[i], actual[i]); 2134 } 2135 } finally { 2136 HBaseTestingUtility.closeRegionAndWAL(this.region); 2137 this.region = null; 2138 } 2139 } 2140 2141 @Test 2142 public void testCheckAndPut_wrongRowInPut() throws IOException { 2143 this.region = initHRegion(tableName, method, CONF, COLUMNS); 2144 try { 2145 Put put = new Put(row2); 2146 put.addColumn(fam1, qual1, value1); 2147 try { 2148 region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, 2149 new BinaryComparator(value2), put); 2150 fail(); 2151 } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) { 2152 // expected exception. 2153 } 2154 } finally { 2155 HBaseTestingUtility.closeRegionAndWAL(this.region); 2156 this.region = null; 2157 } 2158 } 2159 2160 @Test 2161 public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException { 2162 byte[] row1 = Bytes.toBytes("row1"); 2163 byte[] fam1 = Bytes.toBytes("fam1"); 2164 byte[] fam2 = Bytes.toBytes("fam2"); 2165 byte[] qf1 = Bytes.toBytes("qualifier1"); 2166 byte[] qf2 = Bytes.toBytes("qualifier2"); 2167 byte[] qf3 = Bytes.toBytes("qualifier3"); 2168 byte[] val1 = Bytes.toBytes("value1"); 2169 byte[] val2 = Bytes.toBytes("value2"); 2170 byte[] val3 = Bytes.toBytes("value3"); 2171 byte[] emptyVal = new byte[] {}; 2172 2173 byte[][] families = { fam1, fam2 }; 2174 2175 // Setting up region 2176 this.region = initHRegion(tableName, method, CONF, families); 2177 try { 2178 // Put content 2179 Put put = new Put(row1); 2180 put.addColumn(fam1, qf1, val1); 2181 region.put(put); 2182 Threads.sleep(2); 2183 2184 put = new Put(row1); 2185 put.addColumn(fam1, qf1, val2); 2186 put.addColumn(fam2, qf1, val3); 2187 put.addColumn(fam2, qf2, val2); 2188 put.addColumn(fam2, qf3, val1); 2189 put.addColumn(fam1, qf3, val1); 2190 region.put(put); 2191 2192 // Multi-column delete 2193 Delete delete = new Delete(row1); 2194 delete.addColumn(fam1, qf1); 2195 delete.addColumn(fam2, qf1); 2196 delete.addColumn(fam1, qf3); 2197 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator( 2198 val2), delete); 2199 assertEquals(true, res); 2200 2201 Get get = new Get(row1); 2202 get.addColumn(fam1, qf1); 2203 get.addColumn(fam1, qf3); 2204 get.addColumn(fam2, qf2); 2205 Result r = region.get(get); 2206 assertEquals(2, r.size()); 2207 assertArrayEquals(val1, r.getValue(fam1, qf1)); 2208 assertArrayEquals(val2, r.getValue(fam2, qf2)); 2209 2210 // Family delete 2211 delete = new Delete(row1); 2212 delete.addFamily(fam2); 2213 res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal), 2214 delete); 2215 assertEquals(true, res); 2216 2217 get = new Get(row1); 2218 r = region.get(get); 2219 assertEquals(1, r.size()); 2220 assertArrayEquals(val1, r.getValue(fam1, qf1)); 2221 2222 // Row delete 2223 delete = new Delete(row1); 2224 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1), 2225 delete); 2226 assertEquals(true, res); 2227 get = new Get(row1); 2228 r = region.get(get); 2229 assertEquals(0, r.size()); 2230 } finally { 2231 HBaseTestingUtility.closeRegionAndWAL(this.region); 2232 this.region = null; 2233 } 2234 } 2235 2236 // //////////////////////////////////////////////////////////////////////////// 2237 // Delete tests 2238 // //////////////////////////////////////////////////////////////////////////// 2239 @Test 2240 public void testDelete_multiDeleteColumn() throws IOException { 2241 byte[] row1 = Bytes.toBytes("row1"); 2242 byte[] fam1 = Bytes.toBytes("fam1"); 2243 byte[] qual = Bytes.toBytes("qualifier"); 2244 byte[] value = Bytes.toBytes("value"); 2245 2246 Put put = new Put(row1); 2247 put.addColumn(fam1, qual, 1, value); 2248 put.addColumn(fam1, qual, 2, value); 2249 2250 this.region = initHRegion(tableName, method, CONF, fam1); 2251 try { 2252 region.put(put); 2253 2254 // We do support deleting more than 1 'latest' version 2255 Delete delete = new Delete(row1); 2256 delete.addColumn(fam1, qual); 2257 delete.addColumn(fam1, qual); 2258 region.delete(delete); 2259 2260 Get get = new Get(row1); 2261 get.addFamily(fam1); 2262 Result r = region.get(get); 2263 assertEquals(0, r.size()); 2264 } finally { 2265 HBaseTestingUtility.closeRegionAndWAL(this.region); 2266 this.region = null; 2267 } 2268 } 2269 2270 @Test 2271 public void testDelete_CheckFamily() throws IOException { 2272 byte[] row1 = Bytes.toBytes("row1"); 2273 byte[] fam1 = Bytes.toBytes("fam1"); 2274 byte[] fam2 = Bytes.toBytes("fam2"); 2275 byte[] fam3 = Bytes.toBytes("fam3"); 2276 byte[] fam4 = Bytes.toBytes("fam4"); 2277 2278 // Setting up region 2279 this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3); 2280 try { 2281 List<Cell> kvs = new ArrayList<>(); 2282 kvs.add(new KeyValue(row1, fam4, null, null)); 2283 2284 // testing existing family 2285 byte[] family = fam2; 2286 try { 2287 NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 2288 deleteMap.put(family, kvs); 2289 region.delete(deleteMap, Durability.SYNC_WAL); 2290 } catch (Exception e) { 2291 fail("Family " + new String(family, StandardCharsets.UTF_8) + " does not exist"); 2292 } 2293 2294 // testing non existing family 2295 boolean ok = false; 2296 family = fam4; 2297 try { 2298 NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 2299 deleteMap.put(family, kvs); 2300 region.delete(deleteMap, Durability.SYNC_WAL); 2301 } catch (Exception e) { 2302 ok = true; 2303 } 2304 assertTrue("Family " + new String(family, StandardCharsets.UTF_8) + " does exist", ok); 2305 } finally { 2306 HBaseTestingUtility.closeRegionAndWAL(this.region); 2307 this.region = null; 2308 } 2309 } 2310 2311 @Test 2312 public void testDelete_mixed() throws IOException, InterruptedException { 2313 byte[] fam = Bytes.toBytes("info"); 2314 byte[][] families = { fam }; 2315 this.region = initHRegion(tableName, method, CONF, families); 2316 try { 2317 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 2318 2319 byte[] row = Bytes.toBytes("table_name"); 2320 // column names 2321 byte[] serverinfo = Bytes.toBytes("serverinfo"); 2322 byte[] splitA = Bytes.toBytes("splitA"); 2323 byte[] splitB = Bytes.toBytes("splitB"); 2324 2325 // add some data: 2326 Put put = new Put(row); 2327 put.addColumn(fam, splitA, Bytes.toBytes("reference_A")); 2328 region.put(put); 2329 2330 put = new Put(row); 2331 put.addColumn(fam, splitB, Bytes.toBytes("reference_B")); 2332 region.put(put); 2333 2334 put = new Put(row); 2335 put.addColumn(fam, serverinfo, Bytes.toBytes("ip_address")); 2336 region.put(put); 2337 2338 // ok now delete a split: 2339 Delete delete = new Delete(row); 2340 delete.addColumns(fam, splitA); 2341 region.delete(delete); 2342 2343 // assert some things: 2344 Get get = new Get(row).addColumn(fam, serverinfo); 2345 Result result = region.get(get); 2346 assertEquals(1, result.size()); 2347 2348 get = new Get(row).addColumn(fam, splitA); 2349 result = region.get(get); 2350 assertEquals(0, result.size()); 2351 2352 get = new Get(row).addColumn(fam, splitB); 2353 result = region.get(get); 2354 assertEquals(1, result.size()); 2355 2356 // Assert that after a delete, I can put. 2357 put = new Put(row); 2358 put.addColumn(fam, splitA, Bytes.toBytes("reference_A")); 2359 region.put(put); 2360 get = new Get(row); 2361 result = region.get(get); 2362 assertEquals(3, result.size()); 2363 2364 // Now delete all... then test I can add stuff back 2365 delete = new Delete(row); 2366 region.delete(delete); 2367 assertEquals(0, region.get(get).size()); 2368 2369 region.put(new Put(row).addColumn(fam, splitA, Bytes.toBytes("reference_A"))); 2370 result = region.get(get); 2371 assertEquals(1, result.size()); 2372 } finally { 2373 HBaseTestingUtility.closeRegionAndWAL(this.region); 2374 this.region = null; 2375 } 2376 } 2377 2378 @Test 2379 public void testDeleteRowWithFutureTs() throws IOException { 2380 byte[] fam = Bytes.toBytes("info"); 2381 byte[][] families = { fam }; 2382 this.region = initHRegion(tableName, method, CONF, families); 2383 try { 2384 byte[] row = Bytes.toBytes("table_name"); 2385 // column names 2386 byte[] serverinfo = Bytes.toBytes("serverinfo"); 2387 2388 // add data in the far future 2389 Put put = new Put(row); 2390 put.addColumn(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value")); 2391 region.put(put); 2392 2393 // now delete something in the present 2394 Delete delete = new Delete(row); 2395 region.delete(delete); 2396 2397 // make sure we still see our data 2398 Get get = new Get(row).addColumn(fam, serverinfo); 2399 Result result = region.get(get); 2400 assertEquals(1, result.size()); 2401 2402 // delete the future row 2403 delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3); 2404 region.delete(delete); 2405 2406 // make sure it is gone 2407 get = new Get(row).addColumn(fam, serverinfo); 2408 result = region.get(get); 2409 assertEquals(0, result.size()); 2410 } finally { 2411 HBaseTestingUtility.closeRegionAndWAL(this.region); 2412 this.region = null; 2413 } 2414 } 2415 2416 /** 2417 * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by 2418 * the actual timestamp 2419 */ 2420 @Test 2421 public void testPutWithLatestTS() throws IOException { 2422 byte[] fam = Bytes.toBytes("info"); 2423 byte[][] families = { fam }; 2424 this.region = initHRegion(tableName, method, CONF, families); 2425 try { 2426 byte[] row = Bytes.toBytes("row1"); 2427 // column names 2428 byte[] qual = Bytes.toBytes("qual"); 2429 2430 // add data with LATEST_TIMESTAMP, put without WAL 2431 Put put = new Put(row); 2432 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value")); 2433 region.put(put); 2434 2435 // Make sure it shows up with an actual timestamp 2436 Get get = new Get(row).addColumn(fam, qual); 2437 Result result = region.get(get); 2438 assertEquals(1, result.size()); 2439 Cell kv = result.rawCells()[0]; 2440 LOG.info("Got: " + kv); 2441 assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp", 2442 kv.getTimestamp() != HConstants.LATEST_TIMESTAMP); 2443 2444 // Check same with WAL enabled (historically these took different 2445 // code paths, so check both) 2446 row = Bytes.toBytes("row2"); 2447 put = new Put(row); 2448 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value")); 2449 region.put(put); 2450 2451 // Make sure it shows up with an actual timestamp 2452 get = new Get(row).addColumn(fam, qual); 2453 result = region.get(get); 2454 assertEquals(1, result.size()); 2455 kv = result.rawCells()[0]; 2456 LOG.info("Got: " + kv); 2457 assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp", 2458 kv.getTimestamp() != HConstants.LATEST_TIMESTAMP); 2459 } finally { 2460 HBaseTestingUtility.closeRegionAndWAL(this.region); 2461 this.region = null; 2462 } 2463 2464 } 2465 2466 /** 2467 * Tests that there is server-side filtering for invalid timestamp upper 2468 * bound. Note that the timestamp lower bound is automatically handled for us 2469 * by the TTL field. 2470 */ 2471 @Test 2472 public void testPutWithTsSlop() throws IOException { 2473 byte[] fam = Bytes.toBytes("info"); 2474 byte[][] families = { fam }; 2475 2476 // add data with a timestamp that is too recent for range. Ensure assert 2477 CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000); 2478 this.region = initHRegion(tableName, method, CONF, families); 2479 boolean caughtExcep = false; 2480 try { 2481 try { 2482 // no TS specified == use latest. should not error 2483 region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), Bytes.toBytes("value"))); 2484 // TS out of range. should error 2485 region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), 2486 System.currentTimeMillis() + 2000, Bytes.toBytes("value"))); 2487 fail("Expected IOE for TS out of configured timerange"); 2488 } catch (FailedSanityCheckException ioe) { 2489 LOG.debug("Received expected exception", ioe); 2490 caughtExcep = true; 2491 } 2492 assertTrue("Should catch FailedSanityCheckException", caughtExcep); 2493 } finally { 2494 HBaseTestingUtility.closeRegionAndWAL(this.region); 2495 this.region = null; 2496 } 2497 } 2498 2499 @Test 2500 public void testScanner_DeleteOneFamilyNotAnother() throws IOException { 2501 byte[] fam1 = Bytes.toBytes("columnA"); 2502 byte[] fam2 = Bytes.toBytes("columnB"); 2503 this.region = initHRegion(tableName, method, CONF, fam1, fam2); 2504 try { 2505 byte[] rowA = Bytes.toBytes("rowA"); 2506 byte[] rowB = Bytes.toBytes("rowB"); 2507 2508 byte[] value = Bytes.toBytes("value"); 2509 2510 Delete delete = new Delete(rowA); 2511 delete.addFamily(fam1); 2512 2513 region.delete(delete); 2514 2515 // now create data. 2516 Put put = new Put(rowA); 2517 put.addColumn(fam2, null, value); 2518 region.put(put); 2519 2520 put = new Put(rowB); 2521 put.addColumn(fam1, null, value); 2522 put.addColumn(fam2, null, value); 2523 region.put(put); 2524 2525 Scan scan = new Scan(); 2526 scan.addFamily(fam1).addFamily(fam2); 2527 InternalScanner s = region.getScanner(scan); 2528 List<Cell> results = new ArrayList<>(); 2529 s.next(results); 2530 assertTrue(CellUtil.matchingRows(results.get(0), rowA)); 2531 2532 results.clear(); 2533 s.next(results); 2534 assertTrue(CellUtil.matchingRows(results.get(0), rowB)); 2535 } finally { 2536 HBaseTestingUtility.closeRegionAndWAL(this.region); 2537 this.region = null; 2538 } 2539 } 2540 2541 @Test 2542 public void testDataInMemoryWithoutWAL() throws IOException { 2543 FileSystem fs = FileSystem.get(CONF); 2544 Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL"); 2545 FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF); 2546 hLog.init(); 2547 // This chunk creation is done throughout the code base. Do we want to move it into core? 2548 // It is missing from this test. W/o it we NPE. 2549 region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog, 2550 COLUMN_FAMILY_BYTES); 2551 2552 Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1, 2553 System.currentTimeMillis(), KeyValue.Type.Put.getCode(), value1); 2554 final long originalSize = KeyValueUtil.length(originalCell); 2555 2556 Cell addCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1, 2557 System.currentTimeMillis(), KeyValue.Type.Put.getCode(), Bytes.toBytes("xxxxxxxxxx")); 2558 final long addSize = KeyValueUtil.length(addCell); 2559 2560 LOG.info("originalSize:" + originalSize 2561 + ", addSize:" + addSize); 2562 // start test. We expect that the addPut's durability will be replaced 2563 // by originalPut's durability. 2564 2565 // case 1: 2566 testDataInMemoryWithoutWAL(region, 2567 new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL), 2568 new Put(row).add(addCell).setDurability(Durability.SKIP_WAL), 2569 originalSize + addSize); 2570 2571 // case 2: 2572 testDataInMemoryWithoutWAL(region, 2573 new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL), 2574 new Put(row).add(addCell).setDurability(Durability.SYNC_WAL), 2575 originalSize + addSize); 2576 2577 // case 3: 2578 testDataInMemoryWithoutWAL(region, 2579 new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL), 2580 new Put(row).add(addCell).setDurability(Durability.SKIP_WAL), 2581 0); 2582 2583 // case 4: 2584 testDataInMemoryWithoutWAL(region, 2585 new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL), 2586 new Put(row).add(addCell).setDurability(Durability.SYNC_WAL), 2587 0); 2588 } 2589 2590 private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut, 2591 final Put addPut, long delta) throws IOException { 2592 final long initSize = region.getDataInMemoryWithoutWAL(); 2593 // save normalCPHost and replaced by mockedCPHost 2594 RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); 2595 RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); 2596 // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must 2597 // do below format (from Mockito doc). 2598 Mockito.doAnswer(new Answer() { 2599 @Override 2600 public Object answer(InvocationOnMock invocation) throws Throwable { 2601 MiniBatchOperationInProgress<Mutation> mb = invocation.getArgument(0); 2602 mb.addOperationsFromCP(0, new Mutation[]{addPut}); 2603 return null; 2604 } 2605 }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class)); 2606 region.setCoprocessorHost(mockedCPHost); 2607 region.put(originalPut); 2608 region.setCoprocessorHost(normalCPHost); 2609 final long finalSize = region.getDataInMemoryWithoutWAL(); 2610 assertEquals("finalSize:" + finalSize + ", initSize:" 2611 + initSize + ", delta:" + delta,finalSize, initSize + delta); 2612 } 2613 2614 @Test 2615 public void testDeleteColumns_PostInsert() throws IOException, InterruptedException { 2616 Delete delete = new Delete(row); 2617 delete.addColumns(fam1, qual1); 2618 doTestDelete_AndPostInsert(delete); 2619 } 2620 2621 @Test 2622 public void testaddFamily_PostInsert() throws IOException, InterruptedException { 2623 Delete delete = new Delete(row); 2624 delete.addFamily(fam1); 2625 doTestDelete_AndPostInsert(delete); 2626 } 2627 2628 public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException { 2629 this.region = initHRegion(tableName, method, CONF, fam1); 2630 try { 2631 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 2632 Put put = new Put(row); 2633 put.addColumn(fam1, qual1, value1); 2634 region.put(put); 2635 2636 // now delete the value: 2637 region.delete(delete); 2638 2639 // ok put data: 2640 put = new Put(row); 2641 put.addColumn(fam1, qual1, value2); 2642 region.put(put); 2643 2644 // ok get: 2645 Get get = new Get(row); 2646 get.addColumn(fam1, qual1); 2647 2648 Result r = region.get(get); 2649 assertEquals(1, r.size()); 2650 assertArrayEquals(value2, r.getValue(fam1, qual1)); 2651 2652 // next: 2653 Scan scan = new Scan(row); 2654 scan.addColumn(fam1, qual1); 2655 InternalScanner s = region.getScanner(scan); 2656 2657 List<Cell> results = new ArrayList<>(); 2658 assertEquals(false, s.next(results)); 2659 assertEquals(1, results.size()); 2660 Cell kv = results.get(0); 2661 2662 assertArrayEquals(value2, CellUtil.cloneValue(kv)); 2663 assertArrayEquals(fam1, CellUtil.cloneFamily(kv)); 2664 assertArrayEquals(qual1, CellUtil.cloneQualifier(kv)); 2665 assertArrayEquals(row, CellUtil.cloneRow(kv)); 2666 } finally { 2667 HBaseTestingUtility.closeRegionAndWAL(this.region); 2668 this.region = null; 2669 } 2670 } 2671 2672 @Test 2673 public void testDelete_CheckTimestampUpdated() throws IOException { 2674 byte[] row1 = Bytes.toBytes("row1"); 2675 byte[] col1 = Bytes.toBytes("col1"); 2676 byte[] col2 = Bytes.toBytes("col2"); 2677 byte[] col3 = Bytes.toBytes("col3"); 2678 2679 // Setting up region 2680 this.region = initHRegion(tableName, method, CONF, fam1); 2681 try { 2682 // Building checkerList 2683 List<Cell> kvs = new ArrayList<>(); 2684 kvs.add(new KeyValue(row1, fam1, col1, null)); 2685 kvs.add(new KeyValue(row1, fam1, col2, null)); 2686 kvs.add(new KeyValue(row1, fam1, col3, null)); 2687 2688 NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 2689 deleteMap.put(fam1, kvs); 2690 region.delete(deleteMap, Durability.SYNC_WAL); 2691 2692 // extract the key values out the memstore: 2693 // This is kinda hacky, but better than nothing... 2694 long now = System.currentTimeMillis(); 2695 AbstractMemStore memstore = (AbstractMemStore)region.getStore(fam1).memstore; 2696 Cell firstCell = memstore.getActive().first(); 2697 assertTrue(firstCell.getTimestamp() <= now); 2698 now = firstCell.getTimestamp(); 2699 for (Cell cell : memstore.getActive().getCellSet()) { 2700 assertTrue(cell.getTimestamp() <= now); 2701 now = cell.getTimestamp(); 2702 } 2703 } finally { 2704 HBaseTestingUtility.closeRegionAndWAL(this.region); 2705 this.region = null; 2706 } 2707 } 2708 2709 // //////////////////////////////////////////////////////////////////////////// 2710 // Get tests 2711 // //////////////////////////////////////////////////////////////////////////// 2712 @Test 2713 public void testGet_FamilyChecker() throws IOException { 2714 byte[] row1 = Bytes.toBytes("row1"); 2715 byte[] fam1 = Bytes.toBytes("fam1"); 2716 byte[] fam2 = Bytes.toBytes("False"); 2717 byte[] col1 = Bytes.toBytes("col1"); 2718 2719 // Setting up region 2720 this.region = initHRegion(tableName, method, CONF, fam1); 2721 try { 2722 Get get = new Get(row1); 2723 get.addColumn(fam2, col1); 2724 2725 // Test 2726 try { 2727 region.get(get); 2728 } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) { 2729 assertFalse(false); 2730 return; 2731 } 2732 assertFalse(true); 2733 } finally { 2734 HBaseTestingUtility.closeRegionAndWAL(this.region); 2735 this.region = null; 2736 } 2737 } 2738 2739 @Test 2740 public void testGet_Basic() throws IOException { 2741 byte[] row1 = Bytes.toBytes("row1"); 2742 byte[] fam1 = Bytes.toBytes("fam1"); 2743 byte[] col1 = Bytes.toBytes("col1"); 2744 byte[] col2 = Bytes.toBytes("col2"); 2745 byte[] col3 = Bytes.toBytes("col3"); 2746 byte[] col4 = Bytes.toBytes("col4"); 2747 byte[] col5 = Bytes.toBytes("col5"); 2748 2749 // Setting up region 2750 this.region = initHRegion(tableName, method, CONF, fam1); 2751 try { 2752 // Add to memstore 2753 Put put = new Put(row1); 2754 put.addColumn(fam1, col1, null); 2755 put.addColumn(fam1, col2, null); 2756 put.addColumn(fam1, col3, null); 2757 put.addColumn(fam1, col4, null); 2758 put.addColumn(fam1, col5, null); 2759 region.put(put); 2760 2761 Get get = new Get(row1); 2762 get.addColumn(fam1, col2); 2763 get.addColumn(fam1, col4); 2764 // Expected result 2765 KeyValue kv1 = new KeyValue(row1, fam1, col2); 2766 KeyValue kv2 = new KeyValue(row1, fam1, col4); 2767 KeyValue[] expected = { kv1, kv2 }; 2768 2769 // Test 2770 Result res = region.get(get); 2771 assertEquals(expected.length, res.size()); 2772 for (int i = 0; i < res.size(); i++) { 2773 assertTrue(CellUtil.matchingRows(expected[i], res.rawCells()[i])); 2774 assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i])); 2775 assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i])); 2776 } 2777 2778 // Test using a filter on a Get 2779 Get g = new Get(row1); 2780 final int count = 2; 2781 g.setFilter(new ColumnCountGetFilter(count)); 2782 res = region.get(g); 2783 assertEquals(count, res.size()); 2784 } finally { 2785 HBaseTestingUtility.closeRegionAndWAL(this.region); 2786 this.region = null; 2787 } 2788 } 2789 2790 @Test 2791 public void testGet_Empty() throws IOException { 2792 byte[] row = Bytes.toBytes("row"); 2793 byte[] fam = Bytes.toBytes("fam"); 2794 2795 this.region = initHRegion(tableName, method, CONF, fam); 2796 try { 2797 Get get = new Get(row); 2798 get.addFamily(fam); 2799 Result r = region.get(get); 2800 2801 assertTrue(r.isEmpty()); 2802 } finally { 2803 HBaseTestingUtility.closeRegionAndWAL(this.region); 2804 this.region = null; 2805 } 2806 } 2807 2808 @Test 2809 public void testGetWithFilter() throws IOException, InterruptedException { 2810 byte[] row1 = Bytes.toBytes("row1"); 2811 byte[] fam1 = Bytes.toBytes("fam1"); 2812 byte[] col1 = Bytes.toBytes("col1"); 2813 byte[] value1 = Bytes.toBytes("value1"); 2814 byte[] value2 = Bytes.toBytes("value2"); 2815 2816 final int maxVersions = 3; 2817 HColumnDescriptor hcd = new HColumnDescriptor(fam1); 2818 hcd.setMaxVersions(maxVersions); 2819 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker")); 2820 htd.addFamily(hcd); 2821 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 2822 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 2823 Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log"); 2824 final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info); 2825 this.region = TEST_UTIL.createLocalHRegion(info, htd, wal); 2826 2827 try { 2828 // Put 4 version to memstore 2829 long ts = 0; 2830 Put put = new Put(row1, ts); 2831 put.addColumn(fam1, col1, value1); 2832 region.put(put); 2833 put = new Put(row1, ts + 1); 2834 put.addColumn(fam1, col1, Bytes.toBytes("filter1")); 2835 region.put(put); 2836 put = new Put(row1, ts + 2); 2837 put.addColumn(fam1, col1, Bytes.toBytes("filter2")); 2838 region.put(put); 2839 put = new Put(row1, ts + 3); 2840 put.addColumn(fam1, col1, value2); 2841 region.put(put); 2842 2843 Get get = new Get(row1); 2844 get.setMaxVersions(); 2845 Result res = region.get(get); 2846 // Get 3 versions, the oldest version has gone from user view 2847 assertEquals(maxVersions, res.size()); 2848 2849 get.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value"))); 2850 res = region.get(get); 2851 // When use value filter, the oldest version should still gone from user view and it 2852 // should only return one key vaule 2853 assertEquals(1, res.size()); 2854 assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0])); 2855 assertEquals(ts + 3, res.rawCells()[0].getTimestamp()); 2856 2857 region.flush(true); 2858 region.compact(true); 2859 Thread.sleep(1000); 2860 res = region.get(get); 2861 // After flush and compact, the result should be consistent with previous result 2862 assertEquals(1, res.size()); 2863 assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0])); 2864 } finally { 2865 HBaseTestingUtility.closeRegionAndWAL(this.region); 2866 this.region = null; 2867 } 2868 } 2869 2870 // //////////////////////////////////////////////////////////////////////////// 2871 // Scanner tests 2872 // //////////////////////////////////////////////////////////////////////////// 2873 @Test 2874 public void testGetScanner_WithOkFamilies() throws IOException { 2875 byte[] fam1 = Bytes.toBytes("fam1"); 2876 byte[] fam2 = Bytes.toBytes("fam2"); 2877 2878 byte[][] families = { fam1, fam2 }; 2879 2880 // Setting up region 2881 this.region = initHRegion(tableName, method, CONF, families); 2882 try { 2883 Scan scan = new Scan(); 2884 scan.addFamily(fam1); 2885 scan.addFamily(fam2); 2886 try { 2887 region.getScanner(scan); 2888 } catch (Exception e) { 2889 assertTrue("Families could not be found in Region", false); 2890 } 2891 } finally { 2892 HBaseTestingUtility.closeRegionAndWAL(this.region); 2893 this.region = null; 2894 } 2895 } 2896 2897 @Test 2898 public void testGetScanner_WithNotOkFamilies() throws IOException { 2899 byte[] fam1 = Bytes.toBytes("fam1"); 2900 byte[] fam2 = Bytes.toBytes("fam2"); 2901 2902 byte[][] families = { fam1 }; 2903 2904 // Setting up region 2905 this.region = initHRegion(tableName, method, CONF, families); 2906 try { 2907 Scan scan = new Scan(); 2908 scan.addFamily(fam2); 2909 boolean ok = false; 2910 try { 2911 region.getScanner(scan); 2912 } catch (Exception e) { 2913 ok = true; 2914 } 2915 assertTrue("Families could not be found in Region", ok); 2916 } finally { 2917 HBaseTestingUtility.closeRegionAndWAL(this.region); 2918 this.region = null; 2919 } 2920 } 2921 2922 @Test 2923 public void testGetScanner_WithNoFamilies() throws IOException { 2924 byte[] row1 = Bytes.toBytes("row1"); 2925 byte[] fam1 = Bytes.toBytes("fam1"); 2926 byte[] fam2 = Bytes.toBytes("fam2"); 2927 byte[] fam3 = Bytes.toBytes("fam3"); 2928 byte[] fam4 = Bytes.toBytes("fam4"); 2929 2930 byte[][] families = { fam1, fam2, fam3, fam4 }; 2931 2932 // Setting up region 2933 this.region = initHRegion(tableName, method, CONF, families); 2934 try { 2935 2936 // Putting data in Region 2937 Put put = new Put(row1); 2938 put.addColumn(fam1, null, null); 2939 put.addColumn(fam2, null, null); 2940 put.addColumn(fam3, null, null); 2941 put.addColumn(fam4, null, null); 2942 region.put(put); 2943 2944 Scan scan = null; 2945 HRegion.RegionScannerImpl is = null; 2946 2947 // Testing to see how many scanners that is produced by getScanner, 2948 // starting 2949 // with known number, 2 - current = 1 2950 scan = new Scan(); 2951 scan.addFamily(fam2); 2952 scan.addFamily(fam4); 2953 is = region.getScanner(scan); 2954 assertEquals(1, is.storeHeap.getHeap().size()); 2955 2956 scan = new Scan(); 2957 is = region.getScanner(scan); 2958 assertEquals(families.length - 1, is.storeHeap.getHeap().size()); 2959 } finally { 2960 HBaseTestingUtility.closeRegionAndWAL(this.region); 2961 this.region = null; 2962 } 2963 } 2964 2965 /** 2966 * This method tests https://issues.apache.org/jira/browse/HBASE-2516. 2967 * 2968 * @throws IOException 2969 */ 2970 @Test 2971 public void testGetScanner_WithRegionClosed() throws IOException { 2972 byte[] fam1 = Bytes.toBytes("fam1"); 2973 byte[] fam2 = Bytes.toBytes("fam2"); 2974 2975 byte[][] families = { fam1, fam2 }; 2976 2977 // Setting up region 2978 try { 2979 this.region = initHRegion(tableName, method, CONF, families); 2980 } catch (IOException e) { 2981 e.printStackTrace(); 2982 fail("Got IOException during initHRegion, " + e.getMessage()); 2983 } 2984 try { 2985 region.closed.set(true); 2986 try { 2987 region.getScanner(null); 2988 fail("Expected to get an exception during getScanner on a region that is closed"); 2989 } catch (NotServingRegionException e) { 2990 // this is the correct exception that is expected 2991 } catch (IOException e) { 2992 fail("Got wrong type of exception - should be a NotServingRegionException, " + 2993 "but was an IOException: " 2994 + e.getMessage()); 2995 } 2996 } finally { 2997 HBaseTestingUtility.closeRegionAndWAL(this.region); 2998 this.region = null; 2999 } 3000 } 3001 3002 @Test 3003 public void testRegionScanner_Next() throws IOException { 3004 byte[] row1 = Bytes.toBytes("row1"); 3005 byte[] row2 = Bytes.toBytes("row2"); 3006 byte[] fam1 = Bytes.toBytes("fam1"); 3007 byte[] fam2 = Bytes.toBytes("fam2"); 3008 byte[] fam3 = Bytes.toBytes("fam3"); 3009 byte[] fam4 = Bytes.toBytes("fam4"); 3010 3011 byte[][] families = { fam1, fam2, fam3, fam4 }; 3012 long ts = System.currentTimeMillis(); 3013 3014 // Setting up region 3015 this.region = initHRegion(tableName, method, CONF, families); 3016 try { 3017 // Putting data in Region 3018 Put put = null; 3019 put = new Put(row1); 3020 put.addColumn(fam1, (byte[]) null, ts, null); 3021 put.addColumn(fam2, (byte[]) null, ts, null); 3022 put.addColumn(fam3, (byte[]) null, ts, null); 3023 put.addColumn(fam4, (byte[]) null, ts, null); 3024 region.put(put); 3025 3026 put = new Put(row2); 3027 put.addColumn(fam1, (byte[]) null, ts, null); 3028 put.addColumn(fam2, (byte[]) null, ts, null); 3029 put.addColumn(fam3, (byte[]) null, ts, null); 3030 put.addColumn(fam4, (byte[]) null, ts, null); 3031 region.put(put); 3032 3033 Scan scan = new Scan(); 3034 scan.addFamily(fam2); 3035 scan.addFamily(fam4); 3036 InternalScanner is = region.getScanner(scan); 3037 3038 List<Cell> res = null; 3039 3040 // Result 1 3041 List<Cell> expected1 = new ArrayList<>(); 3042 expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null)); 3043 expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null)); 3044 3045 res = new ArrayList<>(); 3046 is.next(res); 3047 for (int i = 0; i < res.size(); i++) { 3048 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected1.get(i), res.get(i))); 3049 } 3050 3051 // Result 2 3052 List<Cell> expected2 = new ArrayList<>(); 3053 expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null)); 3054 expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null)); 3055 3056 res = new ArrayList<>(); 3057 is.next(res); 3058 for (int i = 0; i < res.size(); i++) { 3059 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected2.get(i), res.get(i))); 3060 } 3061 } finally { 3062 HBaseTestingUtility.closeRegionAndWAL(this.region); 3063 this.region = null; 3064 } 3065 } 3066 3067 @Test 3068 public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException { 3069 byte[] row1 = Bytes.toBytes("row1"); 3070 byte[] qf1 = Bytes.toBytes("qualifier1"); 3071 byte[] qf2 = Bytes.toBytes("qualifier2"); 3072 byte[] fam1 = Bytes.toBytes("fam1"); 3073 byte[][] families = { fam1 }; 3074 3075 long ts1 = System.currentTimeMillis(); 3076 long ts2 = ts1 + 1; 3077 long ts3 = ts1 + 2; 3078 3079 // Setting up region 3080 this.region = initHRegion(tableName, method, CONF, families); 3081 try { 3082 // Putting data in Region 3083 Put put = null; 3084 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3085 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3086 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3087 3088 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3089 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3090 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3091 3092 put = new Put(row1); 3093 put.add(kv13); 3094 put.add(kv12); 3095 put.add(kv11); 3096 put.add(kv23); 3097 put.add(kv22); 3098 put.add(kv21); 3099 region.put(put); 3100 3101 // Expected 3102 List<Cell> expected = new ArrayList<>(); 3103 expected.add(kv13); 3104 expected.add(kv12); 3105 3106 Scan scan = new Scan(row1); 3107 scan.addColumn(fam1, qf1); 3108 scan.setMaxVersions(MAX_VERSIONS); 3109 List<Cell> actual = new ArrayList<>(); 3110 InternalScanner scanner = region.getScanner(scan); 3111 3112 boolean hasNext = scanner.next(actual); 3113 assertEquals(false, hasNext); 3114 3115 // Verify result 3116 for (int i = 0; i < expected.size(); i++) { 3117 assertEquals(expected.get(i), actual.get(i)); 3118 } 3119 } finally { 3120 HBaseTestingUtility.closeRegionAndWAL(this.region); 3121 this.region = null; 3122 } 3123 } 3124 3125 @Test 3126 public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException { 3127 byte[] row1 = Bytes.toBytes("row1"); 3128 byte[] qf1 = Bytes.toBytes("qualifier1"); 3129 byte[] qf2 = Bytes.toBytes("qualifier2"); 3130 byte[] fam1 = Bytes.toBytes("fam1"); 3131 byte[][] families = { fam1 }; 3132 3133 long ts1 = 1; // System.currentTimeMillis(); 3134 long ts2 = ts1 + 1; 3135 long ts3 = ts1 + 2; 3136 3137 // Setting up region 3138 this.region = initHRegion(tableName, method, CONF, families); 3139 try { 3140 // Putting data in Region 3141 Put put = null; 3142 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3143 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3144 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3145 3146 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3147 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3148 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3149 3150 put = new Put(row1); 3151 put.add(kv13); 3152 put.add(kv12); 3153 put.add(kv11); 3154 put.add(kv23); 3155 put.add(kv22); 3156 put.add(kv21); 3157 region.put(put); 3158 region.flush(true); 3159 3160 // Expected 3161 List<Cell> expected = new ArrayList<>(); 3162 expected.add(kv13); 3163 expected.add(kv12); 3164 expected.add(kv23); 3165 expected.add(kv22); 3166 3167 Scan scan = new Scan(row1); 3168 scan.addColumn(fam1, qf1); 3169 scan.addColumn(fam1, qf2); 3170 scan.setMaxVersions(MAX_VERSIONS); 3171 List<Cell> actual = new ArrayList<>(); 3172 InternalScanner scanner = region.getScanner(scan); 3173 3174 boolean hasNext = scanner.next(actual); 3175 assertEquals(false, hasNext); 3176 3177 // Verify result 3178 for (int i = 0; i < expected.size(); i++) { 3179 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i))); 3180 } 3181 } finally { 3182 HBaseTestingUtility.closeRegionAndWAL(this.region); 3183 this.region = null; 3184 } 3185 } 3186 3187 @Test 3188 public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws 3189 IOException { 3190 byte[] row1 = Bytes.toBytes("row1"); 3191 byte[] fam1 = Bytes.toBytes("fam1"); 3192 byte[][] families = { fam1 }; 3193 byte[] qf1 = Bytes.toBytes("qualifier1"); 3194 byte[] qf2 = Bytes.toBytes("qualifier2"); 3195 3196 long ts1 = 1; 3197 long ts2 = ts1 + 1; 3198 long ts3 = ts1 + 2; 3199 long ts4 = ts1 + 3; 3200 3201 // Setting up region 3202 this.region = initHRegion(tableName, method, CONF, families); 3203 try { 3204 // Putting data in Region 3205 KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null); 3206 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3207 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3208 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3209 3210 KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null); 3211 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3212 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3213 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3214 3215 Put put = null; 3216 put = new Put(row1); 3217 put.add(kv14); 3218 put.add(kv24); 3219 region.put(put); 3220 region.flush(true); 3221 3222 put = new Put(row1); 3223 put.add(kv23); 3224 put.add(kv13); 3225 region.put(put); 3226 region.flush(true); 3227 3228 put = new Put(row1); 3229 put.add(kv22); 3230 put.add(kv12); 3231 region.put(put); 3232 region.flush(true); 3233 3234 put = new Put(row1); 3235 put.add(kv21); 3236 put.add(kv11); 3237 region.put(put); 3238 3239 // Expected 3240 List<Cell> expected = new ArrayList<>(); 3241 expected.add(kv14); 3242 expected.add(kv13); 3243 expected.add(kv12); 3244 expected.add(kv24); 3245 expected.add(kv23); 3246 expected.add(kv22); 3247 3248 Scan scan = new Scan(row1); 3249 scan.addColumn(fam1, qf1); 3250 scan.addColumn(fam1, qf2); 3251 int versions = 3; 3252 scan.setMaxVersions(versions); 3253 List<Cell> actual = new ArrayList<>(); 3254 InternalScanner scanner = region.getScanner(scan); 3255 3256 boolean hasNext = scanner.next(actual); 3257 assertEquals(false, hasNext); 3258 3259 // Verify result 3260 for (int i = 0; i < expected.size(); i++) { 3261 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i))); 3262 } 3263 } finally { 3264 HBaseTestingUtility.closeRegionAndWAL(this.region); 3265 this.region = null; 3266 } 3267 } 3268 3269 @Test 3270 public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException { 3271 byte[] row1 = Bytes.toBytes("row1"); 3272 byte[] qf1 = Bytes.toBytes("qualifier1"); 3273 byte[] qf2 = Bytes.toBytes("qualifier2"); 3274 byte[] fam1 = Bytes.toBytes("fam1"); 3275 byte[][] families = { fam1 }; 3276 3277 long ts1 = System.currentTimeMillis(); 3278 long ts2 = ts1 + 1; 3279 long ts3 = ts1 + 2; 3280 3281 // Setting up region 3282 this.region = initHRegion(tableName, method, CONF, families); 3283 try { 3284 // Putting data in Region 3285 Put put = null; 3286 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3287 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3288 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3289 3290 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3291 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3292 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3293 3294 put = new Put(row1); 3295 put.add(kv13); 3296 put.add(kv12); 3297 put.add(kv11); 3298 put.add(kv23); 3299 put.add(kv22); 3300 put.add(kv21); 3301 region.put(put); 3302 3303 // Expected 3304 List<Cell> expected = new ArrayList<>(); 3305 expected.add(kv13); 3306 expected.add(kv12); 3307 expected.add(kv23); 3308 expected.add(kv22); 3309 3310 Scan scan = new Scan(row1); 3311 scan.addFamily(fam1); 3312 scan.setMaxVersions(MAX_VERSIONS); 3313 List<Cell> actual = new ArrayList<>(); 3314 InternalScanner scanner = region.getScanner(scan); 3315 3316 boolean hasNext = scanner.next(actual); 3317 assertEquals(false, hasNext); 3318 3319 // Verify result 3320 for (int i = 0; i < expected.size(); i++) { 3321 assertEquals(expected.get(i), actual.get(i)); 3322 } 3323 } finally { 3324 HBaseTestingUtility.closeRegionAndWAL(this.region); 3325 this.region = null; 3326 } 3327 } 3328 3329 @Test 3330 public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException { 3331 byte[] row1 = Bytes.toBytes("row1"); 3332 byte[] qf1 = Bytes.toBytes("qualifier1"); 3333 byte[] qf2 = Bytes.toBytes("qualifier2"); 3334 byte[] fam1 = Bytes.toBytes("fam1"); 3335 3336 long ts1 = 1; // System.currentTimeMillis(); 3337 long ts2 = ts1 + 1; 3338 long ts3 = ts1 + 2; 3339 3340 // Setting up region 3341 this.region = initHRegion(tableName, method, CONF, fam1); 3342 try { 3343 // Putting data in Region 3344 Put put = null; 3345 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3346 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3347 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3348 3349 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3350 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3351 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3352 3353 put = new Put(row1); 3354 put.add(kv13); 3355 put.add(kv12); 3356 put.add(kv11); 3357 put.add(kv23); 3358 put.add(kv22); 3359 put.add(kv21); 3360 region.put(put); 3361 region.flush(true); 3362 3363 // Expected 3364 List<Cell> expected = new ArrayList<>(); 3365 expected.add(kv13); 3366 expected.add(kv12); 3367 expected.add(kv23); 3368 expected.add(kv22); 3369 3370 Scan scan = new Scan(row1); 3371 scan.addFamily(fam1); 3372 scan.setMaxVersions(MAX_VERSIONS); 3373 List<Cell> actual = new ArrayList<>(); 3374 InternalScanner scanner = region.getScanner(scan); 3375 3376 boolean hasNext = scanner.next(actual); 3377 assertEquals(false, hasNext); 3378 3379 // Verify result 3380 for (int i = 0; i < expected.size(); i++) { 3381 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i))); 3382 } 3383 } finally { 3384 HBaseTestingUtility.closeRegionAndWAL(this.region); 3385 this.region = null; 3386 } 3387 } 3388 3389 @Test 3390 public void testScanner_StopRow1542() throws IOException { 3391 byte[] family = Bytes.toBytes("testFamily"); 3392 this.region = initHRegion(tableName, method, CONF, family); 3393 try { 3394 byte[] row1 = Bytes.toBytes("row111"); 3395 byte[] row2 = Bytes.toBytes("row222"); 3396 byte[] row3 = Bytes.toBytes("row333"); 3397 byte[] row4 = Bytes.toBytes("row444"); 3398 byte[] row5 = Bytes.toBytes("row555"); 3399 3400 byte[] col1 = Bytes.toBytes("Pub111"); 3401 byte[] col2 = Bytes.toBytes("Pub222"); 3402 3403 Put put = new Put(row1); 3404 put.addColumn(family, col1, Bytes.toBytes(10L)); 3405 region.put(put); 3406 3407 put = new Put(row2); 3408 put.addColumn(family, col1, Bytes.toBytes(15L)); 3409 region.put(put); 3410 3411 put = new Put(row3); 3412 put.addColumn(family, col2, Bytes.toBytes(20L)); 3413 region.put(put); 3414 3415 put = new Put(row4); 3416 put.addColumn(family, col2, Bytes.toBytes(30L)); 3417 region.put(put); 3418 3419 put = new Put(row5); 3420 put.addColumn(family, col1, Bytes.toBytes(40L)); 3421 region.put(put); 3422 3423 Scan scan = new Scan(row3, row4); 3424 scan.setMaxVersions(); 3425 scan.addColumn(family, col1); 3426 InternalScanner s = region.getScanner(scan); 3427 3428 List<Cell> results = new ArrayList<>(); 3429 assertEquals(false, s.next(results)); 3430 assertEquals(0, results.size()); 3431 } finally { 3432 HBaseTestingUtility.closeRegionAndWAL(this.region); 3433 this.region = null; 3434 } 3435 } 3436 3437 @Test 3438 public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException { 3439 byte[] row1 = Bytes.toBytes("row1"); 3440 byte[] fam1 = Bytes.toBytes("fam1"); 3441 byte[] qf1 = Bytes.toBytes("qualifier1"); 3442 byte[] qf2 = Bytes.toBytes("quateslifier2"); 3443 3444 long ts1 = 1; 3445 long ts2 = ts1 + 1; 3446 long ts3 = ts1 + 2; 3447 long ts4 = ts1 + 3; 3448 3449 // Setting up region 3450 this.region = initHRegion(tableName, method, CONF, fam1); 3451 try { 3452 // Putting data in Region 3453 KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null); 3454 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3455 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3456 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3457 3458 KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null); 3459 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3460 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3461 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3462 3463 Put put = null; 3464 put = new Put(row1); 3465 put.add(kv14); 3466 put.add(kv24); 3467 region.put(put); 3468 region.flush(true); 3469 3470 put = new Put(row1); 3471 put.add(kv23); 3472 put.add(kv13); 3473 region.put(put); 3474 region.flush(true); 3475 3476 put = new Put(row1); 3477 put.add(kv22); 3478 put.add(kv12); 3479 region.put(put); 3480 region.flush(true); 3481 3482 put = new Put(row1); 3483 put.add(kv21); 3484 put.add(kv11); 3485 region.put(put); 3486 3487 // Expected 3488 List<KeyValue> expected = new ArrayList<>(); 3489 expected.add(kv14); 3490 expected.add(kv13); 3491 expected.add(kv12); 3492 expected.add(kv24); 3493 expected.add(kv23); 3494 expected.add(kv22); 3495 3496 Scan scan = new Scan(row1); 3497 int versions = 3; 3498 scan.setMaxVersions(versions); 3499 List<Cell> actual = new ArrayList<>(); 3500 InternalScanner scanner = region.getScanner(scan); 3501 3502 boolean hasNext = scanner.next(actual); 3503 assertEquals(false, hasNext); 3504 3505 // Verify result 3506 for (int i = 0; i < expected.size(); i++) { 3507 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i))); 3508 } 3509 } finally { 3510 HBaseTestingUtility.closeRegionAndWAL(this.region); 3511 this.region = null; 3512 } 3513 } 3514 3515 /** 3516 * Added for HBASE-5416 3517 * 3518 * Here we test scan optimization when only subset of CFs are used in filter 3519 * conditions. 3520 */ 3521 @Test 3522 public void testScanner_JoinedScanners() throws IOException { 3523 byte[] cf_essential = Bytes.toBytes("essential"); 3524 byte[] cf_joined = Bytes.toBytes("joined"); 3525 byte[] cf_alpha = Bytes.toBytes("alpha"); 3526 this.region = initHRegion(tableName, method, CONF, cf_essential, cf_joined, cf_alpha); 3527 try { 3528 byte[] row1 = Bytes.toBytes("row1"); 3529 byte[] row2 = Bytes.toBytes("row2"); 3530 byte[] row3 = Bytes.toBytes("row3"); 3531 3532 byte[] col_normal = Bytes.toBytes("d"); 3533 byte[] col_alpha = Bytes.toBytes("a"); 3534 3535 byte[] filtered_val = Bytes.toBytes(3); 3536 3537 Put put = new Put(row1); 3538 put.addColumn(cf_essential, col_normal, Bytes.toBytes(1)); 3539 put.addColumn(cf_joined, col_alpha, Bytes.toBytes(1)); 3540 region.put(put); 3541 3542 put = new Put(row2); 3543 put.addColumn(cf_essential, col_alpha, Bytes.toBytes(2)); 3544 put.addColumn(cf_joined, col_normal, Bytes.toBytes(2)); 3545 put.addColumn(cf_alpha, col_alpha, Bytes.toBytes(2)); 3546 region.put(put); 3547 3548 put = new Put(row3); 3549 put.addColumn(cf_essential, col_normal, filtered_val); 3550 put.addColumn(cf_joined, col_normal, filtered_val); 3551 region.put(put); 3552 3553 // Check two things: 3554 // 1. result list contains expected values 3555 // 2. result list is sorted properly 3556 3557 Scan scan = new Scan(); 3558 Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal, 3559 CompareOp.NOT_EQUAL, filtered_val); 3560 scan.setFilter(filter); 3561 scan.setLoadColumnFamiliesOnDemand(true); 3562 InternalScanner s = region.getScanner(scan); 3563 3564 List<Cell> results = new ArrayList<>(); 3565 assertTrue(s.next(results)); 3566 assertEquals(1, results.size()); 3567 results.clear(); 3568 3569 assertTrue(s.next(results)); 3570 assertEquals(3, results.size()); 3571 assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha)); 3572 assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential)); 3573 assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined)); 3574 results.clear(); 3575 3576 assertFalse(s.next(results)); 3577 assertEquals(0, results.size()); 3578 } finally { 3579 HBaseTestingUtility.closeRegionAndWAL(this.region); 3580 this.region = null; 3581 } 3582 } 3583 3584 /** 3585 * HBASE-5416 3586 * 3587 * Test case when scan limits amount of KVs returned on each next() call. 3588 */ 3589 @Test 3590 public void testScanner_JoinedScannersWithLimits() throws IOException { 3591 final byte[] cf_first = Bytes.toBytes("first"); 3592 final byte[] cf_second = Bytes.toBytes("second"); 3593 3594 this.region = initHRegion(tableName, method, CONF, cf_first, cf_second); 3595 try { 3596 final byte[] col_a = Bytes.toBytes("a"); 3597 final byte[] col_b = Bytes.toBytes("b"); 3598 3599 Put put; 3600 3601 for (int i = 0; i < 10; i++) { 3602 put = new Put(Bytes.toBytes("r" + Integer.toString(i))); 3603 put.addColumn(cf_first, col_a, Bytes.toBytes(i)); 3604 if (i < 5) { 3605 put.addColumn(cf_first, col_b, Bytes.toBytes(i)); 3606 put.addColumn(cf_second, col_a, Bytes.toBytes(i)); 3607 put.addColumn(cf_second, col_b, Bytes.toBytes(i)); 3608 } 3609 region.put(put); 3610 } 3611 3612 Scan scan = new Scan(); 3613 scan.setLoadColumnFamiliesOnDemand(true); 3614 Filter bogusFilter = new FilterBase() { 3615 @Override 3616 public ReturnCode filterCell(final Cell ignored) throws IOException { 3617 return ReturnCode.INCLUDE; 3618 } 3619 @Override 3620 public boolean isFamilyEssential(byte[] name) { 3621 return Bytes.equals(name, cf_first); 3622 } 3623 }; 3624 3625 scan.setFilter(bogusFilter); 3626 InternalScanner s = region.getScanner(scan); 3627 3628 // Our data looks like this: 3629 // r0: first:a, first:b, second:a, second:b 3630 // r1: first:a, first:b, second:a, second:b 3631 // r2: first:a, first:b, second:a, second:b 3632 // r3: first:a, first:b, second:a, second:b 3633 // r4: first:a, first:b, second:a, second:b 3634 // r5: first:a 3635 // r6: first:a 3636 // r7: first:a 3637 // r8: first:a 3638 // r9: first:a 3639 3640 // But due to next's limit set to 3, we should get this: 3641 // r0: first:a, first:b, second:a 3642 // r0: second:b 3643 // r1: first:a, first:b, second:a 3644 // r1: second:b 3645 // r2: first:a, first:b, second:a 3646 // r2: second:b 3647 // r3: first:a, first:b, second:a 3648 // r3: second:b 3649 // r4: first:a, first:b, second:a 3650 // r4: second:b 3651 // r5: first:a 3652 // r6: first:a 3653 // r7: first:a 3654 // r8: first:a 3655 // r9: first:a 3656 3657 List<Cell> results = new ArrayList<>(); 3658 int index = 0; 3659 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(3).build(); 3660 while (true) { 3661 boolean more = s.next(results, scannerContext); 3662 if ((index >> 1) < 5) { 3663 if (index % 2 == 0) { 3664 assertEquals(3, results.size()); 3665 } else { 3666 assertEquals(1, results.size()); 3667 } 3668 } else { 3669 assertEquals(1, results.size()); 3670 } 3671 results.clear(); 3672 index++; 3673 if (!more) { 3674 break; 3675 } 3676 } 3677 } finally { 3678 HBaseTestingUtility.closeRegionAndWAL(this.region); 3679 this.region = null; 3680 } 3681 } 3682 3683 /** 3684 * Write an HFile block full with Cells whose qualifier that are identical between 3685 * 0 and Short.MAX_VALUE. See HBASE-13329. 3686 * @throws Exception 3687 */ 3688 @Test 3689 public void testLongQualifier() throws Exception { 3690 byte[] family = Bytes.toBytes("family"); 3691 this.region = initHRegion(tableName, method, CONF, family); 3692 byte[] q = new byte[Short.MAX_VALUE+2]; 3693 Arrays.fill(q, 0, q.length-1, (byte)42); 3694 for (byte i=0; i<10; i++) { 3695 Put p = new Put(Bytes.toBytes("row")); 3696 // qualifiers that differ past Short.MAX_VALUE 3697 q[q.length-1]=i; 3698 p.addColumn(family, q, q); 3699 region.put(p); 3700 } 3701 region.flush(false); 3702 HBaseTestingUtility.closeRegionAndWAL(this.region); 3703 this.region = null; 3704 } 3705 3706 /** 3707 * Flushes the cache in a thread while scanning. The tests verify that the 3708 * scan is coherent - e.g. the returned results are always of the same or 3709 * later update as the previous results. 3710 * 3711 * @throws IOException 3712 * scan / compact 3713 * @throws InterruptedException 3714 * thread join 3715 */ 3716 @Test 3717 public void testFlushCacheWhileScanning() throws IOException, InterruptedException { 3718 byte[] family = Bytes.toBytes("family"); 3719 int numRows = 1000; 3720 int flushAndScanInterval = 10; 3721 int compactInterval = 10 * flushAndScanInterval; 3722 3723 this.region = initHRegion(tableName, method, CONF, family); 3724 FlushThread flushThread = new FlushThread(); 3725 try { 3726 flushThread.start(); 3727 3728 Scan scan = new Scan(); 3729 scan.addFamily(family); 3730 scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOp.EQUAL, 3731 new BinaryComparator(Bytes.toBytes(5L)))); 3732 3733 int expectedCount = 0; 3734 List<Cell> res = new ArrayList<>(); 3735 3736 boolean toggle = true; 3737 for (long i = 0; i < numRows; i++) { 3738 Put put = new Put(Bytes.toBytes(i)); 3739 put.setDurability(Durability.SKIP_WAL); 3740 put.addColumn(family, qual1, Bytes.toBytes(i % 10)); 3741 region.put(put); 3742 3743 if (i != 0 && i % compactInterval == 0) { 3744 LOG.debug("iteration = " + i+ " ts="+System.currentTimeMillis()); 3745 region.compact(true); 3746 } 3747 3748 if (i % 10 == 5L) { 3749 expectedCount++; 3750 } 3751 3752 if (i != 0 && i % flushAndScanInterval == 0) { 3753 res.clear(); 3754 InternalScanner scanner = region.getScanner(scan); 3755 if (toggle) { 3756 flushThread.flush(); 3757 } 3758 while (scanner.next(res)) 3759 ; 3760 if (!toggle) { 3761 flushThread.flush(); 3762 } 3763 assertEquals("toggle="+toggle+"i=" + i + " ts="+System.currentTimeMillis(), 3764 expectedCount, res.size()); 3765 toggle = !toggle; 3766 } 3767 } 3768 3769 } finally { 3770 try { 3771 flushThread.done(); 3772 flushThread.join(); 3773 flushThread.checkNoError(); 3774 } catch (InterruptedException ie) { 3775 LOG.warn("Caught exception when joining with flushThread", ie); 3776 } 3777 HBaseTestingUtility.closeRegionAndWAL(this.region); 3778 this.region = null; 3779 } 3780 } 3781 3782 protected class FlushThread extends Thread { 3783 private volatile boolean done; 3784 private Throwable error = null; 3785 3786 FlushThread() { 3787 super("FlushThread"); 3788 } 3789 3790 public void done() { 3791 done = true; 3792 synchronized (this) { 3793 interrupt(); 3794 } 3795 } 3796 3797 public void checkNoError() { 3798 if (error != null) { 3799 assertNull(error); 3800 } 3801 } 3802 3803 @Override 3804 public void run() { 3805 done = false; 3806 while (!done) { 3807 synchronized (this) { 3808 try { 3809 wait(); 3810 } catch (InterruptedException ignored) { 3811 if (done) { 3812 break; 3813 } 3814 } 3815 } 3816 try { 3817 region.flush(true); 3818 } catch (IOException e) { 3819 if (!done) { 3820 LOG.error("Error while flushing cache", e); 3821 error = e; 3822 } 3823 break; 3824 } catch (Throwable t) { 3825 LOG.error("Uncaught exception", t); 3826 throw t; 3827 } 3828 } 3829 } 3830 3831 public void flush() { 3832 synchronized (this) { 3833 notify(); 3834 } 3835 } 3836 } 3837 3838 /** 3839 * Writes very wide records and scans for the latest every time.. Flushes and 3840 * compacts the region every now and then to keep things realistic. 3841 * 3842 * @throws IOException 3843 * by flush / scan / compaction 3844 * @throws InterruptedException 3845 * when joining threads 3846 */ 3847 @Test 3848 public void testWritesWhileScanning() throws IOException, InterruptedException { 3849 int testCount = 100; 3850 int numRows = 1; 3851 int numFamilies = 10; 3852 int numQualifiers = 100; 3853 int flushInterval = 7; 3854 int compactInterval = 5 * flushInterval; 3855 byte[][] families = new byte[numFamilies][]; 3856 for (int i = 0; i < numFamilies; i++) { 3857 families[i] = Bytes.toBytes("family" + i); 3858 } 3859 byte[][] qualifiers = new byte[numQualifiers][]; 3860 for (int i = 0; i < numQualifiers; i++) { 3861 qualifiers[i] = Bytes.toBytes("qual" + i); 3862 } 3863 3864 this.region = initHRegion(tableName, method, CONF, families); 3865 FlushThread flushThread = new FlushThread(); 3866 PutThread putThread = new PutThread(numRows, families, qualifiers); 3867 try { 3868 putThread.start(); 3869 putThread.waitForFirstPut(); 3870 3871 flushThread.start(); 3872 3873 Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1")); 3874 3875 int expectedCount = numFamilies * numQualifiers; 3876 List<Cell> res = new ArrayList<>(); 3877 3878 long prevTimestamp = 0L; 3879 for (int i = 0; i < testCount; i++) { 3880 3881 if (i != 0 && i % compactInterval == 0) { 3882 region.compact(true); 3883 for (HStore store : region.getStores()) { 3884 store.closeAndArchiveCompactedFiles(); 3885 } 3886 } 3887 3888 if (i != 0 && i % flushInterval == 0) { 3889 flushThread.flush(); 3890 } 3891 3892 boolean previousEmpty = res.isEmpty(); 3893 res.clear(); 3894 InternalScanner scanner = region.getScanner(scan); 3895 while (scanner.next(res)) 3896 ; 3897 if (!res.isEmpty() || !previousEmpty || i > compactInterval) { 3898 assertEquals("i=" + i, expectedCount, res.size()); 3899 long timestamp = res.get(0).getTimestamp(); 3900 assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp, 3901 timestamp >= prevTimestamp); 3902 prevTimestamp = timestamp; 3903 } 3904 } 3905 3906 putThread.done(); 3907 3908 region.flush(true); 3909 3910 } finally { 3911 try { 3912 flushThread.done(); 3913 flushThread.join(); 3914 flushThread.checkNoError(); 3915 3916 putThread.join(); 3917 putThread.checkNoError(); 3918 } catch (InterruptedException ie) { 3919 LOG.warn("Caught exception when joining with flushThread", ie); 3920 } 3921 3922 try { 3923 HBaseTestingUtility.closeRegionAndWAL(this.region); 3924 } catch (DroppedSnapshotException dse) { 3925 // We could get this on way out because we interrupt the background flusher and it could 3926 // fail anywhere causing a DSE over in the background flusher... only it is not properly 3927 // dealt with so could still be memory hanging out when we get to here -- memory we can't 3928 // flush because the accounting is 'off' since original DSE. 3929 } 3930 this.region = null; 3931 } 3932 } 3933 3934 protected class PutThread extends Thread { 3935 private volatile boolean done; 3936 private volatile int numPutsFinished = 0; 3937 3938 private Throwable error = null; 3939 private int numRows; 3940 private byte[][] families; 3941 private byte[][] qualifiers; 3942 3943 private PutThread(int numRows, byte[][] families, byte[][] qualifiers) { 3944 super("PutThread"); 3945 this.numRows = numRows; 3946 this.families = families; 3947 this.qualifiers = qualifiers; 3948 } 3949 3950 /** 3951 * Block calling thread until this instance of PutThread has put at least one row. 3952 */ 3953 public void waitForFirstPut() throws InterruptedException { 3954 // wait until put thread actually puts some data 3955 while (isAlive() && numPutsFinished == 0) { 3956 checkNoError(); 3957 Thread.sleep(50); 3958 } 3959 } 3960 3961 public void done() { 3962 done = true; 3963 synchronized (this) { 3964 interrupt(); 3965 } 3966 } 3967 3968 public void checkNoError() { 3969 if (error != null) { 3970 assertNull(error); 3971 } 3972 } 3973 3974 @Override 3975 public void run() { 3976 done = false; 3977 while (!done) { 3978 try { 3979 for (int r = 0; r < numRows; r++) { 3980 byte[] row = Bytes.toBytes("row" + r); 3981 Put put = new Put(row); 3982 put.setDurability(Durability.SKIP_WAL); 3983 byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished)); 3984 for (byte[] family : families) { 3985 for (byte[] qualifier : qualifiers) { 3986 put.addColumn(family, qualifier, numPutsFinished, value); 3987 } 3988 } 3989 region.put(put); 3990 numPutsFinished++; 3991 if (numPutsFinished > 0 && numPutsFinished % 47 == 0) { 3992 System.out.println("put iteration = " + numPutsFinished); 3993 Delete delete = new Delete(row, (long) numPutsFinished - 30); 3994 region.delete(delete); 3995 } 3996 numPutsFinished++; 3997 } 3998 } catch (InterruptedIOException e) { 3999 // This is fine. It means we are done, or didn't get the lock on time 4000 LOG.info("Interrupted", e); 4001 } catch (IOException e) { 4002 LOG.error("Error while putting records", e); 4003 error = e; 4004 break; 4005 } 4006 } 4007 4008 } 4009 4010 } 4011 4012 /** 4013 * Writes very wide records and gets the latest row every time.. Flushes and 4014 * compacts the region aggressivly to catch issues. 4015 * 4016 * @throws IOException 4017 * by flush / scan / compaction 4018 * @throws InterruptedException 4019 * when joining threads 4020 */ 4021 @Test 4022 public void testWritesWhileGetting() throws Exception { 4023 int testCount = 50; 4024 int numRows = 1; 4025 int numFamilies = 10; 4026 int numQualifiers = 100; 4027 int compactInterval = 100; 4028 byte[][] families = new byte[numFamilies][]; 4029 for (int i = 0; i < numFamilies; i++) { 4030 families[i] = Bytes.toBytes("family" + i); 4031 } 4032 byte[][] qualifiers = new byte[numQualifiers][]; 4033 for (int i = 0; i < numQualifiers; i++) { 4034 qualifiers[i] = Bytes.toBytes("qual" + i); 4035 } 4036 4037 4038 // This test flushes constantly and can cause many files to be created, 4039 // possibly 4040 // extending over the ulimit. Make sure compactions are aggressive in 4041 // reducing 4042 // the number of HFiles created. 4043 Configuration conf = HBaseConfiguration.create(CONF); 4044 conf.setInt("hbase.hstore.compaction.min", 1); 4045 conf.setInt("hbase.hstore.compaction.max", 1000); 4046 this.region = initHRegion(tableName, method, conf, families); 4047 PutThread putThread = null; 4048 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 4049 try { 4050 putThread = new PutThread(numRows, families, qualifiers); 4051 putThread.start(); 4052 putThread.waitForFirstPut(); 4053 4054 // Add a thread that flushes as fast as possible 4055 ctx.addThread(new RepeatingTestThread(ctx) { 4056 4057 @Override 4058 public void doAnAction() throws Exception { 4059 region.flush(true); 4060 // Compact regularly to avoid creating too many files and exceeding 4061 // the ulimit. 4062 region.compact(false); 4063 for (HStore store : region.getStores()) { 4064 store.closeAndArchiveCompactedFiles(); 4065 } 4066 } 4067 }); 4068 ctx.startThreads(); 4069 4070 Get get = new Get(Bytes.toBytes("row0")); 4071 Result result = null; 4072 4073 int expectedCount = numFamilies * numQualifiers; 4074 4075 long prevTimestamp = 0L; 4076 for (int i = 0; i < testCount; i++) { 4077 LOG.info("testWritesWhileGetting verify turn " + i); 4078 boolean previousEmpty = result == null || result.isEmpty(); 4079 result = region.get(get); 4080 if (!result.isEmpty() || !previousEmpty || i > compactInterval) { 4081 assertEquals("i=" + i, expectedCount, result.size()); 4082 // TODO this was removed, now what dangit?! 4083 // search looking for the qualifier in question? 4084 long timestamp = 0; 4085 for (Cell kv : result.rawCells()) { 4086 if (CellUtil.matchingFamily(kv, families[0]) 4087 && CellUtil.matchingQualifier(kv, qualifiers[0])) { 4088 timestamp = kv.getTimestamp(); 4089 } 4090 } 4091 assertTrue(timestamp >= prevTimestamp); 4092 prevTimestamp = timestamp; 4093 Cell previousKV = null; 4094 4095 for (Cell kv : result.rawCells()) { 4096 byte[] thisValue = CellUtil.cloneValue(kv); 4097 if (previousKV != null) { 4098 if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) { 4099 LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV 4100 + "(memStoreTS:" + previousKV.getSequenceId() + ")" + ", New KV: " + kv 4101 + "(memStoreTS:" + kv.getSequenceId() + ")"); 4102 assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue)); 4103 } 4104 } 4105 previousKV = kv; 4106 } 4107 } 4108 } 4109 } finally { 4110 if (putThread != null) 4111 putThread.done(); 4112 4113 region.flush(true); 4114 4115 if (putThread != null) { 4116 putThread.join(); 4117 putThread.checkNoError(); 4118 } 4119 4120 ctx.stop(); 4121 HBaseTestingUtility.closeRegionAndWAL(this.region); 4122 this.region = null; 4123 } 4124 } 4125 4126 @Test 4127 public void testHolesInMeta() throws Exception { 4128 byte[] family = Bytes.toBytes("family"); 4129 this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF, 4130 false, family); 4131 try { 4132 byte[] rowNotServed = Bytes.toBytes("a"); 4133 Get g = new Get(rowNotServed); 4134 try { 4135 region.get(g); 4136 fail(); 4137 } catch (WrongRegionException x) { 4138 // OK 4139 } 4140 byte[] row = Bytes.toBytes("y"); 4141 g = new Get(row); 4142 region.get(g); 4143 } finally { 4144 HBaseTestingUtility.closeRegionAndWAL(this.region); 4145 this.region = null; 4146 } 4147 } 4148 4149 @Test 4150 public void testIndexesScanWithOneDeletedRow() throws IOException { 4151 byte[] family = Bytes.toBytes("family"); 4152 4153 // Setting up region 4154 this.region = initHRegion(tableName, method, CONF, family); 4155 try { 4156 Put put = new Put(Bytes.toBytes(1L)); 4157 put.addColumn(family, qual1, 1L, Bytes.toBytes(1L)); 4158 region.put(put); 4159 4160 region.flush(true); 4161 4162 Delete delete = new Delete(Bytes.toBytes(1L), 1L); 4163 region.delete(delete); 4164 4165 put = new Put(Bytes.toBytes(2L)); 4166 put.addColumn(family, qual1, 2L, Bytes.toBytes(2L)); 4167 region.put(put); 4168 4169 Scan idxScan = new Scan(); 4170 idxScan.addFamily(family); 4171 idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList( 4172 new SingleColumnValueFilter(family, qual1, CompareOp.GREATER_OR_EQUAL, 4173 new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1, 4174 CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L)))))); 4175 InternalScanner scanner = region.getScanner(idxScan); 4176 List<Cell> res = new ArrayList<>(); 4177 4178 while (scanner.next(res)) 4179 ; 4180 assertEquals(1L, res.size()); 4181 } finally { 4182 HBaseTestingUtility.closeRegionAndWAL(this.region); 4183 this.region = null; 4184 } 4185 } 4186 4187 // //////////////////////////////////////////////////////////////////////////// 4188 // Bloom filter test 4189 // //////////////////////////////////////////////////////////////////////////// 4190 @Test 4191 public void testBloomFilterSize() throws IOException { 4192 byte[] fam1 = Bytes.toBytes("fam1"); 4193 byte[] qf1 = Bytes.toBytes("col"); 4194 byte[] val1 = Bytes.toBytes("value1"); 4195 // Create Table 4196 HColumnDescriptor hcd = new HColumnDescriptor(fam1).setMaxVersions(Integer.MAX_VALUE) 4197 .setBloomFilterType(BloomType.ROWCOL); 4198 4199 HTableDescriptor htd = new HTableDescriptor(tableName); 4200 htd.addFamily(hcd); 4201 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 4202 this.region = TEST_UTIL.createLocalHRegion(info, htd); 4203 try { 4204 int num_unique_rows = 10; 4205 int duplicate_multiplier = 2; 4206 int num_storefiles = 4; 4207 4208 int version = 0; 4209 for (int f = 0; f < num_storefiles; f++) { 4210 for (int i = 0; i < duplicate_multiplier; i++) { 4211 for (int j = 0; j < num_unique_rows; j++) { 4212 Put put = new Put(Bytes.toBytes("row" + j)); 4213 put.setDurability(Durability.SKIP_WAL); 4214 long ts = version++; 4215 put.addColumn(fam1, qf1, ts, val1); 4216 region.put(put); 4217 } 4218 } 4219 region.flush(true); 4220 } 4221 // before compaction 4222 HStore store = region.getStore(fam1); 4223 Collection<HStoreFile> storeFiles = store.getStorefiles(); 4224 for (HStoreFile storefile : storeFiles) { 4225 StoreFileReader reader = storefile.getReader(); 4226 reader.loadFileInfo(); 4227 reader.loadBloomfilter(); 4228 assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries()); 4229 assertEquals(num_unique_rows, reader.getFilterEntries()); 4230 } 4231 4232 region.compact(true); 4233 4234 // after compaction 4235 storeFiles = store.getStorefiles(); 4236 for (HStoreFile storefile : storeFiles) { 4237 StoreFileReader reader = storefile.getReader(); 4238 reader.loadFileInfo(); 4239 reader.loadBloomfilter(); 4240 assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries()); 4241 assertEquals(num_unique_rows, reader.getFilterEntries()); 4242 } 4243 } finally { 4244 HBaseTestingUtility.closeRegionAndWAL(this.region); 4245 this.region = null; 4246 } 4247 } 4248 4249 @Test 4250 public void testAllColumnsWithBloomFilter() throws IOException { 4251 byte[] TABLE = Bytes.toBytes(name.getMethodName()); 4252 byte[] FAMILY = Bytes.toBytes("family"); 4253 4254 // Create table 4255 HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE) 4256 .setBloomFilterType(BloomType.ROWCOL); 4257 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE)); 4258 htd.addFamily(hcd); 4259 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 4260 this.region = TEST_UTIL.createLocalHRegion(info, htd); 4261 try { 4262 // For row:0, col:0: insert versions 1 through 5. 4263 byte row[] = Bytes.toBytes("row:" + 0); 4264 byte column[] = Bytes.toBytes("column:" + 0); 4265 Put put = new Put(row); 4266 put.setDurability(Durability.SKIP_WAL); 4267 for (long idx = 1; idx <= 4; idx++) { 4268 put.addColumn(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx)); 4269 } 4270 region.put(put); 4271 4272 // Flush 4273 region.flush(true); 4274 4275 // Get rows 4276 Get get = new Get(row); 4277 get.setMaxVersions(); 4278 Cell[] kvs = region.get(get).rawCells(); 4279 4280 // Check if rows are correct 4281 assertEquals(4, kvs.length); 4282 checkOneCell(kvs[0], FAMILY, 0, 0, 4); 4283 checkOneCell(kvs[1], FAMILY, 0, 0, 3); 4284 checkOneCell(kvs[2], FAMILY, 0, 0, 2); 4285 checkOneCell(kvs[3], FAMILY, 0, 0, 1); 4286 } finally { 4287 HBaseTestingUtility.closeRegionAndWAL(this.region); 4288 this.region = null; 4289 } 4290 } 4291 4292 /** 4293 * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when 4294 * issuing delete row on columns with bloom filter set to row+col 4295 * (BloomType.ROWCOL) 4296 */ 4297 @Test 4298 public void testDeleteRowWithBloomFilter() throws IOException { 4299 byte[] familyName = Bytes.toBytes("familyName"); 4300 4301 // Create Table 4302 HColumnDescriptor hcd = new HColumnDescriptor(familyName).setMaxVersions(Integer.MAX_VALUE) 4303 .setBloomFilterType(BloomType.ROWCOL); 4304 4305 HTableDescriptor htd = new HTableDescriptor(tableName); 4306 htd.addFamily(hcd); 4307 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 4308 this.region = TEST_UTIL.createLocalHRegion(info, htd); 4309 try { 4310 // Insert some data 4311 byte row[] = Bytes.toBytes("row1"); 4312 byte col[] = Bytes.toBytes("col1"); 4313 4314 Put put = new Put(row); 4315 put.addColumn(familyName, col, 1, Bytes.toBytes("SomeRandomValue")); 4316 region.put(put); 4317 region.flush(true); 4318 4319 Delete del = new Delete(row); 4320 region.delete(del); 4321 region.flush(true); 4322 4323 // Get remaining rows (should have none) 4324 Get get = new Get(row); 4325 get.addColumn(familyName, col); 4326 4327 Cell[] keyValues = region.get(get).rawCells(); 4328 assertTrue(keyValues.length == 0); 4329 } finally { 4330 HBaseTestingUtility.closeRegionAndWAL(this.region); 4331 this.region = null; 4332 } 4333 } 4334 4335 @Test 4336 public void testgetHDFSBlocksDistribution() throws Exception { 4337 HBaseTestingUtility htu = new HBaseTestingUtility(); 4338 // Why do we set the block size in this test? If we set it smaller than the kvs, then we'll 4339 // break up the file in to more pieces that can be distributed across the three nodes and we 4340 // won't be able to have the condition this test asserts; that at least one node has 4341 // a copy of all replicas -- if small block size, then blocks are spread evenly across the 4342 // the three nodes. hfilev3 with tags seems to put us over the block size. St.Ack. 4343 // final int DEFAULT_BLOCK_SIZE = 1024; 4344 // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); 4345 htu.getConfiguration().setInt("dfs.replication", 2); 4346 4347 // set up a cluster with 3 nodes 4348 MiniHBaseCluster cluster = null; 4349 String dataNodeHosts[] = new String[] { "host1", "host2", "host3" }; 4350 int regionServersCount = 3; 4351 4352 try { 4353 cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts); 4354 byte[][] families = { fam1, fam2 }; 4355 Table ht = htu.createTable(tableName, families); 4356 4357 // Setting up region 4358 byte row[] = Bytes.toBytes("row1"); 4359 byte col[] = Bytes.toBytes("col1"); 4360 4361 Put put = new Put(row); 4362 put.addColumn(fam1, col, 1, Bytes.toBytes("test1")); 4363 put.addColumn(fam2, col, 1, Bytes.toBytes("test2")); 4364 ht.put(put); 4365 4366 HRegion firstRegion = htu.getHBaseCluster().getRegions(tableName).get(0); 4367 firstRegion.flush(true); 4368 HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution(); 4369 4370 // Given the default replication factor is 2 and we have 2 HFiles, 4371 // we will have total of 4 replica of blocks on 3 datanodes; thus there 4372 // must be at least one host that have replica for 2 HFiles. That host's 4373 // weight will be equal to the unique block weight. 4374 long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight(); 4375 StringBuilder sb = new StringBuilder(); 4376 for (String host: blocksDistribution1.getTopHosts()) { 4377 if (sb.length() > 0) sb.append(", "); 4378 sb.append(host); 4379 sb.append("="); 4380 sb.append(blocksDistribution1.getWeight(host)); 4381 } 4382 4383 String topHost = blocksDistribution1.getTopHosts().get(0); 4384 long topHostWeight = blocksDistribution1.getWeight(topHost); 4385 String msg = "uniqueBlocksWeight=" + uniqueBlocksWeight1 + ", topHostWeight=" + 4386 topHostWeight + ", topHost=" + topHost + "; " + sb.toString(); 4387 LOG.info(msg); 4388 assertTrue(msg, uniqueBlocksWeight1 == topHostWeight); 4389 4390 // use the static method to compute the value, it should be the same. 4391 // static method is used by load balancer or other components 4392 HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution( 4393 htu.getConfiguration(), firstRegion.getTableDescriptor(), firstRegion.getRegionInfo()); 4394 long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight(); 4395 4396 assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2); 4397 4398 ht.close(); 4399 } finally { 4400 if (cluster != null) { 4401 htu.shutdownMiniCluster(); 4402 } 4403 } 4404 } 4405 4406 /** 4407 * Testcase to check state of region initialization task set to ABORTED or not 4408 * if any exceptions during initialization 4409 * 4410 * @throws Exception 4411 */ 4412 @Test 4413 public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception { 4414 HRegionInfo info; 4415 try { 4416 FileSystem fs = Mockito.mock(FileSystem.class); 4417 Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException()); 4418 HTableDescriptor htd = new HTableDescriptor(tableName); 4419 htd.addFamily(new HColumnDescriptor("cf")); 4420 info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY, 4421 HConstants.EMPTY_BYTE_ARRAY, false); 4422 Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization"); 4423 region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null); 4424 // region initialization throws IOException and set task state to ABORTED. 4425 region.initialize(); 4426 fail("Region initialization should fail due to IOException"); 4427 } catch (IOException io) { 4428 List<MonitoredTask> tasks = TaskMonitor.get().getTasks(); 4429 for (MonitoredTask monitoredTask : tasks) { 4430 if (!(monitoredTask instanceof MonitoredRPCHandler) 4431 && monitoredTask.getDescription().contains(region.toString())) { 4432 assertTrue("Region state should be ABORTED.", 4433 monitoredTask.getState().equals(MonitoredTask.State.ABORTED)); 4434 break; 4435 } 4436 } 4437 } finally { 4438 HBaseTestingUtility.closeRegionAndWAL(region); 4439 } 4440 } 4441 4442 /** 4443 * Verifies that the .regioninfo file is written on region creation and that 4444 * is recreated if missing during region opening. 4445 */ 4446 @Test 4447 public void testRegionInfoFileCreation() throws IOException { 4448 Path rootDir = new Path(dir + "testRegionInfoFileCreation"); 4449 4450 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 4451 htd.addFamily(new HColumnDescriptor("cf")); 4452 4453 HRegionInfo hri = new HRegionInfo(htd.getTableName()); 4454 4455 // Create a region and skip the initialization (like CreateTableHandler) 4456 HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, rootDir, CONF, htd, false); 4457 Path regionDir = region.getRegionFileSystem().getRegionDir(); 4458 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 4459 HBaseTestingUtility.closeRegionAndWAL(region); 4460 4461 Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE); 4462 4463 // Verify that the .regioninfo file is present 4464 assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir", 4465 fs.exists(regionInfoFile)); 4466 4467 // Try to open the region 4468 region = HRegion.openHRegion(rootDir, hri, htd, null, CONF); 4469 assertEquals(regionDir, region.getRegionFileSystem().getRegionDir()); 4470 HBaseTestingUtility.closeRegionAndWAL(region); 4471 4472 // Verify that the .regioninfo file is still there 4473 assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir", 4474 fs.exists(regionInfoFile)); 4475 4476 // Remove the .regioninfo file and verify is recreated on region open 4477 fs.delete(regionInfoFile, true); 4478 assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir", 4479 fs.exists(regionInfoFile)); 4480 4481 region = HRegion.openHRegion(rootDir, hri, htd, null, CONF); 4482// region = TEST_UTIL.openHRegion(hri, htd); 4483 assertEquals(regionDir, region.getRegionFileSystem().getRegionDir()); 4484 HBaseTestingUtility.closeRegionAndWAL(region); 4485 4486 // Verify that the .regioninfo file is still there 4487 assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir", 4488 fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE))); 4489 } 4490 4491 /** 4492 * TestCase for increment 4493 */ 4494 private static class Incrementer implements Runnable { 4495 private HRegion region; 4496 private final static byte[] incRow = Bytes.toBytes("incRow"); 4497 private final static byte[] family = Bytes.toBytes("family"); 4498 private final static byte[] qualifier = Bytes.toBytes("qualifier"); 4499 private final static long ONE = 1L; 4500 private int incCounter; 4501 4502 public Incrementer(HRegion region, int incCounter) { 4503 this.region = region; 4504 this.incCounter = incCounter; 4505 } 4506 4507 @Override 4508 public void run() { 4509 int count = 0; 4510 while (count < incCounter) { 4511 Increment inc = new Increment(incRow); 4512 inc.addColumn(family, qualifier, ONE); 4513 count++; 4514 try { 4515 region.increment(inc); 4516 } catch (IOException e) { 4517 LOG.info("Count=" + count + ", " + e); 4518 break; 4519 } 4520 } 4521 } 4522 } 4523 4524 /** 4525 * Test case to check increment function with memstore flushing 4526 * @throws Exception 4527 */ 4528 @Test 4529 public void testParallelIncrementWithMemStoreFlush() throws Exception { 4530 byte[] family = Incrementer.family; 4531 this.region = initHRegion(tableName, method, CONF, family); 4532 final HRegion region = this.region; 4533 final AtomicBoolean incrementDone = new AtomicBoolean(false); 4534 Runnable flusher = new Runnable() { 4535 @Override 4536 public void run() { 4537 while (!incrementDone.get()) { 4538 try { 4539 region.flush(true); 4540 } catch (Exception e) { 4541 e.printStackTrace(); 4542 } 4543 } 4544 } 4545 }; 4546 4547 // after all increment finished, the row will increment to 20*100 = 2000 4548 int threadNum = 20; 4549 int incCounter = 100; 4550 long expected = (long) threadNum * incCounter; 4551 Thread[] incrementers = new Thread[threadNum]; 4552 Thread flushThread = new Thread(flusher); 4553 for (int i = 0; i < threadNum; i++) { 4554 incrementers[i] = new Thread(new Incrementer(this.region, incCounter)); 4555 incrementers[i].start(); 4556 } 4557 flushThread.start(); 4558 for (int i = 0; i < threadNum; i++) { 4559 incrementers[i].join(); 4560 } 4561 4562 incrementDone.set(true); 4563 flushThread.join(); 4564 4565 Get get = new Get(Incrementer.incRow); 4566 get.addColumn(Incrementer.family, Incrementer.qualifier); 4567 get.setMaxVersions(1); 4568 Result res = this.region.get(get); 4569 List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier); 4570 4571 // we just got the latest version 4572 assertEquals(1, kvs.size()); 4573 Cell kv = kvs.get(0); 4574 assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset())); 4575 this.region = null; 4576 } 4577 4578 /** 4579 * TestCase for append 4580 */ 4581 private static class Appender implements Runnable { 4582 private HRegion region; 4583 private final static byte[] appendRow = Bytes.toBytes("appendRow"); 4584 private final static byte[] family = Bytes.toBytes("family"); 4585 private final static byte[] qualifier = Bytes.toBytes("qualifier"); 4586 private final static byte[] CHAR = Bytes.toBytes("a"); 4587 private int appendCounter; 4588 4589 public Appender(HRegion region, int appendCounter) { 4590 this.region = region; 4591 this.appendCounter = appendCounter; 4592 } 4593 4594 @Override 4595 public void run() { 4596 int count = 0; 4597 while (count < appendCounter) { 4598 Append app = new Append(appendRow); 4599 app.addColumn(family, qualifier, CHAR); 4600 count++; 4601 try { 4602 region.append(app); 4603 } catch (IOException e) { 4604 LOG.info("Count=" + count + ", max=" + appendCounter + ", " + e); 4605 break; 4606 } 4607 } 4608 } 4609 } 4610 4611 /** 4612 * Test case to check append function with memstore flushing 4613 * @throws Exception 4614 */ 4615 @Test 4616 public void testParallelAppendWithMemStoreFlush() throws Exception { 4617 byte[] family = Appender.family; 4618 this.region = initHRegion(tableName, method, CONF, family); 4619 final HRegion region = this.region; 4620 final AtomicBoolean appendDone = new AtomicBoolean(false); 4621 Runnable flusher = new Runnable() { 4622 @Override 4623 public void run() { 4624 while (!appendDone.get()) { 4625 try { 4626 region.flush(true); 4627 } catch (Exception e) { 4628 e.printStackTrace(); 4629 } 4630 } 4631 } 4632 }; 4633 4634 // After all append finished, the value will append to threadNum * 4635 // appendCounter Appender.CHAR 4636 int threadNum = 20; 4637 int appendCounter = 100; 4638 byte[] expected = new byte[threadNum * appendCounter]; 4639 for (int i = 0; i < threadNum * appendCounter; i++) { 4640 System.arraycopy(Appender.CHAR, 0, expected, i, 1); 4641 } 4642 Thread[] appenders = new Thread[threadNum]; 4643 Thread flushThread = new Thread(flusher); 4644 for (int i = 0; i < threadNum; i++) { 4645 appenders[i] = new Thread(new Appender(this.region, appendCounter)); 4646 appenders[i].start(); 4647 } 4648 flushThread.start(); 4649 for (int i = 0; i < threadNum; i++) { 4650 appenders[i].join(); 4651 } 4652 4653 appendDone.set(true); 4654 flushThread.join(); 4655 4656 Get get = new Get(Appender.appendRow); 4657 get.addColumn(Appender.family, Appender.qualifier); 4658 get.setMaxVersions(1); 4659 Result res = this.region.get(get); 4660 List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier); 4661 4662 // we just got the latest version 4663 assertEquals(1, kvs.size()); 4664 Cell kv = kvs.get(0); 4665 byte[] appendResult = new byte[kv.getValueLength()]; 4666 System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength()); 4667 assertArrayEquals(expected, appendResult); 4668 this.region = null; 4669 } 4670 4671 /** 4672 * Test case to check put function with memstore flushing for same row, same ts 4673 * @throws Exception 4674 */ 4675 @Test 4676 public void testPutWithMemStoreFlush() throws Exception { 4677 byte[] family = Bytes.toBytes("family"); 4678 byte[] qualifier = Bytes.toBytes("qualifier"); 4679 byte[] row = Bytes.toBytes("putRow"); 4680 byte[] value = null; 4681 this.region = initHRegion(tableName, method, CONF, family); 4682 Put put = null; 4683 Get get = null; 4684 List<Cell> kvs = null; 4685 Result res = null; 4686 4687 put = new Put(row); 4688 value = Bytes.toBytes("value0"); 4689 put.addColumn(family, qualifier, 1234567L, value); 4690 region.put(put); 4691 get = new Get(row); 4692 get.addColumn(family, qualifier); 4693 get.setMaxVersions(); 4694 res = this.region.get(get); 4695 kvs = res.getColumnCells(family, qualifier); 4696 assertEquals(1, kvs.size()); 4697 assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0))); 4698 4699 region.flush(true); 4700 get = new Get(row); 4701 get.addColumn(family, qualifier); 4702 get.setMaxVersions(); 4703 res = this.region.get(get); 4704 kvs = res.getColumnCells(family, qualifier); 4705 assertEquals(1, kvs.size()); 4706 assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0))); 4707 4708 put = new Put(row); 4709 value = Bytes.toBytes("value1"); 4710 put.addColumn(family, qualifier, 1234567L, value); 4711 region.put(put); 4712 get = new Get(row); 4713 get.addColumn(family, qualifier); 4714 get.setMaxVersions(); 4715 res = this.region.get(get); 4716 kvs = res.getColumnCells(family, qualifier); 4717 assertEquals(1, kvs.size()); 4718 assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0))); 4719 4720 region.flush(true); 4721 get = new Get(row); 4722 get.addColumn(family, qualifier); 4723 get.setMaxVersions(); 4724 res = this.region.get(get); 4725 kvs = res.getColumnCells(family, qualifier); 4726 assertEquals(1, kvs.size()); 4727 assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0))); 4728 } 4729 4730 @Test 4731 public void testDurability() throws Exception { 4732 // there are 5 x 5 cases: 4733 // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation 4734 // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) 4735 4736 // expected cases for append and sync wal 4737 durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); 4738 durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); 4739 durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false); 4740 4741 durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); 4742 durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); 4743 durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false); 4744 4745 durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); 4746 durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); 4747 4748 durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false); 4749 durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false); 4750 4751 durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false); 4752 durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false); 4753 durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false); 4754 4755 // expected cases for async wal 4756 durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); 4757 durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); 4758 durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); 4759 durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false); 4760 durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false); 4761 durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false); 4762 4763 durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); 4764 durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); 4765 durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); 4766 durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true); 4767 durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true); 4768 durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true); 4769 4770 // expect skip wal cases 4771 durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); 4772 durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); 4773 durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); 4774 durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false); 4775 durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false); 4776 durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false); 4777 4778 } 4779 4780 private void durabilityTest(String method, Durability tableDurability, 4781 Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync, 4782 final boolean expectSyncFromLogSyncer) throws Exception { 4783 Configuration conf = HBaseConfiguration.create(CONF); 4784 method = method + "_" + tableDurability.name() + "_" + mutationDurability.name(); 4785 byte[] family = Bytes.toBytes("family"); 4786 Path logDir = new Path(new Path(dir + method), "log"); 4787 final Configuration walConf = new Configuration(conf); 4788 FSUtils.setRootDir(walConf, logDir); 4789 // XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not 4790 // deal with classes which have a field of an inner class. See discussions in HBASE-15536. 4791 walConf.set(WALFactory.WAL_PROVIDER, "filesystem"); 4792 final WALFactory wals = new WALFactory(walConf, TEST_UTIL.getRandomUUID().toString()); 4793 final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build())); 4794 this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, 4795 HConstants.EMPTY_END_ROW, false, tableDurability, wal, 4796 new byte[][] { family }); 4797 4798 Put put = new Put(Bytes.toBytes("r1")); 4799 put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); 4800 put.setDurability(mutationDurability); 4801 region.put(put); 4802 4803 // verify append called or not 4804 verify(wal, expectAppend ? times(1) : never()).appendData((HRegionInfo) any(), 4805 (WALKeyImpl) any(), (WALEdit) any()); 4806 4807 // verify sync called or not 4808 if (expectSync || expectSyncFromLogSyncer) { 4809 TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() { 4810 @Override 4811 public boolean evaluate() throws Exception { 4812 try { 4813 if (expectSync) { 4814 verify(wal, times(1)).sync(anyLong()); // Hregion calls this one 4815 } else if (expectSyncFromLogSyncer) { 4816 verify(wal, times(1)).sync(); // wal syncer calls this one 4817 } 4818 } catch (Throwable ignore) { 4819 } 4820 return true; 4821 } 4822 }); 4823 } else { 4824 //verify(wal, never()).sync(anyLong()); 4825 verify(wal, never()).sync(); 4826 } 4827 4828 HBaseTestingUtility.closeRegionAndWAL(this.region); 4829 wals.close(); 4830 this.region = null; 4831 } 4832 4833 @Test 4834 public void testRegionReplicaSecondary() throws IOException { 4835 // create a primary region, load some data and flush 4836 // create a secondary region, and do a get against that 4837 Path rootDir = new Path(dir + name.getMethodName()); 4838 FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); 4839 4840 byte[][] families = new byte[][] { 4841 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") 4842 }; 4843 byte[] cq = Bytes.toBytes("cq"); 4844 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 4845 for (byte[] family : families) { 4846 htd.addFamily(new HColumnDescriptor(family)); 4847 } 4848 4849 long time = System.currentTimeMillis(); 4850 HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(), 4851 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4852 false, time, 0); 4853 HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(), 4854 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4855 false, time, 1); 4856 4857 HRegion primaryRegion = null, secondaryRegion = null; 4858 4859 try { 4860 primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri, 4861 rootDir, TEST_UTIL.getConfiguration(), htd); 4862 4863 // load some data 4864 putData(primaryRegion, 0, 1000, cq, families); 4865 4866 // flush region 4867 primaryRegion.flush(true); 4868 4869 // open secondary region 4870 secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF); 4871 4872 verifyData(secondaryRegion, 0, 1000, cq, families); 4873 } finally { 4874 if (primaryRegion != null) { 4875 HBaseTestingUtility.closeRegionAndWAL(primaryRegion); 4876 } 4877 if (secondaryRegion != null) { 4878 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); 4879 } 4880 } 4881 } 4882 4883 @Test 4884 public void testRegionReplicaSecondaryIsReadOnly() throws IOException { 4885 // create a primary region, load some data and flush 4886 // create a secondary region, and do a put against that 4887 Path rootDir = new Path(dir + name.getMethodName()); 4888 FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); 4889 4890 byte[][] families = new byte[][] { 4891 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") 4892 }; 4893 byte[] cq = Bytes.toBytes("cq"); 4894 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 4895 for (byte[] family : families) { 4896 htd.addFamily(new HColumnDescriptor(family)); 4897 } 4898 4899 long time = System.currentTimeMillis(); 4900 HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(), 4901 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4902 false, time, 0); 4903 HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(), 4904 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4905 false, time, 1); 4906 4907 HRegion primaryRegion = null, secondaryRegion = null; 4908 4909 try { 4910 primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri, 4911 rootDir, TEST_UTIL.getConfiguration(), htd); 4912 4913 // load some data 4914 putData(primaryRegion, 0, 1000, cq, families); 4915 4916 // flush region 4917 primaryRegion.flush(true); 4918 4919 // open secondary region 4920 secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF); 4921 4922 try { 4923 putData(secondaryRegion, 0, 1000, cq, families); 4924 fail("Should have thrown exception"); 4925 } catch (IOException ex) { 4926 // expected 4927 } 4928 } finally { 4929 if (primaryRegion != null) { 4930 HBaseTestingUtility.closeRegionAndWAL(primaryRegion); 4931 } 4932 if (secondaryRegion != null) { 4933 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); 4934 } 4935 } 4936 } 4937 4938 static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException { 4939 Configuration confForWAL = new Configuration(conf); 4940 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); 4941 return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)); 4942 } 4943 4944 @Test 4945 public void testCompactionFromPrimary() throws IOException { 4946 Path rootDir = new Path(dir + name.getMethodName()); 4947 FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); 4948 4949 byte[][] families = new byte[][] { 4950 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") 4951 }; 4952 byte[] cq = Bytes.toBytes("cq"); 4953 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 4954 for (byte[] family : families) { 4955 htd.addFamily(new HColumnDescriptor(family)); 4956 } 4957 4958 long time = System.currentTimeMillis(); 4959 HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(), 4960 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4961 false, time, 0); 4962 HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(), 4963 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4964 false, time, 1); 4965 4966 HRegion primaryRegion = null, secondaryRegion = null; 4967 4968 try { 4969 primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri, 4970 rootDir, TEST_UTIL.getConfiguration(), htd); 4971 4972 // load some data 4973 putData(primaryRegion, 0, 1000, cq, families); 4974 4975 // flush region 4976 primaryRegion.flush(true); 4977 4978 // open secondary region 4979 secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF); 4980 4981 // move the file of the primary region to the archive, simulating a compaction 4982 Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles(); 4983 primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles); 4984 Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem() 4985 .getStoreFiles(families[0]); 4986 Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty()); 4987 4988 verifyData(secondaryRegion, 0, 1000, cq, families); 4989 } finally { 4990 if (primaryRegion != null) { 4991 HBaseTestingUtility.closeRegionAndWAL(primaryRegion); 4992 } 4993 if (secondaryRegion != null) { 4994 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); 4995 } 4996 } 4997 } 4998 4999 private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws 5000 IOException { 5001 putData(this.region, startRow, numRows, qf, families); 5002 } 5003 5004 private void putData(HRegion region, 5005 int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { 5006 putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families); 5007 } 5008 5009 static void putData(HRegion region, Durability durability, 5010 int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { 5011 for (int i = startRow; i < startRow + numRows; i++) { 5012 Put put = new Put(Bytes.toBytes("" + i)); 5013 put.setDurability(durability); 5014 for (byte[] family : families) { 5015 put.addColumn(family, qf, null); 5016 } 5017 region.put(put); 5018 LOG.info(put.toString()); 5019 } 5020 } 5021 5022 static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families) 5023 throws IOException { 5024 for (int i = startRow; i < startRow + numRows; i++) { 5025 byte[] row = Bytes.toBytes("" + i); 5026 Get get = new Get(row); 5027 for (byte[] family : families) { 5028 get.addColumn(family, qf); 5029 } 5030 Result result = newReg.get(get); 5031 Cell[] raw = result.rawCells(); 5032 assertEquals(families.length, result.size()); 5033 for (int j = 0; j < families.length; j++) { 5034 assertTrue(CellUtil.matchingRows(raw[j], row)); 5035 assertTrue(CellUtil.matchingFamily(raw[j], families[j])); 5036 assertTrue(CellUtil.matchingQualifier(raw[j], qf)); 5037 } 5038 } 5039 } 5040 5041 static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException { 5042 // Now I have k, get values out and assert they are as expected. 5043 Get get = new Get(k).addFamily(family).setMaxVersions(); 5044 Cell[] results = r.get(get).rawCells(); 5045 for (int j = 0; j < results.length; j++) { 5046 byte[] tmp = CellUtil.cloneValue(results[j]); 5047 // Row should be equal to value every time. 5048 assertTrue(Bytes.equals(k, tmp)); 5049 } 5050 } 5051 5052 /* 5053 * Assert first value in the passed region is <code>firstValue</code>. 5054 * 5055 * @param r 5056 * 5057 * @param fs 5058 * 5059 * @param firstValue 5060 * 5061 * @throws IOException 5062 */ 5063 protected void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue) 5064 throws IOException { 5065 byte[][] families = { fs }; 5066 Scan scan = new Scan(); 5067 for (int i = 0; i < families.length; i++) 5068 scan.addFamily(families[i]); 5069 InternalScanner s = r.getScanner(scan); 5070 try { 5071 List<Cell> curVals = new ArrayList<>(); 5072 boolean first = true; 5073 OUTER_LOOP: while (s.next(curVals)) { 5074 for (Cell kv : curVals) { 5075 byte[] val = CellUtil.cloneValue(kv); 5076 byte[] curval = val; 5077 if (first) { 5078 first = false; 5079 assertTrue(Bytes.compareTo(curval, firstValue) == 0); 5080 } else { 5081 // Not asserting anything. Might as well break. 5082 break OUTER_LOOP; 5083 } 5084 } 5085 } 5086 } finally { 5087 s.close(); 5088 } 5089 } 5090 5091 /** 5092 * Test that we get the expected flush results back 5093 */ 5094 @Test 5095 public void testFlushResult() throws IOException { 5096 byte[] family = Bytes.toBytes("family"); 5097 5098 this.region = initHRegion(tableName, method, family); 5099 5100 // empty memstore, flush doesn't run 5101 HRegion.FlushResult fr = region.flush(true); 5102 assertFalse(fr.isFlushSucceeded()); 5103 assertFalse(fr.isCompactionNeeded()); 5104 5105 // Flush enough files to get up to the threshold, doesn't need compactions 5106 for (int i = 0; i < 2; i++) { 5107 Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes()); 5108 region.put(put); 5109 fr = region.flush(true); 5110 assertTrue(fr.isFlushSucceeded()); 5111 assertFalse(fr.isCompactionNeeded()); 5112 } 5113 5114 // Two flushes after the threshold, compactions are needed 5115 for (int i = 0; i < 2; i++) { 5116 Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes()); 5117 region.put(put); 5118 fr = region.flush(true); 5119 assertTrue(fr.isFlushSucceeded()); 5120 assertTrue(fr.isCompactionNeeded()); 5121 } 5122 } 5123 5124 protected Configuration initSplit() { 5125 // Always compact if there is more than one store file. 5126 CONF.setInt("hbase.hstore.compactionThreshold", 2); 5127 5128 CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000); 5129 5130 // Increase the amount of time between client retries 5131 CONF.setLong("hbase.client.pause", 15 * 1000); 5132 5133 // This size should make it so we always split using the addContent 5134 // below. After adding all data, the first region is 1.3M 5135 CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128); 5136 return CONF; 5137 } 5138 5139 /** 5140 * @return A region on which you must call 5141 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. 5142 */ 5143 protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf, 5144 byte[]... families) throws IOException { 5145 return initHRegion(tableName, callingMethod, conf, false, families); 5146 } 5147 5148 /** 5149 * @return A region on which you must call 5150 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. 5151 */ 5152 protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf, 5153 boolean isReadOnly, byte[]... families) throws IOException { 5154 return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families); 5155 } 5156 5157 protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 5158 String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families) 5159 throws IOException { 5160 Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log"); 5161 HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey); 5162 final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri); 5163 return initHRegion(tableName, startKey, stopKey, isReadOnly, 5164 Durability.SYNC_WAL, wal, families); 5165 } 5166 5167 /** 5168 * @return A region on which you must call 5169 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. 5170 */ 5171 public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 5172 boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException { 5173 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 5174 return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, 5175 isReadOnly, durability, wal, families); 5176 } 5177 5178 /** 5179 * Assert that the passed in Cell has expected contents for the specified row, 5180 * column & timestamp. 5181 */ 5182 private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) { 5183 String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts; 5184 assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx, 5185 Bytes.toString(CellUtil.cloneRow(kv))); 5186 assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf), 5187 Bytes.toString(CellUtil.cloneFamily(kv))); 5188 assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx, 5189 Bytes.toString(CellUtil.cloneQualifier(kv))); 5190 assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp()); 5191 assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts, 5192 Bytes.toString(CellUtil.cloneValue(kv))); 5193 } 5194 5195 @Test 5196 public void testReverseScanner_FromMemStore_SingleCF_Normal() 5197 throws IOException { 5198 byte[] rowC = Bytes.toBytes("rowC"); 5199 byte[] rowA = Bytes.toBytes("rowA"); 5200 byte[] rowB = Bytes.toBytes("rowB"); 5201 byte[] cf = Bytes.toBytes("CF"); 5202 byte[][] families = { cf }; 5203 byte[] col = Bytes.toBytes("C"); 5204 long ts = 1; 5205 this.region = initHRegion(tableName, method, families); 5206 try { 5207 KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); 5208 KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, 5209 null); 5210 KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); 5211 KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); 5212 Put put = null; 5213 put = new Put(rowC); 5214 put.add(kv1); 5215 put.add(kv11); 5216 region.put(put); 5217 put = new Put(rowA); 5218 put.add(kv2); 5219 region.put(put); 5220 put = new Put(rowB); 5221 put.add(kv3); 5222 region.put(put); 5223 5224 Scan scan = new Scan(rowC); 5225 scan.setMaxVersions(5); 5226 scan.setReversed(true); 5227 InternalScanner scanner = region.getScanner(scan); 5228 List<Cell> currRow = new ArrayList<>(); 5229 boolean hasNext = scanner.next(currRow); 5230 assertEquals(2, currRow.size()); 5231 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5232 .get(0).getRowLength(), rowC, 0, rowC.length)); 5233 assertTrue(hasNext); 5234 currRow.clear(); 5235 hasNext = scanner.next(currRow); 5236 assertEquals(1, currRow.size()); 5237 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5238 .get(0).getRowLength(), rowB, 0, rowB.length)); 5239 assertTrue(hasNext); 5240 currRow.clear(); 5241 hasNext = scanner.next(currRow); 5242 assertEquals(1, currRow.size()); 5243 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5244 .get(0).getRowLength(), rowA, 0, rowA.length)); 5245 assertFalse(hasNext); 5246 scanner.close(); 5247 } finally { 5248 HBaseTestingUtility.closeRegionAndWAL(this.region); 5249 this.region = null; 5250 } 5251 } 5252 5253 @Test 5254 public void testReverseScanner_FromMemStore_SingleCF_LargerKey() 5255 throws IOException { 5256 byte[] rowC = Bytes.toBytes("rowC"); 5257 byte[] rowA = Bytes.toBytes("rowA"); 5258 byte[] rowB = Bytes.toBytes("rowB"); 5259 byte[] rowD = Bytes.toBytes("rowD"); 5260 byte[] cf = Bytes.toBytes("CF"); 5261 byte[][] families = { cf }; 5262 byte[] col = Bytes.toBytes("C"); 5263 long ts = 1; 5264 this.region = initHRegion(tableName, method, families); 5265 try { 5266 KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); 5267 KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, 5268 null); 5269 KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); 5270 KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); 5271 Put put = null; 5272 put = new Put(rowC); 5273 put.add(kv1); 5274 put.add(kv11); 5275 region.put(put); 5276 put = new Put(rowA); 5277 put.add(kv2); 5278 region.put(put); 5279 put = new Put(rowB); 5280 put.add(kv3); 5281 region.put(put); 5282 5283 Scan scan = new Scan(rowD); 5284 List<Cell> currRow = new ArrayList<>(); 5285 scan.setReversed(true); 5286 scan.setMaxVersions(5); 5287 InternalScanner scanner = region.getScanner(scan); 5288 boolean hasNext = scanner.next(currRow); 5289 assertEquals(2, currRow.size()); 5290 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5291 .get(0).getRowLength(), rowC, 0, rowC.length)); 5292 assertTrue(hasNext); 5293 currRow.clear(); 5294 hasNext = scanner.next(currRow); 5295 assertEquals(1, currRow.size()); 5296 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5297 .get(0).getRowLength(), rowB, 0, rowB.length)); 5298 assertTrue(hasNext); 5299 currRow.clear(); 5300 hasNext = scanner.next(currRow); 5301 assertEquals(1, currRow.size()); 5302 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5303 .get(0).getRowLength(), rowA, 0, rowA.length)); 5304 assertFalse(hasNext); 5305 scanner.close(); 5306 } finally { 5307 HBaseTestingUtility.closeRegionAndWAL(this.region); 5308 this.region = null; 5309 } 5310 } 5311 5312 @Test 5313 public void testReverseScanner_FromMemStore_SingleCF_FullScan() 5314 throws IOException { 5315 byte[] rowC = Bytes.toBytes("rowC"); 5316 byte[] rowA = Bytes.toBytes("rowA"); 5317 byte[] rowB = Bytes.toBytes("rowB"); 5318 byte[] cf = Bytes.toBytes("CF"); 5319 byte[][] families = { cf }; 5320 byte[] col = Bytes.toBytes("C"); 5321 long ts = 1; 5322 this.region = initHRegion(tableName, method, families); 5323 try { 5324 KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); 5325 KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, 5326 null); 5327 KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); 5328 KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); 5329 Put put = null; 5330 put = new Put(rowC); 5331 put.add(kv1); 5332 put.add(kv11); 5333 region.put(put); 5334 put = new Put(rowA); 5335 put.add(kv2); 5336 region.put(put); 5337 put = new Put(rowB); 5338 put.add(kv3); 5339 region.put(put); 5340 Scan scan = new Scan(); 5341 List<Cell> currRow = new ArrayList<>(); 5342 scan.setReversed(true); 5343 InternalScanner scanner = region.getScanner(scan); 5344 boolean hasNext = scanner.next(currRow); 5345 assertEquals(1, currRow.size()); 5346 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5347 .get(0).getRowLength(), rowC, 0, rowC.length)); 5348 assertTrue(hasNext); 5349 currRow.clear(); 5350 hasNext = scanner.next(currRow); 5351 assertEquals(1, currRow.size()); 5352 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5353 .get(0).getRowLength(), rowB, 0, rowB.length)); 5354 assertTrue(hasNext); 5355 currRow.clear(); 5356 hasNext = scanner.next(currRow); 5357 assertEquals(1, currRow.size()); 5358 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5359 .get(0).getRowLength(), rowA, 0, rowA.length)); 5360 assertFalse(hasNext); 5361 scanner.close(); 5362 } finally { 5363 HBaseTestingUtility.closeRegionAndWAL(this.region); 5364 this.region = null; 5365 } 5366 } 5367 5368 @Test 5369 public void testReverseScanner_moreRowsMayExistAfter() throws IOException { 5370 // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop 5371 byte[] rowA = Bytes.toBytes("rowA"); 5372 byte[] rowB = Bytes.toBytes("rowB"); 5373 byte[] rowC = Bytes.toBytes("rowC"); 5374 byte[] rowD = Bytes.toBytes("rowD"); 5375 byte[] rowE = Bytes.toBytes("rowE"); 5376 byte[] cf = Bytes.toBytes("CF"); 5377 byte[][] families = { cf }; 5378 byte[] col1 = Bytes.toBytes("col1"); 5379 byte[] col2 = Bytes.toBytes("col2"); 5380 long ts = 1; 5381 this.region = initHRegion(tableName, method, families); 5382 try { 5383 KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); 5384 KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); 5385 KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); 5386 KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); 5387 KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); 5388 KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); 5389 Put put = null; 5390 put = new Put(rowA); 5391 put.add(kv1); 5392 region.put(put); 5393 put = new Put(rowB); 5394 put.add(kv2); 5395 region.put(put); 5396 put = new Put(rowC); 5397 put.add(kv3); 5398 region.put(put); 5399 put = new Put(rowD); 5400 put.add(kv4_1); 5401 region.put(put); 5402 put = new Put(rowD); 5403 put.add(kv4_2); 5404 region.put(put); 5405 put = new Put(rowE); 5406 put.add(kv5); 5407 region.put(put); 5408 region.flush(true); 5409 Scan scan = new Scan(rowD, rowA); 5410 scan.addColumn(families[0], col1); 5411 scan.setReversed(true); 5412 List<Cell> currRow = new ArrayList<>(); 5413 InternalScanner scanner = region.getScanner(scan); 5414 boolean hasNext = scanner.next(currRow); 5415 assertEquals(1, currRow.size()); 5416 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5417 .get(0).getRowLength(), rowD, 0, rowD.length)); 5418 assertTrue(hasNext); 5419 currRow.clear(); 5420 hasNext = scanner.next(currRow); 5421 assertEquals(1, currRow.size()); 5422 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5423 .get(0).getRowLength(), rowC, 0, rowC.length)); 5424 assertTrue(hasNext); 5425 currRow.clear(); 5426 hasNext = scanner.next(currRow); 5427 assertEquals(1, currRow.size()); 5428 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5429 .get(0).getRowLength(), rowB, 0, rowB.length)); 5430 assertFalse(hasNext); 5431 scanner.close(); 5432 5433 scan = new Scan(rowD, rowA); 5434 scan.addColumn(families[0], col2); 5435 scan.setReversed(true); 5436 currRow.clear(); 5437 scanner = region.getScanner(scan); 5438 hasNext = scanner.next(currRow); 5439 assertEquals(1, currRow.size()); 5440 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5441 .get(0).getRowLength(), rowD, 0, rowD.length)); 5442 scanner.close(); 5443 } finally { 5444 HBaseTestingUtility.closeRegionAndWAL(this.region); 5445 this.region = null; 5446 } 5447 } 5448 5449 @Test 5450 public void testReverseScanner_smaller_blocksize() throws IOException { 5451 // case to ensure no conflict with HFile index optimization 5452 byte[] rowA = Bytes.toBytes("rowA"); 5453 byte[] rowB = Bytes.toBytes("rowB"); 5454 byte[] rowC = Bytes.toBytes("rowC"); 5455 byte[] rowD = Bytes.toBytes("rowD"); 5456 byte[] rowE = Bytes.toBytes("rowE"); 5457 byte[] cf = Bytes.toBytes("CF"); 5458 byte[][] families = { cf }; 5459 byte[] col1 = Bytes.toBytes("col1"); 5460 byte[] col2 = Bytes.toBytes("col2"); 5461 long ts = 1; 5462 HBaseConfiguration config = new HBaseConfiguration(); 5463 config.setInt("test.block.size", 1); 5464 this.region = initHRegion(tableName, method, config, families); 5465 try { 5466 KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); 5467 KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); 5468 KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); 5469 KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); 5470 KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); 5471 KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); 5472 Put put = null; 5473 put = new Put(rowA); 5474 put.add(kv1); 5475 region.put(put); 5476 put = new Put(rowB); 5477 put.add(kv2); 5478 region.put(put); 5479 put = new Put(rowC); 5480 put.add(kv3); 5481 region.put(put); 5482 put = new Put(rowD); 5483 put.add(kv4_1); 5484 region.put(put); 5485 put = new Put(rowD); 5486 put.add(kv4_2); 5487 region.put(put); 5488 put = new Put(rowE); 5489 put.add(kv5); 5490 region.put(put); 5491 region.flush(true); 5492 Scan scan = new Scan(rowD, rowA); 5493 scan.addColumn(families[0], col1); 5494 scan.setReversed(true); 5495 List<Cell> currRow = new ArrayList<>(); 5496 InternalScanner scanner = region.getScanner(scan); 5497 boolean hasNext = scanner.next(currRow); 5498 assertEquals(1, currRow.size()); 5499 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5500 .get(0).getRowLength(), rowD, 0, rowD.length)); 5501 assertTrue(hasNext); 5502 currRow.clear(); 5503 hasNext = scanner.next(currRow); 5504 assertEquals(1, currRow.size()); 5505 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5506 .get(0).getRowLength(), rowC, 0, rowC.length)); 5507 assertTrue(hasNext); 5508 currRow.clear(); 5509 hasNext = scanner.next(currRow); 5510 assertEquals(1, currRow.size()); 5511 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5512 .get(0).getRowLength(), rowB, 0, rowB.length)); 5513 assertFalse(hasNext); 5514 scanner.close(); 5515 5516 scan = new Scan(rowD, rowA); 5517 scan.addColumn(families[0], col2); 5518 scan.setReversed(true); 5519 currRow.clear(); 5520 scanner = region.getScanner(scan); 5521 hasNext = scanner.next(currRow); 5522 assertEquals(1, currRow.size()); 5523 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5524 .get(0).getRowLength(), rowD, 0, rowD.length)); 5525 scanner.close(); 5526 } finally { 5527 HBaseTestingUtility.closeRegionAndWAL(this.region); 5528 this.region = null; 5529 } 5530 } 5531 5532 @Test 5533 public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1() 5534 throws IOException { 5535 byte[] row0 = Bytes.toBytes("row0"); // 1 kv 5536 byte[] row1 = Bytes.toBytes("row1"); // 2 kv 5537 byte[] row2 = Bytes.toBytes("row2"); // 4 kv 5538 byte[] row3 = Bytes.toBytes("row3"); // 2 kv 5539 byte[] row4 = Bytes.toBytes("row4"); // 5 kv 5540 byte[] row5 = Bytes.toBytes("row5"); // 2 kv 5541 byte[] cf1 = Bytes.toBytes("CF1"); 5542 byte[] cf2 = Bytes.toBytes("CF2"); 5543 byte[] cf3 = Bytes.toBytes("CF3"); 5544 byte[][] families = { cf1, cf2, cf3 }; 5545 byte[] col = Bytes.toBytes("C"); 5546 long ts = 1; 5547 HBaseConfiguration conf = new HBaseConfiguration(); 5548 // disable compactions in this test. 5549 conf.setInt("hbase.hstore.compactionThreshold", 10000); 5550 this.region = initHRegion(tableName, method, conf, families); 5551 try { 5552 // kv naming style: kv(row number) totalKvCountInThisRow seq no 5553 KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put, 5554 null); 5555 KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put, 5556 null); 5557 KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1, 5558 KeyValue.Type.Put, null); 5559 KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, 5560 null); 5561 KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put, 5562 null); 5563 KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put, 5564 null); 5565 KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4, 5566 KeyValue.Type.Put, null); 5567 KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put, 5568 null); 5569 KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4, 5570 KeyValue.Type.Put, null); 5571 KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put, 5572 null); 5573 KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put, 5574 null); 5575 KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5, 5576 KeyValue.Type.Put, null); 5577 KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put, 5578 null); 5579 KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3, 5580 KeyValue.Type.Put, null); 5581 KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put, 5582 null); 5583 KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put, 5584 null); 5585 // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv) 5586 Put put = null; 5587 put = new Put(row1); 5588 put.add(kv1_2_1); 5589 region.put(put); 5590 put = new Put(row2); 5591 put.add(kv2_4_1); 5592 region.put(put); 5593 put = new Put(row4); 5594 put.add(kv4_5_4); 5595 put.add(kv4_5_5); 5596 region.put(put); 5597 region.flush(true); 5598 // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv) 5599 put = new Put(row4); 5600 put.add(kv4_5_1); 5601 put.add(kv4_5_3); 5602 region.put(put); 5603 put = new Put(row1); 5604 put.add(kv1_2_2); 5605 region.put(put); 5606 put = new Put(row2); 5607 put.add(kv2_4_4); 5608 region.put(put); 5609 region.flush(true); 5610 // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv) 5611 put = new Put(row4); 5612 put.add(kv4_5_2); 5613 region.put(put); 5614 put = new Put(row2); 5615 put.add(kv2_4_2); 5616 put.add(kv2_4_3); 5617 region.put(put); 5618 put = new Put(row3); 5619 put.add(kv3_2_2); 5620 region.put(put); 5621 region.flush(true); 5622 // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max) 5623 // ( 2 kv) 5624 put = new Put(row0); 5625 put.add(kv0_1_1); 5626 region.put(put); 5627 put = new Put(row3); 5628 put.add(kv3_2_1); 5629 region.put(put); 5630 put = new Put(row5); 5631 put.add(kv5_2_1); 5632 put.add(kv5_2_2); 5633 region.put(put); 5634 // scan range = ["row4", min), skip the max "row5" 5635 Scan scan = new Scan(row4); 5636 scan.setMaxVersions(5); 5637 scan.setBatch(3); 5638 scan.setReversed(true); 5639 InternalScanner scanner = region.getScanner(scan); 5640 List<Cell> currRow = new ArrayList<>(); 5641 boolean hasNext = false; 5642 // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not 5643 // included in scan range 5644 // "row4" takes 2 next() calls since batch=3 5645 hasNext = scanner.next(currRow); 5646 assertEquals(3, currRow.size()); 5647 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5648 .get(0).getRowLength(), row4, 0, row4.length)); 5649 assertTrue(hasNext); 5650 currRow.clear(); 5651 hasNext = scanner.next(currRow); 5652 assertEquals(2, currRow.size()); 5653 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), 5654 currRow.get(0).getRowLength(), row4, 0, 5655 row4.length)); 5656 assertTrue(hasNext); 5657 // 2. scan out "row3" (2 kv) 5658 currRow.clear(); 5659 hasNext = scanner.next(currRow); 5660 assertEquals(2, currRow.size()); 5661 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5662 .get(0).getRowLength(), row3, 0, row3.length)); 5663 assertTrue(hasNext); 5664 // 3. scan out "row2" (4 kvs) 5665 // "row2" takes 2 next() calls since batch=3 5666 currRow.clear(); 5667 hasNext = scanner.next(currRow); 5668 assertEquals(3, currRow.size()); 5669 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5670 .get(0).getRowLength(), row2, 0, row2.length)); 5671 assertTrue(hasNext); 5672 currRow.clear(); 5673 hasNext = scanner.next(currRow); 5674 assertEquals(1, currRow.size()); 5675 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5676 .get(0).getRowLength(), row2, 0, row2.length)); 5677 assertTrue(hasNext); 5678 // 4. scan out "row1" (2 kv) 5679 currRow.clear(); 5680 hasNext = scanner.next(currRow); 5681 assertEquals(2, currRow.size()); 5682 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5683 .get(0).getRowLength(), row1, 0, row1.length)); 5684 assertTrue(hasNext); 5685 // 5. scan out "row0" (1 kv) 5686 currRow.clear(); 5687 hasNext = scanner.next(currRow); 5688 assertEquals(1, currRow.size()); 5689 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5690 .get(0).getRowLength(), row0, 0, row0.length)); 5691 assertFalse(hasNext); 5692 5693 scanner.close(); 5694 } finally { 5695 HBaseTestingUtility.closeRegionAndWAL(this.region); 5696 this.region = null; 5697 } 5698 } 5699 5700 @Test 5701 public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2() 5702 throws IOException { 5703 byte[] row1 = Bytes.toBytes("row1"); 5704 byte[] row2 = Bytes.toBytes("row2"); 5705 byte[] row3 = Bytes.toBytes("row3"); 5706 byte[] row4 = Bytes.toBytes("row4"); 5707 byte[] cf1 = Bytes.toBytes("CF1"); 5708 byte[] cf2 = Bytes.toBytes("CF2"); 5709 byte[] cf3 = Bytes.toBytes("CF3"); 5710 byte[] cf4 = Bytes.toBytes("CF4"); 5711 byte[][] families = { cf1, cf2, cf3, cf4 }; 5712 byte[] col = Bytes.toBytes("C"); 5713 long ts = 1; 5714 HBaseConfiguration conf = new HBaseConfiguration(); 5715 // disable compactions in this test. 5716 conf.setInt("hbase.hstore.compactionThreshold", 10000); 5717 this.region = initHRegion(tableName, method, conf, families); 5718 try { 5719 KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null); 5720 KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null); 5721 KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null); 5722 KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null); 5723 // storefile1 5724 Put put = new Put(row1); 5725 put.add(kv1); 5726 region.put(put); 5727 region.flush(true); 5728 // storefile2 5729 put = new Put(row2); 5730 put.add(kv2); 5731 region.put(put); 5732 region.flush(true); 5733 // storefile3 5734 put = new Put(row3); 5735 put.add(kv3); 5736 region.put(put); 5737 region.flush(true); 5738 // memstore 5739 put = new Put(row4); 5740 put.add(kv4); 5741 region.put(put); 5742 // scan range = ["row4", min) 5743 Scan scan = new Scan(row4); 5744 scan.setReversed(true); 5745 scan.setBatch(10); 5746 InternalScanner scanner = region.getScanner(scan); 5747 List<Cell> currRow = new ArrayList<>(); 5748 boolean hasNext = scanner.next(currRow); 5749 assertEquals(1, currRow.size()); 5750 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5751 .get(0).getRowLength(), row4, 0, row4.length)); 5752 assertTrue(hasNext); 5753 currRow.clear(); 5754 hasNext = scanner.next(currRow); 5755 assertEquals(1, currRow.size()); 5756 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5757 .get(0).getRowLength(), row3, 0, row3.length)); 5758 assertTrue(hasNext); 5759 currRow.clear(); 5760 hasNext = scanner.next(currRow); 5761 assertEquals(1, currRow.size()); 5762 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5763 .get(0).getRowLength(), row2, 0, row2.length)); 5764 assertTrue(hasNext); 5765 currRow.clear(); 5766 hasNext = scanner.next(currRow); 5767 assertEquals(1, currRow.size()); 5768 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5769 .get(0).getRowLength(), row1, 0, row1.length)); 5770 assertFalse(hasNext); 5771 } finally { 5772 HBaseTestingUtility.closeRegionAndWAL(this.region); 5773 this.region = null; 5774 } 5775 } 5776 5777 /** 5778 * Test for HBASE-14497: Reverse Scan threw StackOverflow caused by readPt checking 5779 */ 5780 @Test 5781 public void testReverseScanner_StackOverflow() throws IOException { 5782 byte[] cf1 = Bytes.toBytes("CF1"); 5783 byte[][] families = {cf1}; 5784 byte[] col = Bytes.toBytes("C"); 5785 HBaseConfiguration conf = new HBaseConfiguration(); 5786 this.region = initHRegion(tableName, method, conf, families); 5787 try { 5788 // setup with one storefile and one memstore, to create scanner and get an earlier readPt 5789 Put put = new Put(Bytes.toBytes("19998")); 5790 put.addColumn(cf1, col, Bytes.toBytes("val")); 5791 region.put(put); 5792 region.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 5793 Put put2 = new Put(Bytes.toBytes("19997")); 5794 put2.addColumn(cf1, col, Bytes.toBytes("val")); 5795 region.put(put2); 5796 5797 Scan scan = new Scan(Bytes.toBytes("19998")); 5798 scan.setReversed(true); 5799 InternalScanner scanner = region.getScanner(scan); 5800 5801 // create one storefile contains many rows will be skipped 5802 // to check StoreFileScanner.seekToPreviousRow 5803 for (int i = 10000; i < 20000; i++) { 5804 Put p = new Put(Bytes.toBytes(""+i)); 5805 p.addColumn(cf1, col, Bytes.toBytes("" + i)); 5806 region.put(p); 5807 } 5808 region.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 5809 5810 // create one memstore contains many rows will be skipped 5811 // to check MemStoreScanner.seekToPreviousRow 5812 for (int i = 10000; i < 20000; i++) { 5813 Put p = new Put(Bytes.toBytes(""+i)); 5814 p.addColumn(cf1, col, Bytes.toBytes("" + i)); 5815 region.put(p); 5816 } 5817 5818 List<Cell> currRow = new ArrayList<>(); 5819 boolean hasNext; 5820 do { 5821 hasNext = scanner.next(currRow); 5822 } while (hasNext); 5823 assertEquals(2, currRow.size()); 5824 assertEquals("19998", Bytes.toString(currRow.get(0).getRowArray(), 5825 currRow.get(0).getRowOffset(), currRow.get(0).getRowLength())); 5826 assertEquals("19997", Bytes.toString(currRow.get(1).getRowArray(), 5827 currRow.get(1).getRowOffset(), currRow.get(1).getRowLength())); 5828 } finally { 5829 HBaseTestingUtility.closeRegionAndWAL(this.region); 5830 this.region = null; 5831 } 5832 } 5833 5834 @Test 5835 public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception { 5836 byte[] cf1 = Bytes.toBytes("CF1"); 5837 byte[][] families = { cf1 }; 5838 byte[] col = Bytes.toBytes("C"); 5839 HBaseConfiguration conf = new HBaseConfiguration(); 5840 this.region = initHRegion(tableName, method, conf, families); 5841 try { 5842 // setup with one storefile and one memstore, to create scanner and get an earlier readPt 5843 Put put = new Put(Bytes.toBytes("19996")); 5844 put.addColumn(cf1, col, Bytes.toBytes("val")); 5845 region.put(put); 5846 Put put2 = new Put(Bytes.toBytes("19995")); 5847 put2.addColumn(cf1, col, Bytes.toBytes("val")); 5848 region.put(put2); 5849 // create a reverse scan 5850 Scan scan = new Scan(Bytes.toBytes("19996")); 5851 scan.setReversed(true); 5852 RegionScannerImpl scanner = region.getScanner(scan); 5853 5854 // flush the cache. This will reset the store scanner 5855 region.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 5856 5857 // create one memstore contains many rows will be skipped 5858 // to check MemStoreScanner.seekToPreviousRow 5859 for (int i = 10000; i < 20000; i++) { 5860 Put p = new Put(Bytes.toBytes("" + i)); 5861 p.addColumn(cf1, col, Bytes.toBytes("" + i)); 5862 region.put(p); 5863 } 5864 List<Cell> currRow = new ArrayList<>(); 5865 boolean hasNext; 5866 boolean assertDone = false; 5867 do { 5868 hasNext = scanner.next(currRow); 5869 // With HBASE-15871, after the scanner is reset the memstore scanner should not be 5870 // added here 5871 if (!assertDone) { 5872 StoreScanner current = 5873 (StoreScanner) (scanner.storeHeap).getCurrentForTesting(); 5874 List<KeyValueScanner> scanners = current.getAllScannersForTesting(); 5875 assertEquals("There should be only one scanner the store file scanner", 1, 5876 scanners.size()); 5877 assertDone = true; 5878 } 5879 } while (hasNext); 5880 assertEquals(2, currRow.size()); 5881 assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(), 5882 currRow.get(0).getRowOffset(), currRow.get(0).getRowLength())); 5883 assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(), 5884 currRow.get(1).getRowOffset(), currRow.get(1).getRowLength())); 5885 } finally { 5886 HBaseTestingUtility.closeRegionAndWAL(this.region); 5887 this.region = null; 5888 } 5889 } 5890 5891 @Test 5892 public void testReverseScanWhenPutCellsAfterOpenReverseScan() throws Exception { 5893 byte[] cf1 = Bytes.toBytes("CF1"); 5894 byte[][] families = { cf1 }; 5895 byte[] col = Bytes.toBytes("C"); 5896 5897 HBaseConfiguration conf = new HBaseConfiguration(); 5898 this.region = initHRegion(tableName, method, conf, families); 5899 5900 Put put = new Put(Bytes.toBytes("199996")); 5901 put.addColumn(cf1, col, Bytes.toBytes("val")); 5902 region.put(put); 5903 Put put2 = new Put(Bytes.toBytes("199995")); 5904 put2.addColumn(cf1, col, Bytes.toBytes("val")); 5905 region.put(put2); 5906 5907 // Create a reverse scan 5908 Scan scan = new Scan(Bytes.toBytes("199996")); 5909 scan.setReversed(true); 5910 RegionScannerImpl scanner = region.getScanner(scan); 5911 5912 // Put a lot of cells that have sequenceIDs grater than the readPt of the reverse scan 5913 for (int i = 100000; i < 200000; i++) { 5914 Put p = new Put(Bytes.toBytes("" + i)); 5915 p.addColumn(cf1, col, Bytes.toBytes("" + i)); 5916 region.put(p); 5917 } 5918 List<Cell> currRow = new ArrayList<>(); 5919 boolean hasNext; 5920 do { 5921 hasNext = scanner.next(currRow); 5922 } while (hasNext); 5923 5924 assertEquals(2, currRow.size()); 5925 assertEquals("199996", Bytes.toString(currRow.get(0).getRowArray(), 5926 currRow.get(0).getRowOffset(), currRow.get(0).getRowLength())); 5927 assertEquals("199995", Bytes.toString(currRow.get(1).getRowArray(), 5928 currRow.get(1).getRowOffset(), currRow.get(1).getRowLength())); 5929 } 5930 5931 @Test 5932 public void testWriteRequestsCounter() throws IOException { 5933 byte[] fam = Bytes.toBytes("info"); 5934 byte[][] families = { fam }; 5935 this.region = initHRegion(tableName, method, CONF, families); 5936 5937 Assert.assertEquals(0L, region.getWriteRequestsCount()); 5938 5939 Put put = new Put(row); 5940 put.addColumn(fam, fam, fam); 5941 5942 Assert.assertEquals(0L, region.getWriteRequestsCount()); 5943 region.put(put); 5944 Assert.assertEquals(1L, region.getWriteRequestsCount()); 5945 region.put(put); 5946 Assert.assertEquals(2L, region.getWriteRequestsCount()); 5947 region.put(put); 5948 Assert.assertEquals(3L, region.getWriteRequestsCount()); 5949 5950 region.delete(new Delete(row)); 5951 Assert.assertEquals(4L, region.getWriteRequestsCount()); 5952 5953 HBaseTestingUtility.closeRegionAndWAL(this.region); 5954 this.region = null; 5955 } 5956 5957 @Test 5958 public void testOpenRegionWrittenToWAL() throws Exception { 5959 final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42); 5960 final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); 5961 5962 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 5963 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1)) 5964 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build(); 5965 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 5966 5967 // open the region w/o rss and wal and flush some files 5968 HRegion region = 5969 HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL 5970 .getConfiguration(), htd); 5971 assertNotNull(region); 5972 5973 // create a file in fam1 for the region before opening in OpenRegionHandler 5974 region.put(new Put(Bytes.toBytes("a")).addColumn(fam1, fam1, fam1)); 5975 region.flush(true); 5976 HBaseTestingUtility.closeRegionAndWAL(region); 5977 5978 ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class); 5979 5980 // capture append() calls 5981 WAL wal = mockWAL(); 5982 when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal); 5983 5984 try { 5985 region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), 5986 TEST_UTIL.getConfiguration(), rss, null); 5987 5988 verify(wal, times(1)).appendMarker((HRegionInfo)any(), (WALKeyImpl)any() 5989 , editCaptor.capture()); 5990 5991 WALEdit edit = editCaptor.getValue(); 5992 assertNotNull(edit); 5993 assertNotNull(edit.getCells()); 5994 assertEquals(1, edit.getCells().size()); 5995 RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0)); 5996 assertNotNull(desc); 5997 5998 LOG.info("RegionEventDescriptor from WAL: " + desc); 5999 6000 assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType()); 6001 assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes())); 6002 assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(), 6003 hri.getEncodedNameAsBytes())); 6004 assertTrue(desc.getLogSequenceNumber() > 0); 6005 assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer())); 6006 assertEquals(2, desc.getStoresCount()); 6007 6008 StoreDescriptor store = desc.getStores(0); 6009 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1)); 6010 assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1)); 6011 assertEquals(1, store.getStoreFileCount()); // 1store file 6012 assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative 6013 6014 store = desc.getStores(1); 6015 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2)); 6016 assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2)); 6017 assertEquals(0, store.getStoreFileCount()); // no store files 6018 } finally { 6019 HBaseTestingUtility.closeRegionAndWAL(region); 6020 } 6021 } 6022 6023 // Helper for test testOpenRegionWrittenToWALForLogReplay 6024 static class HRegionWithSeqId extends HRegion { 6025 public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs, 6026 final Configuration confParam, final RegionInfo regionInfo, 6027 final TableDescriptor htd, final RegionServerServices rsServices) { 6028 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 6029 } 6030 @Override 6031 protected long getNextSequenceId(WAL wal) throws IOException { 6032 return 42; 6033 } 6034 } 6035 6036 @Test 6037 public void testFlushedFileWithNoTags() throws Exception { 6038 final TableName tableName = TableName.valueOf(name.getMethodName()); 6039 HTableDescriptor htd = new HTableDescriptor(tableName); 6040 htd.addFamily(new HColumnDescriptor(fam1)); 6041 HRegionInfo info = new HRegionInfo(tableName, null, null, false); 6042 Path path = TEST_UTIL.getDataTestDir(getClass().getSimpleName()); 6043 region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd); 6044 Put put = new Put(Bytes.toBytes("a-b-0-0")); 6045 put.addColumn(fam1, qual1, Bytes.toBytes("c1-value")); 6046 region.put(put); 6047 region.flush(true); 6048 HStore store = region.getStore(fam1); 6049 Collection<HStoreFile> storefiles = store.getStorefiles(); 6050 for (HStoreFile sf : storefiles) { 6051 assertFalse("Tags should not be present " 6052 ,sf.getReader().getHFileReader().getFileContext().isIncludesTags()); 6053 } 6054 } 6055 6056 /** 6057 * Utility method to setup a WAL mock. 6058 * <p/> 6059 * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs. 6060 * @return a mock WAL 6061 */ 6062 private WAL mockWAL() throws IOException { 6063 WAL wal = mock(WAL.class); 6064 when(wal.appendData(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class))) 6065 .thenAnswer(new Answer<Long>() { 6066 @Override 6067 public Long answer(InvocationOnMock invocation) throws Throwable { 6068 WALKeyImpl key = invocation.getArgument(1); 6069 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); 6070 key.setWriteEntry(we); 6071 return 1L; 6072 } 6073 }); 6074 when(wal.appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class))). 6075 thenAnswer(new Answer<Long>() { 6076 @Override 6077 public Long answer(InvocationOnMock invocation) throws Throwable { 6078 WALKeyImpl key = invocation.getArgument(1); 6079 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); 6080 key.setWriteEntry(we); 6081 return 1L; 6082 } 6083 }); 6084 return wal; 6085 } 6086 6087 @Test 6088 public void testCloseRegionWrittenToWAL() throws Exception { 6089 Path rootDir = new Path(dir + name.getMethodName()); 6090 FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); 6091 6092 final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42); 6093 final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); 6094 6095 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 6096 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1)) 6097 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build(); 6098 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 6099 6100 ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class); 6101 6102 // capture append() calls 6103 WAL wal = mockWAL(); 6104 when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal); 6105 6106 6107 // create and then open a region first so that it can be closed later 6108 region = HRegion.createHRegion(hri, rootDir, TEST_UTIL.getConfiguration(), htd, rss.getWAL(hri)); 6109 region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), 6110 TEST_UTIL.getConfiguration(), rss, null); 6111 6112 // close the region 6113 region.close(false); 6114 6115 // 2 times, one for region open, the other close region 6116 verify(wal, times(2)).appendMarker(any(RegionInfo.class), 6117 (WALKeyImpl) any(WALKeyImpl.class), editCaptor.capture()); 6118 6119 WALEdit edit = editCaptor.getAllValues().get(1); 6120 assertNotNull(edit); 6121 assertNotNull(edit.getCells()); 6122 assertEquals(1, edit.getCells().size()); 6123 RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0)); 6124 assertNotNull(desc); 6125 6126 LOG.info("RegionEventDescriptor from WAL: " + desc); 6127 6128 assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType()); 6129 assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes())); 6130 assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(), 6131 hri.getEncodedNameAsBytes())); 6132 assertTrue(desc.getLogSequenceNumber() > 0); 6133 assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer())); 6134 assertEquals(2, desc.getStoresCount()); 6135 6136 StoreDescriptor store = desc.getStores(0); 6137 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1)); 6138 assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1)); 6139 assertEquals(0, store.getStoreFileCount()); // no store files 6140 6141 store = desc.getStores(1); 6142 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2)); 6143 assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2)); 6144 assertEquals(0, store.getStoreFileCount()); // no store files 6145 } 6146 6147 /** 6148 * Test RegionTooBusyException thrown when region is busy 6149 */ 6150 @Test 6151 public void testRegionTooBusy() throws IOException { 6152 byte[] family = Bytes.toBytes("family"); 6153 long defaultBusyWaitDuration = CONF.getLong("hbase.busy.wait.duration", 6154 HRegion.DEFAULT_BUSY_WAIT_DURATION); 6155 CONF.setLong("hbase.busy.wait.duration", 1000); 6156 region = initHRegion(tableName, method, CONF, family); 6157 final AtomicBoolean stopped = new AtomicBoolean(true); 6158 Thread t = new Thread(new Runnable() { 6159 @Override 6160 public void run() { 6161 try { 6162 region.lock.writeLock().lock(); 6163 stopped.set(false); 6164 while (!stopped.get()) { 6165 Thread.sleep(100); 6166 } 6167 } catch (InterruptedException ie) { 6168 } finally { 6169 region.lock.writeLock().unlock(); 6170 } 6171 } 6172 }); 6173 t.start(); 6174 Get get = new Get(row); 6175 try { 6176 while (stopped.get()) { 6177 Thread.sleep(100); 6178 } 6179 region.get(get); 6180 fail("Should throw RegionTooBusyException"); 6181 } catch (InterruptedException ie) { 6182 fail("test interrupted"); 6183 } catch (RegionTooBusyException e) { 6184 // Good, expected 6185 } finally { 6186 stopped.set(true); 6187 try { 6188 t.join(); 6189 } catch (Throwable e) { 6190 } 6191 6192 HBaseTestingUtility.closeRegionAndWAL(region); 6193 region = null; 6194 CONF.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration); 6195 } 6196 } 6197 6198 @Test 6199 public void testCellTTLs() throws IOException { 6200 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 6201 EnvironmentEdgeManager.injectEdge(edge); 6202 6203 final byte[] row = Bytes.toBytes("testRow"); 6204 final byte[] q1 = Bytes.toBytes("q1"); 6205 final byte[] q2 = Bytes.toBytes("q2"); 6206 final byte[] q3 = Bytes.toBytes("q3"); 6207 final byte[] q4 = Bytes.toBytes("q4"); 6208 6209 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 6210 HColumnDescriptor hcd = new HColumnDescriptor(fam1); 6211 hcd.setTimeToLive(10); // 10 seconds 6212 htd.addFamily(hcd); 6213 6214 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 6215 conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 6216 6217 HRegion region = HBaseTestingUtility.createRegionAndWAL(new HRegionInfo(htd.getTableName(), 6218 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY), 6219 TEST_UTIL.getDataTestDir(), conf, htd); 6220 assertNotNull(region); 6221 try { 6222 long now = EnvironmentEdgeManager.currentTime(); 6223 // Add a cell that will expire in 5 seconds via cell TTL 6224 region.put(new Put(row).add(new KeyValue(row, fam1, q1, now, 6225 HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] { 6226 // TTL tags specify ts in milliseconds 6227 new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } ))); 6228 // Add a cell that will expire after 10 seconds via family setting 6229 region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY)); 6230 // Add a cell that will expire in 15 seconds via cell TTL 6231 region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1, 6232 HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] { 6233 // TTL tags specify ts in milliseconds 6234 new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } ))); 6235 // Add a cell that will expire in 20 seconds via family setting 6236 region.put(new Put(row).addColumn(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY)); 6237 6238 // Flush so we are sure store scanning gets this right 6239 region.flush(true); 6240 6241 // A query at time T+0 should return all cells 6242 Result r = region.get(new Get(row)); 6243 assertNotNull(r.getValue(fam1, q1)); 6244 assertNotNull(r.getValue(fam1, q2)); 6245 assertNotNull(r.getValue(fam1, q3)); 6246 assertNotNull(r.getValue(fam1, q4)); 6247 6248 // Increment time to T+5 seconds 6249 edge.incrementTime(5000); 6250 6251 r = region.get(new Get(row)); 6252 assertNull(r.getValue(fam1, q1)); 6253 assertNotNull(r.getValue(fam1, q2)); 6254 assertNotNull(r.getValue(fam1, q3)); 6255 assertNotNull(r.getValue(fam1, q4)); 6256 6257 // Increment time to T+10 seconds 6258 edge.incrementTime(5000); 6259 6260 r = region.get(new Get(row)); 6261 assertNull(r.getValue(fam1, q1)); 6262 assertNull(r.getValue(fam1, q2)); 6263 assertNotNull(r.getValue(fam1, q3)); 6264 assertNotNull(r.getValue(fam1, q4)); 6265 6266 // Increment time to T+15 seconds 6267 edge.incrementTime(5000); 6268 6269 r = region.get(new Get(row)); 6270 assertNull(r.getValue(fam1, q1)); 6271 assertNull(r.getValue(fam1, q2)); 6272 assertNull(r.getValue(fam1, q3)); 6273 assertNotNull(r.getValue(fam1, q4)); 6274 6275 // Increment time to T+20 seconds 6276 edge.incrementTime(10000); 6277 6278 r = region.get(new Get(row)); 6279 assertNull(r.getValue(fam1, q1)); 6280 assertNull(r.getValue(fam1, q2)); 6281 assertNull(r.getValue(fam1, q3)); 6282 assertNull(r.getValue(fam1, q4)); 6283 6284 // Fun with disappearing increments 6285 6286 // Start at 1 6287 region.put(new Put(row).addColumn(fam1, q1, Bytes.toBytes(1L))); 6288 r = region.get(new Get(row)); 6289 byte[] val = r.getValue(fam1, q1); 6290 assertNotNull(val); 6291 assertEquals(1L, Bytes.toLong(val)); 6292 6293 // Increment with a TTL of 5 seconds 6294 Increment incr = new Increment(row).addColumn(fam1, q1, 1L); 6295 incr.setTTL(5000); 6296 region.increment(incr); // 2 6297 6298 // New value should be 2 6299 r = region.get(new Get(row)); 6300 val = r.getValue(fam1, q1); 6301 assertNotNull(val); 6302 assertEquals(2L, Bytes.toLong(val)); 6303 6304 // Increment time to T+25 seconds 6305 edge.incrementTime(5000); 6306 6307 // Value should be back to 1 6308 r = region.get(new Get(row)); 6309 val = r.getValue(fam1, q1); 6310 assertNotNull(val); 6311 assertEquals(1L, Bytes.toLong(val)); 6312 6313 // Increment time to T+30 seconds 6314 edge.incrementTime(5000); 6315 6316 // Original value written at T+20 should be gone now via family TTL 6317 r = region.get(new Get(row)); 6318 assertNull(r.getValue(fam1, q1)); 6319 6320 } finally { 6321 HBaseTestingUtility.closeRegionAndWAL(region); 6322 } 6323 } 6324 6325 @Test 6326 public void testIncrementTimestampsAreMonotonic() throws IOException { 6327 HRegion region = initHRegion(tableName, method, CONF, fam1); 6328 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 6329 EnvironmentEdgeManager.injectEdge(edge); 6330 6331 edge.setValue(10); 6332 Increment inc = new Increment(row); 6333 inc.setDurability(Durability.SKIP_WAL); 6334 inc.addColumn(fam1, qual1, 1L); 6335 region.increment(inc); 6336 6337 Result result = region.get(new Get(row)); 6338 Cell c = result.getColumnLatestCell(fam1, qual1); 6339 assertNotNull(c); 6340 assertEquals(10L, c.getTimestamp()); 6341 6342 edge.setValue(1); // clock goes back 6343 region.increment(inc); 6344 result = region.get(new Get(row)); 6345 c = result.getColumnLatestCell(fam1, qual1); 6346 assertEquals(11L, c.getTimestamp()); 6347 assertEquals(2L, Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength())); 6348 } 6349 6350 @Test 6351 public void testAppendTimestampsAreMonotonic() throws IOException { 6352 HRegion region = initHRegion(tableName, method, CONF, fam1); 6353 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 6354 EnvironmentEdgeManager.injectEdge(edge); 6355 6356 edge.setValue(10); 6357 Append a = new Append(row); 6358 a.setDurability(Durability.SKIP_WAL); 6359 a.addColumn(fam1, qual1, qual1); 6360 region.append(a); 6361 6362 Result result = region.get(new Get(row)); 6363 Cell c = result.getColumnLatestCell(fam1, qual1); 6364 assertNotNull(c); 6365 assertEquals(10L, c.getTimestamp()); 6366 6367 edge.setValue(1); // clock goes back 6368 region.append(a); 6369 result = region.get(new Get(row)); 6370 c = result.getColumnLatestCell(fam1, qual1); 6371 assertEquals(11L, c.getTimestamp()); 6372 6373 byte[] expected = new byte[qual1.length*2]; 6374 System.arraycopy(qual1, 0, expected, 0, qual1.length); 6375 System.arraycopy(qual1, 0, expected, qual1.length, qual1.length); 6376 6377 assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(), 6378 expected, 0, expected.length)); 6379 } 6380 6381 @Test 6382 public void testCheckAndMutateTimestampsAreMonotonic() throws IOException { 6383 HRegion region = initHRegion(tableName, method, CONF, fam1); 6384 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 6385 EnvironmentEdgeManager.injectEdge(edge); 6386 6387 edge.setValue(10); 6388 Put p = new Put(row); 6389 p.setDurability(Durability.SKIP_WAL); 6390 p.addColumn(fam1, qual1, qual1); 6391 region.put(p); 6392 6393 Result result = region.get(new Get(row)); 6394 Cell c = result.getColumnLatestCell(fam1, qual1); 6395 assertNotNull(c); 6396 assertEquals(10L, c.getTimestamp()); 6397 6398 edge.setValue(1); // clock goes back 6399 p = new Put(row); 6400 p.setDurability(Durability.SKIP_WAL); 6401 p.addColumn(fam1, qual1, qual2); 6402 region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p); 6403 result = region.get(new Get(row)); 6404 c = result.getColumnLatestCell(fam1, qual1); 6405 assertEquals(10L, c.getTimestamp()); 6406 6407 assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(), 6408 qual2, 0, qual2.length)); 6409 } 6410 6411 @Test 6412 public void testBatchMutateWithWrongRegionException() throws Exception { 6413 final byte[] a = Bytes.toBytes("a"); 6414 final byte[] b = Bytes.toBytes("b"); 6415 final byte[] c = Bytes.toBytes("c"); // exclusive 6416 6417 int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000); 6418 CONF.setInt("hbase.rowlock.wait.duration", 1000); 6419 final HRegion region = initHRegion(tableName, a, c, method, CONF, false, fam1); 6420 6421 Mutation[] mutations = new Mutation[] { 6422 new Put(a) 6423 .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6424 .setRow(a) 6425 .setFamily(fam1) 6426 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6427 .setType(Cell.Type.Put) 6428 .build()), 6429 // this is outside the region boundary 6430 new Put(c).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6431 .setRow(c) 6432 .setFamily(fam1) 6433 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6434 .setType(Type.Put) 6435 .build()), 6436 new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6437 .setRow(b) 6438 .setFamily(fam1) 6439 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6440 .setType(Cell.Type.Put) 6441 .build()) 6442 }; 6443 6444 OperationStatus[] status = region.batchMutate(mutations); 6445 assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode()); 6446 assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, status[1].getOperationStatusCode()); 6447 assertEquals(OperationStatusCode.SUCCESS, status[2].getOperationStatusCode()); 6448 6449 6450 // test with a row lock held for a long time 6451 final CountDownLatch obtainedRowLock = new CountDownLatch(1); 6452 ExecutorService exec = Executors.newFixedThreadPool(2); 6453 Future<Void> f1 = exec.submit(new Callable<Void>() { 6454 @Override 6455 public Void call() throws Exception { 6456 LOG.info("Acquiring row lock"); 6457 RowLock rl = region.getRowLock(b); 6458 obtainedRowLock.countDown(); 6459 LOG.info("Waiting for 5 seconds before releasing lock"); 6460 Threads.sleep(5000); 6461 LOG.info("Releasing row lock"); 6462 rl.release(); 6463 return null; 6464 } 6465 }); 6466 obtainedRowLock.await(30, TimeUnit.SECONDS); 6467 6468 Future<Void> f2 = exec.submit(new Callable<Void>() { 6469 @Override 6470 public Void call() throws Exception { 6471 Mutation[] mutations = new Mutation[] { 6472 new Put(a).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6473 .setRow(a) 6474 .setFamily(fam1) 6475 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6476 .setType(Cell.Type.Put) 6477 .build()), 6478 new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6479 .setRow(b) 6480 .setFamily(fam1) 6481 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6482 .setType(Cell.Type.Put) 6483 .build()), 6484 }; 6485 6486 // this will wait for the row lock, and it will eventually succeed 6487 OperationStatus[] status = region.batchMutate(mutations); 6488 assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode()); 6489 assertEquals(OperationStatusCode.SUCCESS, status[1].getOperationStatusCode()); 6490 return null; 6491 } 6492 }); 6493 6494 f1.get(); 6495 f2.get(); 6496 6497 CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout); 6498 } 6499 6500 @Test 6501 public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException { 6502 HRegion region = initHRegion(tableName, method, CONF, fam1); 6503 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 6504 EnvironmentEdgeManager.injectEdge(edge); 6505 6506 edge.setValue(10); 6507 Put p = new Put(row); 6508 p.setDurability(Durability.SKIP_WAL); 6509 p.addColumn(fam1, qual1, qual1); 6510 region.put(p); 6511 6512 Result result = region.get(new Get(row)); 6513 Cell c = result.getColumnLatestCell(fam1, qual1); 6514 assertNotNull(c); 6515 assertEquals(10L, c.getTimestamp()); 6516 6517 edge.setValue(1); // clock goes back 6518 p = new Put(row); 6519 p.setDurability(Durability.SKIP_WAL); 6520 p.addColumn(fam1, qual1, qual2); 6521 RowMutations rm = new RowMutations(row); 6522 rm.add(p); 6523 assertTrue(region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL, 6524 new BinaryComparator(qual1), rm)); 6525 result = region.get(new Get(row)); 6526 c = result.getColumnLatestCell(fam1, qual1); 6527 assertEquals(10L, c.getTimestamp()); 6528 LOG.info("c value " + 6529 Bytes.toStringBinary(c.getValueArray(), c.getValueOffset(), c.getValueLength())); 6530 6531 assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(), 6532 qual2, 0, qual2.length)); 6533 } 6534 6535 HRegion initHRegion(TableName tableName, String callingMethod, 6536 byte[]... families) throws IOException { 6537 return initHRegion(tableName, callingMethod, HBaseConfiguration.create(), 6538 families); 6539 } 6540 6541 /** 6542 * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends 6543 * @throws IOException if IO error occurred during test 6544 */ 6545 @Test 6546 public void testWritesWhileRollWriter() throws IOException { 6547 int testCount = 10; 6548 int numRows = 1024; 6549 int numFamilies = 2; 6550 int numQualifiers = 2; 6551 final byte[][] families = new byte[numFamilies][]; 6552 for (int i = 0; i < numFamilies; i++) { 6553 families[i] = Bytes.toBytes("family" + i); 6554 } 6555 final byte[][] qualifiers = new byte[numQualifiers][]; 6556 for (int i = 0; i < numQualifiers; i++) { 6557 qualifiers[i] = Bytes.toBytes("qual" + i); 6558 } 6559 6560 CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2); 6561 this.region = initHRegion(tableName, method, CONF, families); 6562 try { 6563 List<Thread> threads = new ArrayList<>(); 6564 for (int i = 0; i < numRows; i++) { 6565 final int count = i; 6566 Thread t = new Thread(new Runnable() { 6567 6568 @Override 6569 public void run() { 6570 byte[] row = Bytes.toBytes("row" + count); 6571 Put put = new Put(row); 6572 put.setDurability(Durability.SYNC_WAL); 6573 byte[] value = Bytes.toBytes(String.valueOf(count)); 6574 for (byte[] family : families) { 6575 for (byte[] qualifier : qualifiers) { 6576 put.addColumn(family, qualifier, count, value); 6577 } 6578 } 6579 try { 6580 region.put(put); 6581 } catch (IOException e) { 6582 throw new RuntimeException(e); 6583 } 6584 } 6585 }); 6586 threads.add(t); 6587 } 6588 for (Thread t : threads) { 6589 t.start(); 6590 } 6591 6592 for (int i = 0; i < testCount; i++) { 6593 region.getWAL().rollWriter(); 6594 Thread.yield(); 6595 } 6596 } finally { 6597 try { 6598 HBaseTestingUtility.closeRegionAndWAL(this.region); 6599 CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024); 6600 } catch (DroppedSnapshotException dse) { 6601 // We could get this on way out because we interrupt the background flusher and it could 6602 // fail anywhere causing a DSE over in the background flusher... only it is not properly 6603 // dealt with so could still be memory hanging out when we get to here -- memory we can't 6604 // flush because the accounting is 'off' since original DSE. 6605 } 6606 this.region = null; 6607 } 6608 } 6609 6610 @Test 6611 public void testMutateRow_WriteRequestCount() throws Exception { 6612 byte[] row1 = Bytes.toBytes("row1"); 6613 byte[] fam1 = Bytes.toBytes("fam1"); 6614 byte[] qf1 = Bytes.toBytes("qualifier"); 6615 byte[] val1 = Bytes.toBytes("value1"); 6616 6617 RowMutations rm = new RowMutations(row1); 6618 Put put = new Put(row1); 6619 put.addColumn(fam1, qf1, val1); 6620 rm.add(put); 6621 6622 this.region = initHRegion(tableName, method, CONF, fam1); 6623 try { 6624 long wrcBeforeMutate = this.region.writeRequestsCount.longValue(); 6625 this.region.mutateRow(rm); 6626 long wrcAfterMutate = this.region.writeRequestsCount.longValue(); 6627 Assert.assertEquals(wrcBeforeMutate + rm.getMutations().size(), wrcAfterMutate); 6628 } finally { 6629 HBaseTestingUtility.closeRegionAndWAL(this.region); 6630 this.region = null; 6631 } 6632 } 6633 6634 @Test 6635 public void testBulkLoadReplicationEnabled() throws IOException { 6636 TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 6637 final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42); 6638 final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); 6639 6640 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 6641 htd.addFamily(new HColumnDescriptor(fam1)); 6642 HRegionInfo hri = new HRegionInfo(htd.getTableName(), 6643 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY); 6644 region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(), 6645 rss, null); 6646 6647 assertTrue(region.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false)); 6648 String plugins = region.conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 6649 String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName(); 6650 assertTrue(plugins.contains(replicationCoprocessorClass)); 6651 assertTrue(region.getCoprocessorHost(). 6652 getCoprocessors().contains(ReplicationObserver.class.getSimpleName())); 6653 6654 region.close(); 6655 } 6656 6657 /** 6658 * The same as HRegion class, the only difference is that instantiateHStore will 6659 * create a different HStore - HStoreForTesting. [HBASE-8518] 6660 */ 6661 public static class HRegionForTesting extends HRegion { 6662 6663 public HRegionForTesting(final Path tableDir, final WAL wal, final FileSystem fs, 6664 final Configuration confParam, final RegionInfo regionInfo, 6665 final TableDescriptor htd, final RegionServerServices rsServices) { 6666 this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo), 6667 wal, confParam, htd, rsServices); 6668 } 6669 6670 public HRegionForTesting(HRegionFileSystem fs, WAL wal, 6671 Configuration confParam, TableDescriptor htd, 6672 RegionServerServices rsServices) { 6673 super(fs, wal, confParam, htd, rsServices); 6674 } 6675 6676 /** 6677 * Create HStore instance. 6678 * @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting. 6679 */ 6680 @Override 6681 protected HStore instantiateHStore(final ColumnFamilyDescriptor family) throws IOException { 6682 if (family.isMobEnabled()) { 6683 if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) { 6684 throw new IOException("A minimum HFile version of " 6685 + HFile.MIN_FORMAT_VERSION_WITH_TAGS 6686 + " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY 6687 + " accordingly."); 6688 } 6689 return new HMobStore(this, family, this.conf); 6690 } 6691 return new HStoreForTesting(this, family, this.conf); 6692 } 6693 } 6694 6695 /** 6696 * HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method 6697 * of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which 6698 * doesn't let hstore compaction complete. In the former edition, this config is set in 6699 * HStore class inside compact method, though this is just for testing, otherwise it 6700 * doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete" 6701 * config (except for testing code). 6702 */ 6703 public static class HStoreForTesting extends HStore { 6704 6705 protected HStoreForTesting(final HRegion region, 6706 final ColumnFamilyDescriptor family, 6707 final Configuration confParam) throws IOException { 6708 super(region, family, confParam); 6709 } 6710 6711 @Override 6712 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 6713 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, 6714 List<Path> newFiles) throws IOException { 6715 // let compaction incomplete. 6716 if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { 6717 LOG.warn("hbase.hstore.compaction.complete is set to false"); 6718 List<HStoreFile> sfs = new ArrayList<>(newFiles.size()); 6719 final boolean evictOnClose = 6720 cacheConf != null? cacheConf.shouldEvictOnClose(): true; 6721 for (Path newFile : newFiles) { 6722 // Create storefile around what we wrote with a reader on it. 6723 HStoreFile sf = createStoreFileAndReader(newFile); 6724 sf.closeStoreFile(evictOnClose); 6725 sfs.add(sf); 6726 } 6727 return sfs; 6728 } 6729 return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles); 6730 } 6731 } 6732}