001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; 023import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; 024import static org.junit.Assert.assertArrayEquals; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertFalse; 027import static org.junit.Assert.assertNotNull; 028import static org.junit.Assert.assertNull; 029import static org.junit.Assert.assertTrue; 030import static org.junit.Assert.fail; 031import static org.mockito.ArgumentMatchers.any; 032import static org.mockito.ArgumentMatchers.anyLong; 033import static org.mockito.Mockito.doThrow; 034import static org.mockito.Mockito.mock; 035import static org.mockito.Mockito.never; 036import static org.mockito.Mockito.spy; 037import static org.mockito.Mockito.times; 038import static org.mockito.Mockito.verify; 039import static org.mockito.Mockito.when; 040 041import java.io.IOException; 042import java.io.InterruptedIOException; 043import java.math.BigDecimal; 044import java.nio.charset.StandardCharsets; 045import java.security.PrivilegedExceptionAction; 046import java.util.ArrayList; 047import java.util.Arrays; 048import java.util.Collection; 049import java.util.List; 050import java.util.Map; 051import java.util.NavigableMap; 052import java.util.Objects; 053import java.util.TreeMap; 054import java.util.concurrent.Callable; 055import java.util.concurrent.CountDownLatch; 056import java.util.concurrent.ExecutorService; 057import java.util.concurrent.Executors; 058import java.util.concurrent.Future; 059import java.util.concurrent.TimeUnit; 060import java.util.concurrent.atomic.AtomicBoolean; 061import java.util.concurrent.atomic.AtomicInteger; 062import java.util.concurrent.atomic.AtomicReference; 063import org.apache.commons.lang3.RandomStringUtils; 064import org.apache.hadoop.conf.Configuration; 065import org.apache.hadoop.fs.FSDataOutputStream; 066import org.apache.hadoop.fs.FileStatus; 067import org.apache.hadoop.fs.FileSystem; 068import org.apache.hadoop.fs.Path; 069import org.apache.hadoop.hbase.ArrayBackedTag; 070import org.apache.hadoop.hbase.Cell; 071import org.apache.hadoop.hbase.Cell.Type; 072import org.apache.hadoop.hbase.CellBuilderFactory; 073import org.apache.hadoop.hbase.CellBuilderType; 074import org.apache.hadoop.hbase.CellUtil; 075import org.apache.hadoop.hbase.CompareOperator; 076import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 077import org.apache.hadoop.hbase.DoNotRetryIOException; 078import org.apache.hadoop.hbase.DroppedSnapshotException; 079import org.apache.hadoop.hbase.HBaseClassTestRule; 080import org.apache.hadoop.hbase.HBaseConfiguration; 081import org.apache.hadoop.hbase.HBaseTestingUtility; 082import org.apache.hadoop.hbase.HColumnDescriptor; 083import org.apache.hadoop.hbase.HConstants; 084import org.apache.hadoop.hbase.HConstants.OperationStatusCode; 085import org.apache.hadoop.hbase.HDFSBlocksDistribution; 086import org.apache.hadoop.hbase.HRegionInfo; 087import org.apache.hadoop.hbase.HTableDescriptor; 088import org.apache.hadoop.hbase.KeyValue; 089import org.apache.hadoop.hbase.MiniHBaseCluster; 090import org.apache.hadoop.hbase.MultithreadedTestUtil; 091import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 092import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 093import org.apache.hadoop.hbase.NotServingRegionException; 094import org.apache.hadoop.hbase.PrivateCellUtil; 095import org.apache.hadoop.hbase.RegionTooBusyException; 096import org.apache.hadoop.hbase.ServerName; 097import org.apache.hadoop.hbase.StartMiniClusterOption; 098import org.apache.hadoop.hbase.TableName; 099import org.apache.hadoop.hbase.TagType; 100import org.apache.hadoop.hbase.Waiter; 101import org.apache.hadoop.hbase.client.Append; 102import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 103import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 104import org.apache.hadoop.hbase.client.Delete; 105import org.apache.hadoop.hbase.client.Durability; 106import org.apache.hadoop.hbase.client.Get; 107import org.apache.hadoop.hbase.client.Increment; 108import org.apache.hadoop.hbase.client.Mutation; 109import org.apache.hadoop.hbase.client.Put; 110import org.apache.hadoop.hbase.client.RegionInfo; 111import org.apache.hadoop.hbase.client.RegionInfoBuilder; 112import org.apache.hadoop.hbase.client.Result; 113import org.apache.hadoop.hbase.client.RowMutations; 114import org.apache.hadoop.hbase.client.Scan; 115import org.apache.hadoop.hbase.client.Table; 116import org.apache.hadoop.hbase.client.TableDescriptor; 117import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 118import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 119import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 120import org.apache.hadoop.hbase.filter.BigDecimalComparator; 121import org.apache.hadoop.hbase.filter.BinaryComparator; 122import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; 123import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 124import org.apache.hadoop.hbase.filter.Filter; 125import org.apache.hadoop.hbase.filter.FilterBase; 126import org.apache.hadoop.hbase.filter.FilterList; 127import org.apache.hadoop.hbase.filter.NullComparator; 128import org.apache.hadoop.hbase.filter.PrefixFilter; 129import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; 130import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 131import org.apache.hadoop.hbase.filter.SubstringComparator; 132import org.apache.hadoop.hbase.filter.ValueFilter; 133import org.apache.hadoop.hbase.io.TimeRange; 134import org.apache.hadoop.hbase.io.hfile.HFile; 135import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 136import org.apache.hadoop.hbase.monitoring.MonitoredTask; 137import org.apache.hadoop.hbase.monitoring.TaskMonitor; 138import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation; 139import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; 140import org.apache.hadoop.hbase.regionserver.Region.RowLock; 141import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem; 142import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 143import org.apache.hadoop.hbase.regionserver.wal.FSHLog; 144import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; 145import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 146import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver; 147import org.apache.hadoop.hbase.security.User; 148import org.apache.hadoop.hbase.test.MetricsAssertHelper; 149import org.apache.hadoop.hbase.testclassification.LargeTests; 150import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 151import org.apache.hadoop.hbase.util.Bytes; 152import org.apache.hadoop.hbase.util.CommonFSUtils; 153import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 154import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 155import org.apache.hadoop.hbase.util.HFileArchiveUtil; 156import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 157import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 158import org.apache.hadoop.hbase.util.Threads; 159import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 160import org.apache.hadoop.hbase.wal.FaultyFSLog; 161import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper; 162import org.apache.hadoop.hbase.wal.WAL; 163import org.apache.hadoop.hbase.wal.WALEdit; 164import org.apache.hadoop.hbase.wal.WALFactory; 165import org.apache.hadoop.hbase.wal.WALKeyImpl; 166import org.apache.hadoop.hbase.wal.WALProvider; 167import org.apache.hadoop.hbase.wal.WALProvider.Writer; 168import org.apache.hadoop.hbase.wal.WALSplitUtil; 169import org.junit.After; 170import org.junit.Assert; 171import org.junit.Before; 172import org.junit.ClassRule; 173import org.junit.Rule; 174import org.junit.Test; 175import org.junit.experimental.categories.Category; 176import org.junit.rules.ExpectedException; 177import org.junit.rules.TestName; 178import org.mockito.ArgumentCaptor; 179import org.mockito.ArgumentMatcher; 180import org.mockito.Mockito; 181import org.mockito.invocation.InvocationOnMock; 182import org.mockito.stubbing.Answer; 183import org.slf4j.Logger; 184import org.slf4j.LoggerFactory; 185 186import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 187import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 188import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 189import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 190import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 191 192import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 199 200/** 201 * Basic stand-alone testing of HRegion. No clusters! 202 * 203 * A lot of the meta information for an HRegion now lives inside other HRegions 204 * or in the HBaseMaster, so only basic testing is possible. 205 */ 206@Category({VerySlowRegionServerTests.class, LargeTests.class}) 207@SuppressWarnings("deprecation") 208public class TestHRegion { 209 210 @ClassRule 211 public static final HBaseClassTestRule CLASS_RULE = 212 HBaseClassTestRule.forClass(TestHRegion.class); 213 214 // Do not spin up clusters in here. If you need to spin up a cluster, do it 215 // over in TestHRegionOnCluster. 216 private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class); 217 @Rule 218 public TestName name = new TestName(); 219 @Rule public final ExpectedException thrown = ExpectedException.none(); 220 221 private static final String COLUMN_FAMILY = "MyCF"; 222 private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY); 223 private static final EventLoopGroup GROUP = new NioEventLoopGroup(); 224 225 HRegion region = null; 226 // Do not run unit tests in parallel (? Why not? It don't work? Why not? St.Ack) 227 protected static HBaseTestingUtility TEST_UTIL; 228 public static Configuration CONF ; 229 private String dir; 230 private static FileSystem FILESYSTEM; 231 private final int MAX_VERSIONS = 2; 232 233 // Test names 234 protected TableName tableName; 235 protected String method; 236 protected final byte[] qual = Bytes.toBytes("qual"); 237 protected final byte[] qual1 = Bytes.toBytes("qual1"); 238 protected final byte[] qual2 = Bytes.toBytes("qual2"); 239 protected final byte[] qual3 = Bytes.toBytes("qual3"); 240 protected final byte[] value = Bytes.toBytes("value"); 241 protected final byte[] value1 = Bytes.toBytes("value1"); 242 protected final byte[] value2 = Bytes.toBytes("value2"); 243 protected final byte[] row = Bytes.toBytes("rowA"); 244 protected final byte[] row2 = Bytes.toBytes("rowB"); 245 246 protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory 247 .getInstance(MetricsAssertHelper.class); 248 249 @Before 250 public void setup() throws IOException { 251 TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 252 FILESYSTEM = TEST_UTIL.getTestFileSystem(); 253 CONF = TEST_UTIL.getConfiguration(); 254 NettyAsyncFSWALConfigHelper.setEventLoopConfig(CONF, GROUP, NioSocketChannel.class); 255 dir = TEST_UTIL.getDataTestDir("TestHRegion").toString(); 256 method = name.getMethodName(); 257 tableName = TableName.valueOf(method); 258 CONF.set(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, String.valueOf(0.09)); 259 } 260 261 @After 262 public void tearDown() throws IOException { 263 // Region may have been closed, but it is still no harm if we close it again here using HTU. 264 HBaseTestingUtility.closeRegionAndWAL(region); 265 EnvironmentEdgeManagerTestHelper.reset(); 266 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); 267 TEST_UTIL.cleanupTestDir(); 268 } 269 270 /** 271 * Test that I can use the max flushed sequence id after the close. 272 * @throws IOException 273 */ 274 @Test 275 public void testSequenceId() throws IOException { 276 region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES); 277 assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); 278 // Weird. This returns 0 if no store files or no edits. Afraid to change it. 279 assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); 280 HBaseTestingUtility.closeRegionAndWAL(this.region); 281 assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); 282 assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); 283 // Open region again. 284 region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES); 285 byte [] value = Bytes.toBytes(method); 286 // Make a random put against our cf. 287 Put put = new Put(value); 288 put.addColumn(COLUMN_FAMILY_BYTES, null, value); 289 region.put(put); 290 // No flush yet so init numbers should still be in place. 291 assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); 292 assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); 293 region.flush(true); 294 long max = region.getMaxFlushedSeqId(); 295 HBaseTestingUtility.closeRegionAndWAL(this.region); 296 assertEquals(max, region.getMaxFlushedSeqId()); 297 this.region = null; 298 } 299 300 /** 301 * Test for Bug 2 of HBASE-10466. 302 * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize 303 * is smaller than a certain value, or when region close starts a flush is ongoing, the first 304 * flush is skipped and only the second flush takes place. However, two flushes are required in 305 * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data 306 * in current memstore. The fix is removing all conditions except abort check so we ensure 2 307 * flushes for region close." 308 * @throws IOException 309 */ 310 @Test 311 public void testCloseCarryingSnapshot() throws IOException { 312 region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES); 313 HStore store = region.getStore(COLUMN_FAMILY_BYTES); 314 // Get some random bytes. 315 byte [] value = Bytes.toBytes(method); 316 // Make a random put against our cf. 317 Put put = new Put(value); 318 put.addColumn(COLUMN_FAMILY_BYTES, null, value); 319 // First put something in current memstore, which will be in snapshot after flusher.prepare() 320 region.put(put); 321 StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY); 322 storeFlushCtx.prepare(); 323 // Second put something in current memstore 324 put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); 325 region.put(put); 326 // Close with something in memstore and something in the snapshot. Make sure all is cleared. 327 HBaseTestingUtility.closeRegionAndWAL(region); 328 assertEquals(0, region.getMemStoreDataSize()); 329 region = null; 330 } 331 332 /* 333 * This test is for verifying memstore snapshot size is correctly updated in case of rollback 334 * See HBASE-10845 335 */ 336 @Test 337 public void testMemstoreSnapshotSize() throws IOException { 338 class MyFaultyFSLog extends FaultyFSLog { 339 StoreFlushContext storeFlushCtx; 340 public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf) 341 throws IOException { 342 super(fs, rootDir, logName, conf); 343 } 344 345 void setStoreFlushCtx(StoreFlushContext storeFlushCtx) { 346 this.storeFlushCtx = storeFlushCtx; 347 } 348 349 @Override 350 public void sync(long txid) throws IOException { 351 storeFlushCtx.prepare(); 352 super.sync(txid); 353 } 354 } 355 356 FileSystem fs = FileSystem.get(CONF); 357 Path rootDir = new Path(dir + "testMemstoreSnapshotSize"); 358 MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF); 359 region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog, 360 COLUMN_FAMILY_BYTES); 361 362 HStore store = region.getStore(COLUMN_FAMILY_BYTES); 363 // Get some random bytes. 364 byte [] value = Bytes.toBytes(method); 365 faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY)); 366 367 Put put = new Put(value); 368 put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); 369 faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC); 370 371 boolean threwIOE = false; 372 try { 373 region.put(put); 374 } catch (IOException ioe) { 375 threwIOE = true; 376 } finally { 377 assertTrue("The regionserver should have thrown an exception", threwIOE); 378 } 379 MemStoreSize mss = store.getFlushableSize(); 380 assertTrue("flushable size should be zero, but it is " + mss, 381 mss.getDataSize() == 0); 382 } 383 384 /** 385 * Create a WAL outside of the usual helper in 386 * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method 387 * doesn't play nicely with FaultyFileSystem. Call this method before overriding 388 * {@code fs.file.impl}. 389 * @param callingMethod a unique component for the path, probably the name of the test method. 390 */ 391 private static WAL createWALCompatibleWithFaultyFileSystem(String callingMethod, 392 Configuration conf, TableName tableName) throws IOException { 393 final Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log"); 394 final Configuration walConf = new Configuration(conf); 395 CommonFSUtils.setRootDir(walConf, logDir); 396 return new WALFactory(walConf, callingMethod) 397 .getWAL(RegionInfoBuilder.newBuilder(tableName).build()); 398 } 399 400 @Test 401 public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException { 402 String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate"; 403 FileSystem fs = FileSystem.get(CONF); 404 Path rootDir = new Path(dir + testName); 405 FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF); 406 hLog.init(); 407 HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog, 408 COLUMN_FAMILY_BYTES); 409 HStore store = region.getStore(COLUMN_FAMILY_BYTES); 410 assertEquals(0, region.getMemStoreDataSize()); 411 412 // Put one value 413 byte [] value = Bytes.toBytes(method); 414 Put put = new Put(value); 415 put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); 416 region.put(put); 417 long onePutSize = region.getMemStoreDataSize(); 418 assertTrue(onePutSize > 0); 419 420 RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); 421 doThrow(new IOException()) 422 .when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any()); 423 region.setCoprocessorHost(mockedCPHost); 424 425 put = new Put(value); 426 put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value); 427 try { 428 region.put(put); 429 fail("Should have failed with IOException"); 430 } catch (IOException expected) { 431 } 432 long expectedSize = onePutSize * 2; 433 assertEquals("memstoreSize should be incremented", 434 expectedSize, region.getMemStoreDataSize()); 435 assertEquals("flushable size should be incremented", 436 expectedSize, store.getFlushableSize().getDataSize()); 437 438 region.setCoprocessorHost(null); 439 } 440 441 /** 442 * A test case of HBASE-21041 443 * @throws Exception Exception 444 */ 445 @Test 446 public void testFlushAndMemstoreSizeCounting() throws Exception { 447 byte[] family = Bytes.toBytes("family"); 448 this.region = initHRegion(tableName, method, CONF, family); 449 final WALFactory wals = new WALFactory(CONF, method); 450 try { 451 for (byte[] row : HBaseTestingUtility.ROWS) { 452 Put put = new Put(row); 453 put.addColumn(family, family, row); 454 region.put(put); 455 } 456 region.flush(true); 457 // After flush, data size should be zero 458 assertEquals(0, region.getMemStoreDataSize()); 459 // After flush, a new active mutable segment is created, so the heap size 460 // should equal to MutableSegment.DEEP_OVERHEAD 461 assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize()); 462 // After flush, offheap should be zero 463 assertEquals(0, region.getMemStoreOffHeapSize()); 464 } finally { 465 HBaseTestingUtility.closeRegionAndWAL(this.region); 466 this.region = null; 467 wals.close(); 468 } 469 } 470 471 /** 472 * Test we do not lose data if we fail a flush and then close. 473 * Part of HBase-10466. Tests the following from the issue description: 474 * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is 475 * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when 476 * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by 477 * the sum of current memstore sizes instead of snapshots left from previous failed flush. This 478 * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize 479 * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size 480 * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize 481 * much smaller than expected. In extreme case, if the error accumulates to even bigger than 482 * HRegion's memstore size limit, any further flush is skipped because flush does not do anything 483 * if memstoreSize is not larger than 0." 484 * @throws Exception 485 */ 486 @Test 487 public void testFlushSizeAccounting() throws Exception { 488 final Configuration conf = HBaseConfiguration.create(CONF); 489 final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName); 490 // Only retry once. 491 conf.setInt("hbase.hstore.flush.retries.number", 1); 492 final User user = 493 User.createUserForTesting(conf, method, new String[]{"foo"}); 494 // Inject our faulty LocalFileSystem 495 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 496 user.runAs(new PrivilegedExceptionAction<Object>() { 497 @Override 498 public Object run() throws Exception { 499 // Make sure it worked (above is sensitive to caching details in hadoop core) 500 FileSystem fs = FileSystem.get(conf); 501 Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); 502 FaultyFileSystem ffs = (FaultyFileSystem)fs; 503 HRegion region = null; 504 try { 505 // Initialize region 506 region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal, 507 COLUMN_FAMILY_BYTES); 508 long size = region.getMemStoreDataSize(); 509 Assert.assertEquals(0, size); 510 // Put one item into memstore. Measure the size of one item in memstore. 511 Put p1 = new Put(row); 512 p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[]) null)); 513 region.put(p1); 514 final long sizeOfOnePut = region.getMemStoreDataSize(); 515 // Fail a flush which means the current memstore will hang out as memstore 'snapshot'. 516 try { 517 LOG.info("Flushing"); 518 region.flush(true); 519 Assert.fail("Didn't bubble up IOE!"); 520 } catch (DroppedSnapshotException dse) { 521 // What we are expecting 522 region.closing.set(false); // this is needed for the rest of the test to work 523 } 524 // Make it so all writes succeed from here on out 525 ffs.fault.set(false); 526 // Check sizes. Should still be the one entry. 527 Assert.assertEquals(sizeOfOnePut, region.getMemStoreDataSize()); 528 // Now add two entries so that on this next flush that fails, we can see if we 529 // subtract the right amount, the snapshot size only. 530 Put p2 = new Put(row); 531 p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null)); 532 p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null)); 533 region.put(p2); 534 long expectedSize = sizeOfOnePut * 3; 535 Assert.assertEquals(expectedSize, region.getMemStoreDataSize()); 536 // Do a successful flush. It will clear the snapshot only. Thats how flushes work. 537 // If already a snapshot, we clear it else we move the memstore to be snapshot and flush 538 // it 539 region.flush(true); 540 // Make sure our memory accounting is right. 541 Assert.assertEquals(sizeOfOnePut * 2, region.getMemStoreDataSize()); 542 } finally { 543 HBaseTestingUtility.closeRegionAndWAL(region); 544 } 545 return null; 546 } 547 }); 548 FileSystem.closeAllForUGI(user.getUGI()); 549 } 550 551 @Test 552 public void testCloseWithFailingFlush() throws Exception { 553 final Configuration conf = HBaseConfiguration.create(CONF); 554 final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName); 555 // Only retry once. 556 conf.setInt("hbase.hstore.flush.retries.number", 1); 557 final User user = 558 User.createUserForTesting(conf, this.method, new String[]{"foo"}); 559 // Inject our faulty LocalFileSystem 560 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); 561 user.runAs(new PrivilegedExceptionAction<Object>() { 562 @Override 563 public Object run() throws Exception { 564 // Make sure it worked (above is sensitive to caching details in hadoop core) 565 FileSystem fs = FileSystem.get(conf); 566 Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); 567 FaultyFileSystem ffs = (FaultyFileSystem)fs; 568 HRegion region = null; 569 try { 570 // Initialize region 571 region = initHRegion(tableName, null, null, false, 572 Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES); 573 long size = region.getMemStoreDataSize(); 574 Assert.assertEquals(0, size); 575 // Put one item into memstore. Measure the size of one item in memstore. 576 Put p1 = new Put(row); 577 p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null)); 578 region.put(p1); 579 // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only. 580 HStore store = region.getStore(COLUMN_FAMILY_BYTES); 581 StoreFlushContext storeFlushCtx = 582 store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY); 583 storeFlushCtx.prepare(); 584 // Now add two entries to the foreground memstore. 585 Put p2 = new Put(row); 586 p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null)); 587 p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null)); 588 region.put(p2); 589 // Now try close on top of a failing flush. 590 HBaseTestingUtility.closeRegionAndWAL(region); 591 region = null; 592 fail(); 593 } catch (DroppedSnapshotException dse) { 594 // Expected 595 LOG.info("Expected DroppedSnapshotException"); 596 } finally { 597 // Make it so all writes succeed from here on out so can close clean 598 ffs.fault.set(false); 599 HBaseTestingUtility.closeRegionAndWAL(region); 600 } 601 return null; 602 } 603 }); 604 FileSystem.closeAllForUGI(user.getUGI()); 605 } 606 607 @Test 608 public void testCompactionAffectedByScanners() throws Exception { 609 byte[] family = Bytes.toBytes("family"); 610 this.region = initHRegion(tableName, method, CONF, family); 611 612 Put put = new Put(Bytes.toBytes("r1")); 613 put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); 614 region.put(put); 615 region.flush(true); 616 617 Scan scan = new Scan(); 618 scan.setMaxVersions(3); 619 // open the first scanner 620 RegionScanner scanner1 = region.getScanner(scan); 621 622 Delete delete = new Delete(Bytes.toBytes("r1")); 623 region.delete(delete); 624 region.flush(true); 625 626 // open the second scanner 627 RegionScanner scanner2 = region.getScanner(scan); 628 629 List<Cell> results = new ArrayList<>(); 630 631 System.out.println("Smallest read point:" + region.getSmallestReadPoint()); 632 633 // make a major compaction 634 region.compact(true); 635 636 // open the third scanner 637 RegionScanner scanner3 = region.getScanner(scan); 638 639 // get data from scanner 1, 2, 3 after major compaction 640 scanner1.next(results); 641 System.out.println(results); 642 assertEquals(1, results.size()); 643 644 results.clear(); 645 scanner2.next(results); 646 System.out.println(results); 647 assertEquals(0, results.size()); 648 649 results.clear(); 650 scanner3.next(results); 651 System.out.println(results); 652 assertEquals(0, results.size()); 653 } 654 655 @Test 656 public void testToShowNPEOnRegionScannerReseek() throws Exception { 657 byte[] family = Bytes.toBytes("family"); 658 this.region = initHRegion(tableName, method, CONF, family); 659 660 Put put = new Put(Bytes.toBytes("r1")); 661 put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); 662 region.put(put); 663 put = new Put(Bytes.toBytes("r2")); 664 put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); 665 region.put(put); 666 region.flush(true); 667 668 Scan scan = new Scan(); 669 scan.setMaxVersions(3); 670 // open the first scanner 671 RegionScanner scanner1 = region.getScanner(scan); 672 673 System.out.println("Smallest read point:" + region.getSmallestReadPoint()); 674 675 region.compact(true); 676 677 scanner1.reseek(Bytes.toBytes("r2")); 678 List<Cell> results = new ArrayList<>(); 679 scanner1.next(results); 680 Cell keyValue = results.get(0); 681 Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0); 682 scanner1.close(); 683 } 684 685 @Test 686 public void testArchiveRecoveredEditsReplay() throws Exception { 687 byte[] family = Bytes.toBytes("family"); 688 this.region = initHRegion(tableName, method, CONF, family); 689 final WALFactory wals = new WALFactory(CONF, method); 690 try { 691 Path regiondir = region.getRegionFileSystem().getRegionDir(); 692 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 693 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 694 695 Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 696 697 long maxSeqId = 1050; 698 long minSeqId = 1000; 699 700 for (long i = minSeqId; i <= maxSeqId; i += 10) { 701 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 702 fs.create(recoveredEdits); 703 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 704 705 long time = System.nanoTime(); 706 WALEdit edit = new WALEdit(); 707 edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes 708 .toBytes(i))); 709 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, 710 HConstants.DEFAULT_CLUSTER_ID), edit)); 711 712 writer.close(); 713 } 714 MonitoredTask status = TaskMonitor.get().createStatus(method); 715 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 716 for (HStore store : region.getStores()) { 717 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1); 718 } 719 CONF.set("hbase.region.archive.recovered.edits", "true"); 720 CONF.set(CommonFSUtils.HBASE_WAL_DIR, "/custom_wal_dir"); 721 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); 722 assertEquals(maxSeqId, seqId); 723 region.getMVCC().advanceTo(seqId); 724 String fakeFamilyName = recoveredEditsDir.getName(); 725 Path rootDir = new Path(CONF.get(HConstants.HBASE_DIR)); 726 Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(rootDir, 727 region.getRegionInfo(), Bytes.toBytes(fakeFamilyName)); 728 FileStatus[] list = TEST_UTIL.getTestFileSystem().listStatus(storeArchiveDir); 729 assertEquals(6, list.length); 730 } finally { 731 CONF.set("hbase.region.archive.recovered.edits", "false"); 732 CONF.set(CommonFSUtils.HBASE_WAL_DIR, ""); 733 HBaseTestingUtility.closeRegionAndWAL(this.region); 734 this.region = null; 735 wals.close(); 736 } 737 } 738 739 @Test 740 public void testSkipRecoveredEditsReplay() throws Exception { 741 byte[] family = Bytes.toBytes("family"); 742 this.region = initHRegion(tableName, method, CONF, family); 743 final WALFactory wals = new WALFactory(CONF, method); 744 try { 745 Path regiondir = region.getRegionFileSystem().getRegionDir(); 746 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 747 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 748 749 Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 750 751 long maxSeqId = 1050; 752 long minSeqId = 1000; 753 754 for (long i = minSeqId; i <= maxSeqId; i += 10) { 755 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 756 fs.create(recoveredEdits); 757 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 758 759 long time = System.nanoTime(); 760 WALEdit edit = new WALEdit(); 761 edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes 762 .toBytes(i))); 763 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, 764 HConstants.DEFAULT_CLUSTER_ID), edit)); 765 766 writer.close(); 767 } 768 MonitoredTask status = TaskMonitor.get().createStatus(method); 769 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 770 for (HStore store : region.getStores()) { 771 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1); 772 } 773 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); 774 assertEquals(maxSeqId, seqId); 775 region.getMVCC().advanceTo(seqId); 776 Get get = new Get(row); 777 Result result = region.get(get); 778 for (long i = minSeqId; i <= maxSeqId; i += 10) { 779 List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i)); 780 assertEquals(1, kvs.size()); 781 assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0))); 782 } 783 } finally { 784 HBaseTestingUtility.closeRegionAndWAL(this.region); 785 this.region = null; 786 wals.close(); 787 } 788 } 789 790 @Test 791 public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception { 792 byte[] family = Bytes.toBytes("family"); 793 this.region = initHRegion(tableName, method, CONF, family); 794 final WALFactory wals = new WALFactory(CONF, method); 795 try { 796 Path regiondir = region.getRegionFileSystem().getRegionDir(); 797 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 798 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 799 800 Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 801 802 long maxSeqId = 1050; 803 long minSeqId = 1000; 804 805 for (long i = minSeqId; i <= maxSeqId; i += 10) { 806 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 807 fs.create(recoveredEdits); 808 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 809 810 long time = System.nanoTime(); 811 WALEdit edit = new WALEdit(); 812 edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes 813 .toBytes(i))); 814 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, 815 HConstants.DEFAULT_CLUSTER_ID), edit)); 816 817 writer.close(); 818 } 819 long recoverSeqId = 1030; 820 MonitoredTask status = TaskMonitor.get().createStatus(method); 821 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 822 for (HStore store : region.getStores()) { 823 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); 824 } 825 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); 826 assertEquals(maxSeqId, seqId); 827 region.getMVCC().advanceTo(seqId); 828 Get get = new Get(row); 829 Result result = region.get(get); 830 for (long i = minSeqId; i <= maxSeqId; i += 10) { 831 List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i)); 832 if (i < recoverSeqId) { 833 assertEquals(0, kvs.size()); 834 } else { 835 assertEquals(1, kvs.size()); 836 assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0))); 837 } 838 } 839 } finally { 840 HBaseTestingUtility.closeRegionAndWAL(this.region); 841 this.region = null; 842 wals.close(); 843 } 844 } 845 846 @Test 847 public void testSkipRecoveredEditsReplayAllIgnored() throws Exception { 848 byte[] family = Bytes.toBytes("family"); 849 this.region = initHRegion(tableName, method, CONF, family); 850 Path regiondir = region.getRegionFileSystem().getRegionDir(); 851 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 852 853 Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 854 for (int i = 1000; i < 1050; i += 10) { 855 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 856 FSDataOutputStream dos = fs.create(recoveredEdits); 857 dos.writeInt(i); 858 dos.close(); 859 } 860 long minSeqId = 2000; 861 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1)); 862 FSDataOutputStream dos = fs.create(recoveredEdits); 863 dos.close(); 864 865 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 866 for (HStore store : region.getStores()) { 867 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId); 868 } 869 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null); 870 assertEquals(minSeqId, seqId); 871 } 872 873 @Test 874 public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception { 875 byte[] family = Bytes.toBytes("family"); 876 this.region = initHRegion(tableName, method, CONF, family); 877 final WALFactory wals = new WALFactory(CONF, method); 878 try { 879 Path regiondir = region.getRegionFileSystem().getRegionDir(); 880 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 881 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 882 byte[][] columns = region.getTableDescriptor().getColumnFamilyNames().toArray(new byte[0][]); 883 884 assertEquals(0, region.getStoreFileList(columns).size()); 885 886 Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 887 888 long maxSeqId = 1050; 889 long minSeqId = 1000; 890 891 for (long i = minSeqId; i <= maxSeqId; i += 10) { 892 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); 893 fs.create(recoveredEdits); 894 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 895 896 long time = System.nanoTime(); 897 WALEdit edit = null; 898 if (i == maxSeqId) { 899 edit = WALEdit.createCompaction(region.getRegionInfo(), 900 CompactionDescriptor.newBuilder() 901 .setTableName(ByteString.copyFrom(tableName.getName())) 902 .setFamilyName(ByteString.copyFrom(regionName)) 903 .setEncodedRegionName(ByteString.copyFrom(regionName)) 904 .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString()))) 905 .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName())) 906 .build()); 907 } else { 908 edit = new WALEdit(); 909 edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes 910 .toBytes(i))); 911 } 912 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, 913 HConstants.DEFAULT_CLUSTER_ID), edit)); 914 writer.close(); 915 } 916 917 long recoverSeqId = 1030; 918 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 919 MonitoredTask status = TaskMonitor.get().createStatus(method); 920 for (HStore store : region.getStores()) { 921 maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1); 922 } 923 long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status); 924 assertEquals(maxSeqId, seqId); 925 926 // assert that the files are flushed 927 assertEquals(1, region.getStoreFileList(columns).size()); 928 929 } finally { 930 HBaseTestingUtility.closeRegionAndWAL(this.region); 931 this.region = null; 932 wals.close(); 933 } 934 } 935 936 @Test 937 public void testRecoveredEditsReplayCompaction() throws Exception { 938 testRecoveredEditsReplayCompaction(false); 939 testRecoveredEditsReplayCompaction(true); 940 } 941 942 public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception { 943 CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class); 944 byte[] family = Bytes.toBytes("family"); 945 this.region = initHRegion(tableName, method, CONF, family); 946 final WALFactory wals = new WALFactory(CONF, method); 947 try { 948 Path regiondir = region.getRegionFileSystem().getRegionDir(); 949 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 950 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 951 952 long maxSeqId = 3; 953 long minSeqId = 0; 954 955 for (long i = minSeqId; i < maxSeqId; i++) { 956 Put put = new Put(Bytes.toBytes(i)); 957 put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i)); 958 region.put(put); 959 region.flush(true); 960 } 961 962 // this will create a region with 3 files 963 assertEquals(3, region.getStore(family).getStorefilesCount()); 964 List<Path> storeFiles = new ArrayList<>(3); 965 for (HStoreFile sf : region.getStore(family).getStorefiles()) { 966 storeFiles.add(sf.getPath()); 967 } 968 969 // disable compaction completion 970 CONF.setBoolean("hbase.hstore.compaction.complete", false); 971 region.compactStores(); 972 973 // ensure that nothing changed 974 assertEquals(3, region.getStore(family).getStorefilesCount()); 975 976 // now find the compacted file, and manually add it to the recovered edits 977 Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family)); 978 FileStatus[] files = CommonFSUtils.listStatus(fs, tmpDir); 979 String errorMsg = "Expected to find 1 file in the region temp directory " 980 + "from the compaction, could not find any"; 981 assertNotNull(errorMsg, files); 982 assertEquals(errorMsg, 1, files.length); 983 // move the file inside region dir 984 Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family), 985 files[0].getPath()); 986 987 byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes(); 988 byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length]; 989 for (int i=0; i < encodedNameAsBytes.length; i++) { 990 // Mix the byte array to have a new encodedName 991 fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1); 992 } 993 994 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region 995 .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family, 996 storeFiles, Lists.newArrayList(newFile), 997 region.getRegionFileSystem().getStoreDir(Bytes.toString(family))); 998 999 WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(), 1000 this.region.getRegionInfo(), compactionDescriptor, region.getMVCC()); 1001 1002 Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 1003 1004 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); 1005 fs.create(recoveredEdits); 1006 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 1007 1008 long time = System.nanoTime(); 1009 1010 writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, 10, time, 1011 HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(), 1012 compactionDescriptor))); 1013 writer.close(); 1014 1015 // close the region now, and reopen again 1016 region.getTableDescriptor(); 1017 region.getRegionInfo(); 1018 HBaseTestingUtility.closeRegionAndWAL(this.region); 1019 try { 1020 region = HRegion.openHRegion(region, null); 1021 } catch (WrongRegionException wre) { 1022 fail("Matching encoded region name should not have produced WrongRegionException"); 1023 } 1024 1025 // now check whether we have only one store file, the compacted one 1026 Collection<HStoreFile> sfs = region.getStore(family).getStorefiles(); 1027 for (HStoreFile sf : sfs) { 1028 LOG.info(Objects.toString(sf.getPath())); 1029 } 1030 if (!mismatchedRegionName) { 1031 assertEquals(1, region.getStore(family).getStorefilesCount()); 1032 } 1033 files = CommonFSUtils.listStatus(fs, tmpDir); 1034 assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0); 1035 1036 for (long i = minSeqId; i < maxSeqId; i++) { 1037 Get get = new Get(Bytes.toBytes(i)); 1038 Result result = region.get(get); 1039 byte[] value = result.getValue(family, Bytes.toBytes(i)); 1040 assertArrayEquals(Bytes.toBytes(i), value); 1041 } 1042 } finally { 1043 HBaseTestingUtility.closeRegionAndWAL(this.region); 1044 this.region = null; 1045 wals.close(); 1046 CONF.setClass(HConstants.REGION_IMPL, HRegion.class, Region.class); 1047 } 1048 } 1049 1050 @Test 1051 public void testFlushMarkers() throws Exception { 1052 // tests that flush markers are written to WAL and handled at recovered edits 1053 byte[] family = Bytes.toBytes("family"); 1054 Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log"); 1055 final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration()); 1056 CommonFSUtils.setRootDir(walConf, logDir); 1057 final WALFactory wals = new WALFactory(walConf, method); 1058 final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()); 1059 1060 this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, 1061 HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family); 1062 try { 1063 Path regiondir = region.getRegionFileSystem().getRegionDir(); 1064 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 1065 byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); 1066 1067 long maxSeqId = 3; 1068 long minSeqId = 0; 1069 1070 for (long i = minSeqId; i < maxSeqId; i++) { 1071 Put put = new Put(Bytes.toBytes(i)); 1072 put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i)); 1073 region.put(put); 1074 region.flush(true); 1075 } 1076 1077 // this will create a region with 3 files from flush 1078 assertEquals(3, region.getStore(family).getStorefilesCount()); 1079 List<String> storeFiles = new ArrayList<>(3); 1080 for (HStoreFile sf : region.getStore(family).getStorefiles()) { 1081 storeFiles.add(sf.getPath().getName()); 1082 } 1083 1084 // now verify that the flush markers are written 1085 wal.shutdown(); 1086 WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal), 1087 TEST_UTIL.getConfiguration()); 1088 try { 1089 List<WAL.Entry> flushDescriptors = new ArrayList<>(); 1090 long lastFlushSeqId = -1; 1091 while (true) { 1092 WAL.Entry entry = reader.next(); 1093 if (entry == null) { 1094 break; 1095 } 1096 Cell cell = entry.getEdit().getCells().get(0); 1097 if (WALEdit.isMetaEditFamily(cell)) { 1098 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell); 1099 assertNotNull(flushDesc); 1100 assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray()); 1101 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 1102 assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId); 1103 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 1104 assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId); 1105 } 1106 lastFlushSeqId = flushDesc.getFlushSequenceNumber(); 1107 assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray()); 1108 assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store 1109 StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0); 1110 assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray()); 1111 assertEquals("family", storeFlushDesc.getStoreHomeDir()); 1112 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 1113 assertEquals(0, storeFlushDesc.getFlushOutputCount()); 1114 } else { 1115 assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush 1116 assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0))); 1117 } 1118 1119 flushDescriptors.add(entry); 1120 } 1121 } 1122 1123 assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush 1124 1125 // now write those markers to the recovered edits again. 1126 1127 Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 1128 1129 Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); 1130 fs.create(recoveredEdits); 1131 WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); 1132 1133 for (WAL.Entry entry : flushDescriptors) { 1134 writer.append(entry); 1135 } 1136 writer.close(); 1137 } finally { 1138 if (null != reader) { 1139 try { 1140 reader.close(); 1141 } catch (IOException exception) { 1142 LOG.warn("Problem closing wal: " + exception.getMessage()); 1143 LOG.debug("exception details", exception); 1144 } 1145 } 1146 } 1147 1148 // close the region now, and reopen again 1149 HBaseTestingUtility.closeRegionAndWAL(this.region); 1150 region = HRegion.openHRegion(region, null); 1151 1152 // now check whether we have can read back the data from region 1153 for (long i = minSeqId; i < maxSeqId; i++) { 1154 Get get = new Get(Bytes.toBytes(i)); 1155 Result result = region.get(get); 1156 byte[] value = result.getValue(family, Bytes.toBytes(i)); 1157 assertArrayEquals(Bytes.toBytes(i), value); 1158 } 1159 } finally { 1160 HBaseTestingUtility.closeRegionAndWAL(this.region); 1161 this.region = null; 1162 wals.close(); 1163 } 1164 } 1165 1166 static class IsFlushWALMarker implements ArgumentMatcher<WALEdit> { 1167 volatile FlushAction[] actions; 1168 public IsFlushWALMarker(FlushAction... actions) { 1169 this.actions = actions; 1170 } 1171 @Override 1172 public boolean matches(WALEdit edit) { 1173 List<Cell> cells = edit.getCells(); 1174 if (cells.isEmpty()) { 1175 return false; 1176 } 1177 if (WALEdit.isMetaEditFamily(cells.get(0))) { 1178 FlushDescriptor desc; 1179 try { 1180 desc = WALEdit.getFlushDescriptor(cells.get(0)); 1181 } catch (IOException e) { 1182 LOG.warn(e.toString(), e); 1183 return false; 1184 } 1185 if (desc != null) { 1186 for (FlushAction action : actions) { 1187 if (desc.getAction() == action) { 1188 return true; 1189 } 1190 } 1191 } 1192 } 1193 return false; 1194 } 1195 public IsFlushWALMarker set(FlushAction... actions) { 1196 this.actions = actions; 1197 return this; 1198 } 1199 } 1200 1201 @Test 1202 public void testFlushMarkersWALFail() throws Exception { 1203 // test the cases where the WAL append for flush markers fail. 1204 byte[] family = Bytes.toBytes("family"); 1205 1206 // spy an actual WAL implementation to throw exception (was not able to mock) 1207 Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log"); 1208 1209 final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration()); 1210 CommonFSUtils.setRootDir(walConf, logDir); 1211 // Make up a WAL that we can manipulate at append time. 1212 class FailAppendFlushMarkerWAL extends FSHLog { 1213 volatile FlushAction [] flushActions = null; 1214 1215 public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf) 1216 throws IOException { 1217 super(fs, root, logDir, conf); 1218 } 1219 1220 @Override 1221 protected Writer createWriterInstance(Path path) throws IOException { 1222 final Writer w = super.createWriterInstance(path); 1223 return new Writer() { 1224 @Override 1225 public void close() throws IOException { 1226 w.close(); 1227 } 1228 1229 @Override 1230 public void sync(boolean forceSync) throws IOException { 1231 w.sync(forceSync); 1232 } 1233 1234 @Override 1235 public void append(Entry entry) throws IOException { 1236 List<Cell> cells = entry.getEdit().getCells(); 1237 if (WALEdit.isMetaEditFamily(cells.get(0))) { 1238 FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0)); 1239 if (desc != null) { 1240 for (FlushAction flushAction: flushActions) { 1241 if (desc.getAction().equals(flushAction)) { 1242 throw new IOException("Failed to append flush marker! " + flushAction); 1243 } 1244 } 1245 } 1246 } 1247 w.append(entry); 1248 } 1249 1250 @Override 1251 public long getLength() { 1252 return w.getLength(); 1253 } 1254 }; 1255 } 1256 } 1257 FailAppendFlushMarkerWAL wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), 1258 CommonFSUtils.getRootDir(walConf), method, walConf); 1259 wal.init(); 1260 this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, 1261 HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family); 1262 int i = 0; 1263 Put put = new Put(Bytes.toBytes(i)); 1264 put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal 1265 put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i)); 1266 region.put(put); 1267 1268 // 1. Test case where START_FLUSH throws exception 1269 wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH}; 1270 1271 // start cache flush will throw exception 1272 try { 1273 region.flush(true); 1274 fail("This should have thrown exception"); 1275 } catch (DroppedSnapshotException unexpected) { 1276 // this should not be a dropped snapshot exception. Meaning that RS will not abort 1277 throw unexpected; 1278 } catch (IOException expected) { 1279 // expected 1280 } 1281 // The WAL is hosed now. It has two edits appended. We cannot roll the log without it 1282 // throwing a DroppedSnapshotException to force an abort. Just clean up the mess. 1283 region.close(true); 1284 wal.close(); 1285 1286 // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception 1287 wal.flushActions = new FlushAction[] { FlushAction.COMMIT_FLUSH }; 1288 wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), CommonFSUtils.getRootDir(walConf), 1289 method, walConf); 1290 wal.init(); 1291 this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, 1292 HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family); 1293 region.put(put); 1294 // 3. Test case where ABORT_FLUSH will throw exception. 1295 // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with 1296 // DroppedSnapshotException. Below COMMIT_FLUSH will cause flush to abort 1297 wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH}; 1298 1299 try { 1300 region.flush(true); 1301 fail("This should have thrown exception"); 1302 } catch (DroppedSnapshotException expected) { 1303 // we expect this exception, since we were able to write the snapshot, but failed to 1304 // write the flush marker to WAL 1305 } catch (IOException unexpected) { 1306 throw unexpected; 1307 } 1308 } 1309 1310 @Test 1311 public void testGetWhileRegionClose() throws IOException { 1312 Configuration hc = initSplit(); 1313 int numRows = 100; 1314 byte[][] families = { fam1, fam2, fam3 }; 1315 1316 // Setting up region 1317 this.region = initHRegion(tableName, method, hc, families); 1318 // Put data in region 1319 final int startRow = 100; 1320 putData(startRow, numRows, qual1, families); 1321 putData(startRow, numRows, qual2, families); 1322 putData(startRow, numRows, qual3, families); 1323 final AtomicBoolean done = new AtomicBoolean(false); 1324 final AtomicInteger gets = new AtomicInteger(0); 1325 GetTillDoneOrException[] threads = new GetTillDoneOrException[10]; 1326 try { 1327 // Set ten threads running concurrently getting from the region. 1328 for (int i = 0; i < threads.length / 2; i++) { 1329 threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets); 1330 threads[i].setDaemon(true); 1331 threads[i].start(); 1332 } 1333 // Artificially make the condition by setting closing flag explicitly. 1334 // I can't make the issue happen with a call to region.close(). 1335 this.region.closing.set(true); 1336 for (int i = threads.length / 2; i < threads.length; i++) { 1337 threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets); 1338 threads[i].setDaemon(true); 1339 threads[i].start(); 1340 } 1341 } finally { 1342 if (this.region != null) { 1343 HBaseTestingUtility.closeRegionAndWAL(this.region); 1344 this.region = null; 1345 } 1346 } 1347 done.set(true); 1348 for (GetTillDoneOrException t : threads) { 1349 try { 1350 t.join(); 1351 } catch (InterruptedException e) { 1352 e.printStackTrace(); 1353 } 1354 if (t.e != null) { 1355 LOG.info("Exception=" + t.e); 1356 assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException); 1357 } 1358 } 1359 } 1360 1361 /* 1362 * Thread that does get on single row until 'done' flag is flipped. If an 1363 * exception causes us to fail, it records it. 1364 */ 1365 class GetTillDoneOrException extends Thread { 1366 private final Get g; 1367 private final AtomicBoolean done; 1368 private final AtomicInteger count; 1369 private Exception e; 1370 1371 GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, 1372 final AtomicInteger c) { 1373 super("getter." + i); 1374 this.g = new Get(r); 1375 this.done = d; 1376 this.count = c; 1377 } 1378 1379 @Override 1380 public void run() { 1381 while (!this.done.get()) { 1382 try { 1383 assertTrue(region.get(g).size() > 0); 1384 this.count.incrementAndGet(); 1385 } catch (Exception e) { 1386 this.e = e; 1387 break; 1388 } 1389 } 1390 } 1391 } 1392 1393 /* 1394 * An involved filter test. Has multiple column families and deletes in mix. 1395 */ 1396 @Test 1397 public void testWeirdCacheBehaviour() throws Exception { 1398 final TableName tableName = TableName.valueOf(name.getMethodName()); 1399 byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"), 1400 Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") }; 1401 this.region = initHRegion(tableName, method, CONF, FAMILIES); 1402 String value = "this is the value"; 1403 String value2 = "this is some other value"; 1404 String keyPrefix1 = "prefix1"; 1405 String keyPrefix2 = "prefix2"; 1406 String keyPrefix3 = "prefix3"; 1407 putRows(this.region, 3, value, keyPrefix1); 1408 putRows(this.region, 3, value, keyPrefix2); 1409 putRows(this.region, 3, value, keyPrefix3); 1410 putRows(this.region, 3, value2, keyPrefix1); 1411 putRows(this.region, 3, value2, keyPrefix2); 1412 putRows(this.region, 3, value2, keyPrefix3); 1413 System.out.println("Checking values for key: " + keyPrefix1); 1414 assertEquals("Got back incorrect number of rows from scan", 3, 1415 getNumberOfRows(keyPrefix1, value2, this.region)); 1416 System.out.println("Checking values for key: " + keyPrefix2); 1417 assertEquals("Got back incorrect number of rows from scan", 3, 1418 getNumberOfRows(keyPrefix2, value2, this.region)); 1419 System.out.println("Checking values for key: " + keyPrefix3); 1420 assertEquals("Got back incorrect number of rows from scan", 3, 1421 getNumberOfRows(keyPrefix3, value2, this.region)); 1422 deleteColumns(this.region, value2, keyPrefix1); 1423 deleteColumns(this.region, value2, keyPrefix2); 1424 deleteColumns(this.region, value2, keyPrefix3); 1425 System.out.println("Starting important checks....."); 1426 assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0, 1427 getNumberOfRows(keyPrefix1, value2, this.region)); 1428 assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0, 1429 getNumberOfRows(keyPrefix2, value2, this.region)); 1430 assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0, 1431 getNumberOfRows(keyPrefix3, value2, this.region)); 1432 } 1433 1434 @Test 1435 public void testAppendWithReadOnlyTable() throws Exception { 1436 final TableName tableName = TableName.valueOf(name.getMethodName()); 1437 this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily")); 1438 boolean exceptionCaught = false; 1439 Append append = new Append(Bytes.toBytes("somerow")); 1440 append.setDurability(Durability.SKIP_WAL); 1441 append.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1442 Bytes.toBytes("somevalue")); 1443 try { 1444 region.append(append); 1445 } catch (IOException e) { 1446 exceptionCaught = true; 1447 } 1448 assertTrue(exceptionCaught == true); 1449 } 1450 1451 @Test 1452 public void testIncrWithReadOnlyTable() throws Exception { 1453 final TableName tableName = TableName.valueOf(name.getMethodName()); 1454 this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily")); 1455 boolean exceptionCaught = false; 1456 Increment inc = new Increment(Bytes.toBytes("somerow")); 1457 inc.setDurability(Durability.SKIP_WAL); 1458 inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L); 1459 try { 1460 region.increment(inc); 1461 } catch (IOException e) { 1462 exceptionCaught = true; 1463 } 1464 assertTrue(exceptionCaught == true); 1465 } 1466 1467 private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException { 1468 InternalScanner scanner = buildScanner(keyPrefix, value, r); 1469 int count = 0; 1470 boolean more = false; 1471 List<Cell> results = new ArrayList<>(); 1472 do { 1473 more = scanner.next(results); 1474 if (results != null && !results.isEmpty()) 1475 count++; 1476 else 1477 break; 1478 Delete delete = new Delete(CellUtil.cloneRow(results.get(0))); 1479 delete.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2")); 1480 r.delete(delete); 1481 results.clear(); 1482 } while (more); 1483 assertEquals("Did not perform correct number of deletes", 3, count); 1484 } 1485 1486 private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception { 1487 InternalScanner resultScanner = buildScanner(keyPrefix, value, r); 1488 int numberOfResults = 0; 1489 List<Cell> results = new ArrayList<>(); 1490 boolean more = false; 1491 do { 1492 more = resultScanner.next(results); 1493 if (results != null && !results.isEmpty()) 1494 numberOfResults++; 1495 else 1496 break; 1497 for (Cell kv : results) { 1498 System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv))); 1499 } 1500 results.clear(); 1501 } while (more); 1502 return numberOfResults; 1503 } 1504 1505 private InternalScanner buildScanner(String keyPrefix, String value, HRegion r) 1506 throws IOException { 1507 // Defaults FilterList.Operator.MUST_PASS_ALL. 1508 FilterList allFilters = new FilterList(); 1509 allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix))); 1510 // Only return rows where this column value exists in the row. 1511 SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"), 1512 Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value)); 1513 filter.setFilterIfMissing(true); 1514 allFilters.addFilter(filter); 1515 Scan scan = new Scan(); 1516 scan.addFamily(Bytes.toBytes("trans-blob")); 1517 scan.addFamily(Bytes.toBytes("trans-type")); 1518 scan.addFamily(Bytes.toBytes("trans-date")); 1519 scan.addFamily(Bytes.toBytes("trans-tags")); 1520 scan.addFamily(Bytes.toBytes("trans-group")); 1521 scan.setFilter(allFilters); 1522 return r.getScanner(scan); 1523 } 1524 1525 private void putRows(HRegion r, int numRows, String value, String key) throws IOException { 1526 for (int i = 0; i < numRows; i++) { 1527 String row = key + "_" + i/* UUID.randomUUID().toString() */; 1528 System.out.println(String.format("Saving row: %s, with value %s", row, value)); 1529 Put put = new Put(Bytes.toBytes(row)); 1530 put.setDurability(Durability.SKIP_WAL); 1531 put.addColumn(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob")); 1532 put.addColumn(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement")); 1533 put.addColumn(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999")); 1534 put.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value)); 1535 put.addColumn(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId")); 1536 r.put(put); 1537 } 1538 } 1539 1540 @Test 1541 public void testFamilyWithAndWithoutColon() throws Exception { 1542 byte[] cf = Bytes.toBytes(COLUMN_FAMILY); 1543 this.region = initHRegion(tableName, method, CONF, cf); 1544 Put p = new Put(tableName.toBytes()); 1545 byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":"); 1546 p.addColumn(cfwithcolon, cfwithcolon, cfwithcolon); 1547 boolean exception = false; 1548 try { 1549 this.region.put(p); 1550 } catch (NoSuchColumnFamilyException e) { 1551 exception = true; 1552 } 1553 assertTrue(exception); 1554 } 1555 1556 @Test 1557 public void testBatchPut_whileNoRowLocksHeld() throws IOException { 1558 final Put[] puts = new Put[10]; 1559 MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); 1560 long syncs = prepareRegionForBachPut(puts, source, false); 1561 1562 OperationStatus[] codes = this.region.batchMutate(puts); 1563 assertEquals(10, codes.length); 1564 for (int i = 0; i < 10; i++) { 1565 assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); 1566 } 1567 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); 1568 1569 LOG.info("Next a batch put with one invalid family"); 1570 puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); 1571 codes = this.region.batchMutate(puts); 1572 assertEquals(10, codes.length); 1573 for (int i = 0; i < 10; i++) { 1574 assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, 1575 codes[i].getOperationStatusCode()); 1576 } 1577 1578 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source); 1579 } 1580 1581 @Test 1582 public void testBatchPut_whileMultipleRowLocksHeld() throws Exception { 1583 final Put[] puts = new Put[10]; 1584 MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); 1585 long syncs = prepareRegionForBachPut(puts, source, false); 1586 1587 puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); 1588 1589 LOG.info("batchPut will have to break into four batches to avoid row locks"); 1590 RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2")); 1591 RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_1")); 1592 RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3")); 1593 RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true); 1594 1595 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); 1596 final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>(); 1597 final CountDownLatch startingPuts = new CountDownLatch(1); 1598 final CountDownLatch startingClose = new CountDownLatch(1); 1599 TestThread putter = new TestThread(ctx) { 1600 @Override 1601 public void doWork() throws IOException { 1602 startingPuts.countDown(); 1603 retFromThread.set(region.batchMutate(puts)); 1604 } 1605 }; 1606 LOG.info("...starting put thread while holding locks"); 1607 ctx.addThread(putter); 1608 ctx.startThreads(); 1609 1610 // Now attempt to close the region from another thread. Prior to HBASE-12565 1611 // this would cause the in-progress batchMutate operation to to fail with 1612 // exception because it use to release and re-acquire the close-guard lock 1613 // between batches. Caller then didn't get status indicating which writes succeeded. 1614 // We now expect this thread to block until the batchMutate call finishes. 1615 Thread regionCloseThread = new TestThread(ctx) { 1616 @Override 1617 public void doWork() { 1618 try { 1619 startingPuts.await(); 1620 // Give some time for the batch mutate to get in. 1621 // We don't want to race with the mutate 1622 Thread.sleep(10); 1623 startingClose.countDown(); 1624 HBaseTestingUtility.closeRegionAndWAL(region); 1625 region = null; 1626 } catch (IOException e) { 1627 throw new RuntimeException(e); 1628 } catch (InterruptedException e) { 1629 throw new RuntimeException(e); 1630 } 1631 } 1632 }; 1633 regionCloseThread.start(); 1634 1635 startingClose.await(); 1636 startingPuts.await(); 1637 Thread.sleep(100); 1638 LOG.info("...releasing row lock 1, which should let put thread continue"); 1639 rowLock1.release(); 1640 rowLock2.release(); 1641 rowLock3.release(); 1642 waitForCounter(source, "syncTimeNumOps", syncs + 1); 1643 1644 LOG.info("...joining on put thread"); 1645 ctx.stop(); 1646 regionCloseThread.join(); 1647 1648 OperationStatus[] codes = retFromThread.get(); 1649 for (int i = 0; i < codes.length; i++) { 1650 assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, 1651 codes[i].getOperationStatusCode()); 1652 } 1653 rowLock4.release(); 1654 } 1655 1656 private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount) 1657 throws InterruptedException { 1658 long startWait = System.currentTimeMillis(); 1659 long currentCount; 1660 while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) { 1661 Thread.sleep(100); 1662 if (System.currentTimeMillis() - startWait > 10000) { 1663 fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName, 1664 expectedCount, currentCount)); 1665 } 1666 } 1667 } 1668 1669 @Test 1670 public void testAtomicBatchPut() throws IOException { 1671 final Put[] puts = new Put[10]; 1672 MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); 1673 long syncs = prepareRegionForBachPut(puts, source, false); 1674 1675 // 1. Straight forward case, should succeed 1676 MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true, 1677 HConstants.NO_NONCE, HConstants.NO_NONCE); 1678 OperationStatus[] codes = this.region.batchMutate(batchOp); 1679 assertEquals(10, codes.length); 1680 for (int i = 0; i < 10; i++) { 1681 assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); 1682 } 1683 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); 1684 1685 // 2. Failed to get lock 1686 RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3)); 1687 // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this 1688 // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread 1689 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); 1690 final AtomicReference<IOException> retFromThread = new AtomicReference<>(); 1691 final CountDownLatch finishedPuts = new CountDownLatch(1); 1692 final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true, 1693 HConstants 1694 .NO_NONCE, 1695 HConstants.NO_NONCE); 1696 TestThread putter = new TestThread(ctx) { 1697 @Override 1698 public void doWork() throws IOException { 1699 try { 1700 region.batchMutate(finalBatchOp); 1701 } catch (IOException ioe) { 1702 LOG.error("test failed!", ioe); 1703 retFromThread.set(ioe); 1704 } 1705 finishedPuts.countDown(); 1706 } 1707 }; 1708 LOG.info("...starting put thread while holding locks"); 1709 ctx.addThread(putter); 1710 ctx.startThreads(); 1711 LOG.info("...waiting for batch puts while holding locks"); 1712 try { 1713 finishedPuts.await(); 1714 } catch (InterruptedException e) { 1715 LOG.error("Interrupted!", e); 1716 } finally { 1717 if (lock != null) { 1718 lock.release(); 1719 } 1720 } 1721 assertNotNull(retFromThread.get()); 1722 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); 1723 1724 // 3. Exception thrown in validation 1725 LOG.info("Next a batch put with one invalid family"); 1726 puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value); 1727 batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE, 1728 HConstants.NO_NONCE); 1729 thrown.expect(NoSuchColumnFamilyException.class); 1730 this.region.batchMutate(batchOp); 1731 } 1732 1733 @Test 1734 public void testBatchPutWithTsSlop() throws Exception { 1735 // add data with a timestamp that is too recent for range. Ensure assert 1736 CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000); 1737 final Put[] puts = new Put[10]; 1738 MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); 1739 1740 long syncs = prepareRegionForBachPut(puts, source, true); 1741 1742 OperationStatus[] codes = this.region.batchMutate(puts); 1743 assertEquals(10, codes.length); 1744 for (int i = 0; i < 10; i++) { 1745 assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode()); 1746 } 1747 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); 1748 } 1749 1750 /** 1751 * @return syncs initial syncTimeNumOps 1752 */ 1753 private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source, 1754 boolean slop) throws IOException { 1755 this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES); 1756 1757 LOG.info("First a batch put with all valid puts"); 1758 for (int i = 0; i < puts.length; i++) { 1759 puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) : 1760 new Put(Bytes.toBytes("row_" + i)); 1761 puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value); 1762 } 1763 1764 long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); 1765 metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); 1766 return syncs; 1767 } 1768 1769 // //////////////////////////////////////////////////////////////////////////// 1770 // checkAndMutate tests 1771 // //////////////////////////////////////////////////////////////////////////// 1772 @Test 1773 public void testCheckAndMutate_WithEmptyRowValue() throws IOException { 1774 byte[] row1 = Bytes.toBytes("row1"); 1775 byte[] fam1 = Bytes.toBytes("fam1"); 1776 byte[] qf1 = Bytes.toBytes("qualifier"); 1777 byte[] emptyVal = new byte[] {}; 1778 byte[] val1 = Bytes.toBytes("value1"); 1779 byte[] val2 = Bytes.toBytes("value2"); 1780 1781 // Setting up region 1782 this.region = initHRegion(tableName, method, CONF, fam1); 1783 // Putting empty data in key 1784 Put put = new Put(row1); 1785 put.addColumn(fam1, qf1, emptyVal); 1786 1787 // checkAndPut with empty value 1788 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1789 new BinaryComparator(emptyVal), put); 1790 assertTrue(res); 1791 1792 // Putting data in key 1793 put = new Put(row1); 1794 put.addColumn(fam1, qf1, val1); 1795 1796 // checkAndPut with correct value 1797 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1798 new BinaryComparator(emptyVal), put); 1799 assertTrue(res); 1800 1801 // not empty anymore 1802 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1803 new BinaryComparator(emptyVal), put); 1804 assertFalse(res); 1805 1806 Delete delete = new Delete(row1); 1807 delete.addColumn(fam1, qf1); 1808 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1809 new BinaryComparator(emptyVal), delete); 1810 assertFalse(res); 1811 1812 put = new Put(row1); 1813 put.addColumn(fam1, qf1, val2); 1814 // checkAndPut with correct value 1815 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1816 new BinaryComparator(val1), put); 1817 assertTrue(res); 1818 1819 // checkAndDelete with correct value 1820 delete = new Delete(row1); 1821 delete.addColumn(fam1, qf1); 1822 delete.addColumn(fam1, qf1); 1823 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1824 new BinaryComparator(val2), delete); 1825 assertTrue(res); 1826 1827 delete = new Delete(row1); 1828 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1829 new BinaryComparator(emptyVal), delete); 1830 assertTrue(res); 1831 1832 // checkAndPut looking for a null value 1833 put = new Put(row1); 1834 put.addColumn(fam1, qf1, val1); 1835 1836 res = region 1837 .checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put); 1838 assertTrue(res); 1839 } 1840 1841 @Test 1842 public void testCheckAndMutate_WithWrongValue() throws IOException { 1843 byte[] row1 = Bytes.toBytes("row1"); 1844 byte[] fam1 = Bytes.toBytes("fam1"); 1845 byte[] qf1 = Bytes.toBytes("qualifier"); 1846 byte[] val1 = Bytes.toBytes("value1"); 1847 byte[] val2 = Bytes.toBytes("value2"); 1848 BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE); 1849 BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE); 1850 1851 // Setting up region 1852 this.region = initHRegion(tableName, method, CONF, fam1); 1853 // Putting data in key 1854 Put put = new Put(row1); 1855 put.addColumn(fam1, qf1, val1); 1856 region.put(put); 1857 1858 // checkAndPut with wrong value 1859 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1860 new BinaryComparator(val2), put); 1861 assertEquals(false, res); 1862 1863 // checkAndDelete with wrong value 1864 Delete delete = new Delete(row1); 1865 delete.addFamily(fam1); 1866 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1867 new BinaryComparator(val2), put); 1868 assertEquals(false, res); 1869 1870 // Putting data in key 1871 put = new Put(row1); 1872 put.addColumn(fam1, qf1, Bytes.toBytes(bd1)); 1873 region.put(put); 1874 1875 // checkAndPut with wrong value 1876 res = 1877 region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1878 new BigDecimalComparator(bd2), put); 1879 assertEquals(false, res); 1880 1881 // checkAndDelete with wrong value 1882 delete = new Delete(row1); 1883 delete.addFamily(fam1); 1884 res = 1885 region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1886 new BigDecimalComparator(bd2), put); 1887 assertEquals(false, res); 1888 } 1889 1890 @Test 1891 public void testCheckAndMutate_WithCorrectValue() throws IOException { 1892 byte[] row1 = Bytes.toBytes("row1"); 1893 byte[] fam1 = Bytes.toBytes("fam1"); 1894 byte[] qf1 = Bytes.toBytes("qualifier"); 1895 byte[] val1 = Bytes.toBytes("value1"); 1896 BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE); 1897 1898 // Setting up region 1899 this.region = initHRegion(tableName, method, CONF, fam1); 1900 // Putting data in key 1901 long now = System.currentTimeMillis(); 1902 Put put = new Put(row1); 1903 put.addColumn(fam1, qf1, now, val1); 1904 region.put(put); 1905 1906 // checkAndPut with correct value 1907 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 1908 new BinaryComparator(val1), put); 1909 assertEquals("First", true, res); 1910 1911 // checkAndDelete with correct value 1912 Delete delete = new Delete(row1, now + 1); 1913 delete.addColumn(fam1, qf1); 1914 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1), 1915 delete); 1916 assertEquals("Delete", true, res); 1917 1918 // Putting data in key 1919 put = new Put(row1); 1920 put.addColumn(fam1, qf1, now + 2, Bytes.toBytes(bd1)); 1921 region.put(put); 1922 1923 // checkAndPut with correct value 1924 res = 1925 region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator( 1926 bd1), put); 1927 assertEquals("Second put", true, res); 1928 1929 // checkAndDelete with correct value 1930 delete = new Delete(row1, now + 3); 1931 delete.addColumn(fam1, qf1); 1932 res = 1933 region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator( 1934 bd1), delete); 1935 assertEquals("Second delete", true, res); 1936 } 1937 1938 @Test 1939 public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException { 1940 byte[] row1 = Bytes.toBytes("row1"); 1941 byte[] fam1 = Bytes.toBytes("fam1"); 1942 byte[] qf1 = Bytes.toBytes("qualifier"); 1943 byte[] val1 = Bytes.toBytes("value1"); 1944 byte[] val2 = Bytes.toBytes("value2"); 1945 byte[] val3 = Bytes.toBytes("value3"); 1946 byte[] val4 = Bytes.toBytes("value4"); 1947 1948 // Setting up region 1949 this.region = initHRegion(tableName, method, CONF, fam1); 1950 // Putting val3 in key 1951 Put put = new Put(row1); 1952 put.addColumn(fam1, qf1, val3); 1953 region.put(put); 1954 1955 // Test CompareOp.LESS: original = val3, compare with val3, fail 1956 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS, 1957 new BinaryComparator(val3), put); 1958 assertEquals(false, res); 1959 1960 // Test CompareOp.LESS: original = val3, compare with val4, fail 1961 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS, 1962 new BinaryComparator(val4), put); 1963 assertEquals(false, res); 1964 1965 // Test CompareOp.LESS: original = val3, compare with val2, 1966 // succeed (now value = val2) 1967 put = new Put(row1); 1968 put.addColumn(fam1, qf1, val2); 1969 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS, 1970 new BinaryComparator(val2), put); 1971 assertEquals(true, res); 1972 1973 // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail 1974 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL, 1975 new BinaryComparator(val3), put); 1976 assertEquals(false, res); 1977 1978 // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2, 1979 // succeed (value still = val2) 1980 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL, 1981 new BinaryComparator(val2), put); 1982 assertEquals(true, res); 1983 1984 // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1, 1985 // succeed (now value = val3) 1986 put = new Put(row1); 1987 put.addColumn(fam1, qf1, val3); 1988 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL, 1989 new BinaryComparator(val1), put); 1990 assertEquals(true, res); 1991 1992 // Test CompareOp.GREATER: original = val3, compare with val3, fail 1993 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER, 1994 new BinaryComparator(val3), put); 1995 assertEquals(false, res); 1996 1997 // Test CompareOp.GREATER: original = val3, compare with val2, fail 1998 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER, 1999 new BinaryComparator(val2), put); 2000 assertEquals(false, res); 2001 2002 // Test CompareOp.GREATER: original = val3, compare with val4, 2003 // succeed (now value = val2) 2004 put = new Put(row1); 2005 put.addColumn(fam1, qf1, val2); 2006 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER, 2007 new BinaryComparator(val4), put); 2008 assertEquals(true, res); 2009 2010 // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail 2011 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL, 2012 new BinaryComparator(val1), put); 2013 assertEquals(false, res); 2014 2015 // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2, 2016 // succeed (value still = val2) 2017 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL, 2018 new BinaryComparator(val2), put); 2019 assertEquals(true, res); 2020 2021 // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed 2022 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL, 2023 new BinaryComparator(val3), put); 2024 assertEquals(true, res); 2025 } 2026 2027 @Test 2028 public void testCheckAndPut_ThatPutWasWritten() throws IOException { 2029 byte[] row1 = Bytes.toBytes("row1"); 2030 byte[] fam1 = Bytes.toBytes("fam1"); 2031 byte[] fam2 = Bytes.toBytes("fam2"); 2032 byte[] qf1 = Bytes.toBytes("qualifier"); 2033 byte[] val1 = Bytes.toBytes("value1"); 2034 byte[] val2 = Bytes.toBytes("value2"); 2035 2036 byte[][] families = { fam1, fam2 }; 2037 2038 // Setting up region 2039 this.region = initHRegion(tableName, method, CONF, families); 2040 // Putting data in the key to check 2041 Put put = new Put(row1); 2042 put.addColumn(fam1, qf1, val1); 2043 region.put(put); 2044 2045 // Creating put to add 2046 long ts = System.currentTimeMillis(); 2047 KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2); 2048 put = new Put(row1); 2049 put.add(kv); 2050 2051 // checkAndPut with wrong value 2052 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 2053 new BinaryComparator(val1), put); 2054 assertEquals(true, res); 2055 2056 Get get = new Get(row1); 2057 get.addColumn(fam2, qf1); 2058 Cell[] actual = region.get(get).rawCells(); 2059 2060 Cell[] expected = { kv }; 2061 2062 assertEquals(expected.length, actual.length); 2063 for (int i = 0; i < actual.length; i++) { 2064 assertEquals(expected[i], actual[i]); 2065 } 2066 } 2067 2068 @Test 2069 public void testCheckAndPut_wrongRowInPut() throws IOException { 2070 this.region = initHRegion(tableName, method, CONF, COLUMNS); 2071 Put put = new Put(row2); 2072 put.addColumn(fam1, qual1, value1); 2073 try { 2074 region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, 2075 new BinaryComparator(value2), put); 2076 fail(); 2077 } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) { 2078 // expected exception. 2079 } 2080 } 2081 2082 @Test 2083 public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException { 2084 byte[] row1 = Bytes.toBytes("row1"); 2085 byte[] fam1 = Bytes.toBytes("fam1"); 2086 byte[] fam2 = Bytes.toBytes("fam2"); 2087 byte[] qf1 = Bytes.toBytes("qualifier1"); 2088 byte[] qf2 = Bytes.toBytes("qualifier2"); 2089 byte[] qf3 = Bytes.toBytes("qualifier3"); 2090 byte[] val1 = Bytes.toBytes("value1"); 2091 byte[] val2 = Bytes.toBytes("value2"); 2092 byte[] val3 = Bytes.toBytes("value3"); 2093 byte[] emptyVal = new byte[] {}; 2094 2095 byte[][] families = { fam1, fam2 }; 2096 2097 // Setting up region 2098 this.region = initHRegion(tableName, method, CONF, families); 2099 // Put content 2100 Put put = new Put(row1); 2101 put.addColumn(fam1, qf1, val1); 2102 region.put(put); 2103 Threads.sleep(2); 2104 2105 put = new Put(row1); 2106 put.addColumn(fam1, qf1, val2); 2107 put.addColumn(fam2, qf1, val3); 2108 put.addColumn(fam2, qf2, val2); 2109 put.addColumn(fam2, qf3, val1); 2110 put.addColumn(fam1, qf3, val1); 2111 region.put(put); 2112 2113 LOG.info("get={}", region.get(new Get(row1).addColumn(fam1, qf1)).toString()); 2114 2115 // Multi-column delete 2116 Delete delete = new Delete(row1); 2117 delete.addColumn(fam1, qf1); 2118 delete.addColumn(fam2, qf1); 2119 delete.addColumn(fam1, qf3); 2120 boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, 2121 new BinaryComparator(val2), delete); 2122 assertEquals(true, res); 2123 2124 Get get = new Get(row1); 2125 get.addColumn(fam1, qf1); 2126 get.addColumn(fam1, qf3); 2127 get.addColumn(fam2, qf2); 2128 Result r = region.get(get); 2129 assertEquals(2, r.size()); 2130 assertArrayEquals(val1, r.getValue(fam1, qf1)); 2131 assertArrayEquals(val2, r.getValue(fam2, qf2)); 2132 2133 // Family delete 2134 delete = new Delete(row1); 2135 delete.addFamily(fam2); 2136 res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL, 2137 new BinaryComparator(emptyVal), delete); 2138 assertEquals(true, res); 2139 2140 get = new Get(row1); 2141 r = region.get(get); 2142 assertEquals(1, r.size()); 2143 assertArrayEquals(val1, r.getValue(fam1, qf1)); 2144 2145 // Row delete 2146 delete = new Delete(row1); 2147 res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1), 2148 delete); 2149 assertEquals(true, res); 2150 get = new Get(row1); 2151 r = region.get(get); 2152 assertEquals(0, r.size()); 2153 } 2154 2155 @Test 2156 public void testCheckAndMutate_WithFilters() throws Throwable { 2157 final byte[] FAMILY = Bytes.toBytes("fam"); 2158 2159 // Setting up region 2160 this.region = initHRegion(tableName, method, CONF, FAMILY); 2161 2162 // Put one row 2163 Put put = new Put(row); 2164 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 2165 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 2166 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 2167 region.put(put); 2168 2169 // Put with success 2170 boolean ok = region.checkAndMutate(row, 2171 new FilterList( 2172 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 2173 Bytes.toBytes("a")), 2174 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 2175 Bytes.toBytes("b")) 2176 ), 2177 new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); 2178 assertTrue(ok); 2179 2180 Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))); 2181 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 2182 2183 // Put with failure 2184 ok = region.checkAndMutate(row, 2185 new FilterList( 2186 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 2187 Bytes.toBytes("a")), 2188 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 2189 Bytes.toBytes("c")) 2190 ), 2191 new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))); 2192 assertFalse(ok); 2193 2194 assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty()); 2195 2196 // Delete with success 2197 ok = region.checkAndMutate(row, 2198 new FilterList( 2199 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 2200 Bytes.toBytes("a")), 2201 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 2202 Bytes.toBytes("b")) 2203 ), 2204 new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))); 2205 assertTrue(ok); 2206 2207 assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty()); 2208 2209 // Mutate with success 2210 ok = region.checkAndRowMutate(row, 2211 new FilterList( 2212 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 2213 Bytes.toBytes("a")), 2214 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 2215 Bytes.toBytes("b")) 2216 ), 2217 new RowMutations(row) 2218 .add((Mutation) new Put(row) 2219 .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))) 2220 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))); 2221 assertTrue(ok); 2222 2223 result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))); 2224 assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); 2225 2226 assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty()); 2227 } 2228 2229 @Test 2230 public void testCheckAndMutate_WithFiltersAndTimeRange() throws Throwable { 2231 final byte[] FAMILY = Bytes.toBytes("fam"); 2232 2233 // Setting up region 2234 this.region = initHRegion(tableName, method, CONF, FAMILY); 2235 2236 // Put with specifying the timestamp 2237 region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))); 2238 2239 // Put with success 2240 boolean ok = region.checkAndMutate(row, 2241 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 2242 Bytes.toBytes("a")), 2243 TimeRange.between(0, 101), 2244 new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); 2245 assertTrue(ok); 2246 2247 Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))); 2248 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 2249 2250 // Put with failure 2251 ok = region.checkAndMutate(row, 2252 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 2253 Bytes.toBytes("a")), 2254 TimeRange.between(0, 100), 2255 new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))); 2256 assertFalse(ok); 2257 2258 assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty()); 2259 2260 // Mutate with success 2261 ok = region.checkAndRowMutate(row, 2262 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 2263 Bytes.toBytes("a")), 2264 TimeRange.between(0, 101), 2265 new RowMutations(row) 2266 .add((Mutation) new Put(row) 2267 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 2268 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))); 2269 assertTrue(ok); 2270 2271 result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))); 2272 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 2273 2274 assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty()); 2275 } 2276 2277 @Test 2278 public void testCheckAndMutate_wrongMutationType() throws Throwable { 2279 // Setting up region 2280 this.region = initHRegion(tableName, method, CONF, fam1); 2281 2282 try { 2283 region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(value1), 2284 new Increment(row).addColumn(fam1, qual1, 1)); 2285 fail("should throw DoNotRetryIOException"); 2286 } catch (DoNotRetryIOException e) { 2287 assertEquals("Action must be Put or Delete", e.getMessage()); 2288 } 2289 2290 try { 2291 region.checkAndMutate(row, 2292 new SingleColumnValueFilter(fam1, qual1, CompareOperator.EQUAL, value1), 2293 new Increment(row).addColumn(fam1, qual1, 1)); 2294 fail("should throw DoNotRetryIOException"); 2295 } catch (DoNotRetryIOException e) { 2296 assertEquals("Action must be Put or Delete", e.getMessage()); 2297 } 2298 } 2299 2300 @Test 2301 public void testCheckAndMutate_wrongRow() throws Throwable { 2302 final byte[] wrongRow = Bytes.toBytes("wrongRow"); 2303 2304 // Setting up region 2305 this.region = initHRegion(tableName, method, CONF, fam1); 2306 2307 try { 2308 region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(value1), 2309 new Put(wrongRow).addColumn(fam1, qual1, value1)); 2310 fail("should throw DoNotRetryIOException"); 2311 } catch (DoNotRetryIOException e) { 2312 assertEquals("Action's getRow must match", e.getMessage()); 2313 } 2314 2315 try { 2316 region.checkAndMutate(row, 2317 new SingleColumnValueFilter(fam1, qual1, CompareOperator.EQUAL, value1), 2318 new Put(wrongRow).addColumn(fam1, qual1, value1)); 2319 fail("should throw DoNotRetryIOException"); 2320 } catch (DoNotRetryIOException e) { 2321 assertEquals("Action's getRow must match", e.getMessage()); 2322 } 2323 2324 try { 2325 region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL, 2326 new BinaryComparator(value1), 2327 new RowMutations(wrongRow) 2328 .add((Mutation) new Put(wrongRow) 2329 .addColumn(fam1, qual1, value1)) 2330 .add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2))); 2331 fail("should throw DoNotRetryIOException"); 2332 } catch (DoNotRetryIOException e) { 2333 assertEquals("Action's getRow must match", e.getMessage()); 2334 } 2335 2336 try { 2337 region.checkAndRowMutate(row, 2338 new SingleColumnValueFilter(fam1, qual1, CompareOperator.EQUAL, value1), 2339 new RowMutations(wrongRow) 2340 .add((Mutation) new Put(wrongRow) 2341 .addColumn(fam1, qual1, value1)) 2342 .add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2))); 2343 fail("should throw DoNotRetryIOException"); 2344 } catch (DoNotRetryIOException e) { 2345 assertEquals("Action's getRow must match", e.getMessage()); 2346 } 2347 } 2348 2349 // //////////////////////////////////////////////////////////////////////////// 2350 // Delete tests 2351 // //////////////////////////////////////////////////////////////////////////// 2352 @Test 2353 public void testDelete_multiDeleteColumn() throws IOException { 2354 byte[] row1 = Bytes.toBytes("row1"); 2355 byte[] fam1 = Bytes.toBytes("fam1"); 2356 byte[] qual = Bytes.toBytes("qualifier"); 2357 byte[] value = Bytes.toBytes("value"); 2358 2359 Put put = new Put(row1); 2360 put.addColumn(fam1, qual, 1, value); 2361 put.addColumn(fam1, qual, 2, value); 2362 2363 this.region = initHRegion(tableName, method, CONF, fam1); 2364 region.put(put); 2365 2366 // We do support deleting more than 1 'latest' version 2367 Delete delete = new Delete(row1); 2368 delete.addColumn(fam1, qual); 2369 delete.addColumn(fam1, qual); 2370 region.delete(delete); 2371 2372 Get get = new Get(row1); 2373 get.addFamily(fam1); 2374 Result r = region.get(get); 2375 assertEquals(0, r.size()); 2376 } 2377 2378 @Test 2379 public void testDelete_CheckFamily() throws IOException { 2380 byte[] row1 = Bytes.toBytes("row1"); 2381 byte[] fam1 = Bytes.toBytes("fam1"); 2382 byte[] fam2 = Bytes.toBytes("fam2"); 2383 byte[] fam3 = Bytes.toBytes("fam3"); 2384 byte[] fam4 = Bytes.toBytes("fam4"); 2385 2386 // Setting up region 2387 this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3); 2388 List<Cell> kvs = new ArrayList<>(); 2389 kvs.add(new KeyValue(row1, fam4, null, null)); 2390 2391 // testing existing family 2392 byte[] family = fam2; 2393 NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 2394 deleteMap.put(family, kvs); 2395 region.delete(deleteMap, Durability.SYNC_WAL); 2396 2397 // testing non existing family 2398 boolean ok = false; 2399 family = fam4; 2400 try { 2401 deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 2402 deleteMap.put(family, kvs); 2403 region.delete(deleteMap, Durability.SYNC_WAL); 2404 } catch (Exception e) { 2405 ok = true; 2406 } 2407 assertTrue("Family " + new String(family, StandardCharsets.UTF_8) + " does exist", ok); 2408 } 2409 2410 @Test 2411 public void testDelete_mixed() throws IOException, InterruptedException { 2412 byte[] fam = Bytes.toBytes("info"); 2413 byte[][] families = { fam }; 2414 this.region = initHRegion(tableName, method, CONF, families); 2415 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 2416 2417 byte[] row = Bytes.toBytes("table_name"); 2418 // column names 2419 byte[] serverinfo = Bytes.toBytes("serverinfo"); 2420 byte[] splitA = Bytes.toBytes("splitA"); 2421 byte[] splitB = Bytes.toBytes("splitB"); 2422 2423 // add some data: 2424 Put put = new Put(row); 2425 put.addColumn(fam, splitA, Bytes.toBytes("reference_A")); 2426 region.put(put); 2427 2428 put = new Put(row); 2429 put.addColumn(fam, splitB, Bytes.toBytes("reference_B")); 2430 region.put(put); 2431 2432 put = new Put(row); 2433 put.addColumn(fam, serverinfo, Bytes.toBytes("ip_address")); 2434 region.put(put); 2435 2436 // ok now delete a split: 2437 Delete delete = new Delete(row); 2438 delete.addColumns(fam, splitA); 2439 region.delete(delete); 2440 2441 // assert some things: 2442 Get get = new Get(row).addColumn(fam, serverinfo); 2443 Result result = region.get(get); 2444 assertEquals(1, result.size()); 2445 2446 get = new Get(row).addColumn(fam, splitA); 2447 result = region.get(get); 2448 assertEquals(0, result.size()); 2449 2450 get = new Get(row).addColumn(fam, splitB); 2451 result = region.get(get); 2452 assertEquals(1, result.size()); 2453 2454 // Assert that after a delete, I can put. 2455 put = new Put(row); 2456 put.addColumn(fam, splitA, Bytes.toBytes("reference_A")); 2457 region.put(put); 2458 get = new Get(row); 2459 result = region.get(get); 2460 assertEquals(3, result.size()); 2461 2462 // Now delete all... then test I can add stuff back 2463 delete = new Delete(row); 2464 region.delete(delete); 2465 assertEquals(0, region.get(get).size()); 2466 2467 region.put(new Put(row).addColumn(fam, splitA, Bytes.toBytes("reference_A"))); 2468 result = region.get(get); 2469 assertEquals(1, result.size()); 2470 } 2471 2472 @Test 2473 public void testDeleteRowWithFutureTs() throws IOException { 2474 byte[] fam = Bytes.toBytes("info"); 2475 byte[][] families = { fam }; 2476 this.region = initHRegion(tableName, method, CONF, families); 2477 byte[] row = Bytes.toBytes("table_name"); 2478 // column names 2479 byte[] serverinfo = Bytes.toBytes("serverinfo"); 2480 2481 // add data in the far future 2482 Put put = new Put(row); 2483 put.addColumn(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value")); 2484 region.put(put); 2485 2486 // now delete something in the present 2487 Delete delete = new Delete(row); 2488 region.delete(delete); 2489 2490 // make sure we still see our data 2491 Get get = new Get(row).addColumn(fam, serverinfo); 2492 Result result = region.get(get); 2493 assertEquals(1, result.size()); 2494 2495 // delete the future row 2496 delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3); 2497 region.delete(delete); 2498 2499 // make sure it is gone 2500 get = new Get(row).addColumn(fam, serverinfo); 2501 result = region.get(get); 2502 assertEquals(0, result.size()); 2503 } 2504 2505 /** 2506 * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by 2507 * the actual timestamp 2508 */ 2509 @Test 2510 public void testPutWithLatestTS() throws IOException { 2511 byte[] fam = Bytes.toBytes("info"); 2512 byte[][] families = { fam }; 2513 this.region = initHRegion(tableName, method, CONF, families); 2514 byte[] row = Bytes.toBytes("row1"); 2515 // column names 2516 byte[] qual = Bytes.toBytes("qual"); 2517 2518 // add data with LATEST_TIMESTAMP, put without WAL 2519 Put put = new Put(row); 2520 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value")); 2521 region.put(put); 2522 2523 // Make sure it shows up with an actual timestamp 2524 Get get = new Get(row).addColumn(fam, qual); 2525 Result result = region.get(get); 2526 assertEquals(1, result.size()); 2527 Cell kv = result.rawCells()[0]; 2528 LOG.info("Got: " + kv); 2529 assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp", 2530 kv.getTimestamp() != HConstants.LATEST_TIMESTAMP); 2531 2532 // Check same with WAL enabled (historically these took different 2533 // code paths, so check both) 2534 row = Bytes.toBytes("row2"); 2535 put = new Put(row); 2536 put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value")); 2537 region.put(put); 2538 2539 // Make sure it shows up with an actual timestamp 2540 get = new Get(row).addColumn(fam, qual); 2541 result = region.get(get); 2542 assertEquals(1, result.size()); 2543 kv = result.rawCells()[0]; 2544 LOG.info("Got: " + kv); 2545 assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp", 2546 kv.getTimestamp() != HConstants.LATEST_TIMESTAMP); 2547 } 2548 2549 /** 2550 * Tests that there is server-side filtering for invalid timestamp upper 2551 * bound. Note that the timestamp lower bound is automatically handled for us 2552 * by the TTL field. 2553 */ 2554 @Test 2555 public void testPutWithTsSlop() throws IOException { 2556 byte[] fam = Bytes.toBytes("info"); 2557 byte[][] families = { fam }; 2558 2559 // add data with a timestamp that is too recent for range. Ensure assert 2560 CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000); 2561 this.region = initHRegion(tableName, method, CONF, families); 2562 boolean caughtExcep = false; 2563 try { 2564 // no TS specified == use latest. should not error 2565 region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), Bytes.toBytes("value"))); 2566 // TS out of range. should error 2567 region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), 2568 System.currentTimeMillis() + 2000, Bytes.toBytes("value"))); 2569 fail("Expected IOE for TS out of configured timerange"); 2570 } catch (FailedSanityCheckException ioe) { 2571 LOG.debug("Received expected exception", ioe); 2572 caughtExcep = true; 2573 } 2574 assertTrue("Should catch FailedSanityCheckException", caughtExcep); 2575 } 2576 2577 @Test 2578 public void testScanner_DeleteOneFamilyNotAnother() throws IOException { 2579 byte[] fam1 = Bytes.toBytes("columnA"); 2580 byte[] fam2 = Bytes.toBytes("columnB"); 2581 this.region = initHRegion(tableName, method, CONF, fam1, fam2); 2582 byte[] rowA = Bytes.toBytes("rowA"); 2583 byte[] rowB = Bytes.toBytes("rowB"); 2584 2585 byte[] value = Bytes.toBytes("value"); 2586 2587 Delete delete = new Delete(rowA); 2588 delete.addFamily(fam1); 2589 2590 region.delete(delete); 2591 2592 // now create data. 2593 Put put = new Put(rowA); 2594 put.addColumn(fam2, null, value); 2595 region.put(put); 2596 2597 put = new Put(rowB); 2598 put.addColumn(fam1, null, value); 2599 put.addColumn(fam2, null, value); 2600 region.put(put); 2601 2602 Scan scan = new Scan(); 2603 scan.addFamily(fam1).addFamily(fam2); 2604 InternalScanner s = region.getScanner(scan); 2605 List<Cell> results = new ArrayList<>(); 2606 s.next(results); 2607 assertTrue(CellUtil.matchingRows(results.get(0), rowA)); 2608 2609 results.clear(); 2610 s.next(results); 2611 assertTrue(CellUtil.matchingRows(results.get(0), rowB)); 2612 } 2613 2614 @Test 2615 public void testDataInMemoryWithoutWAL() throws IOException { 2616 FileSystem fs = FileSystem.get(CONF); 2617 Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL"); 2618 FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF); 2619 hLog.init(); 2620 // This chunk creation is done throughout the code base. Do we want to move it into core? 2621 // It is missing from this test. W/o it we NPE. 2622 region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog, 2623 COLUMN_FAMILY_BYTES); 2624 2625 Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1, 2626 System.currentTimeMillis(), KeyValue.Type.Put.getCode(), value1); 2627 final long originalSize = originalCell.getSerializedSize(); 2628 2629 Cell addCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1, 2630 System.currentTimeMillis(), KeyValue.Type.Put.getCode(), Bytes.toBytes("xxxxxxxxxx")); 2631 final long addSize = addCell.getSerializedSize(); 2632 2633 LOG.info("originalSize:" + originalSize 2634 + ", addSize:" + addSize); 2635 // start test. We expect that the addPut's durability will be replaced 2636 // by originalPut's durability. 2637 2638 // case 1: 2639 testDataInMemoryWithoutWAL(region, 2640 new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL), 2641 new Put(row).add(addCell).setDurability(Durability.SKIP_WAL), 2642 originalSize + addSize); 2643 2644 // case 2: 2645 testDataInMemoryWithoutWAL(region, 2646 new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL), 2647 new Put(row).add(addCell).setDurability(Durability.SYNC_WAL), 2648 originalSize + addSize); 2649 2650 // case 3: 2651 testDataInMemoryWithoutWAL(region, 2652 new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL), 2653 new Put(row).add(addCell).setDurability(Durability.SKIP_WAL), 2654 0); 2655 2656 // case 4: 2657 testDataInMemoryWithoutWAL(region, 2658 new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL), 2659 new Put(row).add(addCell).setDurability(Durability.SYNC_WAL), 2660 0); 2661 } 2662 2663 private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut, 2664 final Put addPut, long delta) throws IOException { 2665 final long initSize = region.getDataInMemoryWithoutWAL(); 2666 // save normalCPHost and replaced by mockedCPHost 2667 RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); 2668 RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); 2669 // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must 2670 // do below format (from Mockito doc). 2671 Mockito.doAnswer(new Answer() { 2672 @Override 2673 public Object answer(InvocationOnMock invocation) throws Throwable { 2674 MiniBatchOperationInProgress<Mutation> mb = invocation.getArgument(0); 2675 mb.addOperationsFromCP(0, new Mutation[]{addPut}); 2676 return null; 2677 } 2678 }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class)); 2679 ColumnFamilyDescriptorBuilder builder = ColumnFamilyDescriptorBuilder. 2680 newBuilder(COLUMN_FAMILY_BYTES); 2681 ScanInfo info = new ScanInfo(CONF, builder.build(), Long.MAX_VALUE, 2682 Long.MAX_VALUE, region.getCellComparator()); 2683 Mockito.when(mockedCPHost.preFlushScannerOpen(Mockito.any(HStore.class), 2684 Mockito.any())).thenReturn(info); 2685 Mockito.when(mockedCPHost.preFlush(Mockito.any(), Mockito.any(StoreScanner.class), 2686 Mockito.any())).thenAnswer(i -> i.getArgument(1)); 2687 region.setCoprocessorHost(mockedCPHost); 2688 2689 region.put(originalPut); 2690 region.setCoprocessorHost(normalCPHost); 2691 final long finalSize = region.getDataInMemoryWithoutWAL(); 2692 assertEquals("finalSize:" + finalSize + ", initSize:" 2693 + initSize + ", delta:" + delta,finalSize, initSize + delta); 2694 } 2695 2696 @Test 2697 public void testDeleteColumns_PostInsert() throws IOException, InterruptedException { 2698 Delete delete = new Delete(row); 2699 delete.addColumns(fam1, qual1); 2700 doTestDelete_AndPostInsert(delete); 2701 } 2702 2703 @Test 2704 public void testaddFamily_PostInsert() throws IOException, InterruptedException { 2705 Delete delete = new Delete(row); 2706 delete.addFamily(fam1); 2707 doTestDelete_AndPostInsert(delete); 2708 } 2709 2710 public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException { 2711 this.region = initHRegion(tableName, method, CONF, fam1); 2712 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 2713 Put put = new Put(row); 2714 put.addColumn(fam1, qual1, value1); 2715 region.put(put); 2716 2717 // now delete the value: 2718 region.delete(delete); 2719 2720 // ok put data: 2721 put = new Put(row); 2722 put.addColumn(fam1, qual1, value2); 2723 region.put(put); 2724 2725 // ok get: 2726 Get get = new Get(row); 2727 get.addColumn(fam1, qual1); 2728 2729 Result r = region.get(get); 2730 assertEquals(1, r.size()); 2731 assertArrayEquals(value2, r.getValue(fam1, qual1)); 2732 2733 // next: 2734 Scan scan = new Scan(row); 2735 scan.addColumn(fam1, qual1); 2736 InternalScanner s = region.getScanner(scan); 2737 2738 List<Cell> results = new ArrayList<>(); 2739 assertEquals(false, s.next(results)); 2740 assertEquals(1, results.size()); 2741 Cell kv = results.get(0); 2742 2743 assertArrayEquals(value2, CellUtil.cloneValue(kv)); 2744 assertArrayEquals(fam1, CellUtil.cloneFamily(kv)); 2745 assertArrayEquals(qual1, CellUtil.cloneQualifier(kv)); 2746 assertArrayEquals(row, CellUtil.cloneRow(kv)); 2747 } 2748 2749 @Test 2750 public void testDelete_CheckTimestampUpdated() throws IOException { 2751 byte[] row1 = Bytes.toBytes("row1"); 2752 byte[] col1 = Bytes.toBytes("col1"); 2753 byte[] col2 = Bytes.toBytes("col2"); 2754 byte[] col3 = Bytes.toBytes("col3"); 2755 2756 // Setting up region 2757 this.region = initHRegion(tableName, method, CONF, fam1); 2758 // Building checkerList 2759 List<Cell> kvs = new ArrayList<>(); 2760 kvs.add(new KeyValue(row1, fam1, col1, null)); 2761 kvs.add(new KeyValue(row1, fam1, col2, null)); 2762 kvs.add(new KeyValue(row1, fam1, col3, null)); 2763 2764 NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 2765 deleteMap.put(fam1, kvs); 2766 region.delete(deleteMap, Durability.SYNC_WAL); 2767 2768 // extract the key values out the memstore: 2769 // This is kinda hacky, but better than nothing... 2770 long now = System.currentTimeMillis(); 2771 AbstractMemStore memstore = (AbstractMemStore)region.getStore(fam1).memstore; 2772 Cell firstCell = memstore.getActive().first(); 2773 assertTrue(firstCell.getTimestamp() <= now); 2774 now = firstCell.getTimestamp(); 2775 for (Cell cell : memstore.getActive().getCellSet()) { 2776 assertTrue(cell.getTimestamp() <= now); 2777 now = cell.getTimestamp(); 2778 } 2779 } 2780 2781 // //////////////////////////////////////////////////////////////////////////// 2782 // Get tests 2783 // //////////////////////////////////////////////////////////////////////////// 2784 @Test 2785 public void testGet_FamilyChecker() throws IOException { 2786 byte[] row1 = Bytes.toBytes("row1"); 2787 byte[] fam1 = Bytes.toBytes("fam1"); 2788 byte[] fam2 = Bytes.toBytes("False"); 2789 byte[] col1 = Bytes.toBytes("col1"); 2790 2791 // Setting up region 2792 this.region = initHRegion(tableName, method, CONF, fam1); 2793 Get get = new Get(row1); 2794 get.addColumn(fam2, col1); 2795 2796 // Test 2797 try { 2798 region.get(get); 2799 fail("Expecting DoNotRetryIOException in get but did not get any"); 2800 } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) { 2801 LOG.info("Got expected DoNotRetryIOException successfully"); 2802 } 2803 } 2804 2805 @Test 2806 public void testGet_Basic() throws IOException { 2807 byte[] row1 = Bytes.toBytes("row1"); 2808 byte[] fam1 = Bytes.toBytes("fam1"); 2809 byte[] col1 = Bytes.toBytes("col1"); 2810 byte[] col2 = Bytes.toBytes("col2"); 2811 byte[] col3 = Bytes.toBytes("col3"); 2812 byte[] col4 = Bytes.toBytes("col4"); 2813 byte[] col5 = Bytes.toBytes("col5"); 2814 2815 // Setting up region 2816 this.region = initHRegion(tableName, method, CONF, fam1); 2817 // Add to memstore 2818 Put put = new Put(row1); 2819 put.addColumn(fam1, col1, null); 2820 put.addColumn(fam1, col2, null); 2821 put.addColumn(fam1, col3, null); 2822 put.addColumn(fam1, col4, null); 2823 put.addColumn(fam1, col5, null); 2824 region.put(put); 2825 2826 Get get = new Get(row1); 2827 get.addColumn(fam1, col2); 2828 get.addColumn(fam1, col4); 2829 // Expected result 2830 KeyValue kv1 = new KeyValue(row1, fam1, col2); 2831 KeyValue kv2 = new KeyValue(row1, fam1, col4); 2832 KeyValue[] expected = { kv1, kv2 }; 2833 2834 // Test 2835 Result res = region.get(get); 2836 assertEquals(expected.length, res.size()); 2837 for (int i = 0; i < res.size(); i++) { 2838 assertTrue(CellUtil.matchingRows(expected[i], res.rawCells()[i])); 2839 assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i])); 2840 assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i])); 2841 } 2842 2843 // Test using a filter on a Get 2844 Get g = new Get(row1); 2845 final int count = 2; 2846 g.setFilter(new ColumnCountGetFilter(count)); 2847 res = region.get(g); 2848 assertEquals(count, res.size()); 2849 } 2850 2851 @Test 2852 public void testGet_Empty() throws IOException { 2853 byte[] row = Bytes.toBytes("row"); 2854 byte[] fam = Bytes.toBytes("fam"); 2855 2856 this.region = initHRegion(tableName, method, CONF, fam); 2857 Get get = new Get(row); 2858 get.addFamily(fam); 2859 Result r = region.get(get); 2860 2861 assertTrue(r.isEmpty()); 2862 } 2863 2864 @Test 2865 public void testGetWithFilter() throws IOException, InterruptedException { 2866 byte[] row1 = Bytes.toBytes("row1"); 2867 byte[] fam1 = Bytes.toBytes("fam1"); 2868 byte[] col1 = Bytes.toBytes("col1"); 2869 byte[] value1 = Bytes.toBytes("value1"); 2870 byte[] value2 = Bytes.toBytes("value2"); 2871 2872 final int maxVersions = 3; 2873 HColumnDescriptor hcd = new HColumnDescriptor(fam1); 2874 hcd.setMaxVersions(maxVersions); 2875 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker")); 2876 htd.addFamily(hcd); 2877 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 2878 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 2879 Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log"); 2880 final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info); 2881 this.region = TEST_UTIL.createLocalHRegion(info, htd, wal); 2882 2883 // Put 4 version to memstore 2884 long ts = 0; 2885 Put put = new Put(row1, ts); 2886 put.addColumn(fam1, col1, value1); 2887 region.put(put); 2888 put = new Put(row1, ts + 1); 2889 put.addColumn(fam1, col1, Bytes.toBytes("filter1")); 2890 region.put(put); 2891 put = new Put(row1, ts + 2); 2892 put.addColumn(fam1, col1, Bytes.toBytes("filter2")); 2893 region.put(put); 2894 put = new Put(row1, ts + 3); 2895 put.addColumn(fam1, col1, value2); 2896 region.put(put); 2897 2898 Get get = new Get(row1); 2899 get.setMaxVersions(); 2900 Result res = region.get(get); 2901 // Get 3 versions, the oldest version has gone from user view 2902 assertEquals(maxVersions, res.size()); 2903 2904 get.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value"))); 2905 res = region.get(get); 2906 // When use value filter, the oldest version should still gone from user view and it 2907 // should only return one key vaule 2908 assertEquals(1, res.size()); 2909 assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0])); 2910 assertEquals(ts + 3, res.rawCells()[0].getTimestamp()); 2911 2912 region.flush(true); 2913 region.compact(true); 2914 Thread.sleep(1000); 2915 res = region.get(get); 2916 // After flush and compact, the result should be consistent with previous result 2917 assertEquals(1, res.size()); 2918 assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0])); 2919 } 2920 2921 // //////////////////////////////////////////////////////////////////////////// 2922 // Scanner tests 2923 // //////////////////////////////////////////////////////////////////////////// 2924 @Test 2925 public void testGetScanner_WithOkFamilies() throws IOException { 2926 byte[] fam1 = Bytes.toBytes("fam1"); 2927 byte[] fam2 = Bytes.toBytes("fam2"); 2928 2929 byte[][] families = { fam1, fam2 }; 2930 2931 // Setting up region 2932 this.region = initHRegion(tableName, method, CONF, families); 2933 Scan scan = new Scan(); 2934 scan.addFamily(fam1); 2935 scan.addFamily(fam2); 2936 try { 2937 region.getScanner(scan); 2938 } catch (Exception e) { 2939 assertTrue("Families could not be found in Region", false); 2940 } 2941 } 2942 2943 @Test 2944 public void testGetScanner_WithNotOkFamilies() throws IOException { 2945 byte[] fam1 = Bytes.toBytes("fam1"); 2946 byte[] fam2 = Bytes.toBytes("fam2"); 2947 2948 byte[][] families = { fam1 }; 2949 2950 // Setting up region 2951 this.region = initHRegion(tableName, method, CONF, families); 2952 Scan scan = new Scan(); 2953 scan.addFamily(fam2); 2954 boolean ok = false; 2955 try { 2956 region.getScanner(scan); 2957 } catch (Exception e) { 2958 ok = true; 2959 } 2960 assertTrue("Families could not be found in Region", ok); 2961 } 2962 2963 @Test 2964 public void testGetScanner_WithNoFamilies() throws IOException { 2965 byte[] row1 = Bytes.toBytes("row1"); 2966 byte[] fam1 = Bytes.toBytes("fam1"); 2967 byte[] fam2 = Bytes.toBytes("fam2"); 2968 byte[] fam3 = Bytes.toBytes("fam3"); 2969 byte[] fam4 = Bytes.toBytes("fam4"); 2970 2971 byte[][] families = { fam1, fam2, fam3, fam4 }; 2972 2973 // Setting up region 2974 this.region = initHRegion(tableName, method, CONF, families); 2975 // Putting data in Region 2976 Put put = new Put(row1); 2977 put.addColumn(fam1, null, null); 2978 put.addColumn(fam2, null, null); 2979 put.addColumn(fam3, null, null); 2980 put.addColumn(fam4, null, null); 2981 region.put(put); 2982 2983 Scan scan = null; 2984 HRegion.RegionScannerImpl is = null; 2985 2986 // Testing to see how many scanners that is produced by getScanner, 2987 // starting 2988 // with known number, 2 - current = 1 2989 scan = new Scan(); 2990 scan.addFamily(fam2); 2991 scan.addFamily(fam4); 2992 is = region.getScanner(scan); 2993 assertEquals(1, is.storeHeap.getHeap().size()); 2994 2995 scan = new Scan(); 2996 is = region.getScanner(scan); 2997 assertEquals(families.length - 1, is.storeHeap.getHeap().size()); 2998 } 2999 3000 /** 3001 * This method tests https://issues.apache.org/jira/browse/HBASE-2516. 3002 * 3003 * @throws IOException 3004 */ 3005 @Test 3006 public void testGetScanner_WithRegionClosed() throws IOException { 3007 byte[] fam1 = Bytes.toBytes("fam1"); 3008 byte[] fam2 = Bytes.toBytes("fam2"); 3009 3010 byte[][] families = { fam1, fam2 }; 3011 3012 // Setting up region 3013 try { 3014 this.region = initHRegion(tableName, method, CONF, families); 3015 } catch (IOException e) { 3016 e.printStackTrace(); 3017 fail("Got IOException during initHRegion, " + e.getMessage()); 3018 } 3019 region.closed.set(true); 3020 try { 3021 region.getScanner(null); 3022 fail("Expected to get an exception during getScanner on a region that is closed"); 3023 } catch (NotServingRegionException e) { 3024 // this is the correct exception that is expected 3025 } catch (IOException e) { 3026 fail("Got wrong type of exception - should be a NotServingRegionException, " + 3027 "but was an IOException: " 3028 + e.getMessage()); 3029 } 3030 } 3031 3032 @Test 3033 public void testRegionScanner_Next() throws IOException { 3034 byte[] row1 = Bytes.toBytes("row1"); 3035 byte[] row2 = Bytes.toBytes("row2"); 3036 byte[] fam1 = Bytes.toBytes("fam1"); 3037 byte[] fam2 = Bytes.toBytes("fam2"); 3038 byte[] fam3 = Bytes.toBytes("fam3"); 3039 byte[] fam4 = Bytes.toBytes("fam4"); 3040 3041 byte[][] families = { fam1, fam2, fam3, fam4 }; 3042 long ts = System.currentTimeMillis(); 3043 3044 // Setting up region 3045 this.region = initHRegion(tableName, method, CONF, families); 3046 // Putting data in Region 3047 Put put = null; 3048 put = new Put(row1); 3049 put.addColumn(fam1, (byte[]) null, ts, null); 3050 put.addColumn(fam2, (byte[]) null, ts, null); 3051 put.addColumn(fam3, (byte[]) null, ts, null); 3052 put.addColumn(fam4, (byte[]) null, ts, null); 3053 region.put(put); 3054 3055 put = new Put(row2); 3056 put.addColumn(fam1, (byte[]) null, ts, null); 3057 put.addColumn(fam2, (byte[]) null, ts, null); 3058 put.addColumn(fam3, (byte[]) null, ts, null); 3059 put.addColumn(fam4, (byte[]) null, ts, null); 3060 region.put(put); 3061 3062 Scan scan = new Scan(); 3063 scan.addFamily(fam2); 3064 scan.addFamily(fam4); 3065 InternalScanner is = region.getScanner(scan); 3066 3067 List<Cell> res = null; 3068 3069 // Result 1 3070 List<Cell> expected1 = new ArrayList<>(); 3071 expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null)); 3072 expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null)); 3073 3074 res = new ArrayList<>(); 3075 is.next(res); 3076 for (int i = 0; i < res.size(); i++) { 3077 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected1.get(i), res.get(i))); 3078 } 3079 3080 // Result 2 3081 List<Cell> expected2 = new ArrayList<>(); 3082 expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null)); 3083 expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null)); 3084 3085 res = new ArrayList<>(); 3086 is.next(res); 3087 for (int i = 0; i < res.size(); i++) { 3088 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected2.get(i), res.get(i))); 3089 } 3090 } 3091 3092 @Test 3093 public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException { 3094 byte[] row1 = Bytes.toBytes("row1"); 3095 byte[] qf1 = Bytes.toBytes("qualifier1"); 3096 byte[] qf2 = Bytes.toBytes("qualifier2"); 3097 byte[] fam1 = Bytes.toBytes("fam1"); 3098 byte[][] families = { fam1 }; 3099 3100 long ts1 = System.currentTimeMillis(); 3101 long ts2 = ts1 + 1; 3102 long ts3 = ts1 + 2; 3103 3104 // Setting up region 3105 this.region = initHRegion(tableName, method, CONF, families); 3106 // Putting data in Region 3107 Put put = null; 3108 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3109 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3110 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3111 3112 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3113 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3114 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3115 3116 put = new Put(row1); 3117 put.add(kv13); 3118 put.add(kv12); 3119 put.add(kv11); 3120 put.add(kv23); 3121 put.add(kv22); 3122 put.add(kv21); 3123 region.put(put); 3124 3125 // Expected 3126 List<Cell> expected = new ArrayList<>(); 3127 expected.add(kv13); 3128 expected.add(kv12); 3129 3130 Scan scan = new Scan(row1); 3131 scan.addColumn(fam1, qf1); 3132 scan.setMaxVersions(MAX_VERSIONS); 3133 List<Cell> actual = new ArrayList<>(); 3134 InternalScanner scanner = region.getScanner(scan); 3135 3136 boolean hasNext = scanner.next(actual); 3137 assertEquals(false, hasNext); 3138 3139 // Verify result 3140 for (int i = 0; i < expected.size(); i++) { 3141 assertEquals(expected.get(i), actual.get(i)); 3142 } 3143 } 3144 3145 @Test 3146 public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException { 3147 byte[] row1 = Bytes.toBytes("row1"); 3148 byte[] qf1 = Bytes.toBytes("qualifier1"); 3149 byte[] qf2 = Bytes.toBytes("qualifier2"); 3150 byte[] fam1 = Bytes.toBytes("fam1"); 3151 byte[][] families = { fam1 }; 3152 3153 long ts1 = 1; // System.currentTimeMillis(); 3154 long ts2 = ts1 + 1; 3155 long ts3 = ts1 + 2; 3156 3157 // Setting up region 3158 this.region = initHRegion(tableName, method, CONF, families); 3159 // Putting data in Region 3160 Put put = null; 3161 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3162 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3163 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3164 3165 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3166 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3167 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3168 3169 put = new Put(row1); 3170 put.add(kv13); 3171 put.add(kv12); 3172 put.add(kv11); 3173 put.add(kv23); 3174 put.add(kv22); 3175 put.add(kv21); 3176 region.put(put); 3177 region.flush(true); 3178 3179 // Expected 3180 List<Cell> expected = new ArrayList<>(); 3181 expected.add(kv13); 3182 expected.add(kv12); 3183 expected.add(kv23); 3184 expected.add(kv22); 3185 3186 Scan scan = new Scan(row1); 3187 scan.addColumn(fam1, qf1); 3188 scan.addColumn(fam1, qf2); 3189 scan.setMaxVersions(MAX_VERSIONS); 3190 List<Cell> actual = new ArrayList<>(); 3191 InternalScanner scanner = region.getScanner(scan); 3192 3193 boolean hasNext = scanner.next(actual); 3194 assertEquals(false, hasNext); 3195 3196 // Verify result 3197 for (int i = 0; i < expected.size(); i++) { 3198 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i))); 3199 } 3200 } 3201 3202 @Test 3203 public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws 3204 IOException { 3205 byte[] row1 = Bytes.toBytes("row1"); 3206 byte[] fam1 = Bytes.toBytes("fam1"); 3207 byte[][] families = { fam1 }; 3208 byte[] qf1 = Bytes.toBytes("qualifier1"); 3209 byte[] qf2 = Bytes.toBytes("qualifier2"); 3210 3211 long ts1 = 1; 3212 long ts2 = ts1 + 1; 3213 long ts3 = ts1 + 2; 3214 long ts4 = ts1 + 3; 3215 3216 // Setting up region 3217 this.region = initHRegion(tableName, method, CONF, families); 3218 // Putting data in Region 3219 KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null); 3220 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3221 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3222 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3223 3224 KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null); 3225 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3226 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3227 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3228 3229 Put put = null; 3230 put = new Put(row1); 3231 put.add(kv14); 3232 put.add(kv24); 3233 region.put(put); 3234 region.flush(true); 3235 3236 put = new Put(row1); 3237 put.add(kv23); 3238 put.add(kv13); 3239 region.put(put); 3240 region.flush(true); 3241 3242 put = new Put(row1); 3243 put.add(kv22); 3244 put.add(kv12); 3245 region.put(put); 3246 region.flush(true); 3247 3248 put = new Put(row1); 3249 put.add(kv21); 3250 put.add(kv11); 3251 region.put(put); 3252 3253 // Expected 3254 List<Cell> expected = new ArrayList<>(); 3255 expected.add(kv14); 3256 expected.add(kv13); 3257 expected.add(kv12); 3258 expected.add(kv24); 3259 expected.add(kv23); 3260 expected.add(kv22); 3261 3262 Scan scan = new Scan(row1); 3263 scan.addColumn(fam1, qf1); 3264 scan.addColumn(fam1, qf2); 3265 int versions = 3; 3266 scan.setMaxVersions(versions); 3267 List<Cell> actual = new ArrayList<>(); 3268 InternalScanner scanner = region.getScanner(scan); 3269 3270 boolean hasNext = scanner.next(actual); 3271 assertEquals(false, hasNext); 3272 3273 // Verify result 3274 for (int i = 0; i < expected.size(); i++) { 3275 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i))); 3276 } 3277 } 3278 3279 @Test 3280 public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException { 3281 byte[] row1 = Bytes.toBytes("row1"); 3282 byte[] qf1 = Bytes.toBytes("qualifier1"); 3283 byte[] qf2 = Bytes.toBytes("qualifier2"); 3284 byte[] fam1 = Bytes.toBytes("fam1"); 3285 byte[][] families = { fam1 }; 3286 3287 long ts1 = System.currentTimeMillis(); 3288 long ts2 = ts1 + 1; 3289 long ts3 = ts1 + 2; 3290 3291 // Setting up region 3292 this.region = initHRegion(tableName, method, CONF, families); 3293 // Putting data in Region 3294 Put put = null; 3295 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3296 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3297 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3298 3299 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3300 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3301 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3302 3303 put = new Put(row1); 3304 put.add(kv13); 3305 put.add(kv12); 3306 put.add(kv11); 3307 put.add(kv23); 3308 put.add(kv22); 3309 put.add(kv21); 3310 region.put(put); 3311 3312 // Expected 3313 List<Cell> expected = new ArrayList<>(); 3314 expected.add(kv13); 3315 expected.add(kv12); 3316 expected.add(kv23); 3317 expected.add(kv22); 3318 3319 Scan scan = new Scan(row1); 3320 scan.addFamily(fam1); 3321 scan.setMaxVersions(MAX_VERSIONS); 3322 List<Cell> actual = new ArrayList<>(); 3323 InternalScanner scanner = region.getScanner(scan); 3324 3325 boolean hasNext = scanner.next(actual); 3326 assertEquals(false, hasNext); 3327 3328 // Verify result 3329 for (int i = 0; i < expected.size(); i++) { 3330 assertEquals(expected.get(i), actual.get(i)); 3331 } 3332 } 3333 3334 @Test 3335 public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException { 3336 byte[] row1 = Bytes.toBytes("row1"); 3337 byte[] qf1 = Bytes.toBytes("qualifier1"); 3338 byte[] qf2 = Bytes.toBytes("qualifier2"); 3339 byte[] fam1 = Bytes.toBytes("fam1"); 3340 3341 long ts1 = 1; // System.currentTimeMillis(); 3342 long ts2 = ts1 + 1; 3343 long ts3 = ts1 + 2; 3344 3345 // Setting up region 3346 this.region = initHRegion(tableName, method, CONF, fam1); 3347 // Putting data in Region 3348 Put put = null; 3349 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3350 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3351 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3352 3353 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3354 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3355 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3356 3357 put = new Put(row1); 3358 put.add(kv13); 3359 put.add(kv12); 3360 put.add(kv11); 3361 put.add(kv23); 3362 put.add(kv22); 3363 put.add(kv21); 3364 region.put(put); 3365 region.flush(true); 3366 3367 // Expected 3368 List<Cell> expected = new ArrayList<>(); 3369 expected.add(kv13); 3370 expected.add(kv12); 3371 expected.add(kv23); 3372 expected.add(kv22); 3373 3374 Scan scan = new Scan(row1); 3375 scan.addFamily(fam1); 3376 scan.setMaxVersions(MAX_VERSIONS); 3377 List<Cell> actual = new ArrayList<>(); 3378 InternalScanner scanner = region.getScanner(scan); 3379 3380 boolean hasNext = scanner.next(actual); 3381 assertEquals(false, hasNext); 3382 3383 // Verify result 3384 for (int i = 0; i < expected.size(); i++) { 3385 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i))); 3386 } 3387 } 3388 3389 @Test 3390 public void testScanner_StopRow1542() throws IOException { 3391 byte[] family = Bytes.toBytes("testFamily"); 3392 this.region = initHRegion(tableName, method, CONF, family); 3393 byte[] row1 = Bytes.toBytes("row111"); 3394 byte[] row2 = Bytes.toBytes("row222"); 3395 byte[] row3 = Bytes.toBytes("row333"); 3396 byte[] row4 = Bytes.toBytes("row444"); 3397 byte[] row5 = Bytes.toBytes("row555"); 3398 3399 byte[] col1 = Bytes.toBytes("Pub111"); 3400 byte[] col2 = Bytes.toBytes("Pub222"); 3401 3402 Put put = new Put(row1); 3403 put.addColumn(family, col1, Bytes.toBytes(10L)); 3404 region.put(put); 3405 3406 put = new Put(row2); 3407 put.addColumn(family, col1, Bytes.toBytes(15L)); 3408 region.put(put); 3409 3410 put = new Put(row3); 3411 put.addColumn(family, col2, Bytes.toBytes(20L)); 3412 region.put(put); 3413 3414 put = new Put(row4); 3415 put.addColumn(family, col2, Bytes.toBytes(30L)); 3416 region.put(put); 3417 3418 put = new Put(row5); 3419 put.addColumn(family, col1, Bytes.toBytes(40L)); 3420 region.put(put); 3421 3422 Scan scan = new Scan(row3, row4); 3423 scan.setMaxVersions(); 3424 scan.addColumn(family, col1); 3425 InternalScanner s = region.getScanner(scan); 3426 3427 List<Cell> results = new ArrayList<>(); 3428 assertEquals(false, s.next(results)); 3429 assertEquals(0, results.size()); 3430 } 3431 3432 @Test 3433 public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException { 3434 byte[] row1 = Bytes.toBytes("row1"); 3435 byte[] fam1 = Bytes.toBytes("fam1"); 3436 byte[] qf1 = Bytes.toBytes("qualifier1"); 3437 byte[] qf2 = Bytes.toBytes("quateslifier2"); 3438 3439 long ts1 = 1; 3440 long ts2 = ts1 + 1; 3441 long ts3 = ts1 + 2; 3442 long ts4 = ts1 + 3; 3443 3444 // Setting up region 3445 this.region = initHRegion(tableName, method, CONF, fam1); 3446 // Putting data in Region 3447 KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null); 3448 KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null); 3449 KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null); 3450 KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null); 3451 3452 KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null); 3453 KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null); 3454 KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null); 3455 KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null); 3456 3457 Put put = null; 3458 put = new Put(row1); 3459 put.add(kv14); 3460 put.add(kv24); 3461 region.put(put); 3462 region.flush(true); 3463 3464 put = new Put(row1); 3465 put.add(kv23); 3466 put.add(kv13); 3467 region.put(put); 3468 region.flush(true); 3469 3470 put = new Put(row1); 3471 put.add(kv22); 3472 put.add(kv12); 3473 region.put(put); 3474 region.flush(true); 3475 3476 put = new Put(row1); 3477 put.add(kv21); 3478 put.add(kv11); 3479 region.put(put); 3480 3481 // Expected 3482 List<KeyValue> expected = new ArrayList<>(); 3483 expected.add(kv14); 3484 expected.add(kv13); 3485 expected.add(kv12); 3486 expected.add(kv24); 3487 expected.add(kv23); 3488 expected.add(kv22); 3489 3490 Scan scan = new Scan(row1); 3491 int versions = 3; 3492 scan.setMaxVersions(versions); 3493 List<Cell> actual = new ArrayList<>(); 3494 InternalScanner scanner = region.getScanner(scan); 3495 3496 boolean hasNext = scanner.next(actual); 3497 assertEquals(false, hasNext); 3498 3499 // Verify result 3500 for (int i = 0; i < expected.size(); i++) { 3501 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i))); 3502 } 3503 } 3504 3505 /** 3506 * Added for HBASE-5416 3507 * 3508 * Here we test scan optimization when only subset of CFs are used in filter 3509 * conditions. 3510 */ 3511 @Test 3512 public void testScanner_JoinedScanners() throws IOException { 3513 byte[] cf_essential = Bytes.toBytes("essential"); 3514 byte[] cf_joined = Bytes.toBytes("joined"); 3515 byte[] cf_alpha = Bytes.toBytes("alpha"); 3516 this.region = initHRegion(tableName, method, CONF, cf_essential, cf_joined, cf_alpha); 3517 byte[] row1 = Bytes.toBytes("row1"); 3518 byte[] row2 = Bytes.toBytes("row2"); 3519 byte[] row3 = Bytes.toBytes("row3"); 3520 3521 byte[] col_normal = Bytes.toBytes("d"); 3522 byte[] col_alpha = Bytes.toBytes("a"); 3523 3524 byte[] filtered_val = Bytes.toBytes(3); 3525 3526 Put put = new Put(row1); 3527 put.addColumn(cf_essential, col_normal, Bytes.toBytes(1)); 3528 put.addColumn(cf_joined, col_alpha, Bytes.toBytes(1)); 3529 region.put(put); 3530 3531 put = new Put(row2); 3532 put.addColumn(cf_essential, col_alpha, Bytes.toBytes(2)); 3533 put.addColumn(cf_joined, col_normal, Bytes.toBytes(2)); 3534 put.addColumn(cf_alpha, col_alpha, Bytes.toBytes(2)); 3535 region.put(put); 3536 3537 put = new Put(row3); 3538 put.addColumn(cf_essential, col_normal, filtered_val); 3539 put.addColumn(cf_joined, col_normal, filtered_val); 3540 region.put(put); 3541 3542 // Check two things: 3543 // 1. result list contains expected values 3544 // 2. result list is sorted properly 3545 3546 Scan scan = new Scan(); 3547 Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal, 3548 CompareOp.NOT_EQUAL, filtered_val); 3549 scan.setFilter(filter); 3550 scan.setLoadColumnFamiliesOnDemand(true); 3551 InternalScanner s = region.getScanner(scan); 3552 3553 List<Cell> results = new ArrayList<>(); 3554 assertTrue(s.next(results)); 3555 assertEquals(1, results.size()); 3556 results.clear(); 3557 3558 assertTrue(s.next(results)); 3559 assertEquals(3, results.size()); 3560 assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha)); 3561 assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential)); 3562 assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined)); 3563 results.clear(); 3564 3565 assertFalse(s.next(results)); 3566 assertEquals(0, results.size()); 3567 } 3568 3569 /** 3570 * HBASE-5416 3571 * 3572 * Test case when scan limits amount of KVs returned on each next() call. 3573 */ 3574 @Test 3575 public void testScanner_JoinedScannersWithLimits() throws IOException { 3576 final byte[] cf_first = Bytes.toBytes("first"); 3577 final byte[] cf_second = Bytes.toBytes("second"); 3578 3579 this.region = initHRegion(tableName, method, CONF, cf_first, cf_second); 3580 final byte[] col_a = Bytes.toBytes("a"); 3581 final byte[] col_b = Bytes.toBytes("b"); 3582 3583 Put put; 3584 3585 for (int i = 0; i < 10; i++) { 3586 put = new Put(Bytes.toBytes("r" + Integer.toString(i))); 3587 put.addColumn(cf_first, col_a, Bytes.toBytes(i)); 3588 if (i < 5) { 3589 put.addColumn(cf_first, col_b, Bytes.toBytes(i)); 3590 put.addColumn(cf_second, col_a, Bytes.toBytes(i)); 3591 put.addColumn(cf_second, col_b, Bytes.toBytes(i)); 3592 } 3593 region.put(put); 3594 } 3595 3596 Scan scan = new Scan(); 3597 scan.setLoadColumnFamiliesOnDemand(true); 3598 Filter bogusFilter = new FilterBase() { 3599 @Override 3600 public ReturnCode filterCell(final Cell ignored) throws IOException { 3601 return ReturnCode.INCLUDE; 3602 } 3603 @Override 3604 public boolean isFamilyEssential(byte[] name) { 3605 return Bytes.equals(name, cf_first); 3606 } 3607 }; 3608 3609 scan.setFilter(bogusFilter); 3610 InternalScanner s = region.getScanner(scan); 3611 3612 // Our data looks like this: 3613 // r0: first:a, first:b, second:a, second:b 3614 // r1: first:a, first:b, second:a, second:b 3615 // r2: first:a, first:b, second:a, second:b 3616 // r3: first:a, first:b, second:a, second:b 3617 // r4: first:a, first:b, second:a, second:b 3618 // r5: first:a 3619 // r6: first:a 3620 // r7: first:a 3621 // r8: first:a 3622 // r9: first:a 3623 3624 // But due to next's limit set to 3, we should get this: 3625 // r0: first:a, first:b, second:a 3626 // r0: second:b 3627 // r1: first:a, first:b, second:a 3628 // r1: second:b 3629 // r2: first:a, first:b, second:a 3630 // r2: second:b 3631 // r3: first:a, first:b, second:a 3632 // r3: second:b 3633 // r4: first:a, first:b, second:a 3634 // r4: second:b 3635 // r5: first:a 3636 // r6: first:a 3637 // r7: first:a 3638 // r8: first:a 3639 // r9: first:a 3640 3641 List<Cell> results = new ArrayList<>(); 3642 int index = 0; 3643 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(3).build(); 3644 while (true) { 3645 boolean more = s.next(results, scannerContext); 3646 if ((index >> 1) < 5) { 3647 if (index % 2 == 0) { 3648 assertEquals(3, results.size()); 3649 } else { 3650 assertEquals(1, results.size()); 3651 } 3652 } else { 3653 assertEquals(1, results.size()); 3654 } 3655 results.clear(); 3656 index++; 3657 if (!more) { 3658 break; 3659 } 3660 } 3661 } 3662 3663 /** 3664 * Write an HFile block full with Cells whose qualifier that are identical between 3665 * 0 and Short.MAX_VALUE. See HBASE-13329. 3666 * @throws Exception 3667 */ 3668 @Test 3669 public void testLongQualifier() throws Exception { 3670 byte[] family = Bytes.toBytes("family"); 3671 this.region = initHRegion(tableName, method, CONF, family); 3672 byte[] q = new byte[Short.MAX_VALUE+2]; 3673 Arrays.fill(q, 0, q.length-1, (byte)42); 3674 for (byte i=0; i<10; i++) { 3675 Put p = new Put(Bytes.toBytes("row")); 3676 // qualifiers that differ past Short.MAX_VALUE 3677 q[q.length-1]=i; 3678 p.addColumn(family, q, q); 3679 region.put(p); 3680 } 3681 region.flush(false); 3682 } 3683 3684 /** 3685 * Flushes the cache in a thread while scanning. The tests verify that the 3686 * scan is coherent - e.g. the returned results are always of the same or 3687 * later update as the previous results. 3688 * 3689 * @throws IOException 3690 * scan / compact 3691 * @throws InterruptedException 3692 * thread join 3693 */ 3694 @Test 3695 public void testFlushCacheWhileScanning() throws IOException, InterruptedException { 3696 byte[] family = Bytes.toBytes("family"); 3697 int numRows = 1000; 3698 int flushAndScanInterval = 10; 3699 int compactInterval = 10 * flushAndScanInterval; 3700 3701 this.region = initHRegion(tableName, method, CONF, family); 3702 FlushThread flushThread = new FlushThread(); 3703 try { 3704 flushThread.start(); 3705 3706 Scan scan = new Scan(); 3707 scan.addFamily(family); 3708 scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOp.EQUAL, 3709 new BinaryComparator(Bytes.toBytes(5L)))); 3710 3711 int expectedCount = 0; 3712 List<Cell> res = new ArrayList<>(); 3713 3714 boolean toggle = true; 3715 for (long i = 0; i < numRows; i++) { 3716 Put put = new Put(Bytes.toBytes(i)); 3717 put.setDurability(Durability.SKIP_WAL); 3718 put.addColumn(family, qual1, Bytes.toBytes(i % 10)); 3719 region.put(put); 3720 3721 if (i != 0 && i % compactInterval == 0) { 3722 LOG.debug("iteration = " + i+ " ts="+System.currentTimeMillis()); 3723 region.compact(true); 3724 } 3725 3726 if (i % 10 == 5L) { 3727 expectedCount++; 3728 } 3729 3730 if (i != 0 && i % flushAndScanInterval == 0) { 3731 res.clear(); 3732 InternalScanner scanner = region.getScanner(scan); 3733 if (toggle) { 3734 flushThread.flush(); 3735 } 3736 while (scanner.next(res)) 3737 ; 3738 if (!toggle) { 3739 flushThread.flush(); 3740 } 3741 assertEquals("toggle="+toggle+"i=" + i + " ts="+System.currentTimeMillis(), 3742 expectedCount, res.size()); 3743 toggle = !toggle; 3744 } 3745 } 3746 3747 } finally { 3748 try { 3749 flushThread.done(); 3750 flushThread.join(); 3751 flushThread.checkNoError(); 3752 } catch (InterruptedException ie) { 3753 LOG.warn("Caught exception when joining with flushThread", ie); 3754 } 3755 HBaseTestingUtility.closeRegionAndWAL(this.region); 3756 this.region = null; 3757 } 3758 } 3759 3760 protected class FlushThread extends Thread { 3761 private volatile boolean done; 3762 private Throwable error = null; 3763 3764 FlushThread() { 3765 super("FlushThread"); 3766 } 3767 3768 public void done() { 3769 done = true; 3770 synchronized (this) { 3771 interrupt(); 3772 } 3773 } 3774 3775 public void checkNoError() { 3776 if (error != null) { 3777 assertNull(error); 3778 } 3779 } 3780 3781 @Override 3782 public void run() { 3783 done = false; 3784 while (!done) { 3785 synchronized (this) { 3786 try { 3787 wait(); 3788 } catch (InterruptedException ignored) { 3789 if (done) { 3790 break; 3791 } 3792 } 3793 } 3794 try { 3795 region.flush(true); 3796 } catch (IOException e) { 3797 if (!done) { 3798 LOG.error("Error while flushing cache", e); 3799 error = e; 3800 } 3801 break; 3802 } catch (Throwable t) { 3803 LOG.error("Uncaught exception", t); 3804 throw t; 3805 } 3806 } 3807 } 3808 3809 public void flush() { 3810 synchronized (this) { 3811 notify(); 3812 } 3813 } 3814 } 3815 3816 /** 3817 * So can be overridden in subclasses. 3818 */ 3819 int getNumQualifiersForTestWritesWhileScanning() { 3820 return 100; 3821 } 3822 3823 /** 3824 * So can be overridden in subclasses. 3825 */ 3826 int getTestCountForTestWritesWhileScanning() { 3827 return 100; 3828 } 3829 3830 /** 3831 * Writes very wide records and scans for the latest every time.. Flushes and 3832 * compacts the region every now and then to keep things realistic. 3833 * 3834 * @throws IOException 3835 * by flush / scan / compaction 3836 * @throws InterruptedException 3837 * when joining threads 3838 */ 3839 @Test 3840 public void testWritesWhileScanning() throws IOException, InterruptedException { 3841 int testCount = getTestCountForTestWritesWhileScanning(); 3842 int numRows = 1; 3843 int numFamilies = 10; 3844 int numQualifiers = getNumQualifiersForTestWritesWhileScanning(); 3845 int flushInterval = 7; 3846 int compactInterval = 5 * flushInterval; 3847 byte[][] families = new byte[numFamilies][]; 3848 for (int i = 0; i < numFamilies; i++) { 3849 families[i] = Bytes.toBytes("family" + i); 3850 } 3851 byte[][] qualifiers = new byte[numQualifiers][]; 3852 for (int i = 0; i < numQualifiers; i++) { 3853 qualifiers[i] = Bytes.toBytes("qual" + i); 3854 } 3855 3856 this.region = initHRegion(tableName, method, CONF, families); 3857 FlushThread flushThread = new FlushThread(); 3858 PutThread putThread = new PutThread(numRows, families, qualifiers); 3859 try { 3860 putThread.start(); 3861 putThread.waitForFirstPut(); 3862 3863 flushThread.start(); 3864 3865 Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1")); 3866 3867 int expectedCount = numFamilies * numQualifiers; 3868 List<Cell> res = new ArrayList<>(); 3869 3870 long prevTimestamp = 0L; 3871 for (int i = 0; i < testCount; i++) { 3872 3873 if (i != 0 && i % compactInterval == 0) { 3874 region.compact(true); 3875 for (HStore store : region.getStores()) { 3876 store.closeAndArchiveCompactedFiles(); 3877 } 3878 } 3879 3880 if (i != 0 && i % flushInterval == 0) { 3881 flushThread.flush(); 3882 } 3883 3884 boolean previousEmpty = res.isEmpty(); 3885 res.clear(); 3886 InternalScanner scanner = region.getScanner(scan); 3887 while (scanner.next(res)) 3888 ; 3889 if (!res.isEmpty() || !previousEmpty || i > compactInterval) { 3890 assertEquals("i=" + i, expectedCount, res.size()); 3891 long timestamp = res.get(0).getTimestamp(); 3892 assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp, 3893 timestamp >= prevTimestamp); 3894 prevTimestamp = timestamp; 3895 } 3896 } 3897 3898 putThread.done(); 3899 3900 region.flush(true); 3901 3902 } finally { 3903 try { 3904 flushThread.done(); 3905 flushThread.join(); 3906 flushThread.checkNoError(); 3907 3908 putThread.join(); 3909 putThread.checkNoError(); 3910 } catch (InterruptedException ie) { 3911 LOG.warn("Caught exception when joining with flushThread", ie); 3912 } 3913 3914 try { 3915 HBaseTestingUtility.closeRegionAndWAL(this.region); 3916 } catch (DroppedSnapshotException dse) { 3917 // We could get this on way out because we interrupt the background flusher and it could 3918 // fail anywhere causing a DSE over in the background flusher... only it is not properly 3919 // dealt with so could still be memory hanging out when we get to here -- memory we can't 3920 // flush because the accounting is 'off' since original DSE. 3921 } 3922 this.region = null; 3923 } 3924 } 3925 3926 protected class PutThread extends Thread { 3927 private volatile boolean done; 3928 private volatile int numPutsFinished = 0; 3929 3930 private Throwable error = null; 3931 private int numRows; 3932 private byte[][] families; 3933 private byte[][] qualifiers; 3934 3935 private PutThread(int numRows, byte[][] families, byte[][] qualifiers) { 3936 super("PutThread"); 3937 this.numRows = numRows; 3938 this.families = families; 3939 this.qualifiers = qualifiers; 3940 } 3941 3942 /** 3943 * Block calling thread until this instance of PutThread has put at least one row. 3944 */ 3945 public void waitForFirstPut() throws InterruptedException { 3946 // wait until put thread actually puts some data 3947 while (isAlive() && numPutsFinished == 0) { 3948 checkNoError(); 3949 Thread.sleep(50); 3950 } 3951 } 3952 3953 public void done() { 3954 done = true; 3955 synchronized (this) { 3956 interrupt(); 3957 } 3958 } 3959 3960 public void checkNoError() { 3961 if (error != null) { 3962 assertNull(error); 3963 } 3964 } 3965 3966 @Override 3967 public void run() { 3968 done = false; 3969 while (!done) { 3970 try { 3971 for (int r = 0; r < numRows; r++) { 3972 byte[] row = Bytes.toBytes("row" + r); 3973 Put put = new Put(row); 3974 put.setDurability(Durability.SKIP_WAL); 3975 byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished)); 3976 for (byte[] family : families) { 3977 for (byte[] qualifier : qualifiers) { 3978 put.addColumn(family, qualifier, numPutsFinished, value); 3979 } 3980 } 3981 region.put(put); 3982 numPutsFinished++; 3983 if (numPutsFinished > 0 && numPutsFinished % 47 == 0) { 3984 System.out.println("put iteration = " + numPutsFinished); 3985 Delete delete = new Delete(row, (long) numPutsFinished - 30); 3986 region.delete(delete); 3987 } 3988 numPutsFinished++; 3989 } 3990 } catch (InterruptedIOException e) { 3991 // This is fine. It means we are done, or didn't get the lock on time 3992 LOG.info("Interrupted", e); 3993 } catch (IOException e) { 3994 LOG.error("Error while putting records", e); 3995 error = e; 3996 break; 3997 } 3998 } 3999 4000 } 4001 4002 } 4003 4004 /** 4005 * Writes very wide records and gets the latest row every time.. Flushes and 4006 * compacts the region aggressivly to catch issues. 4007 * 4008 * @throws IOException 4009 * by flush / scan / compaction 4010 * @throws InterruptedException 4011 * when joining threads 4012 */ 4013 @Test 4014 public void testWritesWhileGetting() throws Exception { 4015 int testCount = 50; 4016 int numRows = 1; 4017 int numFamilies = 10; 4018 int numQualifiers = 100; 4019 int compactInterval = 100; 4020 byte[][] families = new byte[numFamilies][]; 4021 for (int i = 0; i < numFamilies; i++) { 4022 families[i] = Bytes.toBytes("family" + i); 4023 } 4024 byte[][] qualifiers = new byte[numQualifiers][]; 4025 for (int i = 0; i < numQualifiers; i++) { 4026 qualifiers[i] = Bytes.toBytes("qual" + i); 4027 } 4028 4029 4030 // This test flushes constantly and can cause many files to be created, 4031 // possibly 4032 // extending over the ulimit. Make sure compactions are aggressive in 4033 // reducing 4034 // the number of HFiles created. 4035 Configuration conf = HBaseConfiguration.create(CONF); 4036 conf.setInt("hbase.hstore.compaction.min", 1); 4037 conf.setInt("hbase.hstore.compaction.max", 1000); 4038 this.region = initHRegion(tableName, method, conf, families); 4039 PutThread putThread = null; 4040 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 4041 try { 4042 putThread = new PutThread(numRows, families, qualifiers); 4043 putThread.start(); 4044 putThread.waitForFirstPut(); 4045 4046 // Add a thread that flushes as fast as possible 4047 ctx.addThread(new RepeatingTestThread(ctx) { 4048 4049 @Override 4050 public void doAnAction() throws Exception { 4051 region.flush(true); 4052 // Compact regularly to avoid creating too many files and exceeding 4053 // the ulimit. 4054 region.compact(false); 4055 for (HStore store : region.getStores()) { 4056 store.closeAndArchiveCompactedFiles(); 4057 } 4058 } 4059 }); 4060 ctx.startThreads(); 4061 4062 Get get = new Get(Bytes.toBytes("row0")); 4063 Result result = null; 4064 4065 int expectedCount = numFamilies * numQualifiers; 4066 4067 long prevTimestamp = 0L; 4068 for (int i = 0; i < testCount; i++) { 4069 LOG.info("testWritesWhileGetting verify turn " + i); 4070 boolean previousEmpty = result == null || result.isEmpty(); 4071 result = region.get(get); 4072 if (!result.isEmpty() || !previousEmpty || i > compactInterval) { 4073 assertEquals("i=" + i, expectedCount, result.size()); 4074 // TODO this was removed, now what dangit?! 4075 // search looking for the qualifier in question? 4076 long timestamp = 0; 4077 for (Cell kv : result.rawCells()) { 4078 if (CellUtil.matchingFamily(kv, families[0]) 4079 && CellUtil.matchingQualifier(kv, qualifiers[0])) { 4080 timestamp = kv.getTimestamp(); 4081 } 4082 } 4083 assertTrue(timestamp >= prevTimestamp); 4084 prevTimestamp = timestamp; 4085 Cell previousKV = null; 4086 4087 for (Cell kv : result.rawCells()) { 4088 byte[] thisValue = CellUtil.cloneValue(kv); 4089 if (previousKV != null) { 4090 if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) { 4091 LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV 4092 + "(memStoreTS:" + previousKV.getSequenceId() + ")" + ", New KV: " + kv 4093 + "(memStoreTS:" + kv.getSequenceId() + ")"); 4094 assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue)); 4095 } 4096 } 4097 previousKV = kv; 4098 } 4099 } 4100 } 4101 } finally { 4102 if (putThread != null) 4103 putThread.done(); 4104 4105 region.flush(true); 4106 4107 if (putThread != null) { 4108 putThread.join(); 4109 putThread.checkNoError(); 4110 } 4111 4112 ctx.stop(); 4113 HBaseTestingUtility.closeRegionAndWAL(this.region); 4114 this.region = null; 4115 } 4116 } 4117 4118 @Test 4119 public void testHolesInMeta() throws Exception { 4120 byte[] family = Bytes.toBytes("family"); 4121 this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF, 4122 false, family); 4123 byte[] rowNotServed = Bytes.toBytes("a"); 4124 Get g = new Get(rowNotServed); 4125 try { 4126 region.get(g); 4127 fail(); 4128 } catch (WrongRegionException x) { 4129 // OK 4130 } 4131 byte[] row = Bytes.toBytes("y"); 4132 g = new Get(row); 4133 region.get(g); 4134 } 4135 4136 @Test 4137 public void testIndexesScanWithOneDeletedRow() throws IOException { 4138 byte[] family = Bytes.toBytes("family"); 4139 4140 // Setting up region 4141 this.region = initHRegion(tableName, method, CONF, family); 4142 Put put = new Put(Bytes.toBytes(1L)); 4143 put.addColumn(family, qual1, 1L, Bytes.toBytes(1L)); 4144 region.put(put); 4145 4146 region.flush(true); 4147 4148 Delete delete = new Delete(Bytes.toBytes(1L), 1L); 4149 region.delete(delete); 4150 4151 put = new Put(Bytes.toBytes(2L)); 4152 put.addColumn(family, qual1, 2L, Bytes.toBytes(2L)); 4153 region.put(put); 4154 4155 Scan idxScan = new Scan(); 4156 idxScan.addFamily(family); 4157 idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList( 4158 new SingleColumnValueFilter(family, qual1, CompareOp.GREATER_OR_EQUAL, 4159 new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1, 4160 CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L)))))); 4161 InternalScanner scanner = region.getScanner(idxScan); 4162 List<Cell> res = new ArrayList<>(); 4163 4164 while (scanner.next(res)) { 4165 // Ignore res value. 4166 } 4167 assertEquals(1L, res.size()); 4168 } 4169 4170 // //////////////////////////////////////////////////////////////////////////// 4171 // Bloom filter test 4172 // //////////////////////////////////////////////////////////////////////////// 4173 @Test 4174 public void testBloomFilterSize() throws IOException { 4175 byte[] fam1 = Bytes.toBytes("fam1"); 4176 byte[] qf1 = Bytes.toBytes("col"); 4177 byte[] val1 = Bytes.toBytes("value1"); 4178 // Create Table 4179 HColumnDescriptor hcd = new HColumnDescriptor(fam1).setMaxVersions(Integer.MAX_VALUE) 4180 .setBloomFilterType(BloomType.ROWCOL); 4181 4182 HTableDescriptor htd = new HTableDescriptor(tableName); 4183 htd.addFamily(hcd); 4184 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 4185 this.region = TEST_UTIL.createLocalHRegion(info, htd); 4186 int num_unique_rows = 10; 4187 int duplicate_multiplier = 2; 4188 int num_storefiles = 4; 4189 4190 int version = 0; 4191 for (int f = 0; f < num_storefiles; f++) { 4192 for (int i = 0; i < duplicate_multiplier; i++) { 4193 for (int j = 0; j < num_unique_rows; j++) { 4194 Put put = new Put(Bytes.toBytes("row" + j)); 4195 put.setDurability(Durability.SKIP_WAL); 4196 long ts = version++; 4197 put.addColumn(fam1, qf1, ts, val1); 4198 region.put(put); 4199 } 4200 } 4201 region.flush(true); 4202 } 4203 // before compaction 4204 HStore store = region.getStore(fam1); 4205 Collection<HStoreFile> storeFiles = store.getStorefiles(); 4206 for (HStoreFile storefile : storeFiles) { 4207 StoreFileReader reader = storefile.getReader(); 4208 reader.loadFileInfo(); 4209 reader.loadBloomfilter(); 4210 assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries()); 4211 assertEquals(num_unique_rows, reader.getFilterEntries()); 4212 } 4213 4214 region.compact(true); 4215 4216 // after compaction 4217 storeFiles = store.getStorefiles(); 4218 for (HStoreFile storefile : storeFiles) { 4219 StoreFileReader reader = storefile.getReader(); 4220 reader.loadFileInfo(); 4221 reader.loadBloomfilter(); 4222 assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries()); 4223 assertEquals(num_unique_rows, reader.getFilterEntries()); 4224 } 4225 } 4226 4227 @Test 4228 public void testAllColumnsWithBloomFilter() throws IOException { 4229 byte[] TABLE = Bytes.toBytes(name.getMethodName()); 4230 byte[] FAMILY = Bytes.toBytes("family"); 4231 4232 // Create table 4233 HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE) 4234 .setBloomFilterType(BloomType.ROWCOL); 4235 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE)); 4236 htd.addFamily(hcd); 4237 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 4238 this.region = TEST_UTIL.createLocalHRegion(info, htd); 4239 // For row:0, col:0: insert versions 1 through 5. 4240 byte[] row = Bytes.toBytes("row:" + 0); 4241 byte[] column = Bytes.toBytes("column:" + 0); 4242 Put put = new Put(row); 4243 put.setDurability(Durability.SKIP_WAL); 4244 for (long idx = 1; idx <= 4; idx++) { 4245 put.addColumn(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx)); 4246 } 4247 region.put(put); 4248 4249 // Flush 4250 region.flush(true); 4251 4252 // Get rows 4253 Get get = new Get(row); 4254 get.setMaxVersions(); 4255 Cell[] kvs = region.get(get).rawCells(); 4256 4257 // Check if rows are correct 4258 assertEquals(4, kvs.length); 4259 checkOneCell(kvs[0], FAMILY, 0, 0, 4); 4260 checkOneCell(kvs[1], FAMILY, 0, 0, 3); 4261 checkOneCell(kvs[2], FAMILY, 0, 0, 2); 4262 checkOneCell(kvs[3], FAMILY, 0, 0, 1); 4263 } 4264 4265 /** 4266 * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when 4267 * issuing delete row on columns with bloom filter set to row+col 4268 * (BloomType.ROWCOL) 4269 */ 4270 @Test 4271 public void testDeleteRowWithBloomFilter() throws IOException { 4272 byte[] familyName = Bytes.toBytes("familyName"); 4273 4274 // Create Table 4275 HColumnDescriptor hcd = new HColumnDescriptor(familyName).setMaxVersions(Integer.MAX_VALUE) 4276 .setBloomFilterType(BloomType.ROWCOL); 4277 4278 HTableDescriptor htd = new HTableDescriptor(tableName); 4279 htd.addFamily(hcd); 4280 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 4281 this.region = TEST_UTIL.createLocalHRegion(info, htd); 4282 // Insert some data 4283 byte[] row = Bytes.toBytes("row1"); 4284 byte[] col = Bytes.toBytes("col1"); 4285 4286 Put put = new Put(row); 4287 put.addColumn(familyName, col, 1, Bytes.toBytes("SomeRandomValue")); 4288 region.put(put); 4289 region.flush(true); 4290 4291 Delete del = new Delete(row); 4292 region.delete(del); 4293 region.flush(true); 4294 4295 // Get remaining rows (should have none) 4296 Get get = new Get(row); 4297 get.addColumn(familyName, col); 4298 4299 Cell[] keyValues = region.get(get).rawCells(); 4300 assertEquals(0, keyValues.length); 4301 } 4302 4303 @Test 4304 public void testgetHDFSBlocksDistribution() throws Exception { 4305 HBaseTestingUtility htu = new HBaseTestingUtility(); 4306 // Why do we set the block size in this test? If we set it smaller than the kvs, then we'll 4307 // break up the file in to more pieces that can be distributed across the three nodes and we 4308 // won't be able to have the condition this test asserts; that at least one node has 4309 // a copy of all replicas -- if small block size, then blocks are spread evenly across the 4310 // the three nodes. hfilev3 with tags seems to put us over the block size. St.Ack. 4311 // final int DEFAULT_BLOCK_SIZE = 1024; 4312 // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); 4313 htu.getConfiguration().setInt("dfs.replication", 2); 4314 4315 // set up a cluster with 3 nodes 4316 MiniHBaseCluster cluster = null; 4317 String dataNodeHosts[] = new String[] { "host1", "host2", "host3" }; 4318 int regionServersCount = 3; 4319 4320 try { 4321 StartMiniClusterOption option = StartMiniClusterOption.builder() 4322 .numRegionServers(regionServersCount).dataNodeHosts(dataNodeHosts).build(); 4323 cluster = htu.startMiniCluster(option); 4324 byte[][] families = { fam1, fam2 }; 4325 Table ht = htu.createTable(tableName, families); 4326 4327 // Setting up region 4328 byte row[] = Bytes.toBytes("row1"); 4329 byte col[] = Bytes.toBytes("col1"); 4330 4331 Put put = new Put(row); 4332 put.addColumn(fam1, col, 1, Bytes.toBytes("test1")); 4333 put.addColumn(fam2, col, 1, Bytes.toBytes("test2")); 4334 ht.put(put); 4335 4336 HRegion firstRegion = htu.getHBaseCluster().getRegions(tableName).get(0); 4337 firstRegion.flush(true); 4338 HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution(); 4339 4340 // Given the default replication factor is 2 and we have 2 HFiles, 4341 // we will have total of 4 replica of blocks on 3 datanodes; thus there 4342 // must be at least one host that have replica for 2 HFiles. That host's 4343 // weight will be equal to the unique block weight. 4344 long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight(); 4345 StringBuilder sb = new StringBuilder(); 4346 for (String host: blocksDistribution1.getTopHosts()) { 4347 if (sb.length() > 0) sb.append(", "); 4348 sb.append(host); 4349 sb.append("="); 4350 sb.append(blocksDistribution1.getWeight(host)); 4351 } 4352 4353 String topHost = blocksDistribution1.getTopHosts().get(0); 4354 long topHostWeight = blocksDistribution1.getWeight(topHost); 4355 String msg = "uniqueBlocksWeight=" + uniqueBlocksWeight1 + ", topHostWeight=" + 4356 topHostWeight + ", topHost=" + topHost + "; " + sb.toString(); 4357 LOG.info(msg); 4358 assertTrue(msg, uniqueBlocksWeight1 == topHostWeight); 4359 4360 // use the static method to compute the value, it should be the same. 4361 // static method is used by load balancer or other components 4362 HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution( 4363 htu.getConfiguration(), firstRegion.getTableDescriptor(), firstRegion.getRegionInfo()); 4364 long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight(); 4365 4366 assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2); 4367 4368 ht.close(); 4369 } finally { 4370 if (cluster != null) { 4371 htu.shutdownMiniCluster(); 4372 } 4373 } 4374 } 4375 4376 /** 4377 * Testcase to check state of region initialization task set to ABORTED or not 4378 * if any exceptions during initialization 4379 * 4380 * @throws Exception 4381 */ 4382 @Test 4383 public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception { 4384 HRegionInfo info; 4385 try { 4386 FileSystem fs = Mockito.mock(FileSystem.class); 4387 Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException()); 4388 HTableDescriptor htd = new HTableDescriptor(tableName); 4389 htd.addFamily(new HColumnDescriptor("cf")); 4390 info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY, 4391 HConstants.EMPTY_BYTE_ARRAY, false); 4392 Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization"); 4393 region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null); 4394 // region initialization throws IOException and set task state to ABORTED. 4395 region.initialize(); 4396 fail("Region initialization should fail due to IOException"); 4397 } catch (IOException io) { 4398 List<MonitoredTask> tasks = TaskMonitor.get().getTasks(); 4399 for (MonitoredTask monitoredTask : tasks) { 4400 if (!(monitoredTask instanceof MonitoredRPCHandler) 4401 && monitoredTask.getDescription().contains(region.toString())) { 4402 assertTrue("Region state should be ABORTED.", 4403 monitoredTask.getState().equals(MonitoredTask.State.ABORTED)); 4404 break; 4405 } 4406 } 4407 } 4408 } 4409 4410 /** 4411 * Verifies that the .regioninfo file is written on region creation and that 4412 * is recreated if missing during region opening. 4413 */ 4414 @Test 4415 public void testRegionInfoFileCreation() throws IOException { 4416 Path rootDir = new Path(dir + "testRegionInfoFileCreation"); 4417 4418 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 4419 htd.addFamily(new HColumnDescriptor("cf")); 4420 4421 HRegionInfo hri = new HRegionInfo(htd.getTableName()); 4422 4423 // Create a region and skip the initialization (like CreateTableHandler) 4424 region = HBaseTestingUtility.createRegionAndWAL(hri, rootDir, CONF, htd, false); 4425 Path regionDir = region.getRegionFileSystem().getRegionDir(); 4426 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 4427 HBaseTestingUtility.closeRegionAndWAL(region); 4428 4429 Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE); 4430 4431 // Verify that the .regioninfo file is present 4432 assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir", 4433 fs.exists(regionInfoFile)); 4434 4435 // Try to open the region 4436 region = HRegion.openHRegion(rootDir, hri, htd, null, CONF); 4437 assertEquals(regionDir, region.getRegionFileSystem().getRegionDir()); 4438 HBaseTestingUtility.closeRegionAndWAL(region); 4439 4440 // Verify that the .regioninfo file is still there 4441 assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir", 4442 fs.exists(regionInfoFile)); 4443 4444 // Remove the .regioninfo file and verify is recreated on region open 4445 fs.delete(regionInfoFile, true); 4446 assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir", 4447 fs.exists(regionInfoFile)); 4448 4449 region = HRegion.openHRegion(rootDir, hri, htd, null, CONF); 4450// region = TEST_UTIL.openHRegion(hri, htd); 4451 assertEquals(regionDir, region.getRegionFileSystem().getRegionDir()); 4452 HBaseTestingUtility.closeRegionAndWAL(region); 4453 4454 // Verify that the .regioninfo file is still there 4455 assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir", 4456 fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE))); 4457 4458 region = null; 4459 } 4460 4461 /** 4462 * TestCase for increment 4463 */ 4464 private static class Incrementer implements Runnable { 4465 private HRegion region; 4466 private final static byte[] incRow = Bytes.toBytes("incRow"); 4467 private final static byte[] family = Bytes.toBytes("family"); 4468 private final static byte[] qualifier = Bytes.toBytes("qualifier"); 4469 private final static long ONE = 1L; 4470 private int incCounter; 4471 4472 public Incrementer(HRegion region, int incCounter) { 4473 this.region = region; 4474 this.incCounter = incCounter; 4475 } 4476 4477 @Override 4478 public void run() { 4479 int count = 0; 4480 while (count < incCounter) { 4481 Increment inc = new Increment(incRow); 4482 inc.addColumn(family, qualifier, ONE); 4483 count++; 4484 try { 4485 region.increment(inc); 4486 } catch (IOException e) { 4487 LOG.info("Count=" + count + ", " + e); 4488 break; 4489 } 4490 } 4491 } 4492 } 4493 4494 /** 4495 * Test case to check increment function with memstore flushing 4496 * @throws Exception 4497 */ 4498 @Test 4499 public void testParallelIncrementWithMemStoreFlush() throws Exception { 4500 byte[] family = Incrementer.family; 4501 this.region = initHRegion(tableName, method, CONF, family); 4502 final HRegion region = this.region; 4503 final AtomicBoolean incrementDone = new AtomicBoolean(false); 4504 Runnable flusher = new Runnable() { 4505 @Override 4506 public void run() { 4507 while (!incrementDone.get()) { 4508 try { 4509 region.flush(true); 4510 } catch (Exception e) { 4511 e.printStackTrace(); 4512 } 4513 } 4514 } 4515 }; 4516 4517 // after all increment finished, the row will increment to 20*100 = 2000 4518 int threadNum = 20; 4519 int incCounter = 100; 4520 long expected = (long) threadNum * incCounter; 4521 Thread[] incrementers = new Thread[threadNum]; 4522 Thread flushThread = new Thread(flusher); 4523 for (int i = 0; i < threadNum; i++) { 4524 incrementers[i] = new Thread(new Incrementer(this.region, incCounter)); 4525 incrementers[i].start(); 4526 } 4527 flushThread.start(); 4528 for (int i = 0; i < threadNum; i++) { 4529 incrementers[i].join(); 4530 } 4531 4532 incrementDone.set(true); 4533 flushThread.join(); 4534 4535 Get get = new Get(Incrementer.incRow); 4536 get.addColumn(Incrementer.family, Incrementer.qualifier); 4537 get.setMaxVersions(1); 4538 Result res = this.region.get(get); 4539 List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier); 4540 4541 // we just got the latest version 4542 assertEquals(1, kvs.size()); 4543 Cell kv = kvs.get(0); 4544 assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset())); 4545 } 4546 4547 /** 4548 * TestCase for append 4549 */ 4550 private static class Appender implements Runnable { 4551 private HRegion region; 4552 private final static byte[] appendRow = Bytes.toBytes("appendRow"); 4553 private final static byte[] family = Bytes.toBytes("family"); 4554 private final static byte[] qualifier = Bytes.toBytes("qualifier"); 4555 private final static byte[] CHAR = Bytes.toBytes("a"); 4556 private int appendCounter; 4557 4558 public Appender(HRegion region, int appendCounter) { 4559 this.region = region; 4560 this.appendCounter = appendCounter; 4561 } 4562 4563 @Override 4564 public void run() { 4565 int count = 0; 4566 while (count < appendCounter) { 4567 Append app = new Append(appendRow); 4568 app.addColumn(family, qualifier, CHAR); 4569 count++; 4570 try { 4571 region.append(app); 4572 } catch (IOException e) { 4573 LOG.info("Count=" + count + ", max=" + appendCounter + ", " + e); 4574 break; 4575 } 4576 } 4577 } 4578 } 4579 4580 /** 4581 * Test case to check append function with memstore flushing 4582 * @throws Exception 4583 */ 4584 @Test 4585 public void testParallelAppendWithMemStoreFlush() throws Exception { 4586 byte[] family = Appender.family; 4587 this.region = initHRegion(tableName, method, CONF, family); 4588 final HRegion region = this.region; 4589 final AtomicBoolean appendDone = new AtomicBoolean(false); 4590 Runnable flusher = new Runnable() { 4591 @Override 4592 public void run() { 4593 while (!appendDone.get()) { 4594 try { 4595 region.flush(true); 4596 } catch (Exception e) { 4597 e.printStackTrace(); 4598 } 4599 } 4600 } 4601 }; 4602 4603 // After all append finished, the value will append to threadNum * 4604 // appendCounter Appender.CHAR 4605 int threadNum = 20; 4606 int appendCounter = 100; 4607 byte[] expected = new byte[threadNum * appendCounter]; 4608 for (int i = 0; i < threadNum * appendCounter; i++) { 4609 System.arraycopy(Appender.CHAR, 0, expected, i, 1); 4610 } 4611 Thread[] appenders = new Thread[threadNum]; 4612 Thread flushThread = new Thread(flusher); 4613 for (int i = 0; i < threadNum; i++) { 4614 appenders[i] = new Thread(new Appender(this.region, appendCounter)); 4615 appenders[i].start(); 4616 } 4617 flushThread.start(); 4618 for (int i = 0; i < threadNum; i++) { 4619 appenders[i].join(); 4620 } 4621 4622 appendDone.set(true); 4623 flushThread.join(); 4624 4625 Get get = new Get(Appender.appendRow); 4626 get.addColumn(Appender.family, Appender.qualifier); 4627 get.setMaxVersions(1); 4628 Result res = this.region.get(get); 4629 List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier); 4630 4631 // we just got the latest version 4632 assertEquals(1, kvs.size()); 4633 Cell kv = kvs.get(0); 4634 byte[] appendResult = new byte[kv.getValueLength()]; 4635 System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength()); 4636 assertArrayEquals(expected, appendResult); 4637 } 4638 4639 /** 4640 * Test case to check put function with memstore flushing for same row, same ts 4641 * @throws Exception 4642 */ 4643 @Test 4644 public void testPutWithMemStoreFlush() throws Exception { 4645 byte[] family = Bytes.toBytes("family"); 4646 byte[] qualifier = Bytes.toBytes("qualifier"); 4647 byte[] row = Bytes.toBytes("putRow"); 4648 byte[] value = null; 4649 this.region = initHRegion(tableName, method, CONF, family); 4650 Put put = null; 4651 Get get = null; 4652 List<Cell> kvs = null; 4653 Result res = null; 4654 4655 put = new Put(row); 4656 value = Bytes.toBytes("value0"); 4657 put.addColumn(family, qualifier, 1234567L, value); 4658 region.put(put); 4659 get = new Get(row); 4660 get.addColumn(family, qualifier); 4661 get.setMaxVersions(); 4662 res = this.region.get(get); 4663 kvs = res.getColumnCells(family, qualifier); 4664 assertEquals(1, kvs.size()); 4665 assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0))); 4666 4667 region.flush(true); 4668 get = new Get(row); 4669 get.addColumn(family, qualifier); 4670 get.setMaxVersions(); 4671 res = this.region.get(get); 4672 kvs = res.getColumnCells(family, qualifier); 4673 assertEquals(1, kvs.size()); 4674 assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0))); 4675 4676 put = new Put(row); 4677 value = Bytes.toBytes("value1"); 4678 put.addColumn(family, qualifier, 1234567L, value); 4679 region.put(put); 4680 get = new Get(row); 4681 get.addColumn(family, qualifier); 4682 get.setMaxVersions(); 4683 res = this.region.get(get); 4684 kvs = res.getColumnCells(family, qualifier); 4685 assertEquals(1, kvs.size()); 4686 assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0))); 4687 4688 region.flush(true); 4689 get = new Get(row); 4690 get.addColumn(family, qualifier); 4691 get.setMaxVersions(); 4692 res = this.region.get(get); 4693 kvs = res.getColumnCells(family, qualifier); 4694 assertEquals(1, kvs.size()); 4695 assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0))); 4696 } 4697 4698 @Test 4699 public void testDurability() throws Exception { 4700 // there are 5 x 5 cases: 4701 // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation 4702 // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) 4703 4704 // expected cases for append and sync wal 4705 durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); 4706 durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); 4707 durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false); 4708 4709 durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); 4710 durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); 4711 durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false); 4712 4713 durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); 4714 durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); 4715 4716 durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false); 4717 durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false); 4718 4719 durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false); 4720 durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false); 4721 durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false); 4722 4723 // expected cases for async wal 4724 durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); 4725 durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); 4726 durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); 4727 durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false); 4728 durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false); 4729 durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false); 4730 4731 durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); 4732 durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); 4733 durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); 4734 durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true); 4735 durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true); 4736 durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true); 4737 4738 // expect skip wal cases 4739 durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); 4740 durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); 4741 durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); 4742 durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false); 4743 durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false); 4744 durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false); 4745 4746 } 4747 4748 private void durabilityTest(String method, Durability tableDurability, 4749 Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync, 4750 final boolean expectSyncFromLogSyncer) throws Exception { 4751 Configuration conf = HBaseConfiguration.create(CONF); 4752 method = method + "_" + tableDurability.name() + "_" + mutationDurability.name(); 4753 byte[] family = Bytes.toBytes("family"); 4754 Path logDir = new Path(new Path(dir + method), "log"); 4755 final Configuration walConf = new Configuration(conf); 4756 CommonFSUtils.setRootDir(walConf, logDir); 4757 // XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not 4758 // deal with classes which have a field of an inner class. See discussions in HBASE-15536. 4759 walConf.set(WALFactory.WAL_PROVIDER, "filesystem"); 4760 final WALFactory wals = new WALFactory(walConf, TEST_UTIL.getRandomUUID().toString()); 4761 final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build())); 4762 this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, 4763 HConstants.EMPTY_END_ROW, false, tableDurability, wal, 4764 new byte[][] { family }); 4765 4766 Put put = new Put(Bytes.toBytes("r1")); 4767 put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); 4768 put.setDurability(mutationDurability); 4769 region.put(put); 4770 4771 // verify append called or not 4772 verify(wal, expectAppend ? times(1) : never()).appendData((HRegionInfo) any(), 4773 (WALKeyImpl) any(), (WALEdit) any()); 4774 4775 // verify sync called or not 4776 if (expectSync || expectSyncFromLogSyncer) { 4777 TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() { 4778 @Override 4779 public boolean evaluate() throws Exception { 4780 try { 4781 if (expectSync) { 4782 verify(wal, times(1)).sync(anyLong()); // Hregion calls this one 4783 } else if (expectSyncFromLogSyncer) { 4784 verify(wal, times(1)).sync(); // wal syncer calls this one 4785 } 4786 } catch (Throwable ignore) { 4787 } 4788 return true; 4789 } 4790 }); 4791 } else { 4792 //verify(wal, never()).sync(anyLong()); 4793 verify(wal, never()).sync(); 4794 } 4795 4796 HBaseTestingUtility.closeRegionAndWAL(this.region); 4797 wals.close(); 4798 this.region = null; 4799 } 4800 4801 @Test 4802 public void testRegionReplicaSecondary() throws IOException { 4803 // create a primary region, load some data and flush 4804 // create a secondary region, and do a get against that 4805 Path rootDir = new Path(dir + name.getMethodName()); 4806 CommonFSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); 4807 4808 byte[][] families = new byte[][] { 4809 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") 4810 }; 4811 byte[] cq = Bytes.toBytes("cq"); 4812 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 4813 for (byte[] family : families) { 4814 htd.addFamily(new HColumnDescriptor(family)); 4815 } 4816 4817 long time = System.currentTimeMillis(); 4818 HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(), 4819 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4820 false, time, 0); 4821 HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(), 4822 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4823 false, time, 1); 4824 4825 HRegion primaryRegion = null, secondaryRegion = null; 4826 4827 try { 4828 primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri, 4829 rootDir, TEST_UTIL.getConfiguration(), htd); 4830 4831 // load some data 4832 putData(primaryRegion, 0, 1000, cq, families); 4833 4834 // flush region 4835 primaryRegion.flush(true); 4836 4837 // open secondary region 4838 secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF); 4839 4840 verifyData(secondaryRegion, 0, 1000, cq, families); 4841 } finally { 4842 if (primaryRegion != null) { 4843 HBaseTestingUtility.closeRegionAndWAL(primaryRegion); 4844 } 4845 if (secondaryRegion != null) { 4846 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); 4847 } 4848 } 4849 } 4850 4851 @Test 4852 public void testRegionReplicaSecondaryIsReadOnly() throws IOException { 4853 // create a primary region, load some data and flush 4854 // create a secondary region, and do a put against that 4855 Path rootDir = new Path(dir + name.getMethodName()); 4856 CommonFSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); 4857 4858 byte[][] families = new byte[][] { 4859 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") 4860 }; 4861 byte[] cq = Bytes.toBytes("cq"); 4862 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 4863 for (byte[] family : families) { 4864 htd.addFamily(new HColumnDescriptor(family)); 4865 } 4866 4867 long time = System.currentTimeMillis(); 4868 HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(), 4869 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4870 false, time, 0); 4871 HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(), 4872 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4873 false, time, 1); 4874 4875 HRegion primaryRegion = null, secondaryRegion = null; 4876 4877 try { 4878 primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri, 4879 rootDir, TEST_UTIL.getConfiguration(), htd); 4880 4881 // load some data 4882 putData(primaryRegion, 0, 1000, cq, families); 4883 4884 // flush region 4885 primaryRegion.flush(true); 4886 4887 // open secondary region 4888 secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF); 4889 4890 try { 4891 putData(secondaryRegion, 0, 1000, cq, families); 4892 fail("Should have thrown exception"); 4893 } catch (IOException ex) { 4894 // expected 4895 } 4896 } finally { 4897 if (primaryRegion != null) { 4898 HBaseTestingUtility.closeRegionAndWAL(primaryRegion); 4899 } 4900 if (secondaryRegion != null) { 4901 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); 4902 } 4903 } 4904 } 4905 4906 static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException { 4907 Configuration confForWAL = new Configuration(conf); 4908 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); 4909 return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)); 4910 } 4911 4912 @Test 4913 public void testCompactionFromPrimary() throws IOException { 4914 Path rootDir = new Path(dir + name.getMethodName()); 4915 CommonFSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); 4916 4917 byte[][] families = new byte[][] { 4918 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") 4919 }; 4920 byte[] cq = Bytes.toBytes("cq"); 4921 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 4922 for (byte[] family : families) { 4923 htd.addFamily(new HColumnDescriptor(family)); 4924 } 4925 4926 long time = System.currentTimeMillis(); 4927 HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(), 4928 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4929 false, time, 0); 4930 HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(), 4931 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 4932 false, time, 1); 4933 4934 HRegion primaryRegion = null, secondaryRegion = null; 4935 4936 try { 4937 primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri, 4938 rootDir, TEST_UTIL.getConfiguration(), htd); 4939 4940 // load some data 4941 putData(primaryRegion, 0, 1000, cq, families); 4942 4943 // flush region 4944 primaryRegion.flush(true); 4945 4946 // open secondary region 4947 secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF); 4948 4949 // move the file of the primary region to the archive, simulating a compaction 4950 Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles(); 4951 primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles); 4952 Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem() 4953 .getStoreFiles(families[0]); 4954 Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty()); 4955 4956 verifyData(secondaryRegion, 0, 1000, cq, families); 4957 } finally { 4958 if (primaryRegion != null) { 4959 HBaseTestingUtility.closeRegionAndWAL(primaryRegion); 4960 } 4961 if (secondaryRegion != null) { 4962 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); 4963 } 4964 } 4965 } 4966 4967 private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws 4968 IOException { 4969 putData(this.region, startRow, numRows, qf, families); 4970 } 4971 4972 private void putData(HRegion region, 4973 int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { 4974 putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families); 4975 } 4976 4977 static void putData(HRegion region, Durability durability, 4978 int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { 4979 for (int i = startRow; i < startRow + numRows; i++) { 4980 Put put = new Put(Bytes.toBytes("" + i)); 4981 put.setDurability(durability); 4982 for (byte[] family : families) { 4983 put.addColumn(family, qf, null); 4984 } 4985 region.put(put); 4986 LOG.info(put.toString()); 4987 } 4988 } 4989 4990 static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families) 4991 throws IOException { 4992 for (int i = startRow; i < startRow + numRows; i++) { 4993 byte[] row = Bytes.toBytes("" + i); 4994 Get get = new Get(row); 4995 for (byte[] family : families) { 4996 get.addColumn(family, qf); 4997 } 4998 Result result = newReg.get(get); 4999 Cell[] raw = result.rawCells(); 5000 assertEquals(families.length, result.size()); 5001 for (int j = 0; j < families.length; j++) { 5002 assertTrue(CellUtil.matchingRows(raw[j], row)); 5003 assertTrue(CellUtil.matchingFamily(raw[j], families[j])); 5004 assertTrue(CellUtil.matchingQualifier(raw[j], qf)); 5005 } 5006 } 5007 } 5008 5009 static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException { 5010 // Now I have k, get values out and assert they are as expected. 5011 Get get = new Get(k).addFamily(family).setMaxVersions(); 5012 Cell[] results = r.get(get).rawCells(); 5013 for (int j = 0; j < results.length; j++) { 5014 byte[] tmp = CellUtil.cloneValue(results[j]); 5015 // Row should be equal to value every time. 5016 assertTrue(Bytes.equals(k, tmp)); 5017 } 5018 } 5019 5020 /* 5021 * Assert first value in the passed region is <code>firstValue</code>. 5022 * 5023 * @param r 5024 * 5025 * @param fs 5026 * 5027 * @param firstValue 5028 * 5029 * @throws IOException 5030 */ 5031 protected void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue) 5032 throws IOException { 5033 byte[][] families = { fs }; 5034 Scan scan = new Scan(); 5035 for (int i = 0; i < families.length; i++) 5036 scan.addFamily(families[i]); 5037 InternalScanner s = r.getScanner(scan); 5038 try { 5039 List<Cell> curVals = new ArrayList<>(); 5040 boolean first = true; 5041 OUTER_LOOP: while (s.next(curVals)) { 5042 for (Cell kv : curVals) { 5043 byte[] val = CellUtil.cloneValue(kv); 5044 byte[] curval = val; 5045 if (first) { 5046 first = false; 5047 assertTrue(Bytes.compareTo(curval, firstValue) == 0); 5048 } else { 5049 // Not asserting anything. Might as well break. 5050 break OUTER_LOOP; 5051 } 5052 } 5053 } 5054 } finally { 5055 s.close(); 5056 } 5057 } 5058 5059 /** 5060 * Test that we get the expected flush results back 5061 */ 5062 @Test 5063 public void testFlushResult() throws IOException { 5064 byte[] family = Bytes.toBytes("family"); 5065 5066 this.region = initHRegion(tableName, method, family); 5067 5068 // empty memstore, flush doesn't run 5069 HRegion.FlushResult fr = region.flush(true); 5070 assertFalse(fr.isFlushSucceeded()); 5071 assertFalse(fr.isCompactionNeeded()); 5072 5073 // Flush enough files to get up to the threshold, doesn't need compactions 5074 for (int i = 0; i < 2; i++) { 5075 Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes()); 5076 region.put(put); 5077 fr = region.flush(true); 5078 assertTrue(fr.isFlushSucceeded()); 5079 assertFalse(fr.isCompactionNeeded()); 5080 } 5081 5082 // Two flushes after the threshold, compactions are needed 5083 for (int i = 0; i < 2; i++) { 5084 Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes()); 5085 region.put(put); 5086 fr = region.flush(true); 5087 assertTrue(fr.isFlushSucceeded()); 5088 assertTrue(fr.isCompactionNeeded()); 5089 } 5090 } 5091 5092 protected Configuration initSplit() { 5093 // Always compact if there is more than one store file. 5094 CONF.setInt("hbase.hstore.compactionThreshold", 2); 5095 5096 CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000); 5097 5098 // Increase the amount of time between client retries 5099 CONF.setLong("hbase.client.pause", 15 * 1000); 5100 5101 // This size should make it so we always split using the addContent 5102 // below. After adding all data, the first region is 1.3M 5103 CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128); 5104 return CONF; 5105 } 5106 5107 /** 5108 * @return A region on which you must call 5109 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. 5110 */ 5111 protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf, 5112 byte[]... families) throws IOException { 5113 return initHRegion(tableName, callingMethod, conf, false, families); 5114 } 5115 5116 /** 5117 * @return A region on which you must call 5118 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. 5119 */ 5120 protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf, 5121 boolean isReadOnly, byte[]... families) throws IOException { 5122 return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families); 5123 } 5124 5125 protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 5126 String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families) 5127 throws IOException { 5128 Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log"); 5129 HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey); 5130 final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri); 5131 return initHRegion(tableName, startKey, stopKey, isReadOnly, 5132 Durability.SYNC_WAL, wal, families); 5133 } 5134 5135 /** 5136 * @return A region on which you must call 5137 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. 5138 */ 5139 public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 5140 boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException { 5141 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 5142 return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, 5143 isReadOnly, durability, wal, families); 5144 } 5145 5146 /** 5147 * Assert that the passed in Cell has expected contents for the specified row, 5148 * column & timestamp. 5149 */ 5150 private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) { 5151 String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts; 5152 assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx, 5153 Bytes.toString(CellUtil.cloneRow(kv))); 5154 assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf), 5155 Bytes.toString(CellUtil.cloneFamily(kv))); 5156 assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx, 5157 Bytes.toString(CellUtil.cloneQualifier(kv))); 5158 assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp()); 5159 assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts, 5160 Bytes.toString(CellUtil.cloneValue(kv))); 5161 } 5162 5163 @Test 5164 public void testReverseScanner_FromMemStore_SingleCF_Normal() 5165 throws IOException { 5166 byte[] rowC = Bytes.toBytes("rowC"); 5167 byte[] rowA = Bytes.toBytes("rowA"); 5168 byte[] rowB = Bytes.toBytes("rowB"); 5169 byte[] cf = Bytes.toBytes("CF"); 5170 byte[][] families = { cf }; 5171 byte[] col = Bytes.toBytes("C"); 5172 long ts = 1; 5173 this.region = initHRegion(tableName, method, families); 5174 KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); 5175 KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, 5176 null); 5177 KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); 5178 KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); 5179 Put put = null; 5180 put = new Put(rowC); 5181 put.add(kv1); 5182 put.add(kv11); 5183 region.put(put); 5184 put = new Put(rowA); 5185 put.add(kv2); 5186 region.put(put); 5187 put = new Put(rowB); 5188 put.add(kv3); 5189 region.put(put); 5190 5191 Scan scan = new Scan(rowC); 5192 scan.setMaxVersions(5); 5193 scan.setReversed(true); 5194 InternalScanner scanner = region.getScanner(scan); 5195 List<Cell> currRow = new ArrayList<>(); 5196 boolean hasNext = scanner.next(currRow); 5197 assertEquals(2, currRow.size()); 5198 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5199 .get(0).getRowLength(), rowC, 0, rowC.length)); 5200 assertTrue(hasNext); 5201 currRow.clear(); 5202 hasNext = scanner.next(currRow); 5203 assertEquals(1, currRow.size()); 5204 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5205 .get(0).getRowLength(), rowB, 0, rowB.length)); 5206 assertTrue(hasNext); 5207 currRow.clear(); 5208 hasNext = scanner.next(currRow); 5209 assertEquals(1, currRow.size()); 5210 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5211 .get(0).getRowLength(), rowA, 0, rowA.length)); 5212 assertFalse(hasNext); 5213 scanner.close(); 5214 } 5215 5216 @Test 5217 public void testReverseScanner_FromMemStore_SingleCF_LargerKey() 5218 throws IOException { 5219 byte[] rowC = Bytes.toBytes("rowC"); 5220 byte[] rowA = Bytes.toBytes("rowA"); 5221 byte[] rowB = Bytes.toBytes("rowB"); 5222 byte[] rowD = Bytes.toBytes("rowD"); 5223 byte[] cf = Bytes.toBytes("CF"); 5224 byte[][] families = { cf }; 5225 byte[] col = Bytes.toBytes("C"); 5226 long ts = 1; 5227 this.region = initHRegion(tableName, method, families); 5228 KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); 5229 KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, 5230 null); 5231 KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); 5232 KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); 5233 Put put = null; 5234 put = new Put(rowC); 5235 put.add(kv1); 5236 put.add(kv11); 5237 region.put(put); 5238 put = new Put(rowA); 5239 put.add(kv2); 5240 region.put(put); 5241 put = new Put(rowB); 5242 put.add(kv3); 5243 region.put(put); 5244 5245 Scan scan = new Scan(rowD); 5246 List<Cell> currRow = new ArrayList<>(); 5247 scan.setReversed(true); 5248 scan.setMaxVersions(5); 5249 InternalScanner scanner = region.getScanner(scan); 5250 boolean hasNext = scanner.next(currRow); 5251 assertEquals(2, currRow.size()); 5252 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5253 .get(0).getRowLength(), rowC, 0, rowC.length)); 5254 assertTrue(hasNext); 5255 currRow.clear(); 5256 hasNext = scanner.next(currRow); 5257 assertEquals(1, currRow.size()); 5258 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5259 .get(0).getRowLength(), rowB, 0, rowB.length)); 5260 assertTrue(hasNext); 5261 currRow.clear(); 5262 hasNext = scanner.next(currRow); 5263 assertEquals(1, currRow.size()); 5264 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5265 .get(0).getRowLength(), rowA, 0, rowA.length)); 5266 assertFalse(hasNext); 5267 scanner.close(); 5268 } 5269 5270 @Test 5271 public void testReverseScanner_FromMemStore_SingleCF_FullScan() 5272 throws IOException { 5273 byte[] rowC = Bytes.toBytes("rowC"); 5274 byte[] rowA = Bytes.toBytes("rowA"); 5275 byte[] rowB = Bytes.toBytes("rowB"); 5276 byte[] cf = Bytes.toBytes("CF"); 5277 byte[][] families = { cf }; 5278 byte[] col = Bytes.toBytes("C"); 5279 long ts = 1; 5280 this.region = initHRegion(tableName, method, families); 5281 KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); 5282 KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, 5283 null); 5284 KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); 5285 KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); 5286 Put put = null; 5287 put = new Put(rowC); 5288 put.add(kv1); 5289 put.add(kv11); 5290 region.put(put); 5291 put = new Put(rowA); 5292 put.add(kv2); 5293 region.put(put); 5294 put = new Put(rowB); 5295 put.add(kv3); 5296 region.put(put); 5297 Scan scan = new Scan(); 5298 List<Cell> currRow = new ArrayList<>(); 5299 scan.setReversed(true); 5300 InternalScanner scanner = region.getScanner(scan); 5301 boolean hasNext = scanner.next(currRow); 5302 assertEquals(1, currRow.size()); 5303 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5304 .get(0).getRowLength(), rowC, 0, rowC.length)); 5305 assertTrue(hasNext); 5306 currRow.clear(); 5307 hasNext = scanner.next(currRow); 5308 assertEquals(1, currRow.size()); 5309 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5310 .get(0).getRowLength(), rowB, 0, rowB.length)); 5311 assertTrue(hasNext); 5312 currRow.clear(); 5313 hasNext = scanner.next(currRow); 5314 assertEquals(1, currRow.size()); 5315 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5316 .get(0).getRowLength(), rowA, 0, rowA.length)); 5317 assertFalse(hasNext); 5318 scanner.close(); 5319 } 5320 5321 @Test 5322 public void testReverseScanner_moreRowsMayExistAfter() throws IOException { 5323 // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop 5324 byte[] rowA = Bytes.toBytes("rowA"); 5325 byte[] rowB = Bytes.toBytes("rowB"); 5326 byte[] rowC = Bytes.toBytes("rowC"); 5327 byte[] rowD = Bytes.toBytes("rowD"); 5328 byte[] rowE = Bytes.toBytes("rowE"); 5329 byte[] cf = Bytes.toBytes("CF"); 5330 byte[][] families = { cf }; 5331 byte[] col1 = Bytes.toBytes("col1"); 5332 byte[] col2 = Bytes.toBytes("col2"); 5333 long ts = 1; 5334 this.region = initHRegion(tableName, method, families); 5335 KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); 5336 KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); 5337 KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); 5338 KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); 5339 KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); 5340 KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); 5341 Put put = null; 5342 put = new Put(rowA); 5343 put.add(kv1); 5344 region.put(put); 5345 put = new Put(rowB); 5346 put.add(kv2); 5347 region.put(put); 5348 put = new Put(rowC); 5349 put.add(kv3); 5350 region.put(put); 5351 put = new Put(rowD); 5352 put.add(kv4_1); 5353 region.put(put); 5354 put = new Put(rowD); 5355 put.add(kv4_2); 5356 region.put(put); 5357 put = new Put(rowE); 5358 put.add(kv5); 5359 region.put(put); 5360 region.flush(true); 5361 Scan scan = new Scan(rowD, rowA); 5362 scan.addColumn(families[0], col1); 5363 scan.setReversed(true); 5364 List<Cell> currRow = new ArrayList<>(); 5365 InternalScanner scanner = region.getScanner(scan); 5366 boolean hasNext = scanner.next(currRow); 5367 assertEquals(1, currRow.size()); 5368 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5369 .get(0).getRowLength(), rowD, 0, rowD.length)); 5370 assertTrue(hasNext); 5371 currRow.clear(); 5372 hasNext = scanner.next(currRow); 5373 assertEquals(1, currRow.size()); 5374 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5375 .get(0).getRowLength(), rowC, 0, rowC.length)); 5376 assertTrue(hasNext); 5377 currRow.clear(); 5378 hasNext = scanner.next(currRow); 5379 assertEquals(1, currRow.size()); 5380 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5381 .get(0).getRowLength(), rowB, 0, rowB.length)); 5382 assertFalse(hasNext); 5383 scanner.close(); 5384 5385 scan = new Scan(rowD, rowA); 5386 scan.addColumn(families[0], col2); 5387 scan.setReversed(true); 5388 currRow.clear(); 5389 scanner = region.getScanner(scan); 5390 hasNext = scanner.next(currRow); 5391 assertEquals(1, currRow.size()); 5392 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5393 .get(0).getRowLength(), rowD, 0, rowD.length)); 5394 scanner.close(); 5395 } 5396 5397 @Test 5398 public void testReverseScanner_smaller_blocksize() throws IOException { 5399 // case to ensure no conflict with HFile index optimization 5400 byte[] rowA = Bytes.toBytes("rowA"); 5401 byte[] rowB = Bytes.toBytes("rowB"); 5402 byte[] rowC = Bytes.toBytes("rowC"); 5403 byte[] rowD = Bytes.toBytes("rowD"); 5404 byte[] rowE = Bytes.toBytes("rowE"); 5405 byte[] cf = Bytes.toBytes("CF"); 5406 byte[][] families = { cf }; 5407 byte[] col1 = Bytes.toBytes("col1"); 5408 byte[] col2 = Bytes.toBytes("col2"); 5409 long ts = 1; 5410 HBaseConfiguration config = new HBaseConfiguration(); 5411 config.setInt("test.block.size", 1); 5412 this.region = initHRegion(tableName, method, config, families); 5413 KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); 5414 KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); 5415 KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); 5416 KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); 5417 KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); 5418 KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); 5419 Put put = null; 5420 put = new Put(rowA); 5421 put.add(kv1); 5422 region.put(put); 5423 put = new Put(rowB); 5424 put.add(kv2); 5425 region.put(put); 5426 put = new Put(rowC); 5427 put.add(kv3); 5428 region.put(put); 5429 put = new Put(rowD); 5430 put.add(kv4_1); 5431 region.put(put); 5432 put = new Put(rowD); 5433 put.add(kv4_2); 5434 region.put(put); 5435 put = new Put(rowE); 5436 put.add(kv5); 5437 region.put(put); 5438 region.flush(true); 5439 Scan scan = new Scan(rowD, rowA); 5440 scan.addColumn(families[0], col1); 5441 scan.setReversed(true); 5442 List<Cell> currRow = new ArrayList<>(); 5443 InternalScanner scanner = region.getScanner(scan); 5444 boolean hasNext = scanner.next(currRow); 5445 assertEquals(1, currRow.size()); 5446 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5447 .get(0).getRowLength(), rowD, 0, rowD.length)); 5448 assertTrue(hasNext); 5449 currRow.clear(); 5450 hasNext = scanner.next(currRow); 5451 assertEquals(1, currRow.size()); 5452 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5453 .get(0).getRowLength(), rowC, 0, rowC.length)); 5454 assertTrue(hasNext); 5455 currRow.clear(); 5456 hasNext = scanner.next(currRow); 5457 assertEquals(1, currRow.size()); 5458 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5459 .get(0).getRowLength(), rowB, 0, rowB.length)); 5460 assertFalse(hasNext); 5461 scanner.close(); 5462 5463 scan = new Scan(rowD, rowA); 5464 scan.addColumn(families[0], col2); 5465 scan.setReversed(true); 5466 currRow.clear(); 5467 scanner = region.getScanner(scan); 5468 hasNext = scanner.next(currRow); 5469 assertEquals(1, currRow.size()); 5470 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5471 .get(0).getRowLength(), rowD, 0, rowD.length)); 5472 scanner.close(); 5473 } 5474 5475 @Test 5476 public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1() 5477 throws IOException { 5478 byte[] row0 = Bytes.toBytes("row0"); // 1 kv 5479 byte[] row1 = Bytes.toBytes("row1"); // 2 kv 5480 byte[] row2 = Bytes.toBytes("row2"); // 4 kv 5481 byte[] row3 = Bytes.toBytes("row3"); // 2 kv 5482 byte[] row4 = Bytes.toBytes("row4"); // 5 kv 5483 byte[] row5 = Bytes.toBytes("row5"); // 2 kv 5484 byte[] cf1 = Bytes.toBytes("CF1"); 5485 byte[] cf2 = Bytes.toBytes("CF2"); 5486 byte[] cf3 = Bytes.toBytes("CF3"); 5487 byte[][] families = { cf1, cf2, cf3 }; 5488 byte[] col = Bytes.toBytes("C"); 5489 long ts = 1; 5490 HBaseConfiguration conf = new HBaseConfiguration(); 5491 // disable compactions in this test. 5492 conf.setInt("hbase.hstore.compactionThreshold", 10000); 5493 this.region = initHRegion(tableName, method, conf, families); 5494 // kv naming style: kv(row number) totalKvCountInThisRow seq no 5495 KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put, 5496 null); 5497 KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put, 5498 null); 5499 KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1, 5500 KeyValue.Type.Put, null); 5501 KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, 5502 null); 5503 KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put, 5504 null); 5505 KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put, 5506 null); 5507 KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4, 5508 KeyValue.Type.Put, null); 5509 KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put, 5510 null); 5511 KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4, 5512 KeyValue.Type.Put, null); 5513 KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put, 5514 null); 5515 KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put, 5516 null); 5517 KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5, 5518 KeyValue.Type.Put, null); 5519 KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put, 5520 null); 5521 KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3, 5522 KeyValue.Type.Put, null); 5523 KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put, 5524 null); 5525 KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put, 5526 null); 5527 // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv) 5528 Put put = null; 5529 put = new Put(row1); 5530 put.add(kv1_2_1); 5531 region.put(put); 5532 put = new Put(row2); 5533 put.add(kv2_4_1); 5534 region.put(put); 5535 put = new Put(row4); 5536 put.add(kv4_5_4); 5537 put.add(kv4_5_5); 5538 region.put(put); 5539 region.flush(true); 5540 // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv) 5541 put = new Put(row4); 5542 put.add(kv4_5_1); 5543 put.add(kv4_5_3); 5544 region.put(put); 5545 put = new Put(row1); 5546 put.add(kv1_2_2); 5547 region.put(put); 5548 put = new Put(row2); 5549 put.add(kv2_4_4); 5550 region.put(put); 5551 region.flush(true); 5552 // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv) 5553 put = new Put(row4); 5554 put.add(kv4_5_2); 5555 region.put(put); 5556 put = new Put(row2); 5557 put.add(kv2_4_2); 5558 put.add(kv2_4_3); 5559 region.put(put); 5560 put = new Put(row3); 5561 put.add(kv3_2_2); 5562 region.put(put); 5563 region.flush(true); 5564 // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max) 5565 // ( 2 kv) 5566 put = new Put(row0); 5567 put.add(kv0_1_1); 5568 region.put(put); 5569 put = new Put(row3); 5570 put.add(kv3_2_1); 5571 region.put(put); 5572 put = new Put(row5); 5573 put.add(kv5_2_1); 5574 put.add(kv5_2_2); 5575 region.put(put); 5576 // scan range = ["row4", min), skip the max "row5" 5577 Scan scan = new Scan(row4); 5578 scan.setMaxVersions(5); 5579 scan.setBatch(3); 5580 scan.setReversed(true); 5581 InternalScanner scanner = region.getScanner(scan); 5582 List<Cell> currRow = new ArrayList<>(); 5583 boolean hasNext = false; 5584 // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not 5585 // included in scan range 5586 // "row4" takes 2 next() calls since batch=3 5587 hasNext = scanner.next(currRow); 5588 assertEquals(3, currRow.size()); 5589 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5590 .get(0).getRowLength(), row4, 0, row4.length)); 5591 assertTrue(hasNext); 5592 currRow.clear(); 5593 hasNext = scanner.next(currRow); 5594 assertEquals(2, currRow.size()); 5595 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), 5596 currRow.get(0).getRowLength(), row4, 0, 5597 row4.length)); 5598 assertTrue(hasNext); 5599 // 2. scan out "row3" (2 kv) 5600 currRow.clear(); 5601 hasNext = scanner.next(currRow); 5602 assertEquals(2, currRow.size()); 5603 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5604 .get(0).getRowLength(), row3, 0, row3.length)); 5605 assertTrue(hasNext); 5606 // 3. scan out "row2" (4 kvs) 5607 // "row2" takes 2 next() calls since batch=3 5608 currRow.clear(); 5609 hasNext = scanner.next(currRow); 5610 assertEquals(3, currRow.size()); 5611 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5612 .get(0).getRowLength(), row2, 0, row2.length)); 5613 assertTrue(hasNext); 5614 currRow.clear(); 5615 hasNext = scanner.next(currRow); 5616 assertEquals(1, currRow.size()); 5617 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5618 .get(0).getRowLength(), row2, 0, row2.length)); 5619 assertTrue(hasNext); 5620 // 4. scan out "row1" (2 kv) 5621 currRow.clear(); 5622 hasNext = scanner.next(currRow); 5623 assertEquals(2, currRow.size()); 5624 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5625 .get(0).getRowLength(), row1, 0, row1.length)); 5626 assertTrue(hasNext); 5627 // 5. scan out "row0" (1 kv) 5628 currRow.clear(); 5629 hasNext = scanner.next(currRow); 5630 assertEquals(1, currRow.size()); 5631 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5632 .get(0).getRowLength(), row0, 0, row0.length)); 5633 assertFalse(hasNext); 5634 5635 scanner.close(); 5636 } 5637 5638 @Test 5639 public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2() 5640 throws IOException { 5641 byte[] row1 = Bytes.toBytes("row1"); 5642 byte[] row2 = Bytes.toBytes("row2"); 5643 byte[] row3 = Bytes.toBytes("row3"); 5644 byte[] row4 = Bytes.toBytes("row4"); 5645 byte[] cf1 = Bytes.toBytes("CF1"); 5646 byte[] cf2 = Bytes.toBytes("CF2"); 5647 byte[] cf3 = Bytes.toBytes("CF3"); 5648 byte[] cf4 = Bytes.toBytes("CF4"); 5649 byte[][] families = { cf1, cf2, cf3, cf4 }; 5650 byte[] col = Bytes.toBytes("C"); 5651 long ts = 1; 5652 HBaseConfiguration conf = new HBaseConfiguration(); 5653 // disable compactions in this test. 5654 conf.setInt("hbase.hstore.compactionThreshold", 10000); 5655 this.region = initHRegion(tableName, method, conf, families); 5656 KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null); 5657 KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null); 5658 KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null); 5659 KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null); 5660 // storefile1 5661 Put put = new Put(row1); 5662 put.add(kv1); 5663 region.put(put); 5664 region.flush(true); 5665 // storefile2 5666 put = new Put(row2); 5667 put.add(kv2); 5668 region.put(put); 5669 region.flush(true); 5670 // storefile3 5671 put = new Put(row3); 5672 put.add(kv3); 5673 region.put(put); 5674 region.flush(true); 5675 // memstore 5676 put = new Put(row4); 5677 put.add(kv4); 5678 region.put(put); 5679 // scan range = ["row4", min) 5680 Scan scan = new Scan(row4); 5681 scan.setReversed(true); 5682 scan.setBatch(10); 5683 InternalScanner scanner = region.getScanner(scan); 5684 List<Cell> currRow = new ArrayList<>(); 5685 boolean hasNext = scanner.next(currRow); 5686 assertEquals(1, currRow.size()); 5687 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5688 .get(0).getRowLength(), row4, 0, row4.length)); 5689 assertTrue(hasNext); 5690 currRow.clear(); 5691 hasNext = scanner.next(currRow); 5692 assertEquals(1, currRow.size()); 5693 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5694 .get(0).getRowLength(), row3, 0, row3.length)); 5695 assertTrue(hasNext); 5696 currRow.clear(); 5697 hasNext = scanner.next(currRow); 5698 assertEquals(1, currRow.size()); 5699 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5700 .get(0).getRowLength(), row2, 0, row2.length)); 5701 assertTrue(hasNext); 5702 currRow.clear(); 5703 hasNext = scanner.next(currRow); 5704 assertEquals(1, currRow.size()); 5705 assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow 5706 .get(0).getRowLength(), row1, 0, row1.length)); 5707 assertFalse(hasNext); 5708 } 5709 5710 /** 5711 * Test for HBASE-14497: Reverse Scan threw StackOverflow caused by readPt checking 5712 */ 5713 @Test 5714 public void testReverseScanner_StackOverflow() throws IOException { 5715 byte[] cf1 = Bytes.toBytes("CF1"); 5716 byte[][] families = {cf1}; 5717 byte[] col = Bytes.toBytes("C"); 5718 HBaseConfiguration conf = new HBaseConfiguration(); 5719 this.region = initHRegion(tableName, method, conf, families); 5720 // setup with one storefile and one memstore, to create scanner and get an earlier readPt 5721 Put put = new Put(Bytes.toBytes("19998")); 5722 put.addColumn(cf1, col, Bytes.toBytes("val")); 5723 region.put(put); 5724 region.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 5725 Put put2 = new Put(Bytes.toBytes("19997")); 5726 put2.addColumn(cf1, col, Bytes.toBytes("val")); 5727 region.put(put2); 5728 5729 Scan scan = new Scan(Bytes.toBytes("19998")); 5730 scan.setReversed(true); 5731 InternalScanner scanner = region.getScanner(scan); 5732 5733 // create one storefile contains many rows will be skipped 5734 // to check StoreFileScanner.seekToPreviousRow 5735 for (int i = 10000; i < 20000; i++) { 5736 Put p = new Put(Bytes.toBytes(""+i)); 5737 p.addColumn(cf1, col, Bytes.toBytes("" + i)); 5738 region.put(p); 5739 } 5740 region.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 5741 5742 // create one memstore contains many rows will be skipped 5743 // to check MemStoreScanner.seekToPreviousRow 5744 for (int i = 10000; i < 20000; i++) { 5745 Put p = new Put(Bytes.toBytes(""+i)); 5746 p.addColumn(cf1, col, Bytes.toBytes("" + i)); 5747 region.put(p); 5748 } 5749 5750 List<Cell> currRow = new ArrayList<>(); 5751 boolean hasNext; 5752 do { 5753 hasNext = scanner.next(currRow); 5754 } while (hasNext); 5755 assertEquals(2, currRow.size()); 5756 assertEquals("19998", Bytes.toString(currRow.get(0).getRowArray(), 5757 currRow.get(0).getRowOffset(), currRow.get(0).getRowLength())); 5758 assertEquals("19997", Bytes.toString(currRow.get(1).getRowArray(), 5759 currRow.get(1).getRowOffset(), currRow.get(1).getRowLength())); 5760 } 5761 5762 @Test 5763 public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception { 5764 byte[] cf1 = Bytes.toBytes("CF1"); 5765 byte[][] families = { cf1 }; 5766 byte[] col = Bytes.toBytes("C"); 5767 HBaseConfiguration conf = new HBaseConfiguration(); 5768 this.region = initHRegion(tableName, method, conf, families); 5769 // setup with one storefile and one memstore, to create scanner and get an earlier readPt 5770 Put put = new Put(Bytes.toBytes("19996")); 5771 put.addColumn(cf1, col, Bytes.toBytes("val")); 5772 region.put(put); 5773 Put put2 = new Put(Bytes.toBytes("19995")); 5774 put2.addColumn(cf1, col, Bytes.toBytes("val")); 5775 region.put(put2); 5776 // create a reverse scan 5777 Scan scan = new Scan(Bytes.toBytes("19996")); 5778 scan.setReversed(true); 5779 RegionScannerImpl scanner = region.getScanner(scan); 5780 5781 // flush the cache. This will reset the store scanner 5782 region.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 5783 5784 // create one memstore contains many rows will be skipped 5785 // to check MemStoreScanner.seekToPreviousRow 5786 for (int i = 10000; i < 20000; i++) { 5787 Put p = new Put(Bytes.toBytes("" + i)); 5788 p.addColumn(cf1, col, Bytes.toBytes("" + i)); 5789 region.put(p); 5790 } 5791 List<Cell> currRow = new ArrayList<>(); 5792 boolean hasNext; 5793 boolean assertDone = false; 5794 do { 5795 hasNext = scanner.next(currRow); 5796 // With HBASE-15871, after the scanner is reset the memstore scanner should not be 5797 // added here 5798 if (!assertDone) { 5799 StoreScanner current = 5800 (StoreScanner) (scanner.storeHeap).getCurrentForTesting(); 5801 List<KeyValueScanner> scanners = current.getAllScannersForTesting(); 5802 assertEquals("There should be only one scanner the store file scanner", 1, 5803 scanners.size()); 5804 assertDone = true; 5805 } 5806 } while (hasNext); 5807 assertEquals(2, currRow.size()); 5808 assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(), 5809 currRow.get(0).getRowOffset(), currRow.get(0).getRowLength())); 5810 assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(), 5811 currRow.get(1).getRowOffset(), currRow.get(1).getRowLength())); 5812 } 5813 5814 @Test 5815 public void testReverseScanWhenPutCellsAfterOpenReverseScan() throws Exception { 5816 byte[] cf1 = Bytes.toBytes("CF1"); 5817 byte[][] families = { cf1 }; 5818 byte[] col = Bytes.toBytes("C"); 5819 5820 HBaseConfiguration conf = new HBaseConfiguration(); 5821 this.region = initHRegion(tableName, method, conf, families); 5822 5823 Put put = new Put(Bytes.toBytes("199996")); 5824 put.addColumn(cf1, col, Bytes.toBytes("val")); 5825 region.put(put); 5826 Put put2 = new Put(Bytes.toBytes("199995")); 5827 put2.addColumn(cf1, col, Bytes.toBytes("val")); 5828 region.put(put2); 5829 5830 // Create a reverse scan 5831 Scan scan = new Scan(Bytes.toBytes("199996")); 5832 scan.setReversed(true); 5833 RegionScannerImpl scanner = region.getScanner(scan); 5834 5835 // Put a lot of cells that have sequenceIDs grater than the readPt of the reverse scan 5836 for (int i = 100000; i < 200000; i++) { 5837 Put p = new Put(Bytes.toBytes("" + i)); 5838 p.addColumn(cf1, col, Bytes.toBytes("" + i)); 5839 region.put(p); 5840 } 5841 List<Cell> currRow = new ArrayList<>(); 5842 boolean hasNext; 5843 do { 5844 hasNext = scanner.next(currRow); 5845 } while (hasNext); 5846 5847 assertEquals(2, currRow.size()); 5848 assertEquals("199996", Bytes.toString(currRow.get(0).getRowArray(), 5849 currRow.get(0).getRowOffset(), currRow.get(0).getRowLength())); 5850 assertEquals("199995", Bytes.toString(currRow.get(1).getRowArray(), 5851 currRow.get(1).getRowOffset(), currRow.get(1).getRowLength())); 5852 } 5853 5854 @Test 5855 public void testWriteRequestsCounter() throws IOException { 5856 byte[] fam = Bytes.toBytes("info"); 5857 byte[][] families = { fam }; 5858 this.region = initHRegion(tableName, method, CONF, families); 5859 5860 Assert.assertEquals(0L, region.getWriteRequestsCount()); 5861 5862 Put put = new Put(row); 5863 put.addColumn(fam, fam, fam); 5864 5865 Assert.assertEquals(0L, region.getWriteRequestsCount()); 5866 region.put(put); 5867 Assert.assertEquals(1L, region.getWriteRequestsCount()); 5868 region.put(put); 5869 Assert.assertEquals(2L, region.getWriteRequestsCount()); 5870 region.put(put); 5871 Assert.assertEquals(3L, region.getWriteRequestsCount()); 5872 5873 region.delete(new Delete(row)); 5874 Assert.assertEquals(4L, region.getWriteRequestsCount()); 5875 } 5876 5877 @Test 5878 public void testOpenRegionWrittenToWAL() throws Exception { 5879 final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42); 5880 final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); 5881 5882 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 5883 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1)) 5884 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build(); 5885 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 5886 5887 // open the region w/o rss and wal and flush some files 5888 region = 5889 HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL 5890 .getConfiguration(), htd); 5891 assertNotNull(region); 5892 5893 // create a file in fam1 for the region before opening in OpenRegionHandler 5894 region.put(new Put(Bytes.toBytes("a")).addColumn(fam1, fam1, fam1)); 5895 region.flush(true); 5896 HBaseTestingUtility.closeRegionAndWAL(region); 5897 5898 ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class); 5899 5900 // capture append() calls 5901 WAL wal = mockWAL(); 5902 when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal); 5903 5904 region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), 5905 TEST_UTIL.getConfiguration(), rss, null); 5906 5907 verify(wal, times(1)).appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), 5908 editCaptor.capture()); 5909 5910 WALEdit edit = editCaptor.getValue(); 5911 assertNotNull(edit); 5912 assertNotNull(edit.getCells()); 5913 assertEquals(1, edit.getCells().size()); 5914 RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0)); 5915 assertNotNull(desc); 5916 5917 LOG.info("RegionEventDescriptor from WAL: " + desc); 5918 5919 assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType()); 5920 assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes())); 5921 assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(), 5922 hri.getEncodedNameAsBytes())); 5923 assertTrue(desc.getLogSequenceNumber() > 0); 5924 assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer())); 5925 assertEquals(2, desc.getStoresCount()); 5926 5927 StoreDescriptor store = desc.getStores(0); 5928 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1)); 5929 assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1)); 5930 assertEquals(1, store.getStoreFileCount()); // 1store file 5931 assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative 5932 5933 store = desc.getStores(1); 5934 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2)); 5935 assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2)); 5936 assertEquals(0, store.getStoreFileCount()); // no store files 5937 } 5938 5939 // Helper for test testOpenRegionWrittenToWALForLogReplay 5940 static class HRegionWithSeqId extends HRegion { 5941 public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs, 5942 final Configuration confParam, final RegionInfo regionInfo, 5943 final TableDescriptor htd, final RegionServerServices rsServices) { 5944 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 5945 } 5946 @Override 5947 protected long getNextSequenceId(WAL wal) throws IOException { 5948 return 42; 5949 } 5950 } 5951 5952 @Test 5953 public void testFlushedFileWithNoTags() throws Exception { 5954 final TableName tableName = TableName.valueOf(name.getMethodName()); 5955 HTableDescriptor htd = new HTableDescriptor(tableName); 5956 htd.addFamily(new HColumnDescriptor(fam1)); 5957 HRegionInfo info = new HRegionInfo(tableName, null, null, false); 5958 Path path = TEST_UTIL.getDataTestDir(getClass().getSimpleName()); 5959 region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd); 5960 Put put = new Put(Bytes.toBytes("a-b-0-0")); 5961 put.addColumn(fam1, qual1, Bytes.toBytes("c1-value")); 5962 region.put(put); 5963 region.flush(true); 5964 HStore store = region.getStore(fam1); 5965 Collection<HStoreFile> storefiles = store.getStorefiles(); 5966 for (HStoreFile sf : storefiles) { 5967 assertFalse("Tags should not be present " 5968 ,sf.getReader().getHFileReader().getFileContext().isIncludesTags()); 5969 } 5970 } 5971 5972 /** 5973 * Utility method to setup a WAL mock. 5974 * <p/> 5975 * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs. 5976 * @return a mock WAL 5977 */ 5978 private WAL mockWAL() throws IOException { 5979 WAL wal = mock(WAL.class); 5980 when(wal.appendData(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class))) 5981 .thenAnswer(new Answer<Long>() { 5982 @Override 5983 public Long answer(InvocationOnMock invocation) throws Throwable { 5984 WALKeyImpl key = invocation.getArgument(1); 5985 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); 5986 key.setWriteEntry(we); 5987 return 1L; 5988 } 5989 }); 5990 when(wal.appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class))). 5991 thenAnswer(new Answer<Long>() { 5992 @Override 5993 public Long answer(InvocationOnMock invocation) throws Throwable { 5994 WALKeyImpl key = invocation.getArgument(1); 5995 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); 5996 key.setWriteEntry(we); 5997 return 1L; 5998 } 5999 }); 6000 return wal; 6001 } 6002 6003 @Test 6004 public void testCloseRegionWrittenToWAL() throws Exception { 6005 Path rootDir = new Path(dir + name.getMethodName()); 6006 CommonFSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); 6007 6008 final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42); 6009 final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); 6010 6011 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 6012 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1)) 6013 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build(); 6014 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 6015 6016 ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class); 6017 6018 // capture append() calls 6019 WAL wal = mockWAL(); 6020 when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal); 6021 6022 6023 // create and then open a region first so that it can be closed later 6024 region = HRegion.createHRegion(hri, rootDir, TEST_UTIL.getConfiguration(), htd, rss.getWAL(hri)); 6025 region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), 6026 TEST_UTIL.getConfiguration(), rss, null); 6027 6028 // close the region 6029 region.close(false); 6030 6031 // 2 times, one for region open, the other close region 6032 verify(wal, times(2)).appendMarker(any(RegionInfo.class), 6033 (WALKeyImpl) any(WALKeyImpl.class), editCaptor.capture()); 6034 6035 WALEdit edit = editCaptor.getAllValues().get(1); 6036 assertNotNull(edit); 6037 assertNotNull(edit.getCells()); 6038 assertEquals(1, edit.getCells().size()); 6039 RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0)); 6040 assertNotNull(desc); 6041 6042 LOG.info("RegionEventDescriptor from WAL: " + desc); 6043 6044 assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType()); 6045 assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes())); 6046 assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(), 6047 hri.getEncodedNameAsBytes())); 6048 assertTrue(desc.getLogSequenceNumber() > 0); 6049 assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer())); 6050 assertEquals(2, desc.getStoresCount()); 6051 6052 StoreDescriptor store = desc.getStores(0); 6053 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1)); 6054 assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1)); 6055 assertEquals(0, store.getStoreFileCount()); // no store files 6056 6057 store = desc.getStores(1); 6058 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2)); 6059 assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2)); 6060 assertEquals(0, store.getStoreFileCount()); // no store files 6061 } 6062 6063 /** 6064 * Test RegionTooBusyException thrown when region is busy 6065 */ 6066 @Test 6067 public void testRegionTooBusy() throws IOException { 6068 byte[] family = Bytes.toBytes("family"); 6069 long defaultBusyWaitDuration = CONF.getLong("hbase.busy.wait.duration", 6070 HRegion.DEFAULT_BUSY_WAIT_DURATION); 6071 CONF.setLong("hbase.busy.wait.duration", 1000); 6072 region = initHRegion(tableName, method, CONF, family); 6073 final AtomicBoolean stopped = new AtomicBoolean(true); 6074 Thread t = new Thread(new Runnable() { 6075 @Override 6076 public void run() { 6077 try { 6078 region.lock.writeLock().lock(); 6079 stopped.set(false); 6080 while (!stopped.get()) { 6081 Thread.sleep(100); 6082 } 6083 } catch (InterruptedException ie) { 6084 } finally { 6085 region.lock.writeLock().unlock(); 6086 } 6087 } 6088 }); 6089 t.start(); 6090 Get get = new Get(row); 6091 try { 6092 while (stopped.get()) { 6093 Thread.sleep(100); 6094 } 6095 region.get(get); 6096 fail("Should throw RegionTooBusyException"); 6097 } catch (InterruptedException ie) { 6098 fail("test interrupted"); 6099 } catch (RegionTooBusyException e) { 6100 // Good, expected 6101 } finally { 6102 stopped.set(true); 6103 try { 6104 t.join(); 6105 } catch (Throwable e) { 6106 } 6107 6108 HBaseTestingUtility.closeRegionAndWAL(region); 6109 region = null; 6110 CONF.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration); 6111 } 6112 } 6113 6114 @Test 6115 public void testCellTTLs() throws IOException { 6116 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 6117 EnvironmentEdgeManager.injectEdge(edge); 6118 6119 final byte[] row = Bytes.toBytes("testRow"); 6120 final byte[] q1 = Bytes.toBytes("q1"); 6121 final byte[] q2 = Bytes.toBytes("q2"); 6122 final byte[] q3 = Bytes.toBytes("q3"); 6123 final byte[] q4 = Bytes.toBytes("q4"); 6124 6125 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 6126 HColumnDescriptor hcd = new HColumnDescriptor(fam1); 6127 hcd.setTimeToLive(10); // 10 seconds 6128 htd.addFamily(hcd); 6129 6130 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 6131 conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 6132 6133 region = HBaseTestingUtility.createRegionAndWAL(new HRegionInfo(htd.getTableName(), 6134 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY), 6135 TEST_UTIL.getDataTestDir(), conf, htd); 6136 assertNotNull(region); 6137 long now = EnvironmentEdgeManager.currentTime(); 6138 // Add a cell that will expire in 5 seconds via cell TTL 6139 region.put(new Put(row).add(new KeyValue(row, fam1, q1, now, 6140 HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] { 6141 // TTL tags specify ts in milliseconds 6142 new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) }))); 6143 // Add a cell that will expire after 10 seconds via family setting 6144 region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY)); 6145 // Add a cell that will expire in 15 seconds via cell TTL 6146 region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1, 6147 HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] { 6148 // TTL tags specify ts in milliseconds 6149 new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) }))); 6150 // Add a cell that will expire in 20 seconds via family setting 6151 region.put(new Put(row).addColumn(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY)); 6152 6153 // Flush so we are sure store scanning gets this right 6154 region.flush(true); 6155 6156 // A query at time T+0 should return all cells 6157 Result r = region.get(new Get(row)); 6158 assertNotNull(r.getValue(fam1, q1)); 6159 assertNotNull(r.getValue(fam1, q2)); 6160 assertNotNull(r.getValue(fam1, q3)); 6161 assertNotNull(r.getValue(fam1, q4)); 6162 6163 // Increment time to T+5 seconds 6164 edge.incrementTime(5000); 6165 6166 r = region.get(new Get(row)); 6167 assertNull(r.getValue(fam1, q1)); 6168 assertNotNull(r.getValue(fam1, q2)); 6169 assertNotNull(r.getValue(fam1, q3)); 6170 assertNotNull(r.getValue(fam1, q4)); 6171 6172 // Increment time to T+10 seconds 6173 edge.incrementTime(5000); 6174 6175 r = region.get(new Get(row)); 6176 assertNull(r.getValue(fam1, q1)); 6177 assertNull(r.getValue(fam1, q2)); 6178 assertNotNull(r.getValue(fam1, q3)); 6179 assertNotNull(r.getValue(fam1, q4)); 6180 6181 // Increment time to T+15 seconds 6182 edge.incrementTime(5000); 6183 6184 r = region.get(new Get(row)); 6185 assertNull(r.getValue(fam1, q1)); 6186 assertNull(r.getValue(fam1, q2)); 6187 assertNull(r.getValue(fam1, q3)); 6188 assertNotNull(r.getValue(fam1, q4)); 6189 6190 // Increment time to T+20 seconds 6191 edge.incrementTime(10000); 6192 6193 r = region.get(new Get(row)); 6194 assertNull(r.getValue(fam1, q1)); 6195 assertNull(r.getValue(fam1, q2)); 6196 assertNull(r.getValue(fam1, q3)); 6197 assertNull(r.getValue(fam1, q4)); 6198 6199 // Fun with disappearing increments 6200 6201 // Start at 1 6202 region.put(new Put(row).addColumn(fam1, q1, Bytes.toBytes(1L))); 6203 r = region.get(new Get(row)); 6204 byte[] val = r.getValue(fam1, q1); 6205 assertNotNull(val); 6206 assertEquals(1L, Bytes.toLong(val)); 6207 6208 // Increment with a TTL of 5 seconds 6209 Increment incr = new Increment(row).addColumn(fam1, q1, 1L); 6210 incr.setTTL(5000); 6211 region.increment(incr); // 2 6212 6213 // New value should be 2 6214 r = region.get(new Get(row)); 6215 val = r.getValue(fam1, q1); 6216 assertNotNull(val); 6217 assertEquals(2L, Bytes.toLong(val)); 6218 6219 // Increment time to T+25 seconds 6220 edge.incrementTime(5000); 6221 6222 // Value should be back to 1 6223 r = region.get(new Get(row)); 6224 val = r.getValue(fam1, q1); 6225 assertNotNull(val); 6226 assertEquals(1L, Bytes.toLong(val)); 6227 6228 // Increment time to T+30 seconds 6229 edge.incrementTime(5000); 6230 6231 // Original value written at T+20 should be gone now via family TTL 6232 r = region.get(new Get(row)); 6233 assertNull(r.getValue(fam1, q1)); 6234 } 6235 6236 @Test 6237 public void testIncrementTimestampsAreMonotonic() throws IOException { 6238 region = initHRegion(tableName, method, CONF, fam1); 6239 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 6240 EnvironmentEdgeManager.injectEdge(edge); 6241 6242 edge.setValue(10); 6243 Increment inc = new Increment(row); 6244 inc.setDurability(Durability.SKIP_WAL); 6245 inc.addColumn(fam1, qual1, 1L); 6246 region.increment(inc); 6247 6248 Result result = region.get(new Get(row)); 6249 Cell c = result.getColumnLatestCell(fam1, qual1); 6250 assertNotNull(c); 6251 assertEquals(10L, c.getTimestamp()); 6252 6253 edge.setValue(1); // clock goes back 6254 region.increment(inc); 6255 result = region.get(new Get(row)); 6256 c = result.getColumnLatestCell(fam1, qual1); 6257 assertEquals(11L, c.getTimestamp()); 6258 assertEquals(2L, Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength())); 6259 } 6260 6261 @Test 6262 public void testAppendTimestampsAreMonotonic() throws IOException { 6263 region = initHRegion(tableName, method, CONF, fam1); 6264 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 6265 EnvironmentEdgeManager.injectEdge(edge); 6266 6267 edge.setValue(10); 6268 Append a = new Append(row); 6269 a.setDurability(Durability.SKIP_WAL); 6270 a.addColumn(fam1, qual1, qual1); 6271 region.append(a); 6272 6273 Result result = region.get(new Get(row)); 6274 Cell c = result.getColumnLatestCell(fam1, qual1); 6275 assertNotNull(c); 6276 assertEquals(10L, c.getTimestamp()); 6277 6278 edge.setValue(1); // clock goes back 6279 region.append(a); 6280 result = region.get(new Get(row)); 6281 c = result.getColumnLatestCell(fam1, qual1); 6282 assertEquals(11L, c.getTimestamp()); 6283 6284 byte[] expected = new byte[qual1.length*2]; 6285 System.arraycopy(qual1, 0, expected, 0, qual1.length); 6286 System.arraycopy(qual1, 0, expected, qual1.length, qual1.length); 6287 6288 assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(), 6289 expected, 0, expected.length)); 6290 } 6291 6292 @Test 6293 public void testCheckAndMutateTimestampsAreMonotonic() throws IOException { 6294 region = initHRegion(tableName, method, CONF, fam1); 6295 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 6296 EnvironmentEdgeManager.injectEdge(edge); 6297 6298 edge.setValue(10); 6299 Put p = new Put(row); 6300 p.setDurability(Durability.SKIP_WAL); 6301 p.addColumn(fam1, qual1, qual1); 6302 region.put(p); 6303 6304 Result result = region.get(new Get(row)); 6305 Cell c = result.getColumnLatestCell(fam1, qual1); 6306 assertNotNull(c); 6307 assertEquals(10L, c.getTimestamp()); 6308 6309 edge.setValue(1); // clock goes back 6310 p = new Put(row); 6311 p.setDurability(Durability.SKIP_WAL); 6312 p.addColumn(fam1, qual1, qual2); 6313 region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p); 6314 result = region.get(new Get(row)); 6315 c = result.getColumnLatestCell(fam1, qual1); 6316 assertEquals(10L, c.getTimestamp()); 6317 6318 assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(), 6319 qual2, 0, qual2.length)); 6320 } 6321 6322 @Test 6323 public void testBatchMutateWithWrongRegionException() throws Exception { 6324 final byte[] a = Bytes.toBytes("a"); 6325 final byte[] b = Bytes.toBytes("b"); 6326 final byte[] c = Bytes.toBytes("c"); // exclusive 6327 6328 int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000); 6329 CONF.setInt("hbase.rowlock.wait.duration", 1000); 6330 region = initHRegion(tableName, a, c, method, CONF, false, fam1); 6331 6332 Mutation[] mutations = new Mutation[] { 6333 new Put(a) 6334 .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6335 .setRow(a) 6336 .setFamily(fam1) 6337 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6338 .setType(Cell.Type.Put) 6339 .build()), 6340 // this is outside the region boundary 6341 new Put(c).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6342 .setRow(c) 6343 .setFamily(fam1) 6344 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6345 .setType(Type.Put) 6346 .build()), 6347 new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6348 .setRow(b) 6349 .setFamily(fam1) 6350 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6351 .setType(Cell.Type.Put) 6352 .build()) 6353 }; 6354 6355 OperationStatus[] status = region.batchMutate(mutations); 6356 assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode()); 6357 assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, status[1].getOperationStatusCode()); 6358 assertEquals(OperationStatusCode.SUCCESS, status[2].getOperationStatusCode()); 6359 6360 6361 // test with a row lock held for a long time 6362 final CountDownLatch obtainedRowLock = new CountDownLatch(1); 6363 ExecutorService exec = Executors.newFixedThreadPool(2); 6364 Future<Void> f1 = exec.submit(new Callable<Void>() { 6365 @Override 6366 public Void call() throws Exception { 6367 LOG.info("Acquiring row lock"); 6368 RowLock rl = region.getRowLock(b); 6369 obtainedRowLock.countDown(); 6370 LOG.info("Waiting for 5 seconds before releasing lock"); 6371 Threads.sleep(5000); 6372 LOG.info("Releasing row lock"); 6373 rl.release(); 6374 return null; 6375 } 6376 }); 6377 obtainedRowLock.await(30, TimeUnit.SECONDS); 6378 6379 Future<Void> f2 = exec.submit(new Callable<Void>() { 6380 @Override 6381 public Void call() throws Exception { 6382 Mutation[] mutations = new Mutation[] { 6383 new Put(a).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6384 .setRow(a) 6385 .setFamily(fam1) 6386 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6387 .setType(Cell.Type.Put) 6388 .build()), 6389 new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 6390 .setRow(b) 6391 .setFamily(fam1) 6392 .setTimestamp(HConstants.LATEST_TIMESTAMP) 6393 .setType(Cell.Type.Put) 6394 .build()), 6395 }; 6396 6397 // this will wait for the row lock, and it will eventually succeed 6398 OperationStatus[] status = region.batchMutate(mutations); 6399 assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode()); 6400 assertEquals(OperationStatusCode.SUCCESS, status[1].getOperationStatusCode()); 6401 return null; 6402 } 6403 }); 6404 6405 f1.get(); 6406 f2.get(); 6407 6408 CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout); 6409 } 6410 6411 @Test 6412 public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException { 6413 region = initHRegion(tableName, method, CONF, fam1); 6414 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 6415 EnvironmentEdgeManager.injectEdge(edge); 6416 6417 edge.setValue(10); 6418 Put p = new Put(row); 6419 p.setDurability(Durability.SKIP_WAL); 6420 p.addColumn(fam1, qual1, qual1); 6421 region.put(p); 6422 6423 Result result = region.get(new Get(row)); 6424 Cell c = result.getColumnLatestCell(fam1, qual1); 6425 assertNotNull(c); 6426 assertEquals(10L, c.getTimestamp()); 6427 6428 edge.setValue(1); // clock goes back 6429 p = new Put(row); 6430 p.setDurability(Durability.SKIP_WAL); 6431 p.addColumn(fam1, qual1, qual2); 6432 RowMutations rm = new RowMutations(row); 6433 rm.add(p); 6434 assertTrue(region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL, 6435 new BinaryComparator(qual1), rm)); 6436 result = region.get(new Get(row)); 6437 c = result.getColumnLatestCell(fam1, qual1); 6438 assertEquals(10L, c.getTimestamp()); 6439 LOG.info("c value " + 6440 Bytes.toStringBinary(c.getValueArray(), c.getValueOffset(), c.getValueLength())); 6441 6442 assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(), 6443 qual2, 0, qual2.length)); 6444 } 6445 6446 HRegion initHRegion(TableName tableName, String callingMethod, 6447 byte[]... families) throws IOException { 6448 return initHRegion(tableName, callingMethod, HBaseConfiguration.create(), 6449 families); 6450 } 6451 6452 /** 6453 * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends 6454 * @throws IOException if IO error occurred during test 6455 */ 6456 @Test 6457 public void testWritesWhileRollWriter() throws IOException { 6458 int testCount = 10; 6459 int numRows = 1024; 6460 int numFamilies = 2; 6461 int numQualifiers = 2; 6462 final byte[][] families = new byte[numFamilies][]; 6463 for (int i = 0; i < numFamilies; i++) { 6464 families[i] = Bytes.toBytes("family" + i); 6465 } 6466 final byte[][] qualifiers = new byte[numQualifiers][]; 6467 for (int i = 0; i < numQualifiers; i++) { 6468 qualifiers[i] = Bytes.toBytes("qual" + i); 6469 } 6470 6471 CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2); 6472 this.region = initHRegion(tableName, method, CONF, families); 6473 try { 6474 List<Thread> threads = new ArrayList<>(); 6475 for (int i = 0; i < numRows; i++) { 6476 final int count = i; 6477 Thread t = new Thread(new Runnable() { 6478 6479 @Override 6480 public void run() { 6481 byte[] row = Bytes.toBytes("row" + count); 6482 Put put = new Put(row); 6483 put.setDurability(Durability.SYNC_WAL); 6484 byte[] value = Bytes.toBytes(String.valueOf(count)); 6485 for (byte[] family : families) { 6486 for (byte[] qualifier : qualifiers) { 6487 put.addColumn(family, qualifier, count, value); 6488 } 6489 } 6490 try { 6491 region.put(put); 6492 } catch (IOException e) { 6493 throw new RuntimeException(e); 6494 } 6495 } 6496 }); 6497 threads.add(t); 6498 } 6499 for (Thread t : threads) { 6500 t.start(); 6501 } 6502 6503 for (int i = 0; i < testCount; i++) { 6504 region.getWAL().rollWriter(); 6505 Thread.yield(); 6506 } 6507 } finally { 6508 try { 6509 HBaseTestingUtility.closeRegionAndWAL(this.region); 6510 CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024); 6511 } catch (DroppedSnapshotException dse) { 6512 // We could get this on way out because we interrupt the background flusher and it could 6513 // fail anywhere causing a DSE over in the background flusher... only it is not properly 6514 // dealt with so could still be memory hanging out when we get to here -- memory we can't 6515 // flush because the accounting is 'off' since original DSE. 6516 } 6517 this.region = null; 6518 } 6519 } 6520 6521 @Test 6522 public void testMutateRow_WriteRequestCount() throws Exception { 6523 byte[] row1 = Bytes.toBytes("row1"); 6524 byte[] fam1 = Bytes.toBytes("fam1"); 6525 byte[] qf1 = Bytes.toBytes("qualifier"); 6526 byte[] val1 = Bytes.toBytes("value1"); 6527 6528 RowMutations rm = new RowMutations(row1); 6529 Put put = new Put(row1); 6530 put.addColumn(fam1, qf1, val1); 6531 rm.add(put); 6532 6533 this.region = initHRegion(tableName, method, CONF, fam1); 6534 long wrcBeforeMutate = this.region.writeRequestsCount.longValue(); 6535 this.region.mutateRow(rm); 6536 long wrcAfterMutate = this.region.writeRequestsCount.longValue(); 6537 Assert.assertEquals(wrcBeforeMutate + rm.getMutations().size(), wrcAfterMutate); 6538 } 6539 6540 @Test 6541 public void testBulkLoadReplicationEnabled() throws IOException { 6542 TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 6543 final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42); 6544 final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); 6545 6546 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 6547 htd.addFamily(new HColumnDescriptor(fam1)); 6548 HRegionInfo hri = new HRegionInfo(htd.getTableName(), 6549 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY); 6550 region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(), 6551 rss, null); 6552 6553 assertTrue(region.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false)); 6554 String plugins = region.conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 6555 String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName(); 6556 assertTrue(plugins.contains(replicationCoprocessorClass)); 6557 assertTrue(region.getCoprocessorHost(). 6558 getCoprocessors().contains(ReplicationObserver.class.getSimpleName())); 6559 } 6560 6561 /** 6562 * The same as HRegion class, the only difference is that instantiateHStore will 6563 * create a different HStore - HStoreForTesting. [HBASE-8518] 6564 */ 6565 public static class HRegionForTesting extends HRegion { 6566 6567 public HRegionForTesting(final Path tableDir, final WAL wal, final FileSystem fs, 6568 final Configuration confParam, final RegionInfo regionInfo, 6569 final TableDescriptor htd, final RegionServerServices rsServices) { 6570 this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo), 6571 wal, confParam, htd, rsServices); 6572 } 6573 6574 public HRegionForTesting(HRegionFileSystem fs, WAL wal, 6575 Configuration confParam, TableDescriptor htd, 6576 RegionServerServices rsServices) { 6577 super(fs, wal, confParam, htd, rsServices); 6578 } 6579 6580 /** 6581 * Create HStore instance. 6582 * @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting. 6583 */ 6584 @Override 6585 protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup) 6586 throws IOException { 6587 if (family.isMobEnabled()) { 6588 if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) { 6589 throw new IOException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS + 6590 " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY + 6591 " accordingly."); 6592 } 6593 return new HMobStore(this, family, this.conf, warmup); 6594 } 6595 return new HStoreForTesting(this, family, this.conf, warmup); 6596 } 6597 } 6598 6599 /** 6600 * HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method 6601 * of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which 6602 * doesn't let hstore compaction complete. In the former edition, this config is set in 6603 * HStore class inside compact method, though this is just for testing, otherwise it 6604 * doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete" 6605 * config (except for testing code). 6606 */ 6607 public static class HStoreForTesting extends HStore { 6608 6609 protected HStoreForTesting(final HRegion region, 6610 final ColumnFamilyDescriptor family, 6611 final Configuration confParam, boolean warmup) throws IOException { 6612 super(region, family, confParam, warmup); 6613 } 6614 6615 @Override 6616 protected List<HStoreFile> doCompaction(CompactionRequestImpl cr, 6617 Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, 6618 List<Path> newFiles) throws IOException { 6619 // let compaction incomplete. 6620 if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { 6621 LOG.warn("hbase.hstore.compaction.complete is set to false"); 6622 List<HStoreFile> sfs = new ArrayList<>(newFiles.size()); 6623 final boolean evictOnClose = 6624 cacheConf != null? cacheConf.shouldEvictOnClose(): true; 6625 for (Path newFile : newFiles) { 6626 // Create storefile around what we wrote with a reader on it. 6627 HStoreFile sf = createStoreFileAndReader(newFile); 6628 sf.closeStoreFile(evictOnClose); 6629 sfs.add(sf); 6630 } 6631 return sfs; 6632 } 6633 return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles); 6634 } 6635 } 6636}