001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
023import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
024import static org.junit.Assert.assertArrayEquals;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertFalse;
027import static org.junit.Assert.assertNotNull;
028import static org.junit.Assert.assertNull;
029import static org.junit.Assert.assertTrue;
030import static org.junit.Assert.fail;
031import static org.mockito.ArgumentMatchers.any;
032import static org.mockito.ArgumentMatchers.anyLong;
033import static org.mockito.Mockito.doThrow;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.never;
036import static org.mockito.Mockito.spy;
037import static org.mockito.Mockito.times;
038import static org.mockito.Mockito.verify;
039import static org.mockito.Mockito.when;
040
041import java.io.IOException;
042import java.io.InterruptedIOException;
043import java.math.BigDecimal;
044import java.nio.charset.StandardCharsets;
045import java.security.PrivilegedExceptionAction;
046import java.util.ArrayList;
047import java.util.Arrays;
048import java.util.Collection;
049import java.util.List;
050import java.util.Map;
051import java.util.NavigableMap;
052import java.util.Objects;
053import java.util.TreeMap;
054import java.util.concurrent.Callable;
055import java.util.concurrent.CountDownLatch;
056import java.util.concurrent.ExecutorService;
057import java.util.concurrent.Executors;
058import java.util.concurrent.Future;
059import java.util.concurrent.TimeUnit;
060import java.util.concurrent.atomic.AtomicBoolean;
061import java.util.concurrent.atomic.AtomicInteger;
062import java.util.concurrent.atomic.AtomicReference;
063import org.apache.commons.lang3.RandomStringUtils;
064import org.apache.hadoop.conf.Configuration;
065import org.apache.hadoop.fs.FSDataOutputStream;
066import org.apache.hadoop.fs.FileStatus;
067import org.apache.hadoop.fs.FileSystem;
068import org.apache.hadoop.fs.Path;
069import org.apache.hadoop.hbase.ArrayBackedTag;
070import org.apache.hadoop.hbase.Cell;
071import org.apache.hadoop.hbase.Cell.Type;
072import org.apache.hadoop.hbase.CellBuilderFactory;
073import org.apache.hadoop.hbase.CellBuilderType;
074import org.apache.hadoop.hbase.CellUtil;
075import org.apache.hadoop.hbase.CompareOperator;
076import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
077import org.apache.hadoop.hbase.DroppedSnapshotException;
078import org.apache.hadoop.hbase.HBaseClassTestRule;
079import org.apache.hadoop.hbase.HBaseConfiguration;
080import org.apache.hadoop.hbase.HBaseTestingUtility;
081import org.apache.hadoop.hbase.HColumnDescriptor;
082import org.apache.hadoop.hbase.HConstants;
083import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
084import org.apache.hadoop.hbase.HDFSBlocksDistribution;
085import org.apache.hadoop.hbase.HRegionInfo;
086import org.apache.hadoop.hbase.HTableDescriptor;
087import org.apache.hadoop.hbase.KeyValue;
088import org.apache.hadoop.hbase.MiniHBaseCluster;
089import org.apache.hadoop.hbase.MultithreadedTestUtil;
090import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
091import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
092import org.apache.hadoop.hbase.NotServingRegionException;
093import org.apache.hadoop.hbase.PrivateCellUtil;
094import org.apache.hadoop.hbase.RegionTooBusyException;
095import org.apache.hadoop.hbase.ServerName;
096import org.apache.hadoop.hbase.StartMiniClusterOption;
097import org.apache.hadoop.hbase.TableName;
098import org.apache.hadoop.hbase.TagType;
099import org.apache.hadoop.hbase.Waiter;
100import org.apache.hadoop.hbase.client.Append;
101import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
102import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
103import org.apache.hadoop.hbase.client.Delete;
104import org.apache.hadoop.hbase.client.Durability;
105import org.apache.hadoop.hbase.client.Get;
106import org.apache.hadoop.hbase.client.Increment;
107import org.apache.hadoop.hbase.client.Mutation;
108import org.apache.hadoop.hbase.client.Put;
109import org.apache.hadoop.hbase.client.RegionInfo;
110import org.apache.hadoop.hbase.client.RegionInfoBuilder;
111import org.apache.hadoop.hbase.client.Result;
112import org.apache.hadoop.hbase.client.RowMutations;
113import org.apache.hadoop.hbase.client.Scan;
114import org.apache.hadoop.hbase.client.Table;
115import org.apache.hadoop.hbase.client.TableDescriptor;
116import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
117import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
118import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
119import org.apache.hadoop.hbase.filter.BigDecimalComparator;
120import org.apache.hadoop.hbase.filter.BinaryComparator;
121import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
122import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
123import org.apache.hadoop.hbase.filter.Filter;
124import org.apache.hadoop.hbase.filter.FilterBase;
125import org.apache.hadoop.hbase.filter.FilterList;
126import org.apache.hadoop.hbase.filter.NullComparator;
127import org.apache.hadoop.hbase.filter.PrefixFilter;
128import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
129import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
130import org.apache.hadoop.hbase.filter.SubstringComparator;
131import org.apache.hadoop.hbase.filter.ValueFilter;
132import org.apache.hadoop.hbase.io.hfile.HFile;
133import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
134import org.apache.hadoop.hbase.monitoring.MonitoredTask;
135import org.apache.hadoop.hbase.monitoring.TaskMonitor;
136import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
137import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
138import org.apache.hadoop.hbase.regionserver.Region.RowLock;
139import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
140import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
141import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
142import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
143import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
144import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
145import org.apache.hadoop.hbase.security.User;
146import org.apache.hadoop.hbase.test.MetricsAssertHelper;
147import org.apache.hadoop.hbase.testclassification.LargeTests;
148import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
149import org.apache.hadoop.hbase.util.Bytes;
150import org.apache.hadoop.hbase.util.CommonFSUtils;
151import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
152import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
153import org.apache.hadoop.hbase.util.FSUtils;
154import org.apache.hadoop.hbase.util.HFileArchiveUtil;
155import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
156import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
157import org.apache.hadoop.hbase.util.Threads;
158import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
159import org.apache.hadoop.hbase.wal.FaultyFSLog;
160import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
161import org.apache.hadoop.hbase.wal.WAL;
162import org.apache.hadoop.hbase.wal.WALEdit;
163import org.apache.hadoop.hbase.wal.WALFactory;
164import org.apache.hadoop.hbase.wal.WALKeyImpl;
165import org.apache.hadoop.hbase.wal.WALProvider;
166import org.apache.hadoop.hbase.wal.WALProvider.Writer;
167import org.apache.hadoop.hbase.wal.WALSplitUtil;
168import org.junit.After;
169import org.junit.Assert;
170import org.junit.Before;
171import org.junit.ClassRule;
172import org.junit.Rule;
173import org.junit.Test;
174import org.junit.experimental.categories.Category;
175import org.junit.rules.ExpectedException;
176import org.junit.rules.TestName;
177import org.mockito.ArgumentCaptor;
178import org.mockito.ArgumentMatcher;
179import org.mockito.Mockito;
180import org.mockito.invocation.InvocationOnMock;
181import org.mockito.stubbing.Answer;
182import org.slf4j.Logger;
183import org.slf4j.LoggerFactory;
184
185import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
186import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
187import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
188import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
189import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
190
191import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
198
199/**
200 * Basic stand-alone testing of HRegion.  No clusters!
201 *
202 * A lot of the meta information for an HRegion now lives inside other HRegions
203 * or in the HBaseMaster, so only basic testing is possible.
204 */
205@Category({VerySlowRegionServerTests.class, LargeTests.class})
206@SuppressWarnings("deprecation")
207public class TestHRegion {
208
209  @ClassRule
210  public static final HBaseClassTestRule CLASS_RULE =
211      HBaseClassTestRule.forClass(TestHRegion.class);
212
213  // Do not spin up clusters in here. If you need to spin up a cluster, do it
214  // over in TestHRegionOnCluster.
215  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
216  @Rule
217  public TestName name = new TestName();
218  @Rule public final ExpectedException thrown = ExpectedException.none();
219
220  private static final String COLUMN_FAMILY = "MyCF";
221  private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
222  private static final EventLoopGroup GROUP = new NioEventLoopGroup();
223
224  HRegion region = null;
225  // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  St.Ack)
226  protected static HBaseTestingUtility TEST_UTIL;
227  public static Configuration CONF ;
228  private String dir;
229  private static FileSystem FILESYSTEM;
230  private final int MAX_VERSIONS = 2;
231
232  // Test names
233  protected TableName tableName;
234  protected String method;
235  protected final byte[] qual = Bytes.toBytes("qual");
236  protected final byte[] qual1 = Bytes.toBytes("qual1");
237  protected final byte[] qual2 = Bytes.toBytes("qual2");
238  protected final byte[] qual3 = Bytes.toBytes("qual3");
239  protected final byte[] value = Bytes.toBytes("value");
240  protected final byte[] value1 = Bytes.toBytes("value1");
241  protected final byte[] value2 = Bytes.toBytes("value2");
242  protected final byte[] row = Bytes.toBytes("rowA");
243  protected final byte[] row2 = Bytes.toBytes("rowB");
244
245  protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory
246      .getInstance(MetricsAssertHelper.class);
247
248  @Before
249  public void setup() throws IOException {
250    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
251    FILESYSTEM = TEST_UTIL.getTestFileSystem();
252    CONF = TEST_UTIL.getConfiguration();
253    NettyAsyncFSWALConfigHelper.setEventLoopConfig(CONF, GROUP, NioSocketChannel.class);
254    dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
255    method = name.getMethodName();
256    tableName = TableName.valueOf(method);
257    CONF.set(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, String.valueOf(0.09));
258  }
259
260  @After
261  public void tearDown() throws IOException {
262    // Region may have been closed, but it is still no harm if we close it again here using HTU.
263    HBaseTestingUtility.closeRegionAndWAL(region);
264    EnvironmentEdgeManagerTestHelper.reset();
265    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
266    TEST_UTIL.cleanupTestDir();
267  }
268
269  /**
270   * Test that I can use the max flushed sequence id after the close.
271   * @throws IOException
272   */
273  @Test
274  public void testSequenceId() throws IOException {
275    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
276    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
277    // Weird. This returns 0 if no store files or no edits. Afraid to change it.
278    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
279    HBaseTestingUtility.closeRegionAndWAL(this.region);
280    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
281    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
282    // Open region again.
283    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
284    byte [] value = Bytes.toBytes(method);
285    // Make a random put against our cf.
286    Put put = new Put(value);
287    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
288    region.put(put);
289    // No flush yet so init numbers should still be in place.
290    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
291    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
292    region.flush(true);
293    long max = region.getMaxFlushedSeqId();
294    HBaseTestingUtility.closeRegionAndWAL(this.region);
295    assertEquals(max, region.getMaxFlushedSeqId());
296    this.region = null;
297  }
298
299  /**
300   * Test for Bug 2 of HBASE-10466.
301   * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
302   * is smaller than a certain value, or when region close starts a flush is ongoing, the first
303   * flush is skipped and only the second flush takes place. However, two flushes are required in
304   * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
305   * in current memstore. The fix is removing all conditions except abort check so we ensure 2
306   * flushes for region close."
307   * @throws IOException
308   */
309  @Test
310  public void testCloseCarryingSnapshot() throws IOException {
311    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
312    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
313    // Get some random bytes.
314    byte [] value = Bytes.toBytes(method);
315    // Make a random put against our cf.
316    Put put = new Put(value);
317    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
318    // First put something in current memstore, which will be in snapshot after flusher.prepare()
319    region.put(put);
320    StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
321    storeFlushCtx.prepare();
322    // Second put something in current memstore
323    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
324    region.put(put);
325    // Close with something in memstore and something in the snapshot.  Make sure all is cleared.
326    HBaseTestingUtility.closeRegionAndWAL(region);
327    assertEquals(0, region.getMemStoreDataSize());
328    region = null;
329  }
330
331  /*
332   * This test is for verifying memstore snapshot size is correctly updated in case of rollback
333   * See HBASE-10845
334   */
335  @Test
336  public void testMemstoreSnapshotSize() throws IOException {
337    class MyFaultyFSLog extends FaultyFSLog {
338      StoreFlushContext storeFlushCtx;
339      public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
340          throws IOException {
341        super(fs, rootDir, logName, conf);
342      }
343
344      void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
345        this.storeFlushCtx = storeFlushCtx;
346      }
347
348      @Override
349      public void sync(long txid) throws IOException {
350        storeFlushCtx.prepare();
351        super.sync(txid);
352      }
353    }
354
355    FileSystem fs = FileSystem.get(CONF);
356    Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
357    MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
358    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog,
359        COLUMN_FAMILY_BYTES);
360
361    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
362    // Get some random bytes.
363    byte [] value = Bytes.toBytes(method);
364    faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY));
365
366    Put put = new Put(value);
367    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
368    faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
369
370    boolean threwIOE = false;
371    try {
372      region.put(put);
373    } catch (IOException ioe) {
374      threwIOE = true;
375    } finally {
376      assertTrue("The regionserver should have thrown an exception", threwIOE);
377    }
378    MemStoreSize mss = store.getFlushableSize();
379    assertTrue("flushable size should be zero, but it is " + mss,
380        mss.getDataSize() == 0);
381  }
382
383  /**
384   * Create a WAL outside of the usual helper in
385   * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method
386   * doesn't play nicely with FaultyFileSystem. Call this method before overriding
387   * {@code fs.file.impl}.
388   * @param callingMethod a unique component for the path, probably the name of the test method.
389   */
390  private static WAL createWALCompatibleWithFaultyFileSystem(String callingMethod,
391      Configuration conf, TableName tableName) throws IOException {
392    final Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
393    final Configuration walConf = new Configuration(conf);
394    FSUtils.setRootDir(walConf, logDir);
395    return new WALFactory(walConf, callingMethod)
396        .getWAL(RegionInfoBuilder.newBuilder(tableName).build());
397  }
398
399  @Test
400  public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
401    String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
402    FileSystem fs = FileSystem.get(CONF);
403    Path rootDir = new Path(dir + testName);
404    FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
405    hLog.init();
406    HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
407        COLUMN_FAMILY_BYTES);
408    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
409    assertEquals(0, region.getMemStoreDataSize());
410
411    // Put one value
412    byte [] value = Bytes.toBytes(method);
413    Put put = new Put(value);
414    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
415    region.put(put);
416    long onePutSize = region.getMemStoreDataSize();
417    assertTrue(onePutSize > 0);
418
419    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
420    doThrow(new IOException())
421       .when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any());
422    region.setCoprocessorHost(mockedCPHost);
423
424    put = new Put(value);
425    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value);
426    try {
427      region.put(put);
428      fail("Should have failed with IOException");
429    } catch (IOException expected) {
430    }
431    long expectedSize = onePutSize * 2;
432    assertEquals("memstoreSize should be incremented",
433        expectedSize, region.getMemStoreDataSize());
434    assertEquals("flushable size should be incremented",
435        expectedSize, store.getFlushableSize().getDataSize());
436
437    region.setCoprocessorHost(null);
438  }
439
440  /**
441   * A test case of HBASE-21041
442   * @throws Exception Exception
443   */
444  @Test
445  public void testFlushAndMemstoreSizeCounting() throws Exception {
446    byte[] family = Bytes.toBytes("family");
447    this.region = initHRegion(tableName, method, CONF, family);
448    final WALFactory wals = new WALFactory(CONF, method);
449    try {
450      for (byte[] row : HBaseTestingUtility.ROWS) {
451        Put put = new Put(row);
452        put.addColumn(family, family, row);
453        region.put(put);
454      }
455      region.flush(true);
456      // After flush, data size should be zero
457      assertEquals(0, region.getMemStoreDataSize());
458      // After flush, a new active mutable segment is created, so the heap size
459      // should equal to MutableSegment.DEEP_OVERHEAD
460      assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
461      // After flush, offheap should be zero
462      assertEquals(0, region.getMemStoreOffHeapSize());
463    } finally {
464      HBaseTestingUtility.closeRegionAndWAL(this.region);
465      this.region = null;
466      wals.close();
467    }
468  }
469
470  /**
471   * Test we do not lose data if we fail a flush and then close.
472   * Part of HBase-10466.  Tests the following from the issue description:
473   * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
474   * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
475   * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
476   * the sum of current memstore sizes instead of snapshots left from previous failed flush. This
477   * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
478   * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
479   * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
480   * much smaller than expected. In extreme case, if the error accumulates to even bigger than
481   * HRegion's memstore size limit, any further flush is skipped because flush does not do anything
482   * if memstoreSize is not larger than 0."
483   * @throws Exception
484   */
485  @Test
486  public void testFlushSizeAccounting() throws Exception {
487    final Configuration conf = HBaseConfiguration.create(CONF);
488    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
489    // Only retry once.
490    conf.setInt("hbase.hstore.flush.retries.number", 1);
491    final User user =
492      User.createUserForTesting(conf, method, new String[]{"foo"});
493    // Inject our faulty LocalFileSystem
494    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
495    user.runAs(new PrivilegedExceptionAction<Object>() {
496      @Override
497      public Object run() throws Exception {
498        // Make sure it worked (above is sensitive to caching details in hadoop core)
499        FileSystem fs = FileSystem.get(conf);
500        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
501        FaultyFileSystem ffs = (FaultyFileSystem)fs;
502        HRegion region = null;
503        try {
504          // Initialize region
505          region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal,
506              COLUMN_FAMILY_BYTES);
507          long size = region.getMemStoreDataSize();
508          Assert.assertEquals(0, size);
509          // Put one item into memstore.  Measure the size of one item in memstore.
510          Put p1 = new Put(row);
511          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[]) null));
512          region.put(p1);
513          final long sizeOfOnePut = region.getMemStoreDataSize();
514          // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
515          try {
516            LOG.info("Flushing");
517            region.flush(true);
518            Assert.fail("Didn't bubble up IOE!");
519          } catch (DroppedSnapshotException dse) {
520            // What we are expecting
521            region.closing.set(false); // this is needed for the rest of the test to work
522          }
523          // Make it so all writes succeed from here on out
524          ffs.fault.set(false);
525          // Check sizes.  Should still be the one entry.
526          Assert.assertEquals(sizeOfOnePut, region.getMemStoreDataSize());
527          // Now add two entries so that on this next flush that fails, we can see if we
528          // subtract the right amount, the snapshot size only.
529          Put p2 = new Put(row);
530          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
531          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
532          region.put(p2);
533          long expectedSize = sizeOfOnePut * 3;
534          Assert.assertEquals(expectedSize, region.getMemStoreDataSize());
535          // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
536          // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
537          // it
538          region.flush(true);
539          // Make sure our memory accounting is right.
540          Assert.assertEquals(sizeOfOnePut * 2, region.getMemStoreDataSize());
541        } finally {
542          HBaseTestingUtility.closeRegionAndWAL(region);
543        }
544        return null;
545      }
546    });
547    FileSystem.closeAllForUGI(user.getUGI());
548  }
549
550  @Test
551  public void testCloseWithFailingFlush() throws Exception {
552    final Configuration conf = HBaseConfiguration.create(CONF);
553    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
554    // Only retry once.
555    conf.setInt("hbase.hstore.flush.retries.number", 1);
556    final User user =
557      User.createUserForTesting(conf, this.method, new String[]{"foo"});
558    // Inject our faulty LocalFileSystem
559    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
560    user.runAs(new PrivilegedExceptionAction<Object>() {
561      @Override
562      public Object run() throws Exception {
563        // Make sure it worked (above is sensitive to caching details in hadoop core)
564        FileSystem fs = FileSystem.get(conf);
565        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
566        FaultyFileSystem ffs = (FaultyFileSystem)fs;
567        HRegion region = null;
568        try {
569          // Initialize region
570          region = initHRegion(tableName, null, null, false,
571              Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
572          long size = region.getMemStoreDataSize();
573          Assert.assertEquals(0, size);
574          // Put one item into memstore.  Measure the size of one item in memstore.
575          Put p1 = new Put(row);
576          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
577          region.put(p1);
578          // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
579          HStore store = region.getStore(COLUMN_FAMILY_BYTES);
580          StoreFlushContext storeFlushCtx =
581              store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
582          storeFlushCtx.prepare();
583          // Now add two entries to the foreground memstore.
584          Put p2 = new Put(row);
585          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
586          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
587          region.put(p2);
588          // Now try close on top of a failing flush.
589          HBaseTestingUtility.closeRegionAndWAL(region);
590          region = null;
591          fail();
592        } catch (DroppedSnapshotException dse) {
593          // Expected
594          LOG.info("Expected DroppedSnapshotException");
595        } finally {
596          // Make it so all writes succeed from here on out so can close clean
597          ffs.fault.set(false);
598          HBaseTestingUtility.closeRegionAndWAL(region);
599        }
600        return null;
601      }
602    });
603    FileSystem.closeAllForUGI(user.getUGI());
604  }
605
606  @Test
607  public void testCompactionAffectedByScanners() throws Exception {
608    byte[] family = Bytes.toBytes("family");
609    this.region = initHRegion(tableName, method, CONF, family);
610
611    Put put = new Put(Bytes.toBytes("r1"));
612    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
613    region.put(put);
614    region.flush(true);
615
616    Scan scan = new Scan();
617    scan.setMaxVersions(3);
618    // open the first scanner
619    RegionScanner scanner1 = region.getScanner(scan);
620
621    Delete delete = new Delete(Bytes.toBytes("r1"));
622    region.delete(delete);
623    region.flush(true);
624
625    // open the second scanner
626    RegionScanner scanner2 = region.getScanner(scan);
627
628    List<Cell> results = new ArrayList<>();
629
630    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
631
632    // make a major compaction
633    region.compact(true);
634
635    // open the third scanner
636    RegionScanner scanner3 = region.getScanner(scan);
637
638    // get data from scanner 1, 2, 3 after major compaction
639    scanner1.next(results);
640    System.out.println(results);
641    assertEquals(1, results.size());
642
643    results.clear();
644    scanner2.next(results);
645    System.out.println(results);
646    assertEquals(0, results.size());
647
648    results.clear();
649    scanner3.next(results);
650    System.out.println(results);
651    assertEquals(0, results.size());
652  }
653
654  @Test
655  public void testToShowNPEOnRegionScannerReseek() throws Exception {
656    byte[] family = Bytes.toBytes("family");
657    this.region = initHRegion(tableName, method, CONF, family);
658
659    Put put = new Put(Bytes.toBytes("r1"));
660    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
661    region.put(put);
662    put = new Put(Bytes.toBytes("r2"));
663    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
664    region.put(put);
665    region.flush(true);
666
667    Scan scan = new Scan();
668    scan.setMaxVersions(3);
669    // open the first scanner
670    RegionScanner scanner1 = region.getScanner(scan);
671
672    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
673
674    region.compact(true);
675
676    scanner1.reseek(Bytes.toBytes("r2"));
677    List<Cell> results = new ArrayList<>();
678    scanner1.next(results);
679    Cell keyValue = results.get(0);
680    Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0);
681    scanner1.close();
682  }
683
684  @Test
685  public void testArchiveRecoveredEditsReplay() throws Exception {
686    byte[] family = Bytes.toBytes("family");
687    this.region = initHRegion(tableName, method, CONF, family);
688    final WALFactory wals = new WALFactory(CONF, method);
689    try {
690      Path regiondir = region.getRegionFileSystem().getRegionDir();
691      FileSystem fs = region.getRegionFileSystem().getFileSystem();
692      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
693
694      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
695
696      long maxSeqId = 1050;
697      long minSeqId = 1000;
698
699      for (long i = minSeqId; i <= maxSeqId; i += 10) {
700        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
701        fs.create(recoveredEdits);
702        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
703
704        long time = System.nanoTime();
705        WALEdit edit = new WALEdit();
706        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
707          .toBytes(i)));
708        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
709          HConstants.DEFAULT_CLUSTER_ID), edit));
710
711        writer.close();
712      }
713      MonitoredTask status = TaskMonitor.get().createStatus(method);
714      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
715      for (HStore store : region.getStores()) {
716        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
717      }
718      CONF.set("hbase.region.archive.recovered.edits", "true");
719      CONF.set(CommonFSUtils.HBASE_WAL_DIR, "/custom_wal_dir");
720      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
721      assertEquals(maxSeqId, seqId);
722      region.getMVCC().advanceTo(seqId);
723      String fakeFamilyName = recoveredEditsDir.getName();
724      Path rootDir = new Path(CONF.get(HConstants.HBASE_DIR));
725      Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(rootDir,
726        region.getRegionInfo(), Bytes.toBytes(fakeFamilyName));
727      FileStatus[] list = TEST_UTIL.getTestFileSystem().listStatus(storeArchiveDir);
728      assertEquals(6, list.length);
729    } finally {
730      CONF.set("hbase.region.archive.recovered.edits", "false");
731      CONF.set(CommonFSUtils.HBASE_WAL_DIR, "");
732      HBaseTestingUtility.closeRegionAndWAL(this.region);
733      this.region = null;
734      wals.close();
735    }
736  }
737
738  @Test
739  public void testSkipRecoveredEditsReplay() throws Exception {
740    byte[] family = Bytes.toBytes("family");
741    this.region = initHRegion(tableName, method, CONF, family);
742    final WALFactory wals = new WALFactory(CONF, method);
743    try {
744      Path regiondir = region.getRegionFileSystem().getRegionDir();
745      FileSystem fs = region.getRegionFileSystem().getFileSystem();
746      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
747
748      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
749
750      long maxSeqId = 1050;
751      long minSeqId = 1000;
752
753      for (long i = minSeqId; i <= maxSeqId; i += 10) {
754        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
755        fs.create(recoveredEdits);
756        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
757
758        long time = System.nanoTime();
759        WALEdit edit = new WALEdit();
760        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
761            .toBytes(i)));
762        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
763            HConstants.DEFAULT_CLUSTER_ID), edit));
764
765        writer.close();
766      }
767      MonitoredTask status = TaskMonitor.get().createStatus(method);
768      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
769      for (HStore store : region.getStores()) {
770        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
771      }
772      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
773      assertEquals(maxSeqId, seqId);
774      region.getMVCC().advanceTo(seqId);
775      Get get = new Get(row);
776      Result result = region.get(get);
777      for (long i = minSeqId; i <= maxSeqId; i += 10) {
778        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
779        assertEquals(1, kvs.size());
780        assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
781      }
782    } finally {
783      HBaseTestingUtility.closeRegionAndWAL(this.region);
784      this.region = null;
785      wals.close();
786    }
787  }
788
789  @Test
790  public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception {
791    byte[] family = Bytes.toBytes("family");
792    this.region = initHRegion(tableName, method, CONF, family);
793    final WALFactory wals = new WALFactory(CONF, method);
794    try {
795      Path regiondir = region.getRegionFileSystem().getRegionDir();
796      FileSystem fs = region.getRegionFileSystem().getFileSystem();
797      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
798
799      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
800
801      long maxSeqId = 1050;
802      long minSeqId = 1000;
803
804      for (long i = minSeqId; i <= maxSeqId; i += 10) {
805        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
806        fs.create(recoveredEdits);
807        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
808
809        long time = System.nanoTime();
810        WALEdit edit = new WALEdit();
811        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
812            .toBytes(i)));
813        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
814            HConstants.DEFAULT_CLUSTER_ID), edit));
815
816        writer.close();
817      }
818      long recoverSeqId = 1030;
819      MonitoredTask status = TaskMonitor.get().createStatus(method);
820      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
821      for (HStore store : region.getStores()) {
822        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
823      }
824      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
825      assertEquals(maxSeqId, seqId);
826      region.getMVCC().advanceTo(seqId);
827      Get get = new Get(row);
828      Result result = region.get(get);
829      for (long i = minSeqId; i <= maxSeqId; i += 10) {
830        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
831        if (i < recoverSeqId) {
832          assertEquals(0, kvs.size());
833        } else {
834          assertEquals(1, kvs.size());
835          assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
836        }
837      }
838    } finally {
839      HBaseTestingUtility.closeRegionAndWAL(this.region);
840      this.region = null;
841      wals.close();
842    }
843  }
844
845  @Test
846  public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
847    byte[] family = Bytes.toBytes("family");
848    this.region = initHRegion(tableName, method, CONF, family);
849    Path regiondir = region.getRegionFileSystem().getRegionDir();
850    FileSystem fs = region.getRegionFileSystem().getFileSystem();
851
852    Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
853    for (int i = 1000; i < 1050; i += 10) {
854      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
855      FSDataOutputStream dos = fs.create(recoveredEdits);
856      dos.writeInt(i);
857      dos.close();
858    }
859    long minSeqId = 2000;
860    Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1));
861    FSDataOutputStream dos = fs.create(recoveredEdits);
862    dos.close();
863
864    Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
865    for (HStore store : region.getStores()) {
866      maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId);
867    }
868    long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null);
869    assertEquals(minSeqId, seqId);
870  }
871
872  @Test
873  public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
874    byte[] family = Bytes.toBytes("family");
875    this.region = initHRegion(tableName, method, CONF, family);
876    final WALFactory wals = new WALFactory(CONF, method);
877    try {
878      Path regiondir = region.getRegionFileSystem().getRegionDir();
879      FileSystem fs = region.getRegionFileSystem().getFileSystem();
880      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
881      byte[][] columns = region.getTableDescriptor().getColumnFamilyNames().toArray(new byte[0][]);
882
883      assertEquals(0, region.getStoreFileList(columns).size());
884
885      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
886
887      long maxSeqId = 1050;
888      long minSeqId = 1000;
889
890      for (long i = minSeqId; i <= maxSeqId; i += 10) {
891        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
892        fs.create(recoveredEdits);
893        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
894
895        long time = System.nanoTime();
896        WALEdit edit = null;
897        if (i == maxSeqId) {
898          edit = WALEdit.createCompaction(region.getRegionInfo(),
899          CompactionDescriptor.newBuilder()
900          .setTableName(ByteString.copyFrom(tableName.getName()))
901          .setFamilyName(ByteString.copyFrom(regionName))
902          .setEncodedRegionName(ByteString.copyFrom(regionName))
903          .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
904          .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
905          .build());
906        } else {
907          edit = new WALEdit();
908          edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
909            .toBytes(i)));
910        }
911        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
912            HConstants.DEFAULT_CLUSTER_ID), edit));
913        writer.close();
914      }
915
916      long recoverSeqId = 1030;
917      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
918      MonitoredTask status = TaskMonitor.get().createStatus(method);
919      for (HStore store : region.getStores()) {
920        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
921      }
922      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
923      assertEquals(maxSeqId, seqId);
924
925      // assert that the files are flushed
926      assertEquals(1, region.getStoreFileList(columns).size());
927
928    } finally {
929      HBaseTestingUtility.closeRegionAndWAL(this.region);
930      this.region = null;
931      wals.close();
932    }
933  }
934
935  @Test
936  public void testRecoveredEditsReplayCompaction() throws Exception {
937    testRecoveredEditsReplayCompaction(false);
938    testRecoveredEditsReplayCompaction(true);
939  }
940
941  public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception {
942    CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class);
943    byte[] family = Bytes.toBytes("family");
944    this.region = initHRegion(tableName, method, CONF, family);
945    final WALFactory wals = new WALFactory(CONF, method);
946    try {
947      Path regiondir = region.getRegionFileSystem().getRegionDir();
948      FileSystem fs = region.getRegionFileSystem().getFileSystem();
949      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
950
951      long maxSeqId = 3;
952      long minSeqId = 0;
953
954      for (long i = minSeqId; i < maxSeqId; i++) {
955        Put put = new Put(Bytes.toBytes(i));
956        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
957        region.put(put);
958        region.flush(true);
959      }
960
961      // this will create a region with 3 files
962      assertEquals(3, region.getStore(family).getStorefilesCount());
963      List<Path> storeFiles = new ArrayList<>(3);
964      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
965        storeFiles.add(sf.getPath());
966      }
967
968      // disable compaction completion
969      CONF.setBoolean("hbase.hstore.compaction.complete", false);
970      region.compactStores();
971
972      // ensure that nothing changed
973      assertEquals(3, region.getStore(family).getStorefilesCount());
974
975      // now find the compacted file, and manually add it to the recovered edits
976      Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family));
977      FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
978      String errorMsg = "Expected to find 1 file in the region temp directory "
979          + "from the compaction, could not find any";
980      assertNotNull(errorMsg, files);
981      assertEquals(errorMsg, 1, files.length);
982      // move the file inside region dir
983      Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family),
984          files[0].getPath());
985
986      byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes();
987      byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length];
988      for (int i=0; i < encodedNameAsBytes.length; i++) {
989        // Mix the byte array to have a new encodedName
990        fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1);
991      }
992
993      CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region
994        .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family,
995            storeFiles, Lists.newArrayList(newFile),
996            region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
997
998      WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
999          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
1000
1001      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
1002
1003      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1004      fs.create(recoveredEdits);
1005      WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1006
1007      long time = System.nanoTime();
1008
1009      writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, 10, time,
1010          HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
1011          compactionDescriptor)));
1012      writer.close();
1013
1014      // close the region now, and reopen again
1015      region.getTableDescriptor();
1016      region.getRegionInfo();
1017      HBaseTestingUtility.closeRegionAndWAL(this.region);
1018      try {
1019        region = HRegion.openHRegion(region, null);
1020      } catch (WrongRegionException wre) {
1021        fail("Matching encoded region name should not have produced WrongRegionException");
1022      }
1023
1024      // now check whether we have only one store file, the compacted one
1025      Collection<HStoreFile> sfs = region.getStore(family).getStorefiles();
1026      for (HStoreFile sf : sfs) {
1027        LOG.info(Objects.toString(sf.getPath()));
1028      }
1029      if (!mismatchedRegionName) {
1030        assertEquals(1, region.getStore(family).getStorefilesCount());
1031      }
1032      files = FSUtils.listStatus(fs, tmpDir);
1033      assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0);
1034
1035      for (long i = minSeqId; i < maxSeqId; i++) {
1036        Get get = new Get(Bytes.toBytes(i));
1037        Result result = region.get(get);
1038        byte[] value = result.getValue(family, Bytes.toBytes(i));
1039        assertArrayEquals(Bytes.toBytes(i), value);
1040      }
1041    } finally {
1042      HBaseTestingUtility.closeRegionAndWAL(this.region);
1043      this.region = null;
1044      wals.close();
1045      CONF.setClass(HConstants.REGION_IMPL, HRegion.class, Region.class);
1046    }
1047  }
1048
1049  @Test
1050  public void testFlushMarkers() throws Exception {
1051    // tests that flush markers are written to WAL and handled at recovered edits
1052    byte[] family = Bytes.toBytes("family");
1053    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
1054    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1055    FSUtils.setRootDir(walConf, logDir);
1056    final WALFactory wals = new WALFactory(walConf, method);
1057    final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build());
1058
1059    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1060      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1061    try {
1062      Path regiondir = region.getRegionFileSystem().getRegionDir();
1063      FileSystem fs = region.getRegionFileSystem().getFileSystem();
1064      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
1065
1066      long maxSeqId = 3;
1067      long minSeqId = 0;
1068
1069      for (long i = minSeqId; i < maxSeqId; i++) {
1070        Put put = new Put(Bytes.toBytes(i));
1071        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1072        region.put(put);
1073        region.flush(true);
1074      }
1075
1076      // this will create a region with 3 files from flush
1077      assertEquals(3, region.getStore(family).getStorefilesCount());
1078      List<String> storeFiles = new ArrayList<>(3);
1079      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
1080        storeFiles.add(sf.getPath().getName());
1081      }
1082
1083      // now verify that the flush markers are written
1084      wal.shutdown();
1085      WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal),
1086        TEST_UTIL.getConfiguration());
1087      try {
1088        List<WAL.Entry> flushDescriptors = new ArrayList<>();
1089        long lastFlushSeqId = -1;
1090        while (true) {
1091          WAL.Entry entry = reader.next();
1092          if (entry == null) {
1093            break;
1094          }
1095          Cell cell = entry.getEdit().getCells().get(0);
1096          if (WALEdit.isMetaEditFamily(cell)) {
1097            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
1098            assertNotNull(flushDesc);
1099            assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray());
1100            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1101              assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
1102            } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1103              assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
1104            }
1105            lastFlushSeqId = flushDesc.getFlushSequenceNumber();
1106            assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray());
1107            assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
1108            StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
1109            assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray());
1110            assertEquals("family", storeFlushDesc.getStoreHomeDir());
1111            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1112              assertEquals(0, storeFlushDesc.getFlushOutputCount());
1113            } else {
1114              assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush
1115              assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
1116            }
1117
1118            flushDescriptors.add(entry);
1119          }
1120        }
1121
1122        assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush
1123
1124        // now write those markers to the recovered edits again.
1125
1126        Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
1127
1128        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1129        fs.create(recoveredEdits);
1130        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1131
1132        for (WAL.Entry entry : flushDescriptors) {
1133          writer.append(entry);
1134        }
1135        writer.close();
1136      } finally {
1137        if (null != reader) {
1138          try {
1139            reader.close();
1140          } catch (IOException exception) {
1141            LOG.warn("Problem closing wal: " + exception.getMessage());
1142            LOG.debug("exception details", exception);
1143          }
1144        }
1145      }
1146
1147      // close the region now, and reopen again
1148      HBaseTestingUtility.closeRegionAndWAL(this.region);
1149      region = HRegion.openHRegion(region, null);
1150
1151      // now check whether we have can read back the data from region
1152      for (long i = minSeqId; i < maxSeqId; i++) {
1153        Get get = new Get(Bytes.toBytes(i));
1154        Result result = region.get(get);
1155        byte[] value = result.getValue(family, Bytes.toBytes(i));
1156        assertArrayEquals(Bytes.toBytes(i), value);
1157      }
1158    } finally {
1159      HBaseTestingUtility.closeRegionAndWAL(this.region);
1160      this.region = null;
1161      wals.close();
1162    }
1163  }
1164
1165  static class IsFlushWALMarker implements ArgumentMatcher<WALEdit> {
1166    volatile FlushAction[] actions;
1167    public IsFlushWALMarker(FlushAction... actions) {
1168      this.actions = actions;
1169    }
1170    @Override
1171    public boolean matches(WALEdit edit) {
1172      List<Cell> cells = edit.getCells();
1173      if (cells.isEmpty()) {
1174        return false;
1175      }
1176      if (WALEdit.isMetaEditFamily(cells.get(0))) {
1177        FlushDescriptor desc;
1178        try {
1179          desc = WALEdit.getFlushDescriptor(cells.get(0));
1180        } catch (IOException e) {
1181          LOG.warn(e.toString(), e);
1182          return false;
1183        }
1184        if (desc != null) {
1185          for (FlushAction action : actions) {
1186            if (desc.getAction() == action) {
1187              return true;
1188            }
1189          }
1190        }
1191      }
1192      return false;
1193    }
1194    public IsFlushWALMarker set(FlushAction... actions) {
1195      this.actions = actions;
1196      return this;
1197    }
1198  }
1199
1200  @Test
1201  public void testFlushMarkersWALFail() throws Exception {
1202    // test the cases where the WAL append for flush markers fail.
1203    byte[] family = Bytes.toBytes("family");
1204
1205    // spy an actual WAL implementation to throw exception (was not able to mock)
1206    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log");
1207
1208    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1209    FSUtils.setRootDir(walConf, logDir);
1210    // Make up a WAL that we can manipulate at append time.
1211    class FailAppendFlushMarkerWAL extends FSHLog {
1212      volatile FlushAction [] flushActions = null;
1213
1214      public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf)
1215      throws IOException {
1216        super(fs, root, logDir, conf);
1217      }
1218
1219      @Override
1220      protected Writer createWriterInstance(Path path) throws IOException {
1221        final Writer w = super.createWriterInstance(path);
1222        return new Writer() {
1223          @Override
1224          public void close() throws IOException {
1225            w.close();
1226          }
1227
1228          @Override
1229          public void sync(boolean forceSync) throws IOException {
1230            w.sync(forceSync);
1231          }
1232
1233          @Override
1234          public void append(Entry entry) throws IOException {
1235            List<Cell> cells = entry.getEdit().getCells();
1236            if (WALEdit.isMetaEditFamily(cells.get(0))) {
1237              FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0));
1238              if (desc != null) {
1239                for (FlushAction flushAction: flushActions) {
1240                  if (desc.getAction().equals(flushAction)) {
1241                    throw new IOException("Failed to append flush marker! " + flushAction);
1242                  }
1243                }
1244              }
1245            }
1246            w.append(entry);
1247          }
1248
1249          @Override
1250          public long getLength() {
1251            return w.getLength();
1252          }
1253        };
1254      }
1255    }
1256    FailAppendFlushMarkerWAL wal =
1257      new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1258        method, walConf);
1259    wal.init();
1260    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1261      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1262    int i = 0;
1263    Put put = new Put(Bytes.toBytes(i));
1264    put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
1265    put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1266    region.put(put);
1267
1268    // 1. Test case where START_FLUSH throws exception
1269    wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH};
1270
1271    // start cache flush will throw exception
1272    try {
1273      region.flush(true);
1274      fail("This should have thrown exception");
1275    } catch (DroppedSnapshotException unexpected) {
1276      // this should not be a dropped snapshot exception. Meaning that RS will not abort
1277      throw unexpected;
1278    } catch (IOException expected) {
1279      // expected
1280    }
1281    // The WAL is hosed now. It has two edits appended. We cannot roll the log without it
1282    // throwing a DroppedSnapshotException to force an abort. Just clean up the mess.
1283    region.close(true);
1284    wal.close();
1285
1286    // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
1287    wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH};
1288    wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1289          method, walConf);
1290    wal.init();
1291    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1292      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1293    region.put(put);
1294    // 3. Test case where ABORT_FLUSH will throw exception.
1295    // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
1296    // DroppedSnapshotException. Below COMMIT_FLUSH will cause flush to abort
1297    wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH};
1298
1299    try {
1300      region.flush(true);
1301      fail("This should have thrown exception");
1302    } catch (DroppedSnapshotException expected) {
1303      // we expect this exception, since we were able to write the snapshot, but failed to
1304      // write the flush marker to WAL
1305    } catch (IOException unexpected) {
1306      throw unexpected;
1307    }
1308  }
1309
1310  @Test
1311  public void testGetWhileRegionClose() throws IOException {
1312    Configuration hc = initSplit();
1313    int numRows = 100;
1314    byte[][] families = { fam1, fam2, fam3 };
1315
1316    // Setting up region
1317    this.region = initHRegion(tableName, method, hc, families);
1318    // Put data in region
1319    final int startRow = 100;
1320    putData(startRow, numRows, qual1, families);
1321    putData(startRow, numRows, qual2, families);
1322    putData(startRow, numRows, qual3, families);
1323    final AtomicBoolean done = new AtomicBoolean(false);
1324    final AtomicInteger gets = new AtomicInteger(0);
1325    GetTillDoneOrException[] threads = new GetTillDoneOrException[10];
1326    try {
1327      // Set ten threads running concurrently getting from the region.
1328      for (int i = 0; i < threads.length / 2; i++) {
1329        threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1330        threads[i].setDaemon(true);
1331        threads[i].start();
1332      }
1333      // Artificially make the condition by setting closing flag explicitly.
1334      // I can't make the issue happen with a call to region.close().
1335      this.region.closing.set(true);
1336      for (int i = threads.length / 2; i < threads.length; i++) {
1337        threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1338        threads[i].setDaemon(true);
1339        threads[i].start();
1340      }
1341    } finally {
1342      if (this.region != null) {
1343        HBaseTestingUtility.closeRegionAndWAL(this.region);
1344        this.region = null;
1345      }
1346    }
1347    done.set(true);
1348    for (GetTillDoneOrException t : threads) {
1349      try {
1350        t.join();
1351      } catch (InterruptedException e) {
1352        e.printStackTrace();
1353      }
1354      if (t.e != null) {
1355        LOG.info("Exception=" + t.e);
1356        assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException);
1357      }
1358    }
1359  }
1360
1361  /*
1362   * Thread that does get on single row until 'done' flag is flipped. If an
1363   * exception causes us to fail, it records it.
1364   */
1365  class GetTillDoneOrException extends Thread {
1366    private final Get g;
1367    private final AtomicBoolean done;
1368    private final AtomicInteger count;
1369    private Exception e;
1370
1371    GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d,
1372        final AtomicInteger c) {
1373      super("getter." + i);
1374      this.g = new Get(r);
1375      this.done = d;
1376      this.count = c;
1377    }
1378
1379    @Override
1380    public void run() {
1381      while (!this.done.get()) {
1382        try {
1383          assertTrue(region.get(g).size() > 0);
1384          this.count.incrementAndGet();
1385        } catch (Exception e) {
1386          this.e = e;
1387          break;
1388        }
1389      }
1390    }
1391  }
1392
1393  /*
1394   * An involved filter test. Has multiple column families and deletes in mix.
1395   */
1396  @Test
1397  public void testWeirdCacheBehaviour() throws Exception {
1398    final TableName tableName = TableName.valueOf(name.getMethodName());
1399    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"),
1400        Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
1401    this.region = initHRegion(tableName, method, CONF, FAMILIES);
1402    String value = "this is the value";
1403    String value2 = "this is some other value";
1404    String keyPrefix1 = "prefix1";
1405    String keyPrefix2 = "prefix2";
1406    String keyPrefix3 = "prefix3";
1407    putRows(this.region, 3, value, keyPrefix1);
1408    putRows(this.region, 3, value, keyPrefix2);
1409    putRows(this.region, 3, value, keyPrefix3);
1410    putRows(this.region, 3, value2, keyPrefix1);
1411    putRows(this.region, 3, value2, keyPrefix2);
1412    putRows(this.region, 3, value2, keyPrefix3);
1413    System.out.println("Checking values for key: " + keyPrefix1);
1414    assertEquals("Got back incorrect number of rows from scan", 3,
1415        getNumberOfRows(keyPrefix1, value2, this.region));
1416    System.out.println("Checking values for key: " + keyPrefix2);
1417    assertEquals("Got back incorrect number of rows from scan", 3,
1418        getNumberOfRows(keyPrefix2, value2, this.region));
1419    System.out.println("Checking values for key: " + keyPrefix3);
1420    assertEquals("Got back incorrect number of rows from scan", 3,
1421        getNumberOfRows(keyPrefix3, value2, this.region));
1422    deleteColumns(this.region, value2, keyPrefix1);
1423    deleteColumns(this.region, value2, keyPrefix2);
1424    deleteColumns(this.region, value2, keyPrefix3);
1425    System.out.println("Starting important checks.....");
1426    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0,
1427        getNumberOfRows(keyPrefix1, value2, this.region));
1428    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0,
1429        getNumberOfRows(keyPrefix2, value2, this.region));
1430    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0,
1431        getNumberOfRows(keyPrefix3, value2, this.region));
1432  }
1433
1434  @Test
1435  public void testAppendWithReadOnlyTable() throws Exception {
1436    final TableName tableName = TableName.valueOf(name.getMethodName());
1437    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1438    boolean exceptionCaught = false;
1439    Append append = new Append(Bytes.toBytes("somerow"));
1440    append.setDurability(Durability.SKIP_WAL);
1441    append.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"),
1442        Bytes.toBytes("somevalue"));
1443    try {
1444      region.append(append);
1445    } catch (IOException e) {
1446      exceptionCaught = true;
1447    }
1448    assertTrue(exceptionCaught == true);
1449  }
1450
1451  @Test
1452  public void testIncrWithReadOnlyTable() throws Exception {
1453    final TableName tableName = TableName.valueOf(name.getMethodName());
1454    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1455    boolean exceptionCaught = false;
1456    Increment inc = new Increment(Bytes.toBytes("somerow"));
1457    inc.setDurability(Durability.SKIP_WAL);
1458    inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L);
1459    try {
1460      region.increment(inc);
1461    } catch (IOException e) {
1462      exceptionCaught = true;
1463    }
1464    assertTrue(exceptionCaught == true);
1465  }
1466
1467  private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException {
1468    InternalScanner scanner = buildScanner(keyPrefix, value, r);
1469    int count = 0;
1470    boolean more = false;
1471    List<Cell> results = new ArrayList<>();
1472    do {
1473      more = scanner.next(results);
1474      if (results != null && !results.isEmpty())
1475        count++;
1476      else
1477        break;
1478      Delete delete = new Delete(CellUtil.cloneRow(results.get(0)));
1479      delete.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"));
1480      r.delete(delete);
1481      results.clear();
1482    } while (more);
1483    assertEquals("Did not perform correct number of deletes", 3, count);
1484  }
1485
1486  private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception {
1487    InternalScanner resultScanner = buildScanner(keyPrefix, value, r);
1488    int numberOfResults = 0;
1489    List<Cell> results = new ArrayList<>();
1490    boolean more = false;
1491    do {
1492      more = resultScanner.next(results);
1493      if (results != null && !results.isEmpty())
1494        numberOfResults++;
1495      else
1496        break;
1497      for (Cell kv : results) {
1498        System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv)));
1499      }
1500      results.clear();
1501    } while (more);
1502    return numberOfResults;
1503  }
1504
1505  private InternalScanner buildScanner(String keyPrefix, String value, HRegion r)
1506      throws IOException {
1507    // Defaults FilterList.Operator.MUST_PASS_ALL.
1508    FilterList allFilters = new FilterList();
1509    allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
1510    // Only return rows where this column value exists in the row.
1511    SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"),
1512        Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value));
1513    filter.setFilterIfMissing(true);
1514    allFilters.addFilter(filter);
1515    Scan scan = new Scan();
1516    scan.addFamily(Bytes.toBytes("trans-blob"));
1517    scan.addFamily(Bytes.toBytes("trans-type"));
1518    scan.addFamily(Bytes.toBytes("trans-date"));
1519    scan.addFamily(Bytes.toBytes("trans-tags"));
1520    scan.addFamily(Bytes.toBytes("trans-group"));
1521    scan.setFilter(allFilters);
1522    return r.getScanner(scan);
1523  }
1524
1525  private void putRows(HRegion r, int numRows, String value, String key) throws IOException {
1526    for (int i = 0; i < numRows; i++) {
1527      String row = key + "_" + i/* UUID.randomUUID().toString() */;
1528      System.out.println(String.format("Saving row: %s, with value %s", row, value));
1529      Put put = new Put(Bytes.toBytes(row));
1530      put.setDurability(Durability.SKIP_WAL);
1531      put.addColumn(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob"));
1532      put.addColumn(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement"));
1533      put.addColumn(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999"));
1534      put.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value));
1535      put.addColumn(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId"));
1536      r.put(put);
1537    }
1538  }
1539
1540  @Test
1541  public void testFamilyWithAndWithoutColon() throws Exception {
1542    byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1543    this.region = initHRegion(tableName, method, CONF, cf);
1544    Put p = new Put(tableName.toBytes());
1545    byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":");
1546    p.addColumn(cfwithcolon, cfwithcolon, cfwithcolon);
1547    boolean exception = false;
1548    try {
1549      this.region.put(p);
1550    } catch (NoSuchColumnFamilyException e) {
1551      exception = true;
1552    }
1553    assertTrue(exception);
1554  }
1555
1556  @Test
1557  public void testBatchPut_whileNoRowLocksHeld() throws IOException {
1558    final Put[] puts = new Put[10];
1559    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1560    long syncs = prepareRegionForBachPut(puts, source, false);
1561
1562    OperationStatus[] codes = this.region.batchMutate(puts);
1563    assertEquals(10, codes.length);
1564    for (int i = 0; i < 10; i++) {
1565      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1566    }
1567    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1568
1569    LOG.info("Next a batch put with one invalid family");
1570    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1571    codes = this.region.batchMutate(puts);
1572    assertEquals(10, codes.length);
1573    for (int i = 0; i < 10; i++) {
1574      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1575          codes[i].getOperationStatusCode());
1576    }
1577
1578    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
1579  }
1580
1581  @Test
1582  public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
1583    final Put[] puts = new Put[10];
1584    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1585    long syncs = prepareRegionForBachPut(puts, source, false);
1586
1587    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1588
1589    LOG.info("batchPut will have to break into four batches to avoid row locks");
1590    RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
1591    RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_1"));
1592    RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
1593    RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
1594
1595    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1596    final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>();
1597    final CountDownLatch startingPuts = new CountDownLatch(1);
1598    final CountDownLatch startingClose = new CountDownLatch(1);
1599    TestThread putter = new TestThread(ctx) {
1600      @Override
1601      public void doWork() throws IOException {
1602        startingPuts.countDown();
1603        retFromThread.set(region.batchMutate(puts));
1604      }
1605    };
1606    LOG.info("...starting put thread while holding locks");
1607    ctx.addThread(putter);
1608    ctx.startThreads();
1609
1610    // Now attempt to close the region from another thread.  Prior to HBASE-12565
1611    // this would cause the in-progress batchMutate operation to to fail with
1612    // exception because it use to release and re-acquire the close-guard lock
1613    // between batches.  Caller then didn't get status indicating which writes succeeded.
1614    // We now expect this thread to block until the batchMutate call finishes.
1615    Thread regionCloseThread = new TestThread(ctx) {
1616      @Override
1617      public void doWork() {
1618        try {
1619          startingPuts.await();
1620          // Give some time for the batch mutate to get in.
1621          // We don't want to race with the mutate
1622          Thread.sleep(10);
1623          startingClose.countDown();
1624          HBaseTestingUtility.closeRegionAndWAL(region);
1625          region = null;
1626        } catch (IOException e) {
1627          throw new RuntimeException(e);
1628        } catch (InterruptedException e) {
1629          throw new RuntimeException(e);
1630        }
1631      }
1632    };
1633    regionCloseThread.start();
1634
1635    startingClose.await();
1636    startingPuts.await();
1637    Thread.sleep(100);
1638    LOG.info("...releasing row lock 1, which should let put thread continue");
1639    rowLock1.release();
1640    rowLock2.release();
1641    rowLock3.release();
1642    waitForCounter(source, "syncTimeNumOps", syncs + 1);
1643
1644    LOG.info("...joining on put thread");
1645    ctx.stop();
1646    regionCloseThread.join();
1647
1648    OperationStatus[] codes = retFromThread.get();
1649    for (int i = 0; i < codes.length; i++) {
1650      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1651          codes[i].getOperationStatusCode());
1652    }
1653    rowLock4.release();
1654  }
1655
1656  private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount)
1657      throws InterruptedException {
1658    long startWait = System.currentTimeMillis();
1659    long currentCount;
1660    while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) {
1661      Thread.sleep(100);
1662      if (System.currentTimeMillis() - startWait > 10000) {
1663        fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
1664            expectedCount, currentCount));
1665      }
1666    }
1667  }
1668
1669  @Test
1670  public void testAtomicBatchPut() throws IOException {
1671    final Put[] puts = new Put[10];
1672    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1673    long syncs = prepareRegionForBachPut(puts, source, false);
1674
1675    // 1. Straight forward case, should succeed
1676    MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true,
1677        HConstants.NO_NONCE, HConstants.NO_NONCE);
1678    OperationStatus[] codes = this.region.batchMutate(batchOp);
1679    assertEquals(10, codes.length);
1680    for (int i = 0; i < 10; i++) {
1681      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1682    }
1683    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1684
1685    // 2. Failed to get lock
1686    RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3));
1687    // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this
1688    // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread
1689    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1690    final AtomicReference<IOException> retFromThread = new AtomicReference<>();
1691    final CountDownLatch finishedPuts = new CountDownLatch(1);
1692    final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true,
1693        HConstants
1694        .NO_NONCE,
1695        HConstants.NO_NONCE);
1696    TestThread putter = new TestThread(ctx) {
1697      @Override
1698      public void doWork() throws IOException {
1699        try {
1700          region.batchMutate(finalBatchOp);
1701        } catch (IOException ioe) {
1702          LOG.error("test failed!", ioe);
1703          retFromThread.set(ioe);
1704        }
1705        finishedPuts.countDown();
1706      }
1707    };
1708    LOG.info("...starting put thread while holding locks");
1709    ctx.addThread(putter);
1710    ctx.startThreads();
1711    LOG.info("...waiting for batch puts while holding locks");
1712    try {
1713      finishedPuts.await();
1714    } catch (InterruptedException e) {
1715      LOG.error("Interrupted!", e);
1716    } finally {
1717      if (lock != null) {
1718        lock.release();
1719      }
1720    }
1721    assertNotNull(retFromThread.get());
1722    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1723
1724    // 3. Exception thrown in validation
1725    LOG.info("Next a batch put with one invalid family");
1726    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1727    batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE,
1728        HConstants.NO_NONCE);
1729    thrown.expect(NoSuchColumnFamilyException.class);
1730    this.region.batchMutate(batchOp);
1731  }
1732
1733  @Test
1734  public void testBatchPutWithTsSlop() throws Exception {
1735    // add data with a timestamp that is too recent for range. Ensure assert
1736    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
1737    final Put[] puts = new Put[10];
1738    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1739
1740    long syncs = prepareRegionForBachPut(puts, source, true);
1741
1742    OperationStatus[] codes = this.region.batchMutate(puts);
1743    assertEquals(10, codes.length);
1744    for (int i = 0; i < 10; i++) {
1745      assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
1746    }
1747    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1748  }
1749
1750  /**
1751   * @return syncs initial syncTimeNumOps
1752   */
1753  private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source,
1754      boolean slop) throws IOException {
1755    this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
1756
1757    LOG.info("First a batch put with all valid puts");
1758    for (int i = 0; i < puts.length; i++) {
1759      puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) :
1760          new Put(Bytes.toBytes("row_" + i));
1761      puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value);
1762    }
1763
1764    long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1765    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1766    return syncs;
1767  }
1768
1769  // ////////////////////////////////////////////////////////////////////////////
1770  // checkAndMutate tests
1771  // ////////////////////////////////////////////////////////////////////////////
1772  @Test
1773  public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
1774    byte[] row1 = Bytes.toBytes("row1");
1775    byte[] fam1 = Bytes.toBytes("fam1");
1776    byte[] qf1 = Bytes.toBytes("qualifier");
1777    byte[] emptyVal = new byte[] {};
1778    byte[] val1 = Bytes.toBytes("value1");
1779    byte[] val2 = Bytes.toBytes("value2");
1780
1781    // Setting up region
1782    this.region = initHRegion(tableName, method, CONF, fam1);
1783    // Putting empty data in key
1784    Put put = new Put(row1);
1785    put.addColumn(fam1, qf1, emptyVal);
1786
1787    // checkAndPut with empty value
1788    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1789        new BinaryComparator(emptyVal), put);
1790    assertTrue(res);
1791
1792    // Putting data in key
1793    put = new Put(row1);
1794    put.addColumn(fam1, qf1, val1);
1795
1796    // checkAndPut with correct value
1797    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1798        new BinaryComparator(emptyVal), put);
1799    assertTrue(res);
1800
1801    // not empty anymore
1802    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1803        new BinaryComparator(emptyVal), put);
1804    assertFalse(res);
1805
1806    Delete delete = new Delete(row1);
1807    delete.addColumn(fam1, qf1);
1808    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1809        new BinaryComparator(emptyVal), delete);
1810    assertFalse(res);
1811
1812    put = new Put(row1);
1813    put.addColumn(fam1, qf1, val2);
1814    // checkAndPut with correct value
1815    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1816        new BinaryComparator(val1), put);
1817    assertTrue(res);
1818
1819    // checkAndDelete with correct value
1820    delete = new Delete(row1);
1821    delete.addColumn(fam1, qf1);
1822    delete.addColumn(fam1, qf1);
1823    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1824        new BinaryComparator(val2), delete);
1825    assertTrue(res);
1826
1827    delete = new Delete(row1);
1828    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1829        new BinaryComparator(emptyVal), delete);
1830    assertTrue(res);
1831
1832    // checkAndPut looking for a null value
1833    put = new Put(row1);
1834    put.addColumn(fam1, qf1, val1);
1835
1836    res = region
1837        .checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put);
1838    assertTrue(res);
1839  }
1840
1841  @Test
1842  public void testCheckAndMutate_WithWrongValue() throws IOException {
1843    byte[] row1 = Bytes.toBytes("row1");
1844    byte[] fam1 = Bytes.toBytes("fam1");
1845    byte[] qf1 = Bytes.toBytes("qualifier");
1846    byte[] val1 = Bytes.toBytes("value1");
1847    byte[] val2 = Bytes.toBytes("value2");
1848    BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE);
1849    BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE);
1850
1851    // Setting up region
1852    this.region = initHRegion(tableName, method, CONF, fam1);
1853    // Putting data in key
1854    Put put = new Put(row1);
1855    put.addColumn(fam1, qf1, val1);
1856    region.put(put);
1857
1858    // checkAndPut with wrong value
1859    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1860        new BinaryComparator(val2), put);
1861    assertEquals(false, res);
1862
1863    // checkAndDelete with wrong value
1864    Delete delete = new Delete(row1);
1865    delete.addFamily(fam1);
1866    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1867        new BinaryComparator(val2), put);
1868    assertEquals(false, res);
1869
1870    // Putting data in key
1871    put = new Put(row1);
1872    put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1873    region.put(put);
1874
1875    // checkAndPut with wrong value
1876    res =
1877        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1878            new BigDecimalComparator(bd2), put);
1879    assertEquals(false, res);
1880
1881    // checkAndDelete with wrong value
1882    delete = new Delete(row1);
1883    delete.addFamily(fam1);
1884    res =
1885        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1886            new BigDecimalComparator(bd2), put);
1887    assertEquals(false, res);
1888  }
1889
1890  @Test
1891  public void testCheckAndMutate_WithCorrectValue() throws IOException {
1892    byte[] row1 = Bytes.toBytes("row1");
1893    byte[] fam1 = Bytes.toBytes("fam1");
1894    byte[] qf1 = Bytes.toBytes("qualifier");
1895    byte[] val1 = Bytes.toBytes("value1");
1896    BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE);
1897
1898    // Setting up region
1899    this.region = initHRegion(tableName, method, CONF, fam1);
1900    // Putting data in key
1901    Put put = new Put(row1);
1902    put.addColumn(fam1, qf1, val1);
1903    region.put(put);
1904
1905    // checkAndPut with correct value
1906    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1907        new BinaryComparator(val1), put);
1908    assertEquals(true, res);
1909
1910    // checkAndDelete with correct value
1911    Delete delete = new Delete(row1);
1912    delete.addColumn(fam1, qf1);
1913    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
1914        delete);
1915    assertEquals(true, res);
1916
1917    // Putting data in key
1918    put = new Put(row1);
1919    put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1920    region.put(put);
1921
1922    // checkAndPut with correct value
1923    res =
1924        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1925            bd1), put);
1926    assertEquals(true, res);
1927
1928    // checkAndDelete with correct value
1929    delete = new Delete(row1);
1930    delete.addColumn(fam1, qf1);
1931    res =
1932        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1933            bd1), delete);
1934    assertEquals(true, res);
1935  }
1936
1937  @Test
1938  public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
1939    byte[] row1 = Bytes.toBytes("row1");
1940    byte[] fam1 = Bytes.toBytes("fam1");
1941    byte[] qf1 = Bytes.toBytes("qualifier");
1942    byte[] val1 = Bytes.toBytes("value1");
1943    byte[] val2 = Bytes.toBytes("value2");
1944    byte[] val3 = Bytes.toBytes("value3");
1945    byte[] val4 = Bytes.toBytes("value4");
1946
1947    // Setting up region
1948    this.region = initHRegion(tableName, method, CONF, fam1);
1949    // Putting val3 in key
1950    Put put = new Put(row1);
1951    put.addColumn(fam1, qf1, val3);
1952    region.put(put);
1953
1954    // Test CompareOp.LESS: original = val3, compare with val3, fail
1955    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1956        new BinaryComparator(val3), put);
1957    assertEquals(false, res);
1958
1959    // Test CompareOp.LESS: original = val3, compare with val4, fail
1960    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1961        new BinaryComparator(val4), put);
1962    assertEquals(false, res);
1963
1964    // Test CompareOp.LESS: original = val3, compare with val2,
1965    // succeed (now value = val2)
1966    put = new Put(row1);
1967    put.addColumn(fam1, qf1, val2);
1968    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1969        new BinaryComparator(val2), put);
1970    assertEquals(true, res);
1971
1972    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
1973    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1974        new BinaryComparator(val3), put);
1975    assertEquals(false, res);
1976
1977    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
1978    // succeed (value still = val2)
1979    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1980        new BinaryComparator(val2), put);
1981    assertEquals(true, res);
1982
1983    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
1984    // succeed (now value = val3)
1985    put = new Put(row1);
1986    put.addColumn(fam1, qf1, val3);
1987    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1988        new BinaryComparator(val1), put);
1989    assertEquals(true, res);
1990
1991    // Test CompareOp.GREATER: original = val3, compare with val3, fail
1992    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
1993        new BinaryComparator(val3), put);
1994    assertEquals(false, res);
1995
1996    // Test CompareOp.GREATER: original = val3, compare with val2, fail
1997    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
1998        new BinaryComparator(val2), put);
1999    assertEquals(false, res);
2000
2001    // Test CompareOp.GREATER: original = val3, compare with val4,
2002    // succeed (now value = val2)
2003    put = new Put(row1);
2004    put.addColumn(fam1, qf1, val2);
2005    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
2006        new BinaryComparator(val4), put);
2007    assertEquals(true, res);
2008
2009    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
2010    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2011        new BinaryComparator(val1), put);
2012    assertEquals(false, res);
2013
2014    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
2015    // succeed (value still = val2)
2016    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2017        new BinaryComparator(val2), put);
2018    assertEquals(true, res);
2019
2020    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
2021    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2022        new BinaryComparator(val3), put);
2023    assertEquals(true, res);
2024  }
2025
2026  @Test
2027  public void testCheckAndPut_ThatPutWasWritten() throws IOException {
2028    byte[] row1 = Bytes.toBytes("row1");
2029    byte[] fam1 = Bytes.toBytes("fam1");
2030    byte[] fam2 = Bytes.toBytes("fam2");
2031    byte[] qf1 = Bytes.toBytes("qualifier");
2032    byte[] val1 = Bytes.toBytes("value1");
2033    byte[] val2 = Bytes.toBytes("value2");
2034
2035    byte[][] families = { fam1, fam2 };
2036
2037    // Setting up region
2038    this.region = initHRegion(tableName, method, CONF, families);
2039    // Putting data in the key to check
2040    Put put = new Put(row1);
2041    put.addColumn(fam1, qf1, val1);
2042    region.put(put);
2043
2044    // Creating put to add
2045    long ts = System.currentTimeMillis();
2046    KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
2047    put = new Put(row1);
2048    put.add(kv);
2049
2050    // checkAndPut with wrong value
2051    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
2052        new BinaryComparator(val1), put);
2053    assertEquals(true, res);
2054
2055    Get get = new Get(row1);
2056    get.addColumn(fam2, qf1);
2057    Cell[] actual = region.get(get).rawCells();
2058
2059    Cell[] expected = { kv };
2060
2061    assertEquals(expected.length, actual.length);
2062    for (int i = 0; i < actual.length; i++) {
2063      assertEquals(expected[i], actual[i]);
2064    }
2065  }
2066
2067  @Test
2068  public void testCheckAndPut_wrongRowInPut() throws IOException {
2069    this.region = initHRegion(tableName, method, CONF, COLUMNS);
2070    Put put = new Put(row2);
2071    put.addColumn(fam1, qual1, value1);
2072    try {
2073      region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL,
2074          new BinaryComparator(value2), put);
2075      fail();
2076    } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
2077      // expected exception.
2078    }
2079  }
2080
2081  @Test
2082  public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
2083    byte[] row1 = Bytes.toBytes("row1");
2084    byte[] fam1 = Bytes.toBytes("fam1");
2085    byte[] fam2 = Bytes.toBytes("fam2");
2086    byte[] qf1 = Bytes.toBytes("qualifier1");
2087    byte[] qf2 = Bytes.toBytes("qualifier2");
2088    byte[] qf3 = Bytes.toBytes("qualifier3");
2089    byte[] val1 = Bytes.toBytes("value1");
2090    byte[] val2 = Bytes.toBytes("value2");
2091    byte[] val3 = Bytes.toBytes("value3");
2092    byte[] emptyVal = new byte[] {};
2093
2094    byte[][] families = { fam1, fam2 };
2095
2096    // Setting up region
2097    this.region = initHRegion(tableName, method, CONF, families);
2098    // Put content
2099    Put put = new Put(row1);
2100    put.addColumn(fam1, qf1, val1);
2101    region.put(put);
2102    Threads.sleep(2);
2103
2104    put = new Put(row1);
2105    put.addColumn(fam1, qf1, val2);
2106    put.addColumn(fam2, qf1, val3);
2107    put.addColumn(fam2, qf2, val2);
2108    put.addColumn(fam2, qf3, val1);
2109    put.addColumn(fam1, qf3, val1);
2110    region.put(put);
2111
2112    // Multi-column delete
2113    Delete delete = new Delete(row1);
2114    delete.addColumn(fam1, qf1);
2115    delete.addColumn(fam2, qf1);
2116    delete.addColumn(fam1, qf3);
2117    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
2118        new BinaryComparator(val2), delete);
2119    assertEquals(true, res);
2120
2121    Get get = new Get(row1);
2122    get.addColumn(fam1, qf1);
2123    get.addColumn(fam1, qf3);
2124    get.addColumn(fam2, qf2);
2125    Result r = region.get(get);
2126    assertEquals(2, r.size());
2127    assertArrayEquals(val1, r.getValue(fam1, qf1));
2128    assertArrayEquals(val2, r.getValue(fam2, qf2));
2129
2130    // Family delete
2131    delete = new Delete(row1);
2132    delete.addFamily(fam2);
2133    res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL,
2134        new BinaryComparator(emptyVal), delete);
2135    assertEquals(true, res);
2136
2137    get = new Get(row1);
2138    r = region.get(get);
2139    assertEquals(1, r.size());
2140    assertArrayEquals(val1, r.getValue(fam1, qf1));
2141
2142    // Row delete
2143    delete = new Delete(row1);
2144    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
2145        delete);
2146    assertEquals(true, res);
2147    get = new Get(row1);
2148    r = region.get(get);
2149    assertEquals(0, r.size());
2150  }
2151
2152  // ////////////////////////////////////////////////////////////////////////////
2153  // Delete tests
2154  // ////////////////////////////////////////////////////////////////////////////
2155  @Test
2156  public void testDelete_multiDeleteColumn() throws IOException {
2157    byte[] row1 = Bytes.toBytes("row1");
2158    byte[] fam1 = Bytes.toBytes("fam1");
2159    byte[] qual = Bytes.toBytes("qualifier");
2160    byte[] value = Bytes.toBytes("value");
2161
2162    Put put = new Put(row1);
2163    put.addColumn(fam1, qual, 1, value);
2164    put.addColumn(fam1, qual, 2, value);
2165
2166    this.region = initHRegion(tableName, method, CONF, fam1);
2167    region.put(put);
2168
2169    // We do support deleting more than 1 'latest' version
2170    Delete delete = new Delete(row1);
2171    delete.addColumn(fam1, qual);
2172    delete.addColumn(fam1, qual);
2173    region.delete(delete);
2174
2175    Get get = new Get(row1);
2176    get.addFamily(fam1);
2177    Result r = region.get(get);
2178    assertEquals(0, r.size());
2179  }
2180
2181  @Test
2182  public void testDelete_CheckFamily() throws IOException {
2183    byte[] row1 = Bytes.toBytes("row1");
2184    byte[] fam1 = Bytes.toBytes("fam1");
2185    byte[] fam2 = Bytes.toBytes("fam2");
2186    byte[] fam3 = Bytes.toBytes("fam3");
2187    byte[] fam4 = Bytes.toBytes("fam4");
2188
2189    // Setting up region
2190    this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3);
2191    List<Cell> kvs = new ArrayList<>();
2192    kvs.add(new KeyValue(row1, fam4, null, null));
2193
2194    // testing existing family
2195    byte[] family = fam2;
2196    NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2197    deleteMap.put(family, kvs);
2198    region.delete(deleteMap, Durability.SYNC_WAL);
2199
2200    // testing non existing family
2201    boolean ok = false;
2202    family = fam4;
2203    try {
2204      deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2205      deleteMap.put(family, kvs);
2206      region.delete(deleteMap, Durability.SYNC_WAL);
2207    } catch (Exception e) {
2208      ok = true;
2209    }
2210    assertTrue("Family " + new String(family, StandardCharsets.UTF_8) + " does exist", ok);
2211  }
2212
2213  @Test
2214  public void testDelete_mixed() throws IOException, InterruptedException {
2215    byte[] fam = Bytes.toBytes("info");
2216    byte[][] families = { fam };
2217    this.region = initHRegion(tableName, method, CONF, families);
2218    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2219
2220    byte[] row = Bytes.toBytes("table_name");
2221    // column names
2222    byte[] serverinfo = Bytes.toBytes("serverinfo");
2223    byte[] splitA = Bytes.toBytes("splitA");
2224    byte[] splitB = Bytes.toBytes("splitB");
2225
2226    // add some data:
2227    Put put = new Put(row);
2228    put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2229    region.put(put);
2230
2231    put = new Put(row);
2232    put.addColumn(fam, splitB, Bytes.toBytes("reference_B"));
2233    region.put(put);
2234
2235    put = new Put(row);
2236    put.addColumn(fam, serverinfo, Bytes.toBytes("ip_address"));
2237    region.put(put);
2238
2239    // ok now delete a split:
2240    Delete delete = new Delete(row);
2241    delete.addColumns(fam, splitA);
2242    region.delete(delete);
2243
2244    // assert some things:
2245    Get get = new Get(row).addColumn(fam, serverinfo);
2246    Result result = region.get(get);
2247    assertEquals(1, result.size());
2248
2249    get = new Get(row).addColumn(fam, splitA);
2250    result = region.get(get);
2251    assertEquals(0, result.size());
2252
2253    get = new Get(row).addColumn(fam, splitB);
2254    result = region.get(get);
2255    assertEquals(1, result.size());
2256
2257    // Assert that after a delete, I can put.
2258    put = new Put(row);
2259    put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2260    region.put(put);
2261    get = new Get(row);
2262    result = region.get(get);
2263    assertEquals(3, result.size());
2264
2265    // Now delete all... then test I can add stuff back
2266    delete = new Delete(row);
2267    region.delete(delete);
2268    assertEquals(0, region.get(get).size());
2269
2270    region.put(new Put(row).addColumn(fam, splitA, Bytes.toBytes("reference_A")));
2271    result = region.get(get);
2272    assertEquals(1, result.size());
2273  }
2274
2275  @Test
2276  public void testDeleteRowWithFutureTs() throws IOException {
2277    byte[] fam = Bytes.toBytes("info");
2278    byte[][] families = { fam };
2279    this.region = initHRegion(tableName, method, CONF, families);
2280    byte[] row = Bytes.toBytes("table_name");
2281    // column names
2282    byte[] serverinfo = Bytes.toBytes("serverinfo");
2283
2284    // add data in the far future
2285    Put put = new Put(row);
2286    put.addColumn(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value"));
2287    region.put(put);
2288
2289    // now delete something in the present
2290    Delete delete = new Delete(row);
2291    region.delete(delete);
2292
2293    // make sure we still see our data
2294    Get get = new Get(row).addColumn(fam, serverinfo);
2295    Result result = region.get(get);
2296    assertEquals(1, result.size());
2297
2298    // delete the future row
2299    delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3);
2300    region.delete(delete);
2301
2302    // make sure it is gone
2303    get = new Get(row).addColumn(fam, serverinfo);
2304    result = region.get(get);
2305    assertEquals(0, result.size());
2306  }
2307
2308  /**
2309   * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by
2310   * the actual timestamp
2311   */
2312  @Test
2313  public void testPutWithLatestTS() throws IOException {
2314    byte[] fam = Bytes.toBytes("info");
2315    byte[][] families = { fam };
2316    this.region = initHRegion(tableName, method, CONF, families);
2317    byte[] row = Bytes.toBytes("row1");
2318    // column names
2319    byte[] qual = Bytes.toBytes("qual");
2320
2321    // add data with LATEST_TIMESTAMP, put without WAL
2322    Put put = new Put(row);
2323    put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2324    region.put(put);
2325
2326    // Make sure it shows up with an actual timestamp
2327    Get get = new Get(row).addColumn(fam, qual);
2328    Result result = region.get(get);
2329    assertEquals(1, result.size());
2330    Cell kv = result.rawCells()[0];
2331    LOG.info("Got: " + kv);
2332    assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2333        kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2334
2335    // Check same with WAL enabled (historically these took different
2336    // code paths, so check both)
2337    row = Bytes.toBytes("row2");
2338    put = new Put(row);
2339    put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2340    region.put(put);
2341
2342    // Make sure it shows up with an actual timestamp
2343    get = new Get(row).addColumn(fam, qual);
2344    result = region.get(get);
2345    assertEquals(1, result.size());
2346    kv = result.rawCells()[0];
2347    LOG.info("Got: " + kv);
2348    assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2349        kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2350  }
2351
2352  /**
2353   * Tests that there is server-side filtering for invalid timestamp upper
2354   * bound. Note that the timestamp lower bound is automatically handled for us
2355   * by the TTL field.
2356   */
2357  @Test
2358  public void testPutWithTsSlop() throws IOException {
2359    byte[] fam = Bytes.toBytes("info");
2360    byte[][] families = { fam };
2361
2362    // add data with a timestamp that is too recent for range. Ensure assert
2363    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
2364    this.region = initHRegion(tableName, method, CONF, families);
2365    boolean caughtExcep = false;
2366    try {
2367      // no TS specified == use latest. should not error
2368      region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), Bytes.toBytes("value")));
2369      // TS out of range. should error
2370      region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"),
2371          System.currentTimeMillis() + 2000, Bytes.toBytes("value")));
2372      fail("Expected IOE for TS out of configured timerange");
2373    } catch (FailedSanityCheckException ioe) {
2374      LOG.debug("Received expected exception", ioe);
2375      caughtExcep = true;
2376    }
2377    assertTrue("Should catch FailedSanityCheckException", caughtExcep);
2378  }
2379
2380  @Test
2381  public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
2382    byte[] fam1 = Bytes.toBytes("columnA");
2383    byte[] fam2 = Bytes.toBytes("columnB");
2384    this.region = initHRegion(tableName, method, CONF, fam1, fam2);
2385    byte[] rowA = Bytes.toBytes("rowA");
2386    byte[] rowB = Bytes.toBytes("rowB");
2387
2388    byte[] value = Bytes.toBytes("value");
2389
2390    Delete delete = new Delete(rowA);
2391    delete.addFamily(fam1);
2392
2393    region.delete(delete);
2394
2395    // now create data.
2396    Put put = new Put(rowA);
2397    put.addColumn(fam2, null, value);
2398    region.put(put);
2399
2400    put = new Put(rowB);
2401    put.addColumn(fam1, null, value);
2402    put.addColumn(fam2, null, value);
2403    region.put(put);
2404
2405    Scan scan = new Scan();
2406    scan.addFamily(fam1).addFamily(fam2);
2407    InternalScanner s = region.getScanner(scan);
2408    List<Cell> results = new ArrayList<>();
2409    s.next(results);
2410    assertTrue(CellUtil.matchingRows(results.get(0), rowA));
2411
2412    results.clear();
2413    s.next(results);
2414    assertTrue(CellUtil.matchingRows(results.get(0), rowB));
2415  }
2416
2417  @Test
2418  public void testDataInMemoryWithoutWAL() throws IOException {
2419    FileSystem fs = FileSystem.get(CONF);
2420    Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
2421    FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
2422    hLog.init();
2423    // This chunk creation is done throughout the code base. Do we want to move it into core?
2424    // It is missing from this test. W/o it we NPE.
2425    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
2426        COLUMN_FAMILY_BYTES);
2427
2428    Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
2429      System.currentTimeMillis(), KeyValue.Type.Put.getCode(), value1);
2430    final long originalSize = originalCell.getSerializedSize();
2431
2432    Cell addCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
2433      System.currentTimeMillis(), KeyValue.Type.Put.getCode(), Bytes.toBytes("xxxxxxxxxx"));
2434    final long addSize = addCell.getSerializedSize();
2435
2436    LOG.info("originalSize:" + originalSize
2437      + ", addSize:" + addSize);
2438    // start test. We expect that the addPut's durability will be replaced
2439    // by originalPut's durability.
2440
2441    // case 1:
2442    testDataInMemoryWithoutWAL(region,
2443            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2444            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2445            originalSize + addSize);
2446
2447    // case 2:
2448    testDataInMemoryWithoutWAL(region,
2449            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2450            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2451            originalSize + addSize);
2452
2453    // case 3:
2454    testDataInMemoryWithoutWAL(region,
2455            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2456            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2457            0);
2458
2459    // case 4:
2460    testDataInMemoryWithoutWAL(region,
2461            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2462            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2463            0);
2464  }
2465
2466  private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut,
2467          final Put addPut, long delta) throws IOException {
2468    final long initSize = region.getDataInMemoryWithoutWAL();
2469    // save normalCPHost and replaced by mockedCPHost
2470    RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
2471    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
2472    // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must
2473    // do below format (from Mockito doc).
2474    Mockito.doAnswer(new Answer() {
2475      @Override
2476      public Object answer(InvocationOnMock invocation) throws Throwable {
2477        MiniBatchOperationInProgress<Mutation> mb = invocation.getArgument(0);
2478        mb.addOperationsFromCP(0, new Mutation[]{addPut});
2479        return null;
2480      }
2481    }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
2482    region.setCoprocessorHost(mockedCPHost);
2483    region.put(originalPut);
2484    region.setCoprocessorHost(normalCPHost);
2485    final long finalSize = region.getDataInMemoryWithoutWAL();
2486    assertEquals("finalSize:" + finalSize + ", initSize:"
2487      + initSize + ", delta:" + delta,finalSize, initSize + delta);
2488  }
2489
2490  @Test
2491  public void testDeleteColumns_PostInsert() throws IOException, InterruptedException {
2492    Delete delete = new Delete(row);
2493    delete.addColumns(fam1, qual1);
2494    doTestDelete_AndPostInsert(delete);
2495  }
2496
2497  @Test
2498  public void testaddFamily_PostInsert() throws IOException, InterruptedException {
2499    Delete delete = new Delete(row);
2500    delete.addFamily(fam1);
2501    doTestDelete_AndPostInsert(delete);
2502  }
2503
2504  public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException {
2505    this.region = initHRegion(tableName, method, CONF, fam1);
2506    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2507    Put put = new Put(row);
2508    put.addColumn(fam1, qual1, value1);
2509    region.put(put);
2510
2511    // now delete the value:
2512    region.delete(delete);
2513
2514    // ok put data:
2515    put = new Put(row);
2516    put.addColumn(fam1, qual1, value2);
2517    region.put(put);
2518
2519    // ok get:
2520    Get get = new Get(row);
2521    get.addColumn(fam1, qual1);
2522
2523    Result r = region.get(get);
2524    assertEquals(1, r.size());
2525    assertArrayEquals(value2, r.getValue(fam1, qual1));
2526
2527    // next:
2528    Scan scan = new Scan(row);
2529    scan.addColumn(fam1, qual1);
2530    InternalScanner s = region.getScanner(scan);
2531
2532    List<Cell> results = new ArrayList<>();
2533    assertEquals(false, s.next(results));
2534    assertEquals(1, results.size());
2535    Cell kv = results.get(0);
2536
2537    assertArrayEquals(value2, CellUtil.cloneValue(kv));
2538    assertArrayEquals(fam1, CellUtil.cloneFamily(kv));
2539    assertArrayEquals(qual1, CellUtil.cloneQualifier(kv));
2540    assertArrayEquals(row, CellUtil.cloneRow(kv));
2541  }
2542
2543  @Test
2544  public void testDelete_CheckTimestampUpdated() throws IOException {
2545    byte[] row1 = Bytes.toBytes("row1");
2546    byte[] col1 = Bytes.toBytes("col1");
2547    byte[] col2 = Bytes.toBytes("col2");
2548    byte[] col3 = Bytes.toBytes("col3");
2549
2550    // Setting up region
2551    this.region = initHRegion(tableName, method, CONF, fam1);
2552    // Building checkerList
2553    List<Cell> kvs = new ArrayList<>();
2554    kvs.add(new KeyValue(row1, fam1, col1, null));
2555    kvs.add(new KeyValue(row1, fam1, col2, null));
2556    kvs.add(new KeyValue(row1, fam1, col3, null));
2557
2558    NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2559    deleteMap.put(fam1, kvs);
2560    region.delete(deleteMap, Durability.SYNC_WAL);
2561
2562    // extract the key values out the memstore:
2563    // This is kinda hacky, but better than nothing...
2564    long now = System.currentTimeMillis();
2565    AbstractMemStore memstore = (AbstractMemStore)region.getStore(fam1).memstore;
2566    Cell firstCell = memstore.getActive().first();
2567    assertTrue(firstCell.getTimestamp() <= now);
2568    now = firstCell.getTimestamp();
2569    for (Cell cell : memstore.getActive().getCellSet()) {
2570      assertTrue(cell.getTimestamp() <= now);
2571      now = cell.getTimestamp();
2572    }
2573  }
2574
2575  // ////////////////////////////////////////////////////////////////////////////
2576  // Get tests
2577  // ////////////////////////////////////////////////////////////////////////////
2578  @Test
2579  public void testGet_FamilyChecker() throws IOException {
2580    byte[] row1 = Bytes.toBytes("row1");
2581    byte[] fam1 = Bytes.toBytes("fam1");
2582    byte[] fam2 = Bytes.toBytes("False");
2583    byte[] col1 = Bytes.toBytes("col1");
2584
2585    // Setting up region
2586    this.region = initHRegion(tableName, method, CONF, fam1);
2587    Get get = new Get(row1);
2588    get.addColumn(fam2, col1);
2589
2590    // Test
2591    try {
2592      region.get(get);
2593      fail("Expecting DoNotRetryIOException in get but did not get any");
2594    } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
2595      LOG.info("Got expected DoNotRetryIOException successfully");
2596    }
2597  }
2598
2599  @Test
2600  public void testGet_Basic() throws IOException {
2601    byte[] row1 = Bytes.toBytes("row1");
2602    byte[] fam1 = Bytes.toBytes("fam1");
2603    byte[] col1 = Bytes.toBytes("col1");
2604    byte[] col2 = Bytes.toBytes("col2");
2605    byte[] col3 = Bytes.toBytes("col3");
2606    byte[] col4 = Bytes.toBytes("col4");
2607    byte[] col5 = Bytes.toBytes("col5");
2608
2609    // Setting up region
2610    this.region = initHRegion(tableName, method, CONF, fam1);
2611    // Add to memstore
2612    Put put = new Put(row1);
2613    put.addColumn(fam1, col1, null);
2614    put.addColumn(fam1, col2, null);
2615    put.addColumn(fam1, col3, null);
2616    put.addColumn(fam1, col4, null);
2617    put.addColumn(fam1, col5, null);
2618    region.put(put);
2619
2620    Get get = new Get(row1);
2621    get.addColumn(fam1, col2);
2622    get.addColumn(fam1, col4);
2623    // Expected result
2624    KeyValue kv1 = new KeyValue(row1, fam1, col2);
2625    KeyValue kv2 = new KeyValue(row1, fam1, col4);
2626    KeyValue[] expected = { kv1, kv2 };
2627
2628    // Test
2629    Result res = region.get(get);
2630    assertEquals(expected.length, res.size());
2631    for (int i = 0; i < res.size(); i++) {
2632      assertTrue(CellUtil.matchingRows(expected[i], res.rawCells()[i]));
2633      assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i]));
2634      assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i]));
2635    }
2636
2637    // Test using a filter on a Get
2638    Get g = new Get(row1);
2639    final int count = 2;
2640    g.setFilter(new ColumnCountGetFilter(count));
2641    res = region.get(g);
2642    assertEquals(count, res.size());
2643  }
2644
2645  @Test
2646  public void testGet_Empty() throws IOException {
2647    byte[] row = Bytes.toBytes("row");
2648    byte[] fam = Bytes.toBytes("fam");
2649
2650    this.region = initHRegion(tableName, method, CONF, fam);
2651    Get get = new Get(row);
2652    get.addFamily(fam);
2653    Result r = region.get(get);
2654
2655    assertTrue(r.isEmpty());
2656  }
2657
2658  @Test
2659  public void testGetWithFilter() throws IOException, InterruptedException {
2660    byte[] row1 = Bytes.toBytes("row1");
2661    byte[] fam1 = Bytes.toBytes("fam1");
2662    byte[] col1 = Bytes.toBytes("col1");
2663    byte[] value1 = Bytes.toBytes("value1");
2664    byte[] value2 = Bytes.toBytes("value2");
2665
2666    final int maxVersions = 3;
2667    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
2668    hcd.setMaxVersions(maxVersions);
2669    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker"));
2670    htd.addFamily(hcd);
2671    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2672    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
2673    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
2674    final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info);
2675    this.region = TEST_UTIL.createLocalHRegion(info, htd, wal);
2676
2677    // Put 4 version to memstore
2678    long ts = 0;
2679    Put put = new Put(row1, ts);
2680    put.addColumn(fam1, col1, value1);
2681    region.put(put);
2682    put = new Put(row1, ts + 1);
2683    put.addColumn(fam1, col1, Bytes.toBytes("filter1"));
2684    region.put(put);
2685    put = new Put(row1, ts + 2);
2686    put.addColumn(fam1, col1, Bytes.toBytes("filter2"));
2687    region.put(put);
2688    put = new Put(row1, ts + 3);
2689    put.addColumn(fam1, col1, value2);
2690    region.put(put);
2691
2692    Get get = new Get(row1);
2693    get.setMaxVersions();
2694    Result res = region.get(get);
2695    // Get 3 versions, the oldest version has gone from user view
2696    assertEquals(maxVersions, res.size());
2697
2698    get.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value")));
2699    res = region.get(get);
2700    // When use value filter, the oldest version should still gone from user view and it
2701    // should only return one key vaule
2702    assertEquals(1, res.size());
2703    assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2704    assertEquals(ts + 3, res.rawCells()[0].getTimestamp());
2705
2706    region.flush(true);
2707    region.compact(true);
2708    Thread.sleep(1000);
2709    res = region.get(get);
2710    // After flush and compact, the result should be consistent with previous result
2711    assertEquals(1, res.size());
2712    assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2713  }
2714
2715  // ////////////////////////////////////////////////////////////////////////////
2716  // Scanner tests
2717  // ////////////////////////////////////////////////////////////////////////////
2718  @Test
2719  public void testGetScanner_WithOkFamilies() throws IOException {
2720    byte[] fam1 = Bytes.toBytes("fam1");
2721    byte[] fam2 = Bytes.toBytes("fam2");
2722
2723    byte[][] families = { fam1, fam2 };
2724
2725    // Setting up region
2726    this.region = initHRegion(tableName, method, CONF, families);
2727    Scan scan = new Scan();
2728    scan.addFamily(fam1);
2729    scan.addFamily(fam2);
2730    try {
2731      region.getScanner(scan);
2732    } catch (Exception e) {
2733      assertTrue("Families could not be found in Region", false);
2734    }
2735  }
2736
2737  @Test
2738  public void testGetScanner_WithNotOkFamilies() throws IOException {
2739    byte[] fam1 = Bytes.toBytes("fam1");
2740    byte[] fam2 = Bytes.toBytes("fam2");
2741
2742    byte[][] families = { fam1 };
2743
2744    // Setting up region
2745    this.region = initHRegion(tableName, method, CONF, families);
2746    Scan scan = new Scan();
2747    scan.addFamily(fam2);
2748    boolean ok = false;
2749    try {
2750      region.getScanner(scan);
2751    } catch (Exception e) {
2752      ok = true;
2753    }
2754    assertTrue("Families could not be found in Region", ok);
2755  }
2756
2757  @Test
2758  public void testGetScanner_WithNoFamilies() throws IOException {
2759    byte[] row1 = Bytes.toBytes("row1");
2760    byte[] fam1 = Bytes.toBytes("fam1");
2761    byte[] fam2 = Bytes.toBytes("fam2");
2762    byte[] fam3 = Bytes.toBytes("fam3");
2763    byte[] fam4 = Bytes.toBytes("fam4");
2764
2765    byte[][] families = { fam1, fam2, fam3, fam4 };
2766
2767    // Setting up region
2768    this.region = initHRegion(tableName, method, CONF, families);
2769    // Putting data in Region
2770    Put put = new Put(row1);
2771    put.addColumn(fam1, null, null);
2772    put.addColumn(fam2, null, null);
2773    put.addColumn(fam3, null, null);
2774    put.addColumn(fam4, null, null);
2775    region.put(put);
2776
2777    Scan scan = null;
2778    HRegion.RegionScannerImpl is = null;
2779
2780    // Testing to see how many scanners that is produced by getScanner,
2781    // starting
2782    // with known number, 2 - current = 1
2783    scan = new Scan();
2784    scan.addFamily(fam2);
2785    scan.addFamily(fam4);
2786    is = region.getScanner(scan);
2787    assertEquals(1, is.storeHeap.getHeap().size());
2788
2789    scan = new Scan();
2790    is = region.getScanner(scan);
2791    assertEquals(families.length - 1, is.storeHeap.getHeap().size());
2792  }
2793
2794  /**
2795   * This method tests https://issues.apache.org/jira/browse/HBASE-2516.
2796   *
2797   * @throws IOException
2798   */
2799  @Test
2800  public void testGetScanner_WithRegionClosed() throws IOException {
2801    byte[] fam1 = Bytes.toBytes("fam1");
2802    byte[] fam2 = Bytes.toBytes("fam2");
2803
2804    byte[][] families = { fam1, fam2 };
2805
2806    // Setting up region
2807    try {
2808      this.region = initHRegion(tableName, method, CONF, families);
2809    } catch (IOException e) {
2810      e.printStackTrace();
2811      fail("Got IOException during initHRegion, " + e.getMessage());
2812    }
2813    region.closed.set(true);
2814    try {
2815      region.getScanner(null);
2816      fail("Expected to get an exception during getScanner on a region that is closed");
2817    } catch (NotServingRegionException e) {
2818      // this is the correct exception that is expected
2819    } catch (IOException e) {
2820      fail("Got wrong type of exception - should be a NotServingRegionException, " +
2821          "but was an IOException: "
2822          + e.getMessage());
2823    }
2824  }
2825
2826  @Test
2827  public void testRegionScanner_Next() throws IOException {
2828    byte[] row1 = Bytes.toBytes("row1");
2829    byte[] row2 = Bytes.toBytes("row2");
2830    byte[] fam1 = Bytes.toBytes("fam1");
2831    byte[] fam2 = Bytes.toBytes("fam2");
2832    byte[] fam3 = Bytes.toBytes("fam3");
2833    byte[] fam4 = Bytes.toBytes("fam4");
2834
2835    byte[][] families = { fam1, fam2, fam3, fam4 };
2836    long ts = System.currentTimeMillis();
2837
2838    // Setting up region
2839    this.region = initHRegion(tableName, method, CONF, families);
2840    // Putting data in Region
2841    Put put = null;
2842    put = new Put(row1);
2843    put.addColumn(fam1, (byte[]) null, ts, null);
2844    put.addColumn(fam2, (byte[]) null, ts, null);
2845    put.addColumn(fam3, (byte[]) null, ts, null);
2846    put.addColumn(fam4, (byte[]) null, ts, null);
2847    region.put(put);
2848
2849    put = new Put(row2);
2850    put.addColumn(fam1, (byte[]) null, ts, null);
2851    put.addColumn(fam2, (byte[]) null, ts, null);
2852    put.addColumn(fam3, (byte[]) null, ts, null);
2853    put.addColumn(fam4, (byte[]) null, ts, null);
2854    region.put(put);
2855
2856    Scan scan = new Scan();
2857    scan.addFamily(fam2);
2858    scan.addFamily(fam4);
2859    InternalScanner is = region.getScanner(scan);
2860
2861    List<Cell> res = null;
2862
2863    // Result 1
2864    List<Cell> expected1 = new ArrayList<>();
2865    expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null));
2866    expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null));
2867
2868    res = new ArrayList<>();
2869    is.next(res);
2870    for (int i = 0; i < res.size(); i++) {
2871      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected1.get(i), res.get(i)));
2872    }
2873
2874    // Result 2
2875    List<Cell> expected2 = new ArrayList<>();
2876    expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null));
2877    expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null));
2878
2879    res = new ArrayList<>();
2880    is.next(res);
2881    for (int i = 0; i < res.size(); i++) {
2882      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected2.get(i), res.get(i)));
2883    }
2884  }
2885
2886  @Test
2887  public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException {
2888    byte[] row1 = Bytes.toBytes("row1");
2889    byte[] qf1 = Bytes.toBytes("qualifier1");
2890    byte[] qf2 = Bytes.toBytes("qualifier2");
2891    byte[] fam1 = Bytes.toBytes("fam1");
2892    byte[][] families = { fam1 };
2893
2894    long ts1 = System.currentTimeMillis();
2895    long ts2 = ts1 + 1;
2896    long ts3 = ts1 + 2;
2897
2898    // Setting up region
2899    this.region = initHRegion(tableName, method, CONF, families);
2900    // Putting data in Region
2901    Put put = null;
2902    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2903    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2904    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2905
2906    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2907    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2908    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2909
2910    put = new Put(row1);
2911    put.add(kv13);
2912    put.add(kv12);
2913    put.add(kv11);
2914    put.add(kv23);
2915    put.add(kv22);
2916    put.add(kv21);
2917    region.put(put);
2918
2919    // Expected
2920    List<Cell> expected = new ArrayList<>();
2921    expected.add(kv13);
2922    expected.add(kv12);
2923
2924    Scan scan = new Scan(row1);
2925    scan.addColumn(fam1, qf1);
2926    scan.setMaxVersions(MAX_VERSIONS);
2927    List<Cell> actual = new ArrayList<>();
2928    InternalScanner scanner = region.getScanner(scan);
2929
2930    boolean hasNext = scanner.next(actual);
2931    assertEquals(false, hasNext);
2932
2933    // Verify result
2934    for (int i = 0; i < expected.size(); i++) {
2935      assertEquals(expected.get(i), actual.get(i));
2936    }
2937  }
2938
2939  @Test
2940  public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException {
2941    byte[] row1 = Bytes.toBytes("row1");
2942    byte[] qf1 = Bytes.toBytes("qualifier1");
2943    byte[] qf2 = Bytes.toBytes("qualifier2");
2944    byte[] fam1 = Bytes.toBytes("fam1");
2945    byte[][] families = { fam1 };
2946
2947    long ts1 = 1; // System.currentTimeMillis();
2948    long ts2 = ts1 + 1;
2949    long ts3 = ts1 + 2;
2950
2951    // Setting up region
2952    this.region = initHRegion(tableName, method, CONF, families);
2953    // Putting data in Region
2954    Put put = null;
2955    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2956    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2957    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2958
2959    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2960    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2961    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2962
2963    put = new Put(row1);
2964    put.add(kv13);
2965    put.add(kv12);
2966    put.add(kv11);
2967    put.add(kv23);
2968    put.add(kv22);
2969    put.add(kv21);
2970    region.put(put);
2971    region.flush(true);
2972
2973    // Expected
2974    List<Cell> expected = new ArrayList<>();
2975    expected.add(kv13);
2976    expected.add(kv12);
2977    expected.add(kv23);
2978    expected.add(kv22);
2979
2980    Scan scan = new Scan(row1);
2981    scan.addColumn(fam1, qf1);
2982    scan.addColumn(fam1, qf2);
2983    scan.setMaxVersions(MAX_VERSIONS);
2984    List<Cell> actual = new ArrayList<>();
2985    InternalScanner scanner = region.getScanner(scan);
2986
2987    boolean hasNext = scanner.next(actual);
2988    assertEquals(false, hasNext);
2989
2990    // Verify result
2991    for (int i = 0; i < expected.size(); i++) {
2992      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
2993    }
2994  }
2995
2996  @Test
2997  public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws
2998      IOException {
2999    byte[] row1 = Bytes.toBytes("row1");
3000    byte[] fam1 = Bytes.toBytes("fam1");
3001    byte[][] families = { fam1 };
3002    byte[] qf1 = Bytes.toBytes("qualifier1");
3003    byte[] qf2 = Bytes.toBytes("qualifier2");
3004
3005    long ts1 = 1;
3006    long ts2 = ts1 + 1;
3007    long ts3 = ts1 + 2;
3008    long ts4 = ts1 + 3;
3009
3010    // Setting up region
3011    this.region = initHRegion(tableName, method, CONF, families);
3012    // Putting data in Region
3013    KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3014    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3015    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3016    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3017
3018    KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3019    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3020    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3021    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3022
3023    Put put = null;
3024    put = new Put(row1);
3025    put.add(kv14);
3026    put.add(kv24);
3027    region.put(put);
3028    region.flush(true);
3029
3030    put = new Put(row1);
3031    put.add(kv23);
3032    put.add(kv13);
3033    region.put(put);
3034    region.flush(true);
3035
3036    put = new Put(row1);
3037    put.add(kv22);
3038    put.add(kv12);
3039    region.put(put);
3040    region.flush(true);
3041
3042    put = new Put(row1);
3043    put.add(kv21);
3044    put.add(kv11);
3045    region.put(put);
3046
3047    // Expected
3048    List<Cell> expected = new ArrayList<>();
3049    expected.add(kv14);
3050    expected.add(kv13);
3051    expected.add(kv12);
3052    expected.add(kv24);
3053    expected.add(kv23);
3054    expected.add(kv22);
3055
3056    Scan scan = new Scan(row1);
3057    scan.addColumn(fam1, qf1);
3058    scan.addColumn(fam1, qf2);
3059    int versions = 3;
3060    scan.setMaxVersions(versions);
3061    List<Cell> actual = new ArrayList<>();
3062    InternalScanner scanner = region.getScanner(scan);
3063
3064    boolean hasNext = scanner.next(actual);
3065    assertEquals(false, hasNext);
3066
3067    // Verify result
3068    for (int i = 0; i < expected.size(); i++) {
3069      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3070    }
3071  }
3072
3073  @Test
3074  public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException {
3075    byte[] row1 = Bytes.toBytes("row1");
3076    byte[] qf1 = Bytes.toBytes("qualifier1");
3077    byte[] qf2 = Bytes.toBytes("qualifier2");
3078    byte[] fam1 = Bytes.toBytes("fam1");
3079    byte[][] families = { fam1 };
3080
3081    long ts1 = System.currentTimeMillis();
3082    long ts2 = ts1 + 1;
3083    long ts3 = ts1 + 2;
3084
3085    // Setting up region
3086    this.region = initHRegion(tableName, method, CONF, families);
3087    // Putting data in Region
3088    Put put = null;
3089    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3090    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3091    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3092
3093    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3094    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3095    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3096
3097    put = new Put(row1);
3098    put.add(kv13);
3099    put.add(kv12);
3100    put.add(kv11);
3101    put.add(kv23);
3102    put.add(kv22);
3103    put.add(kv21);
3104    region.put(put);
3105
3106    // Expected
3107    List<Cell> expected = new ArrayList<>();
3108    expected.add(kv13);
3109    expected.add(kv12);
3110    expected.add(kv23);
3111    expected.add(kv22);
3112
3113    Scan scan = new Scan(row1);
3114    scan.addFamily(fam1);
3115    scan.setMaxVersions(MAX_VERSIONS);
3116    List<Cell> actual = new ArrayList<>();
3117    InternalScanner scanner = region.getScanner(scan);
3118
3119    boolean hasNext = scanner.next(actual);
3120    assertEquals(false, hasNext);
3121
3122    // Verify result
3123    for (int i = 0; i < expected.size(); i++) {
3124      assertEquals(expected.get(i), actual.get(i));
3125    }
3126  }
3127
3128  @Test
3129  public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException {
3130    byte[] row1 = Bytes.toBytes("row1");
3131    byte[] qf1 = Bytes.toBytes("qualifier1");
3132    byte[] qf2 = Bytes.toBytes("qualifier2");
3133    byte[] fam1 = Bytes.toBytes("fam1");
3134
3135    long ts1 = 1; // System.currentTimeMillis();
3136    long ts2 = ts1 + 1;
3137    long ts3 = ts1 + 2;
3138
3139    // Setting up region
3140    this.region = initHRegion(tableName, method, CONF, fam1);
3141    // Putting data in Region
3142    Put put = null;
3143    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3144    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3145    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3146
3147    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3148    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3149    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3150
3151    put = new Put(row1);
3152    put.add(kv13);
3153    put.add(kv12);
3154    put.add(kv11);
3155    put.add(kv23);
3156    put.add(kv22);
3157    put.add(kv21);
3158    region.put(put);
3159    region.flush(true);
3160
3161    // Expected
3162    List<Cell> expected = new ArrayList<>();
3163    expected.add(kv13);
3164    expected.add(kv12);
3165    expected.add(kv23);
3166    expected.add(kv22);
3167
3168    Scan scan = new Scan(row1);
3169    scan.addFamily(fam1);
3170    scan.setMaxVersions(MAX_VERSIONS);
3171    List<Cell> actual = new ArrayList<>();
3172    InternalScanner scanner = region.getScanner(scan);
3173
3174    boolean hasNext = scanner.next(actual);
3175    assertEquals(false, hasNext);
3176
3177    // Verify result
3178    for (int i = 0; i < expected.size(); i++) {
3179      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3180    }
3181  }
3182
3183  @Test
3184  public void testScanner_StopRow1542() throws IOException {
3185    byte[] family = Bytes.toBytes("testFamily");
3186    this.region = initHRegion(tableName, method, CONF, family);
3187    byte[] row1 = Bytes.toBytes("row111");
3188    byte[] row2 = Bytes.toBytes("row222");
3189    byte[] row3 = Bytes.toBytes("row333");
3190    byte[] row4 = Bytes.toBytes("row444");
3191    byte[] row5 = Bytes.toBytes("row555");
3192
3193    byte[] col1 = Bytes.toBytes("Pub111");
3194    byte[] col2 = Bytes.toBytes("Pub222");
3195
3196    Put put = new Put(row1);
3197    put.addColumn(family, col1, Bytes.toBytes(10L));
3198    region.put(put);
3199
3200    put = new Put(row2);
3201    put.addColumn(family, col1, Bytes.toBytes(15L));
3202    region.put(put);
3203
3204    put = new Put(row3);
3205    put.addColumn(family, col2, Bytes.toBytes(20L));
3206    region.put(put);
3207
3208    put = new Put(row4);
3209    put.addColumn(family, col2, Bytes.toBytes(30L));
3210    region.put(put);
3211
3212    put = new Put(row5);
3213    put.addColumn(family, col1, Bytes.toBytes(40L));
3214    region.put(put);
3215
3216    Scan scan = new Scan(row3, row4);
3217    scan.setMaxVersions();
3218    scan.addColumn(family, col1);
3219    InternalScanner s = region.getScanner(scan);
3220
3221    List<Cell> results = new ArrayList<>();
3222    assertEquals(false, s.next(results));
3223    assertEquals(0, results.size());
3224  }
3225
3226  @Test
3227  public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException {
3228    byte[] row1 = Bytes.toBytes("row1");
3229    byte[] fam1 = Bytes.toBytes("fam1");
3230    byte[] qf1 = Bytes.toBytes("qualifier1");
3231    byte[] qf2 = Bytes.toBytes("quateslifier2");
3232
3233    long ts1 = 1;
3234    long ts2 = ts1 + 1;
3235    long ts3 = ts1 + 2;
3236    long ts4 = ts1 + 3;
3237
3238    // Setting up region
3239    this.region = initHRegion(tableName, method, CONF, fam1);
3240    // Putting data in Region
3241    KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3242    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3243    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3244    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3245
3246    KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3247    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3248    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3249    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3250
3251    Put put = null;
3252    put = new Put(row1);
3253    put.add(kv14);
3254    put.add(kv24);
3255    region.put(put);
3256    region.flush(true);
3257
3258    put = new Put(row1);
3259    put.add(kv23);
3260    put.add(kv13);
3261    region.put(put);
3262    region.flush(true);
3263
3264    put = new Put(row1);
3265    put.add(kv22);
3266    put.add(kv12);
3267    region.put(put);
3268    region.flush(true);
3269
3270    put = new Put(row1);
3271    put.add(kv21);
3272    put.add(kv11);
3273    region.put(put);
3274
3275    // Expected
3276    List<KeyValue> expected = new ArrayList<>();
3277    expected.add(kv14);
3278    expected.add(kv13);
3279    expected.add(kv12);
3280    expected.add(kv24);
3281    expected.add(kv23);
3282    expected.add(kv22);
3283
3284    Scan scan = new Scan(row1);
3285    int versions = 3;
3286    scan.setMaxVersions(versions);
3287    List<Cell> actual = new ArrayList<>();
3288    InternalScanner scanner = region.getScanner(scan);
3289
3290    boolean hasNext = scanner.next(actual);
3291    assertEquals(false, hasNext);
3292
3293    // Verify result
3294    for (int i = 0; i < expected.size(); i++) {
3295      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3296    }
3297  }
3298
3299  /**
3300   * Added for HBASE-5416
3301   *
3302   * Here we test scan optimization when only subset of CFs are used in filter
3303   * conditions.
3304   */
3305  @Test
3306  public void testScanner_JoinedScanners() throws IOException {
3307    byte[] cf_essential = Bytes.toBytes("essential");
3308    byte[] cf_joined = Bytes.toBytes("joined");
3309    byte[] cf_alpha = Bytes.toBytes("alpha");
3310    this.region = initHRegion(tableName, method, CONF, cf_essential, cf_joined, cf_alpha);
3311    byte[] row1 = Bytes.toBytes("row1");
3312    byte[] row2 = Bytes.toBytes("row2");
3313    byte[] row3 = Bytes.toBytes("row3");
3314
3315    byte[] col_normal = Bytes.toBytes("d");
3316    byte[] col_alpha = Bytes.toBytes("a");
3317
3318    byte[] filtered_val = Bytes.toBytes(3);
3319
3320    Put put = new Put(row1);
3321    put.addColumn(cf_essential, col_normal, Bytes.toBytes(1));
3322    put.addColumn(cf_joined, col_alpha, Bytes.toBytes(1));
3323    region.put(put);
3324
3325    put = new Put(row2);
3326    put.addColumn(cf_essential, col_alpha, Bytes.toBytes(2));
3327    put.addColumn(cf_joined, col_normal, Bytes.toBytes(2));
3328    put.addColumn(cf_alpha, col_alpha, Bytes.toBytes(2));
3329    region.put(put);
3330
3331    put = new Put(row3);
3332    put.addColumn(cf_essential, col_normal, filtered_val);
3333    put.addColumn(cf_joined, col_normal, filtered_val);
3334    region.put(put);
3335
3336    // Check two things:
3337    // 1. result list contains expected values
3338    // 2. result list is sorted properly
3339
3340    Scan scan = new Scan();
3341    Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
3342        CompareOp.NOT_EQUAL, filtered_val);
3343    scan.setFilter(filter);
3344    scan.setLoadColumnFamiliesOnDemand(true);
3345    InternalScanner s = region.getScanner(scan);
3346
3347    List<Cell> results = new ArrayList<>();
3348    assertTrue(s.next(results));
3349    assertEquals(1, results.size());
3350    results.clear();
3351
3352    assertTrue(s.next(results));
3353    assertEquals(3, results.size());
3354    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha));
3355    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential));
3356    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined));
3357    results.clear();
3358
3359    assertFalse(s.next(results));
3360    assertEquals(0, results.size());
3361  }
3362
3363  /**
3364   * HBASE-5416
3365   *
3366   * Test case when scan limits amount of KVs returned on each next() call.
3367   */
3368  @Test
3369  public void testScanner_JoinedScannersWithLimits() throws IOException {
3370    final byte[] cf_first = Bytes.toBytes("first");
3371    final byte[] cf_second = Bytes.toBytes("second");
3372
3373    this.region = initHRegion(tableName, method, CONF, cf_first, cf_second);
3374    final byte[] col_a = Bytes.toBytes("a");
3375    final byte[] col_b = Bytes.toBytes("b");
3376
3377    Put put;
3378
3379    for (int i = 0; i < 10; i++) {
3380      put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
3381      put.addColumn(cf_first, col_a, Bytes.toBytes(i));
3382      if (i < 5) {
3383        put.addColumn(cf_first, col_b, Bytes.toBytes(i));
3384        put.addColumn(cf_second, col_a, Bytes.toBytes(i));
3385        put.addColumn(cf_second, col_b, Bytes.toBytes(i));
3386      }
3387      region.put(put);
3388    }
3389
3390    Scan scan = new Scan();
3391    scan.setLoadColumnFamiliesOnDemand(true);
3392    Filter bogusFilter = new FilterBase() {
3393      @Override
3394      public ReturnCode filterCell(final Cell ignored) throws IOException {
3395        return ReturnCode.INCLUDE;
3396      }
3397      @Override
3398      public boolean isFamilyEssential(byte[] name) {
3399        return Bytes.equals(name, cf_first);
3400      }
3401    };
3402
3403    scan.setFilter(bogusFilter);
3404    InternalScanner s = region.getScanner(scan);
3405
3406    // Our data looks like this:
3407    // r0: first:a, first:b, second:a, second:b
3408    // r1: first:a, first:b, second:a, second:b
3409    // r2: first:a, first:b, second:a, second:b
3410    // r3: first:a, first:b, second:a, second:b
3411    // r4: first:a, first:b, second:a, second:b
3412    // r5: first:a
3413    // r6: first:a
3414    // r7: first:a
3415    // r8: first:a
3416    // r9: first:a
3417
3418    // But due to next's limit set to 3, we should get this:
3419    // r0: first:a, first:b, second:a
3420    // r0: second:b
3421    // r1: first:a, first:b, second:a
3422    // r1: second:b
3423    // r2: first:a, first:b, second:a
3424    // r2: second:b
3425    // r3: first:a, first:b, second:a
3426    // r3: second:b
3427    // r4: first:a, first:b, second:a
3428    // r4: second:b
3429    // r5: first:a
3430    // r6: first:a
3431    // r7: first:a
3432    // r8: first:a
3433    // r9: first:a
3434
3435    List<Cell> results = new ArrayList<>();
3436    int index = 0;
3437    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(3).build();
3438    while (true) {
3439      boolean more = s.next(results, scannerContext);
3440      if ((index >> 1) < 5) {
3441        if (index % 2 == 0) {
3442          assertEquals(3, results.size());
3443        } else {
3444          assertEquals(1, results.size());
3445        }
3446      } else {
3447        assertEquals(1, results.size());
3448      }
3449      results.clear();
3450      index++;
3451      if (!more) {
3452        break;
3453      }
3454    }
3455  }
3456
3457  /**
3458   * Write an HFile block full with Cells whose qualifier that are identical between
3459   * 0 and Short.MAX_VALUE. See HBASE-13329.
3460   * @throws Exception
3461   */
3462  @Test
3463  public void testLongQualifier() throws Exception {
3464    byte[] family = Bytes.toBytes("family");
3465    this.region = initHRegion(tableName, method, CONF, family);
3466    byte[] q = new byte[Short.MAX_VALUE+2];
3467    Arrays.fill(q, 0, q.length-1, (byte)42);
3468    for (byte i=0; i<10; i++) {
3469      Put p = new Put(Bytes.toBytes("row"));
3470      // qualifiers that differ past Short.MAX_VALUE
3471      q[q.length-1]=i;
3472      p.addColumn(family, q, q);
3473      region.put(p);
3474    }
3475    region.flush(false);
3476  }
3477
3478  /**
3479   * Flushes the cache in a thread while scanning. The tests verify that the
3480   * scan is coherent - e.g. the returned results are always of the same or
3481   * later update as the previous results.
3482   *
3483   * @throws IOException
3484   *           scan / compact
3485   * @throws InterruptedException
3486   *           thread join
3487   */
3488  @Test
3489  public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
3490    byte[] family = Bytes.toBytes("family");
3491    int numRows = 1000;
3492    int flushAndScanInterval = 10;
3493    int compactInterval = 10 * flushAndScanInterval;
3494
3495    this.region = initHRegion(tableName, method, CONF, family);
3496    FlushThread flushThread = new FlushThread();
3497    try {
3498      flushThread.start();
3499
3500      Scan scan = new Scan();
3501      scan.addFamily(family);
3502      scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOp.EQUAL,
3503          new BinaryComparator(Bytes.toBytes(5L))));
3504
3505      int expectedCount = 0;
3506      List<Cell> res = new ArrayList<>();
3507
3508      boolean toggle = true;
3509      for (long i = 0; i < numRows; i++) {
3510        Put put = new Put(Bytes.toBytes(i));
3511        put.setDurability(Durability.SKIP_WAL);
3512        put.addColumn(family, qual1, Bytes.toBytes(i % 10));
3513        region.put(put);
3514
3515        if (i != 0 && i % compactInterval == 0) {
3516          LOG.debug("iteration = " + i+ " ts="+System.currentTimeMillis());
3517          region.compact(true);
3518        }
3519
3520        if (i % 10 == 5L) {
3521          expectedCount++;
3522        }
3523
3524        if (i != 0 && i % flushAndScanInterval == 0) {
3525          res.clear();
3526          InternalScanner scanner = region.getScanner(scan);
3527          if (toggle) {
3528            flushThread.flush();
3529          }
3530          while (scanner.next(res))
3531            ;
3532          if (!toggle) {
3533            flushThread.flush();
3534          }
3535          assertEquals("toggle="+toggle+"i=" + i + " ts="+System.currentTimeMillis(),
3536              expectedCount, res.size());
3537          toggle = !toggle;
3538        }
3539      }
3540
3541    } finally {
3542      try {
3543        flushThread.done();
3544        flushThread.join();
3545        flushThread.checkNoError();
3546      } catch (InterruptedException ie) {
3547        LOG.warn("Caught exception when joining with flushThread", ie);
3548      }
3549      HBaseTestingUtility.closeRegionAndWAL(this.region);
3550      this.region = null;
3551    }
3552  }
3553
3554  protected class FlushThread extends Thread {
3555    private volatile boolean done;
3556    private Throwable error = null;
3557
3558    FlushThread() {
3559      super("FlushThread");
3560    }
3561
3562    public void done() {
3563      done = true;
3564      synchronized (this) {
3565        interrupt();
3566      }
3567    }
3568
3569    public void checkNoError() {
3570      if (error != null) {
3571        assertNull(error);
3572      }
3573    }
3574
3575    @Override
3576    public void run() {
3577      done = false;
3578      while (!done) {
3579        synchronized (this) {
3580          try {
3581            wait();
3582          } catch (InterruptedException ignored) {
3583            if (done) {
3584              break;
3585            }
3586          }
3587        }
3588        try {
3589          region.flush(true);
3590        } catch (IOException e) {
3591          if (!done) {
3592            LOG.error("Error while flushing cache", e);
3593            error = e;
3594          }
3595          break;
3596        } catch (Throwable t) {
3597          LOG.error("Uncaught exception", t);
3598          throw t;
3599        }
3600      }
3601    }
3602
3603    public void flush() {
3604      synchronized (this) {
3605        notify();
3606      }
3607    }
3608  }
3609
3610  /**
3611   * Writes very wide records and scans for the latest every time.. Flushes and
3612   * compacts the region every now and then to keep things realistic.
3613   *
3614   * @throws IOException
3615   *           by flush / scan / compaction
3616   * @throws InterruptedException
3617   *           when joining threads
3618   */
3619  @Test
3620  public void testWritesWhileScanning() throws IOException, InterruptedException {
3621    int testCount = 100;
3622    int numRows = 1;
3623    int numFamilies = 10;
3624    int numQualifiers = 100;
3625    int flushInterval = 7;
3626    int compactInterval = 5 * flushInterval;
3627    byte[][] families = new byte[numFamilies][];
3628    for (int i = 0; i < numFamilies; i++) {
3629      families[i] = Bytes.toBytes("family" + i);
3630    }
3631    byte[][] qualifiers = new byte[numQualifiers][];
3632    for (int i = 0; i < numQualifiers; i++) {
3633      qualifiers[i] = Bytes.toBytes("qual" + i);
3634    }
3635
3636    this.region = initHRegion(tableName, method, CONF, families);
3637    FlushThread flushThread = new FlushThread();
3638    PutThread putThread = new PutThread(numRows, families, qualifiers);
3639    try {
3640      putThread.start();
3641      putThread.waitForFirstPut();
3642
3643      flushThread.start();
3644
3645      Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
3646
3647      int expectedCount = numFamilies * numQualifiers;
3648      List<Cell> res = new ArrayList<>();
3649
3650      long prevTimestamp = 0L;
3651      for (int i = 0; i < testCount; i++) {
3652
3653        if (i != 0 && i % compactInterval == 0) {
3654          region.compact(true);
3655          for (HStore store : region.getStores()) {
3656            store.closeAndArchiveCompactedFiles();
3657          }
3658        }
3659
3660        if (i != 0 && i % flushInterval == 0) {
3661          flushThread.flush();
3662        }
3663
3664        boolean previousEmpty = res.isEmpty();
3665        res.clear();
3666        InternalScanner scanner = region.getScanner(scan);
3667        while (scanner.next(res))
3668          ;
3669        if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
3670          assertEquals("i=" + i, expectedCount, res.size());
3671          long timestamp = res.get(0).getTimestamp();
3672          assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
3673              timestamp >= prevTimestamp);
3674          prevTimestamp = timestamp;
3675        }
3676      }
3677
3678      putThread.done();
3679
3680      region.flush(true);
3681
3682    } finally {
3683      try {
3684        flushThread.done();
3685        flushThread.join();
3686        flushThread.checkNoError();
3687
3688        putThread.join();
3689        putThread.checkNoError();
3690      } catch (InterruptedException ie) {
3691        LOG.warn("Caught exception when joining with flushThread", ie);
3692      }
3693
3694      try {
3695          HBaseTestingUtility.closeRegionAndWAL(this.region);
3696      } catch (DroppedSnapshotException dse) {
3697        // We could get this on way out because we interrupt the background flusher and it could
3698        // fail anywhere causing a DSE over in the background flusher... only it is not properly
3699        // dealt with so could still be memory hanging out when we get to here -- memory we can't
3700        // flush because the accounting is 'off' since original DSE.
3701      }
3702      this.region = null;
3703    }
3704  }
3705
3706  protected class PutThread extends Thread {
3707    private volatile boolean done;
3708    private volatile int numPutsFinished = 0;
3709
3710    private Throwable error = null;
3711    private int numRows;
3712    private byte[][] families;
3713    private byte[][] qualifiers;
3714
3715    private PutThread(int numRows, byte[][] families, byte[][] qualifiers) {
3716      super("PutThread");
3717      this.numRows = numRows;
3718      this.families = families;
3719      this.qualifiers = qualifiers;
3720    }
3721
3722    /**
3723     * Block calling thread until this instance of PutThread has put at least one row.
3724     */
3725    public void waitForFirstPut() throws InterruptedException {
3726      // wait until put thread actually puts some data
3727      while (isAlive() && numPutsFinished == 0) {
3728        checkNoError();
3729        Thread.sleep(50);
3730      }
3731    }
3732
3733    public void done() {
3734      done = true;
3735      synchronized (this) {
3736        interrupt();
3737      }
3738    }
3739
3740    public void checkNoError() {
3741      if (error != null) {
3742        assertNull(error);
3743      }
3744    }
3745
3746    @Override
3747    public void run() {
3748      done = false;
3749      while (!done) {
3750        try {
3751          for (int r = 0; r < numRows; r++) {
3752            byte[] row = Bytes.toBytes("row" + r);
3753            Put put = new Put(row);
3754            put.setDurability(Durability.SKIP_WAL);
3755            byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished));
3756            for (byte[] family : families) {
3757              for (byte[] qualifier : qualifiers) {
3758                put.addColumn(family, qualifier, numPutsFinished, value);
3759              }
3760            }
3761            region.put(put);
3762            numPutsFinished++;
3763            if (numPutsFinished > 0 && numPutsFinished % 47 == 0) {
3764              System.out.println("put iteration = " + numPutsFinished);
3765              Delete delete = new Delete(row, (long) numPutsFinished - 30);
3766              region.delete(delete);
3767            }
3768            numPutsFinished++;
3769          }
3770        } catch (InterruptedIOException e) {
3771          // This is fine. It means we are done, or didn't get the lock on time
3772          LOG.info("Interrupted", e);
3773        } catch (IOException e) {
3774          LOG.error("Error while putting records", e);
3775          error = e;
3776          break;
3777        }
3778      }
3779
3780    }
3781
3782  }
3783
3784  /**
3785   * Writes very wide records and gets the latest row every time.. Flushes and
3786   * compacts the region aggressivly to catch issues.
3787   *
3788   * @throws IOException
3789   *           by flush / scan / compaction
3790   * @throws InterruptedException
3791   *           when joining threads
3792   */
3793  @Test
3794  public void testWritesWhileGetting() throws Exception {
3795    int testCount = 50;
3796    int numRows = 1;
3797    int numFamilies = 10;
3798    int numQualifiers = 100;
3799    int compactInterval = 100;
3800    byte[][] families = new byte[numFamilies][];
3801    for (int i = 0; i < numFamilies; i++) {
3802      families[i] = Bytes.toBytes("family" + i);
3803    }
3804    byte[][] qualifiers = new byte[numQualifiers][];
3805    for (int i = 0; i < numQualifiers; i++) {
3806      qualifiers[i] = Bytes.toBytes("qual" + i);
3807    }
3808
3809
3810    // This test flushes constantly and can cause many files to be created,
3811    // possibly
3812    // extending over the ulimit. Make sure compactions are aggressive in
3813    // reducing
3814    // the number of HFiles created.
3815    Configuration conf = HBaseConfiguration.create(CONF);
3816    conf.setInt("hbase.hstore.compaction.min", 1);
3817    conf.setInt("hbase.hstore.compaction.max", 1000);
3818    this.region = initHRegion(tableName, method, conf, families);
3819    PutThread putThread = null;
3820    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
3821    try {
3822      putThread = new PutThread(numRows, families, qualifiers);
3823      putThread.start();
3824      putThread.waitForFirstPut();
3825
3826      // Add a thread that flushes as fast as possible
3827      ctx.addThread(new RepeatingTestThread(ctx) {
3828
3829        @Override
3830        public void doAnAction() throws Exception {
3831          region.flush(true);
3832          // Compact regularly to avoid creating too many files and exceeding
3833          // the ulimit.
3834          region.compact(false);
3835          for (HStore store : region.getStores()) {
3836            store.closeAndArchiveCompactedFiles();
3837          }
3838        }
3839      });
3840      ctx.startThreads();
3841
3842      Get get = new Get(Bytes.toBytes("row0"));
3843      Result result = null;
3844
3845      int expectedCount = numFamilies * numQualifiers;
3846
3847      long prevTimestamp = 0L;
3848      for (int i = 0; i < testCount; i++) {
3849        LOG.info("testWritesWhileGetting verify turn " + i);
3850        boolean previousEmpty = result == null || result.isEmpty();
3851        result = region.get(get);
3852        if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
3853          assertEquals("i=" + i, expectedCount, result.size());
3854          // TODO this was removed, now what dangit?!
3855          // search looking for the qualifier in question?
3856          long timestamp = 0;
3857          for (Cell kv : result.rawCells()) {
3858            if (CellUtil.matchingFamily(kv, families[0])
3859                && CellUtil.matchingQualifier(kv, qualifiers[0])) {
3860              timestamp = kv.getTimestamp();
3861            }
3862          }
3863          assertTrue(timestamp >= prevTimestamp);
3864          prevTimestamp = timestamp;
3865          Cell previousKV = null;
3866
3867          for (Cell kv : result.rawCells()) {
3868            byte[] thisValue = CellUtil.cloneValue(kv);
3869            if (previousKV != null) {
3870              if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) {
3871                LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV
3872                    + "(memStoreTS:" + previousKV.getSequenceId() + ")" + ", New KV: " + kv
3873                    + "(memStoreTS:" + kv.getSequenceId() + ")");
3874                assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue));
3875              }
3876            }
3877            previousKV = kv;
3878          }
3879        }
3880      }
3881    } finally {
3882      if (putThread != null)
3883        putThread.done();
3884
3885      region.flush(true);
3886
3887      if (putThread != null) {
3888        putThread.join();
3889        putThread.checkNoError();
3890      }
3891
3892      ctx.stop();
3893      HBaseTestingUtility.closeRegionAndWAL(this.region);
3894      this.region = null;
3895    }
3896  }
3897
3898  @Test
3899  public void testHolesInMeta() throws Exception {
3900    byte[] family = Bytes.toBytes("family");
3901    this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF,
3902        false, family);
3903    byte[] rowNotServed = Bytes.toBytes("a");
3904    Get g = new Get(rowNotServed);
3905    try {
3906      region.get(g);
3907      fail();
3908    } catch (WrongRegionException x) {
3909      // OK
3910    }
3911    byte[] row = Bytes.toBytes("y");
3912    g = new Get(row);
3913    region.get(g);
3914  }
3915
3916  @Test
3917  public void testIndexesScanWithOneDeletedRow() throws IOException {
3918    byte[] family = Bytes.toBytes("family");
3919
3920    // Setting up region
3921    this.region = initHRegion(tableName, method, CONF, family);
3922    Put put = new Put(Bytes.toBytes(1L));
3923    put.addColumn(family, qual1, 1L, Bytes.toBytes(1L));
3924    region.put(put);
3925
3926    region.flush(true);
3927
3928    Delete delete = new Delete(Bytes.toBytes(1L), 1L);
3929    region.delete(delete);
3930
3931    put = new Put(Bytes.toBytes(2L));
3932    put.addColumn(family, qual1, 2L, Bytes.toBytes(2L));
3933    region.put(put);
3934
3935    Scan idxScan = new Scan();
3936    idxScan.addFamily(family);
3937    idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList(
3938        new SingleColumnValueFilter(family, qual1, CompareOp.GREATER_OR_EQUAL,
3939            new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1,
3940            CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L))))));
3941    InternalScanner scanner = region.getScanner(idxScan);
3942    List<Cell> res = new ArrayList<>();
3943
3944    while (scanner.next(res)) {
3945      // Ignore res value.
3946    }
3947    assertEquals(1L, res.size());
3948  }
3949
3950  // ////////////////////////////////////////////////////////////////////////////
3951  // Bloom filter test
3952  // ////////////////////////////////////////////////////////////////////////////
3953  @Test
3954  public void testBloomFilterSize() throws IOException {
3955    byte[] fam1 = Bytes.toBytes("fam1");
3956    byte[] qf1 = Bytes.toBytes("col");
3957    byte[] val1 = Bytes.toBytes("value1");
3958    // Create Table
3959    HColumnDescriptor hcd = new HColumnDescriptor(fam1).setMaxVersions(Integer.MAX_VALUE)
3960        .setBloomFilterType(BloomType.ROWCOL);
3961
3962    HTableDescriptor htd = new HTableDescriptor(tableName);
3963    htd.addFamily(hcd);
3964    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
3965    this.region = TEST_UTIL.createLocalHRegion(info, htd);
3966    int num_unique_rows = 10;
3967    int duplicate_multiplier = 2;
3968    int num_storefiles = 4;
3969
3970    int version = 0;
3971    for (int f = 0; f < num_storefiles; f++) {
3972      for (int i = 0; i < duplicate_multiplier; i++) {
3973        for (int j = 0; j < num_unique_rows; j++) {
3974          Put put = new Put(Bytes.toBytes("row" + j));
3975          put.setDurability(Durability.SKIP_WAL);
3976          long ts = version++;
3977          put.addColumn(fam1, qf1, ts, val1);
3978          region.put(put);
3979        }
3980      }
3981      region.flush(true);
3982    }
3983    // before compaction
3984    HStore store = region.getStore(fam1);
3985    Collection<HStoreFile> storeFiles = store.getStorefiles();
3986    for (HStoreFile storefile : storeFiles) {
3987      StoreFileReader reader = storefile.getReader();
3988      reader.loadFileInfo();
3989      reader.loadBloomfilter();
3990      assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
3991      assertEquals(num_unique_rows, reader.getFilterEntries());
3992    }
3993
3994    region.compact(true);
3995
3996    // after compaction
3997    storeFiles = store.getStorefiles();
3998    for (HStoreFile storefile : storeFiles) {
3999      StoreFileReader reader = storefile.getReader();
4000      reader.loadFileInfo();
4001      reader.loadBloomfilter();
4002      assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries());
4003      assertEquals(num_unique_rows, reader.getFilterEntries());
4004    }
4005  }
4006
4007  @Test
4008  public void testAllColumnsWithBloomFilter() throws IOException {
4009    byte[] TABLE = Bytes.toBytes(name.getMethodName());
4010    byte[] FAMILY = Bytes.toBytes("family");
4011
4012    // Create table
4013    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE)
4014        .setBloomFilterType(BloomType.ROWCOL);
4015    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
4016    htd.addFamily(hcd);
4017    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4018    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4019    // For row:0, col:0: insert versions 1 through 5.
4020    byte[] row = Bytes.toBytes("row:" + 0);
4021    byte[] column = Bytes.toBytes("column:" + 0);
4022    Put put = new Put(row);
4023    put.setDurability(Durability.SKIP_WAL);
4024    for (long idx = 1; idx <= 4; idx++) {
4025      put.addColumn(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx));
4026    }
4027    region.put(put);
4028
4029    // Flush
4030    region.flush(true);
4031
4032    // Get rows
4033    Get get = new Get(row);
4034    get.setMaxVersions();
4035    Cell[] kvs = region.get(get).rawCells();
4036
4037    // Check if rows are correct
4038    assertEquals(4, kvs.length);
4039    checkOneCell(kvs[0], FAMILY, 0, 0, 4);
4040    checkOneCell(kvs[1], FAMILY, 0, 0, 3);
4041    checkOneCell(kvs[2], FAMILY, 0, 0, 2);
4042    checkOneCell(kvs[3], FAMILY, 0, 0, 1);
4043  }
4044
4045  /**
4046   * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when
4047   * issuing delete row on columns with bloom filter set to row+col
4048   * (BloomType.ROWCOL)
4049   */
4050  @Test
4051  public void testDeleteRowWithBloomFilter() throws IOException {
4052    byte[] familyName = Bytes.toBytes("familyName");
4053
4054    // Create Table
4055    HColumnDescriptor hcd = new HColumnDescriptor(familyName).setMaxVersions(Integer.MAX_VALUE)
4056        .setBloomFilterType(BloomType.ROWCOL);
4057
4058    HTableDescriptor htd = new HTableDescriptor(tableName);
4059    htd.addFamily(hcd);
4060    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4061    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4062    // Insert some data
4063    byte[] row = Bytes.toBytes("row1");
4064    byte[] col = Bytes.toBytes("col1");
4065
4066    Put put = new Put(row);
4067    put.addColumn(familyName, col, 1, Bytes.toBytes("SomeRandomValue"));
4068    region.put(put);
4069    region.flush(true);
4070
4071    Delete del = new Delete(row);
4072    region.delete(del);
4073    region.flush(true);
4074
4075    // Get remaining rows (should have none)
4076    Get get = new Get(row);
4077    get.addColumn(familyName, col);
4078
4079    Cell[] keyValues = region.get(get).rawCells();
4080    assertEquals(0, keyValues.length);
4081  }
4082
4083  @Test
4084  public void testgetHDFSBlocksDistribution() throws Exception {
4085    HBaseTestingUtility htu = new HBaseTestingUtility();
4086    // Why do we set the block size in this test?  If we set it smaller than the kvs, then we'll
4087    // break up the file in to more pieces that can be distributed across the three nodes and we
4088    // won't be able to have the condition this test asserts; that at least one node has
4089    // a copy of all replicas -- if small block size, then blocks are spread evenly across the
4090    // the three nodes.  hfilev3 with tags seems to put us over the block size.  St.Ack.
4091    // final int DEFAULT_BLOCK_SIZE = 1024;
4092    // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
4093    htu.getConfiguration().setInt("dfs.replication", 2);
4094
4095    // set up a cluster with 3 nodes
4096    MiniHBaseCluster cluster = null;
4097    String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
4098    int regionServersCount = 3;
4099
4100    try {
4101      StartMiniClusterOption option = StartMiniClusterOption.builder()
4102          .numRegionServers(regionServersCount).dataNodeHosts(dataNodeHosts).build();
4103      cluster = htu.startMiniCluster(option);
4104      byte[][] families = { fam1, fam2 };
4105      Table ht = htu.createTable(tableName, families);
4106
4107      // Setting up region
4108      byte row[] = Bytes.toBytes("row1");
4109      byte col[] = Bytes.toBytes("col1");
4110
4111      Put put = new Put(row);
4112      put.addColumn(fam1, col, 1, Bytes.toBytes("test1"));
4113      put.addColumn(fam2, col, 1, Bytes.toBytes("test2"));
4114      ht.put(put);
4115
4116      HRegion firstRegion = htu.getHBaseCluster().getRegions(tableName).get(0);
4117      firstRegion.flush(true);
4118      HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution();
4119
4120      // Given the default replication factor is 2 and we have 2 HFiles,
4121      // we will have total of 4 replica of blocks on 3 datanodes; thus there
4122      // must be at least one host that have replica for 2 HFiles. That host's
4123      // weight will be equal to the unique block weight.
4124      long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight();
4125      StringBuilder sb = new StringBuilder();
4126      for (String host: blocksDistribution1.getTopHosts()) {
4127        if (sb.length() > 0) sb.append(", ");
4128        sb.append(host);
4129        sb.append("=");
4130        sb.append(blocksDistribution1.getWeight(host));
4131      }
4132
4133      String topHost = blocksDistribution1.getTopHosts().get(0);
4134      long topHostWeight = blocksDistribution1.getWeight(topHost);
4135      String msg = "uniqueBlocksWeight=" + uniqueBlocksWeight1 + ", topHostWeight=" +
4136        topHostWeight + ", topHost=" + topHost + "; " + sb.toString();
4137      LOG.info(msg);
4138      assertTrue(msg, uniqueBlocksWeight1 == topHostWeight);
4139
4140      // use the static method to compute the value, it should be the same.
4141      // static method is used by load balancer or other components
4142      HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution(
4143          htu.getConfiguration(), firstRegion.getTableDescriptor(), firstRegion.getRegionInfo());
4144      long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight();
4145
4146      assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2);
4147
4148      ht.close();
4149    } finally {
4150      if (cluster != null) {
4151        htu.shutdownMiniCluster();
4152      }
4153    }
4154  }
4155
4156  /**
4157   * Testcase to check state of region initialization task set to ABORTED or not
4158   * if any exceptions during initialization
4159   *
4160   * @throws Exception
4161   */
4162  @Test
4163  public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception {
4164    HRegionInfo info;
4165    try {
4166      FileSystem fs = Mockito.mock(FileSystem.class);
4167      Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException());
4168      HTableDescriptor htd = new HTableDescriptor(tableName);
4169      htd.addFamily(new HColumnDescriptor("cf"));
4170      info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
4171          HConstants.EMPTY_BYTE_ARRAY, false);
4172      Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
4173      region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null);
4174      // region initialization throws IOException and set task state to ABORTED.
4175      region.initialize();
4176      fail("Region initialization should fail due to IOException");
4177    } catch (IOException io) {
4178      List<MonitoredTask> tasks = TaskMonitor.get().getTasks();
4179      for (MonitoredTask monitoredTask : tasks) {
4180        if (!(monitoredTask instanceof MonitoredRPCHandler)
4181            && monitoredTask.getDescription().contains(region.toString())) {
4182          assertTrue("Region state should be ABORTED.",
4183              monitoredTask.getState().equals(MonitoredTask.State.ABORTED));
4184          break;
4185        }
4186      }
4187    }
4188  }
4189
4190  /**
4191   * Verifies that the .regioninfo file is written on region creation and that
4192   * is recreated if missing during region opening.
4193   */
4194  @Test
4195  public void testRegionInfoFileCreation() throws IOException {
4196    Path rootDir = new Path(dir + "testRegionInfoFileCreation");
4197
4198    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4199    htd.addFamily(new HColumnDescriptor("cf"));
4200
4201    HRegionInfo hri = new HRegionInfo(htd.getTableName());
4202
4203    // Create a region and skip the initialization (like CreateTableHandler)
4204    region = HBaseTestingUtility.createRegionAndWAL(hri, rootDir, CONF, htd, false);
4205    Path regionDir = region.getRegionFileSystem().getRegionDir();
4206    FileSystem fs = region.getRegionFileSystem().getFileSystem();
4207    HBaseTestingUtility.closeRegionAndWAL(region);
4208
4209    Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE);
4210
4211    // Verify that the .regioninfo file is present
4212    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4213        fs.exists(regionInfoFile));
4214
4215    // Try to open the region
4216    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4217    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4218    HBaseTestingUtility.closeRegionAndWAL(region);
4219
4220    // Verify that the .regioninfo file is still there
4221    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4222        fs.exists(regionInfoFile));
4223
4224    // Remove the .regioninfo file and verify is recreated on region open
4225    fs.delete(regionInfoFile, true);
4226    assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir",
4227        fs.exists(regionInfoFile));
4228
4229    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4230//    region = TEST_UTIL.openHRegion(hri, htd);
4231    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4232    HBaseTestingUtility.closeRegionAndWAL(region);
4233
4234    // Verify that the .regioninfo file is still there
4235    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4236        fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE)));
4237
4238    region = null;
4239  }
4240
4241  /**
4242   * TestCase for increment
4243   */
4244  private static class Incrementer implements Runnable {
4245    private HRegion region;
4246    private final static byte[] incRow = Bytes.toBytes("incRow");
4247    private final static byte[] family = Bytes.toBytes("family");
4248    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4249    private final static long ONE = 1L;
4250    private int incCounter;
4251
4252    public Incrementer(HRegion region, int incCounter) {
4253      this.region = region;
4254      this.incCounter = incCounter;
4255    }
4256
4257    @Override
4258    public void run() {
4259      int count = 0;
4260      while (count < incCounter) {
4261        Increment inc = new Increment(incRow);
4262        inc.addColumn(family, qualifier, ONE);
4263        count++;
4264        try {
4265          region.increment(inc);
4266        } catch (IOException e) {
4267          LOG.info("Count=" + count + ", " + e);
4268          break;
4269        }
4270      }
4271    }
4272  }
4273
4274  /**
4275   * Test case to check increment function with memstore flushing
4276   * @throws Exception
4277   */
4278  @Test
4279  public void testParallelIncrementWithMemStoreFlush() throws Exception {
4280    byte[] family = Incrementer.family;
4281    this.region = initHRegion(tableName, method, CONF, family);
4282    final HRegion region = this.region;
4283    final AtomicBoolean incrementDone = new AtomicBoolean(false);
4284    Runnable flusher = new Runnable() {
4285      @Override
4286      public void run() {
4287        while (!incrementDone.get()) {
4288          try {
4289            region.flush(true);
4290          } catch (Exception e) {
4291            e.printStackTrace();
4292          }
4293        }
4294      }
4295    };
4296
4297    // after all increment finished, the row will increment to 20*100 = 2000
4298    int threadNum = 20;
4299    int incCounter = 100;
4300    long expected = (long) threadNum * incCounter;
4301    Thread[] incrementers = new Thread[threadNum];
4302    Thread flushThread = new Thread(flusher);
4303    for (int i = 0; i < threadNum; i++) {
4304      incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
4305      incrementers[i].start();
4306    }
4307    flushThread.start();
4308    for (int i = 0; i < threadNum; i++) {
4309      incrementers[i].join();
4310    }
4311
4312    incrementDone.set(true);
4313    flushThread.join();
4314
4315    Get get = new Get(Incrementer.incRow);
4316    get.addColumn(Incrementer.family, Incrementer.qualifier);
4317    get.setMaxVersions(1);
4318    Result res = this.region.get(get);
4319    List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier);
4320
4321    // we just got the latest version
4322    assertEquals(1, kvs.size());
4323    Cell kv = kvs.get(0);
4324    assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset()));
4325  }
4326
4327  /**
4328   * TestCase for append
4329   */
4330  private static class Appender implements Runnable {
4331    private HRegion region;
4332    private final static byte[] appendRow = Bytes.toBytes("appendRow");
4333    private final static byte[] family = Bytes.toBytes("family");
4334    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4335    private final static byte[] CHAR = Bytes.toBytes("a");
4336    private int appendCounter;
4337
4338    public Appender(HRegion region, int appendCounter) {
4339      this.region = region;
4340      this.appendCounter = appendCounter;
4341    }
4342
4343    @Override
4344    public void run() {
4345      int count = 0;
4346      while (count < appendCounter) {
4347        Append app = new Append(appendRow);
4348        app.addColumn(family, qualifier, CHAR);
4349        count++;
4350        try {
4351          region.append(app);
4352        } catch (IOException e) {
4353          LOG.info("Count=" + count + ", max=" + appendCounter + ", " + e);
4354          break;
4355        }
4356      }
4357    }
4358  }
4359
4360  /**
4361   * Test case to check append function with memstore flushing
4362   * @throws Exception
4363   */
4364  @Test
4365  public void testParallelAppendWithMemStoreFlush() throws Exception {
4366    byte[] family = Appender.family;
4367    this.region = initHRegion(tableName, method, CONF, family);
4368    final HRegion region = this.region;
4369    final AtomicBoolean appendDone = new AtomicBoolean(false);
4370    Runnable flusher = new Runnable() {
4371      @Override
4372      public void run() {
4373        while (!appendDone.get()) {
4374          try {
4375            region.flush(true);
4376          } catch (Exception e) {
4377            e.printStackTrace();
4378          }
4379        }
4380      }
4381    };
4382
4383    // After all append finished, the value will append to threadNum *
4384    // appendCounter Appender.CHAR
4385    int threadNum = 20;
4386    int appendCounter = 100;
4387    byte[] expected = new byte[threadNum * appendCounter];
4388    for (int i = 0; i < threadNum * appendCounter; i++) {
4389      System.arraycopy(Appender.CHAR, 0, expected, i, 1);
4390    }
4391    Thread[] appenders = new Thread[threadNum];
4392    Thread flushThread = new Thread(flusher);
4393    for (int i = 0; i < threadNum; i++) {
4394      appenders[i] = new Thread(new Appender(this.region, appendCounter));
4395      appenders[i].start();
4396    }
4397    flushThread.start();
4398    for (int i = 0; i < threadNum; i++) {
4399      appenders[i].join();
4400    }
4401
4402    appendDone.set(true);
4403    flushThread.join();
4404
4405    Get get = new Get(Appender.appendRow);
4406    get.addColumn(Appender.family, Appender.qualifier);
4407    get.setMaxVersions(1);
4408    Result res = this.region.get(get);
4409    List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier);
4410
4411    // we just got the latest version
4412    assertEquals(1, kvs.size());
4413    Cell kv = kvs.get(0);
4414    byte[] appendResult = new byte[kv.getValueLength()];
4415    System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
4416    assertArrayEquals(expected, appendResult);
4417  }
4418
4419  /**
4420   * Test case to check put function with memstore flushing for same row, same ts
4421   * @throws Exception
4422   */
4423  @Test
4424  public void testPutWithMemStoreFlush() throws Exception {
4425    byte[] family = Bytes.toBytes("family");
4426    byte[] qualifier = Bytes.toBytes("qualifier");
4427    byte[] row = Bytes.toBytes("putRow");
4428    byte[] value = null;
4429    this.region = initHRegion(tableName, method, CONF, family);
4430    Put put = null;
4431    Get get = null;
4432    List<Cell> kvs = null;
4433    Result res = null;
4434
4435    put = new Put(row);
4436    value = Bytes.toBytes("value0");
4437    put.addColumn(family, qualifier, 1234567L, value);
4438    region.put(put);
4439    get = new Get(row);
4440    get.addColumn(family, qualifier);
4441    get.setMaxVersions();
4442    res = this.region.get(get);
4443    kvs = res.getColumnCells(family, qualifier);
4444    assertEquals(1, kvs.size());
4445    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4446
4447    region.flush(true);
4448    get = new Get(row);
4449    get.addColumn(family, qualifier);
4450    get.setMaxVersions();
4451    res = this.region.get(get);
4452    kvs = res.getColumnCells(family, qualifier);
4453    assertEquals(1, kvs.size());
4454    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4455
4456    put = new Put(row);
4457    value = Bytes.toBytes("value1");
4458    put.addColumn(family, qualifier, 1234567L, value);
4459    region.put(put);
4460    get = new Get(row);
4461    get.addColumn(family, qualifier);
4462    get.setMaxVersions();
4463    res = this.region.get(get);
4464    kvs = res.getColumnCells(family, qualifier);
4465    assertEquals(1, kvs.size());
4466    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4467
4468    region.flush(true);
4469    get = new Get(row);
4470    get.addColumn(family, qualifier);
4471    get.setMaxVersions();
4472    res = this.region.get(get);
4473    kvs = res.getColumnCells(family, qualifier);
4474    assertEquals(1, kvs.size());
4475    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4476  }
4477
4478  @Test
4479  public void testDurability() throws Exception {
4480    // there are 5 x 5 cases:
4481    // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation
4482    // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
4483
4484    // expected cases for append and sync wal
4485    durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4486    durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4487    durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4488
4489    durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4490    durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4491    durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4492
4493    durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4494    durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4495
4496    durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false);
4497    durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4498
4499    durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false);
4500    durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false);
4501    durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false);
4502
4503    // expected cases for async wal
4504    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4505    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4506    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4507    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4508    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false);
4509    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false);
4510
4511    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4512    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4513    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4514    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4515    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true);
4516    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
4517
4518    // expect skip wal cases
4519    durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4520    durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4521    durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4522    durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
4523    durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false);
4524    durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false);
4525
4526  }
4527
4528  private void durabilityTest(String method, Durability tableDurability,
4529      Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
4530      final boolean expectSyncFromLogSyncer) throws Exception {
4531    Configuration conf = HBaseConfiguration.create(CONF);
4532    method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
4533    byte[] family = Bytes.toBytes("family");
4534    Path logDir = new Path(new Path(dir + method), "log");
4535    final Configuration walConf = new Configuration(conf);
4536    FSUtils.setRootDir(walConf, logDir);
4537    // XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not
4538    // deal with classes which have a field of an inner class. See discussions in HBASE-15536.
4539    walConf.set(WALFactory.WAL_PROVIDER, "filesystem");
4540    final WALFactory wals = new WALFactory(walConf, TEST_UTIL.getRandomUUID().toString());
4541    final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()));
4542    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
4543        HConstants.EMPTY_END_ROW, false, tableDurability, wal,
4544        new byte[][] { family });
4545
4546    Put put = new Put(Bytes.toBytes("r1"));
4547    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
4548    put.setDurability(mutationDurability);
4549    region.put(put);
4550
4551    // verify append called or not
4552    verify(wal, expectAppend ? times(1) : never()).appendData((HRegionInfo) any(),
4553      (WALKeyImpl) any(), (WALEdit) any());
4554
4555    // verify sync called or not
4556    if (expectSync || expectSyncFromLogSyncer) {
4557      TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() {
4558        @Override
4559        public boolean evaluate() throws Exception {
4560          try {
4561            if (expectSync) {
4562              verify(wal, times(1)).sync(anyLong()); // Hregion calls this one
4563            } else if (expectSyncFromLogSyncer) {
4564              verify(wal, times(1)).sync(); // wal syncer calls this one
4565            }
4566          } catch (Throwable ignore) {
4567          }
4568          return true;
4569        }
4570      });
4571    } else {
4572      //verify(wal, never()).sync(anyLong());
4573      verify(wal, never()).sync();
4574    }
4575
4576    HBaseTestingUtility.closeRegionAndWAL(this.region);
4577    wals.close();
4578    this.region = null;
4579  }
4580
4581  @Test
4582  public void testRegionReplicaSecondary() throws IOException {
4583    // create a primary region, load some data and flush
4584    // create a secondary region, and do a get against that
4585    Path rootDir = new Path(dir + name.getMethodName());
4586    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4587
4588    byte[][] families = new byte[][] {
4589        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4590    };
4591    byte[] cq = Bytes.toBytes("cq");
4592    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4593    for (byte[] family : families) {
4594      htd.addFamily(new HColumnDescriptor(family));
4595    }
4596
4597    long time = System.currentTimeMillis();
4598    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4599      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4600      false, time, 0);
4601    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4602      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4603      false, time, 1);
4604
4605    HRegion primaryRegion = null, secondaryRegion = null;
4606
4607    try {
4608      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4609          rootDir, TEST_UTIL.getConfiguration(), htd);
4610
4611      // load some data
4612      putData(primaryRegion, 0, 1000, cq, families);
4613
4614      // flush region
4615      primaryRegion.flush(true);
4616
4617      // open secondary region
4618      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4619
4620      verifyData(secondaryRegion, 0, 1000, cq, families);
4621    } finally {
4622      if (primaryRegion != null) {
4623        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4624      }
4625      if (secondaryRegion != null) {
4626        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4627      }
4628    }
4629  }
4630
4631  @Test
4632  public void testRegionReplicaSecondaryIsReadOnly() throws IOException {
4633    // create a primary region, load some data and flush
4634    // create a secondary region, and do a put against that
4635    Path rootDir = new Path(dir + name.getMethodName());
4636    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4637
4638    byte[][] families = new byte[][] {
4639        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4640    };
4641    byte[] cq = Bytes.toBytes("cq");
4642    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4643    for (byte[] family : families) {
4644      htd.addFamily(new HColumnDescriptor(family));
4645    }
4646
4647    long time = System.currentTimeMillis();
4648    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4649      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4650      false, time, 0);
4651    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4652      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4653      false, time, 1);
4654
4655    HRegion primaryRegion = null, secondaryRegion = null;
4656
4657    try {
4658      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4659          rootDir, TEST_UTIL.getConfiguration(), htd);
4660
4661      // load some data
4662      putData(primaryRegion, 0, 1000, cq, families);
4663
4664      // flush region
4665      primaryRegion.flush(true);
4666
4667      // open secondary region
4668      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4669
4670      try {
4671        putData(secondaryRegion, 0, 1000, cq, families);
4672        fail("Should have thrown exception");
4673      } catch (IOException ex) {
4674        // expected
4675      }
4676    } finally {
4677      if (primaryRegion != null) {
4678        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4679      }
4680      if (secondaryRegion != null) {
4681        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4682      }
4683    }
4684  }
4685
4686  static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException {
4687    Configuration confForWAL = new Configuration(conf);
4688    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
4689    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8));
4690  }
4691
4692  @Test
4693  public void testCompactionFromPrimary() throws IOException {
4694    Path rootDir = new Path(dir + name.getMethodName());
4695    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4696
4697    byte[][] families = new byte[][] {
4698        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4699    };
4700    byte[] cq = Bytes.toBytes("cq");
4701    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4702    for (byte[] family : families) {
4703      htd.addFamily(new HColumnDescriptor(family));
4704    }
4705
4706    long time = System.currentTimeMillis();
4707    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4708      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4709      false, time, 0);
4710    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4711      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4712      false, time, 1);
4713
4714    HRegion primaryRegion = null, secondaryRegion = null;
4715
4716    try {
4717      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4718          rootDir, TEST_UTIL.getConfiguration(), htd);
4719
4720      // load some data
4721      putData(primaryRegion, 0, 1000, cq, families);
4722
4723      // flush region
4724      primaryRegion.flush(true);
4725
4726      // open secondary region
4727      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4728
4729      // move the file of the primary region to the archive, simulating a compaction
4730      Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
4731      primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
4732      Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
4733          .getStoreFiles(families[0]);
4734      Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
4735
4736      verifyData(secondaryRegion, 0, 1000, cq, families);
4737    } finally {
4738      if (primaryRegion != null) {
4739        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4740      }
4741      if (secondaryRegion != null) {
4742        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4743      }
4744    }
4745  }
4746
4747  private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws
4748      IOException {
4749    putData(this.region, startRow, numRows, qf, families);
4750  }
4751
4752  private void putData(HRegion region,
4753      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4754    putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families);
4755  }
4756
4757  static void putData(HRegion region, Durability durability,
4758      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4759    for (int i = startRow; i < startRow + numRows; i++) {
4760      Put put = new Put(Bytes.toBytes("" + i));
4761      put.setDurability(durability);
4762      for (byte[] family : families) {
4763        put.addColumn(family, qf, null);
4764      }
4765      region.put(put);
4766      LOG.info(put.toString());
4767    }
4768  }
4769
4770  static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
4771      throws IOException {
4772    for (int i = startRow; i < startRow + numRows; i++) {
4773      byte[] row = Bytes.toBytes("" + i);
4774      Get get = new Get(row);
4775      for (byte[] family : families) {
4776        get.addColumn(family, qf);
4777      }
4778      Result result = newReg.get(get);
4779      Cell[] raw = result.rawCells();
4780      assertEquals(families.length, result.size());
4781      for (int j = 0; j < families.length; j++) {
4782        assertTrue(CellUtil.matchingRows(raw[j], row));
4783        assertTrue(CellUtil.matchingFamily(raw[j], families[j]));
4784        assertTrue(CellUtil.matchingQualifier(raw[j], qf));
4785      }
4786    }
4787  }
4788
4789  static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
4790    // Now I have k, get values out and assert they are as expected.
4791    Get get = new Get(k).addFamily(family).setMaxVersions();
4792    Cell[] results = r.get(get).rawCells();
4793    for (int j = 0; j < results.length; j++) {
4794      byte[] tmp = CellUtil.cloneValue(results[j]);
4795      // Row should be equal to value every time.
4796      assertTrue(Bytes.equals(k, tmp));
4797    }
4798  }
4799
4800  /*
4801   * Assert first value in the passed region is <code>firstValue</code>.
4802   *
4803   * @param r
4804   *
4805   * @param fs
4806   *
4807   * @param firstValue
4808   *
4809   * @throws IOException
4810   */
4811  protected void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue)
4812      throws IOException {
4813    byte[][] families = { fs };
4814    Scan scan = new Scan();
4815    for (int i = 0; i < families.length; i++)
4816      scan.addFamily(families[i]);
4817    InternalScanner s = r.getScanner(scan);
4818    try {
4819      List<Cell> curVals = new ArrayList<>();
4820      boolean first = true;
4821      OUTER_LOOP: while (s.next(curVals)) {
4822        for (Cell kv : curVals) {
4823          byte[] val = CellUtil.cloneValue(kv);
4824          byte[] curval = val;
4825          if (first) {
4826            first = false;
4827            assertTrue(Bytes.compareTo(curval, firstValue) == 0);
4828          } else {
4829            // Not asserting anything. Might as well break.
4830            break OUTER_LOOP;
4831          }
4832        }
4833      }
4834    } finally {
4835      s.close();
4836    }
4837  }
4838
4839  /**
4840   * Test that we get the expected flush results back
4841   */
4842  @Test
4843  public void testFlushResult() throws IOException {
4844    byte[] family = Bytes.toBytes("family");
4845
4846    this.region = initHRegion(tableName, method, family);
4847
4848    // empty memstore, flush doesn't run
4849    HRegion.FlushResult fr = region.flush(true);
4850    assertFalse(fr.isFlushSucceeded());
4851    assertFalse(fr.isCompactionNeeded());
4852
4853    // Flush enough files to get up to the threshold, doesn't need compactions
4854    for (int i = 0; i < 2; i++) {
4855      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
4856      region.put(put);
4857      fr = region.flush(true);
4858      assertTrue(fr.isFlushSucceeded());
4859      assertFalse(fr.isCompactionNeeded());
4860    }
4861
4862    // Two flushes after the threshold, compactions are needed
4863    for (int i = 0; i < 2; i++) {
4864      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
4865      region.put(put);
4866      fr = region.flush(true);
4867      assertTrue(fr.isFlushSucceeded());
4868      assertTrue(fr.isCompactionNeeded());
4869    }
4870  }
4871
4872  protected Configuration initSplit() {
4873    // Always compact if there is more than one store file.
4874    CONF.setInt("hbase.hstore.compactionThreshold", 2);
4875
4876    CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
4877
4878    // Increase the amount of time between client retries
4879    CONF.setLong("hbase.client.pause", 15 * 1000);
4880
4881    // This size should make it so we always split using the addContent
4882    // below. After adding all data, the first region is 1.3M
4883    CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
4884    return CONF;
4885  }
4886
4887  /**
4888   * @return A region on which you must call
4889   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4890   */
4891  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
4892      byte[]... families) throws IOException {
4893    return initHRegion(tableName, callingMethod, conf, false, families);
4894  }
4895
4896  /**
4897   * @return A region on which you must call
4898   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4899   */
4900  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
4901      boolean isReadOnly, byte[]... families) throws IOException {
4902    return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
4903  }
4904
4905  protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
4906      String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
4907      throws IOException {
4908    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
4909    HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
4910    final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
4911    return initHRegion(tableName, startKey, stopKey, isReadOnly,
4912        Durability.SYNC_WAL, wal, families);
4913  }
4914
4915  /**
4916   * @return A region on which you must call
4917   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4918   */
4919  public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
4920      boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
4921    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
4922    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey,
4923        isReadOnly, durability, wal, families);
4924  }
4925
4926  /**
4927   * Assert that the passed in Cell has expected contents for the specified row,
4928   * column & timestamp.
4929   */
4930  private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
4931    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
4932    assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx,
4933        Bytes.toString(CellUtil.cloneRow(kv)));
4934    assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf),
4935        Bytes.toString(CellUtil.cloneFamily(kv)));
4936    assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx,
4937        Bytes.toString(CellUtil.cloneQualifier(kv)));
4938    assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp());
4939    assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
4940        Bytes.toString(CellUtil.cloneValue(kv)));
4941  }
4942
4943  @Test
4944  public void testReverseScanner_FromMemStore_SingleCF_Normal()
4945      throws IOException {
4946    byte[] rowC = Bytes.toBytes("rowC");
4947    byte[] rowA = Bytes.toBytes("rowA");
4948    byte[] rowB = Bytes.toBytes("rowB");
4949    byte[] cf = Bytes.toBytes("CF");
4950    byte[][] families = { cf };
4951    byte[] col = Bytes.toBytes("C");
4952    long ts = 1;
4953    this.region = initHRegion(tableName, method, families);
4954    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
4955    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
4956        null);
4957    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
4958    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
4959    Put put = null;
4960    put = new Put(rowC);
4961    put.add(kv1);
4962    put.add(kv11);
4963    region.put(put);
4964    put = new Put(rowA);
4965    put.add(kv2);
4966    region.put(put);
4967    put = new Put(rowB);
4968    put.add(kv3);
4969    region.put(put);
4970
4971    Scan scan = new Scan(rowC);
4972    scan.setMaxVersions(5);
4973    scan.setReversed(true);
4974    InternalScanner scanner = region.getScanner(scan);
4975    List<Cell> currRow = new ArrayList<>();
4976    boolean hasNext = scanner.next(currRow);
4977    assertEquals(2, currRow.size());
4978    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4979        .get(0).getRowLength(), rowC, 0, rowC.length));
4980    assertTrue(hasNext);
4981    currRow.clear();
4982    hasNext = scanner.next(currRow);
4983    assertEquals(1, currRow.size());
4984    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4985        .get(0).getRowLength(), rowB, 0, rowB.length));
4986    assertTrue(hasNext);
4987    currRow.clear();
4988    hasNext = scanner.next(currRow);
4989    assertEquals(1, currRow.size());
4990    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4991        .get(0).getRowLength(), rowA, 0, rowA.length));
4992    assertFalse(hasNext);
4993    scanner.close();
4994  }
4995
4996  @Test
4997  public void testReverseScanner_FromMemStore_SingleCF_LargerKey()
4998      throws IOException {
4999    byte[] rowC = Bytes.toBytes("rowC");
5000    byte[] rowA = Bytes.toBytes("rowA");
5001    byte[] rowB = Bytes.toBytes("rowB");
5002    byte[] rowD = Bytes.toBytes("rowD");
5003    byte[] cf = Bytes.toBytes("CF");
5004    byte[][] families = { cf };
5005    byte[] col = Bytes.toBytes("C");
5006    long ts = 1;
5007    this.region = initHRegion(tableName, method, families);
5008    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5009    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5010        null);
5011    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5012    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5013    Put put = null;
5014    put = new Put(rowC);
5015    put.add(kv1);
5016    put.add(kv11);
5017    region.put(put);
5018    put = new Put(rowA);
5019    put.add(kv2);
5020    region.put(put);
5021    put = new Put(rowB);
5022    put.add(kv3);
5023    region.put(put);
5024
5025    Scan scan = new Scan(rowD);
5026    List<Cell> currRow = new ArrayList<>();
5027    scan.setReversed(true);
5028    scan.setMaxVersions(5);
5029    InternalScanner scanner = region.getScanner(scan);
5030    boolean hasNext = scanner.next(currRow);
5031    assertEquals(2, currRow.size());
5032    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5033        .get(0).getRowLength(), rowC, 0, rowC.length));
5034    assertTrue(hasNext);
5035    currRow.clear();
5036    hasNext = scanner.next(currRow);
5037    assertEquals(1, currRow.size());
5038    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5039        .get(0).getRowLength(), rowB, 0, rowB.length));
5040    assertTrue(hasNext);
5041    currRow.clear();
5042    hasNext = scanner.next(currRow);
5043    assertEquals(1, currRow.size());
5044    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5045        .get(0).getRowLength(), rowA, 0, rowA.length));
5046    assertFalse(hasNext);
5047    scanner.close();
5048  }
5049
5050  @Test
5051  public void testReverseScanner_FromMemStore_SingleCF_FullScan()
5052      throws IOException {
5053    byte[] rowC = Bytes.toBytes("rowC");
5054    byte[] rowA = Bytes.toBytes("rowA");
5055    byte[] rowB = Bytes.toBytes("rowB");
5056    byte[] cf = Bytes.toBytes("CF");
5057    byte[][] families = { cf };
5058    byte[] col = Bytes.toBytes("C");
5059    long ts = 1;
5060    this.region = initHRegion(tableName, method, families);
5061    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5062    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5063        null);
5064    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5065    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5066    Put put = null;
5067    put = new Put(rowC);
5068    put.add(kv1);
5069    put.add(kv11);
5070    region.put(put);
5071    put = new Put(rowA);
5072    put.add(kv2);
5073    region.put(put);
5074    put = new Put(rowB);
5075    put.add(kv3);
5076    region.put(put);
5077    Scan scan = new Scan();
5078    List<Cell> currRow = new ArrayList<>();
5079    scan.setReversed(true);
5080    InternalScanner scanner = region.getScanner(scan);
5081    boolean hasNext = scanner.next(currRow);
5082    assertEquals(1, currRow.size());
5083    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5084        .get(0).getRowLength(), rowC, 0, rowC.length));
5085    assertTrue(hasNext);
5086    currRow.clear();
5087    hasNext = scanner.next(currRow);
5088    assertEquals(1, currRow.size());
5089    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5090        .get(0).getRowLength(), rowB, 0, rowB.length));
5091    assertTrue(hasNext);
5092    currRow.clear();
5093    hasNext = scanner.next(currRow);
5094    assertEquals(1, currRow.size());
5095    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5096        .get(0).getRowLength(), rowA, 0, rowA.length));
5097    assertFalse(hasNext);
5098    scanner.close();
5099  }
5100
5101  @Test
5102  public void testReverseScanner_moreRowsMayExistAfter() throws IOException {
5103    // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop
5104    byte[] rowA = Bytes.toBytes("rowA");
5105    byte[] rowB = Bytes.toBytes("rowB");
5106    byte[] rowC = Bytes.toBytes("rowC");
5107    byte[] rowD = Bytes.toBytes("rowD");
5108    byte[] rowE = Bytes.toBytes("rowE");
5109    byte[] cf = Bytes.toBytes("CF");
5110    byte[][] families = { cf };
5111    byte[] col1 = Bytes.toBytes("col1");
5112    byte[] col2 = Bytes.toBytes("col2");
5113    long ts = 1;
5114    this.region = initHRegion(tableName, method, families);
5115    KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5116    KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5117    KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5118    KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5119    KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5120    KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5121    Put put = null;
5122    put = new Put(rowA);
5123    put.add(kv1);
5124    region.put(put);
5125    put = new Put(rowB);
5126    put.add(kv2);
5127    region.put(put);
5128    put = new Put(rowC);
5129    put.add(kv3);
5130    region.put(put);
5131    put = new Put(rowD);
5132    put.add(kv4_1);
5133    region.put(put);
5134    put = new Put(rowD);
5135    put.add(kv4_2);
5136    region.put(put);
5137    put = new Put(rowE);
5138    put.add(kv5);
5139    region.put(put);
5140    region.flush(true);
5141    Scan scan = new Scan(rowD, rowA);
5142    scan.addColumn(families[0], col1);
5143    scan.setReversed(true);
5144    List<Cell> currRow = new ArrayList<>();
5145    InternalScanner scanner = region.getScanner(scan);
5146    boolean hasNext = scanner.next(currRow);
5147    assertEquals(1, currRow.size());
5148    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5149        .get(0).getRowLength(), rowD, 0, rowD.length));
5150    assertTrue(hasNext);
5151    currRow.clear();
5152    hasNext = scanner.next(currRow);
5153    assertEquals(1, currRow.size());
5154    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5155        .get(0).getRowLength(), rowC, 0, rowC.length));
5156    assertTrue(hasNext);
5157    currRow.clear();
5158    hasNext = scanner.next(currRow);
5159    assertEquals(1, currRow.size());
5160    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5161        .get(0).getRowLength(), rowB, 0, rowB.length));
5162    assertFalse(hasNext);
5163    scanner.close();
5164
5165    scan = new Scan(rowD, rowA);
5166    scan.addColumn(families[0], col2);
5167    scan.setReversed(true);
5168    currRow.clear();
5169    scanner = region.getScanner(scan);
5170    hasNext = scanner.next(currRow);
5171    assertEquals(1, currRow.size());
5172    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5173        .get(0).getRowLength(), rowD, 0, rowD.length));
5174    scanner.close();
5175  }
5176
5177  @Test
5178  public void testReverseScanner_smaller_blocksize() throws IOException {
5179    // case to ensure no conflict with HFile index optimization
5180    byte[] rowA = Bytes.toBytes("rowA");
5181    byte[] rowB = Bytes.toBytes("rowB");
5182    byte[] rowC = Bytes.toBytes("rowC");
5183    byte[] rowD = Bytes.toBytes("rowD");
5184    byte[] rowE = Bytes.toBytes("rowE");
5185    byte[] cf = Bytes.toBytes("CF");
5186    byte[][] families = { cf };
5187    byte[] col1 = Bytes.toBytes("col1");
5188    byte[] col2 = Bytes.toBytes("col2");
5189    long ts = 1;
5190    HBaseConfiguration config = new HBaseConfiguration();
5191    config.setInt("test.block.size", 1);
5192    this.region = initHRegion(tableName, method, config, families);
5193    KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5194    KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5195    KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5196    KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5197    KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5198    KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5199    Put put = null;
5200    put = new Put(rowA);
5201    put.add(kv1);
5202    region.put(put);
5203    put = new Put(rowB);
5204    put.add(kv2);
5205    region.put(put);
5206    put = new Put(rowC);
5207    put.add(kv3);
5208    region.put(put);
5209    put = new Put(rowD);
5210    put.add(kv4_1);
5211    region.put(put);
5212    put = new Put(rowD);
5213    put.add(kv4_2);
5214    region.put(put);
5215    put = new Put(rowE);
5216    put.add(kv5);
5217    region.put(put);
5218    region.flush(true);
5219    Scan scan = new Scan(rowD, rowA);
5220    scan.addColumn(families[0], col1);
5221    scan.setReversed(true);
5222    List<Cell> currRow = new ArrayList<>();
5223    InternalScanner scanner = region.getScanner(scan);
5224    boolean hasNext = scanner.next(currRow);
5225    assertEquals(1, currRow.size());
5226    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5227        .get(0).getRowLength(), rowD, 0, rowD.length));
5228    assertTrue(hasNext);
5229    currRow.clear();
5230    hasNext = scanner.next(currRow);
5231    assertEquals(1, currRow.size());
5232    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5233        .get(0).getRowLength(), rowC, 0, rowC.length));
5234    assertTrue(hasNext);
5235    currRow.clear();
5236    hasNext = scanner.next(currRow);
5237    assertEquals(1, currRow.size());
5238    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5239        .get(0).getRowLength(), rowB, 0, rowB.length));
5240    assertFalse(hasNext);
5241    scanner.close();
5242
5243    scan = new Scan(rowD, rowA);
5244    scan.addColumn(families[0], col2);
5245    scan.setReversed(true);
5246    currRow.clear();
5247    scanner = region.getScanner(scan);
5248    hasNext = scanner.next(currRow);
5249    assertEquals(1, currRow.size());
5250    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5251        .get(0).getRowLength(), rowD, 0, rowD.length));
5252    scanner.close();
5253  }
5254
5255  @Test
5256  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1()
5257      throws IOException {
5258    byte[] row0 = Bytes.toBytes("row0"); // 1 kv
5259    byte[] row1 = Bytes.toBytes("row1"); // 2 kv
5260    byte[] row2 = Bytes.toBytes("row2"); // 4 kv
5261    byte[] row3 = Bytes.toBytes("row3"); // 2 kv
5262    byte[] row4 = Bytes.toBytes("row4"); // 5 kv
5263    byte[] row5 = Bytes.toBytes("row5"); // 2 kv
5264    byte[] cf1 = Bytes.toBytes("CF1");
5265    byte[] cf2 = Bytes.toBytes("CF2");
5266    byte[] cf3 = Bytes.toBytes("CF3");
5267    byte[][] families = { cf1, cf2, cf3 };
5268    byte[] col = Bytes.toBytes("C");
5269    long ts = 1;
5270    HBaseConfiguration conf = new HBaseConfiguration();
5271    // disable compactions in this test.
5272    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5273    this.region = initHRegion(tableName, method, conf, families);
5274    // kv naming style: kv(row number) totalKvCountInThisRow seq no
5275    KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put,
5276        null);
5277    KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put,
5278        null);
5279    KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1,
5280        KeyValue.Type.Put, null);
5281    KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put,
5282        null);
5283    KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put,
5284        null);
5285    KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put,
5286        null);
5287    KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4,
5288        KeyValue.Type.Put, null);
5289    KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put,
5290        null);
5291    KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4,
5292        KeyValue.Type.Put, null);
5293    KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put,
5294        null);
5295    KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put,
5296        null);
5297    KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5,
5298        KeyValue.Type.Put, null);
5299    KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put,
5300        null);
5301    KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3,
5302        KeyValue.Type.Put, null);
5303    KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put,
5304        null);
5305    KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put,
5306        null);
5307    // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv)
5308    Put put = null;
5309    put = new Put(row1);
5310    put.add(kv1_2_1);
5311    region.put(put);
5312    put = new Put(row2);
5313    put.add(kv2_4_1);
5314    region.put(put);
5315    put = new Put(row4);
5316    put.add(kv4_5_4);
5317    put.add(kv4_5_5);
5318    region.put(put);
5319    region.flush(true);
5320    // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv)
5321    put = new Put(row4);
5322    put.add(kv4_5_1);
5323    put.add(kv4_5_3);
5324    region.put(put);
5325    put = new Put(row1);
5326    put.add(kv1_2_2);
5327    region.put(put);
5328    put = new Put(row2);
5329    put.add(kv2_4_4);
5330    region.put(put);
5331    region.flush(true);
5332    // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv)
5333    put = new Put(row4);
5334    put.add(kv4_5_2);
5335    region.put(put);
5336    put = new Put(row2);
5337    put.add(kv2_4_2);
5338    put.add(kv2_4_3);
5339    region.put(put);
5340    put = new Put(row3);
5341    put.add(kv3_2_2);
5342    region.put(put);
5343    region.flush(true);
5344    // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max)
5345    // ( 2 kv)
5346    put = new Put(row0);
5347    put.add(kv0_1_1);
5348    region.put(put);
5349    put = new Put(row3);
5350    put.add(kv3_2_1);
5351    region.put(put);
5352    put = new Put(row5);
5353    put.add(kv5_2_1);
5354    put.add(kv5_2_2);
5355    region.put(put);
5356    // scan range = ["row4", min), skip the max "row5"
5357    Scan scan = new Scan(row4);
5358    scan.setMaxVersions(5);
5359    scan.setBatch(3);
5360    scan.setReversed(true);
5361    InternalScanner scanner = region.getScanner(scan);
5362    List<Cell> currRow = new ArrayList<>();
5363    boolean hasNext = false;
5364    // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not
5365    // included in scan range
5366    // "row4" takes 2 next() calls since batch=3
5367    hasNext = scanner.next(currRow);
5368    assertEquals(3, currRow.size());
5369    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5370        .get(0).getRowLength(), row4, 0, row4.length));
5371    assertTrue(hasNext);
5372    currRow.clear();
5373    hasNext = scanner.next(currRow);
5374    assertEquals(2, currRow.size());
5375    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
5376        currRow.get(0).getRowLength(), row4, 0,
5377      row4.length));
5378    assertTrue(hasNext);
5379    // 2. scan out "row3" (2 kv)
5380    currRow.clear();
5381    hasNext = scanner.next(currRow);
5382    assertEquals(2, currRow.size());
5383    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5384        .get(0).getRowLength(), row3, 0, row3.length));
5385    assertTrue(hasNext);
5386    // 3. scan out "row2" (4 kvs)
5387    // "row2" takes 2 next() calls since batch=3
5388    currRow.clear();
5389    hasNext = scanner.next(currRow);
5390    assertEquals(3, currRow.size());
5391    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5392        .get(0).getRowLength(), row2, 0, row2.length));
5393    assertTrue(hasNext);
5394    currRow.clear();
5395    hasNext = scanner.next(currRow);
5396    assertEquals(1, currRow.size());
5397    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5398        .get(0).getRowLength(), row2, 0, row2.length));
5399    assertTrue(hasNext);
5400    // 4. scan out "row1" (2 kv)
5401    currRow.clear();
5402    hasNext = scanner.next(currRow);
5403    assertEquals(2, currRow.size());
5404    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5405        .get(0).getRowLength(), row1, 0, row1.length));
5406    assertTrue(hasNext);
5407    // 5. scan out "row0" (1 kv)
5408    currRow.clear();
5409    hasNext = scanner.next(currRow);
5410    assertEquals(1, currRow.size());
5411    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5412        .get(0).getRowLength(), row0, 0, row0.length));
5413    assertFalse(hasNext);
5414
5415    scanner.close();
5416  }
5417
5418  @Test
5419  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2()
5420      throws IOException {
5421    byte[] row1 = Bytes.toBytes("row1");
5422    byte[] row2 = Bytes.toBytes("row2");
5423    byte[] row3 = Bytes.toBytes("row3");
5424    byte[] row4 = Bytes.toBytes("row4");
5425    byte[] cf1 = Bytes.toBytes("CF1");
5426    byte[] cf2 = Bytes.toBytes("CF2");
5427    byte[] cf3 = Bytes.toBytes("CF3");
5428    byte[] cf4 = Bytes.toBytes("CF4");
5429    byte[][] families = { cf1, cf2, cf3, cf4 };
5430    byte[] col = Bytes.toBytes("C");
5431    long ts = 1;
5432    HBaseConfiguration conf = new HBaseConfiguration();
5433    // disable compactions in this test.
5434    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5435    this.region = initHRegion(tableName, method, conf, families);
5436    KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null);
5437    KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null);
5438    KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null);
5439    KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null);
5440    // storefile1
5441    Put put = new Put(row1);
5442    put.add(kv1);
5443    region.put(put);
5444    region.flush(true);
5445    // storefile2
5446    put = new Put(row2);
5447    put.add(kv2);
5448    region.put(put);
5449    region.flush(true);
5450    // storefile3
5451    put = new Put(row3);
5452    put.add(kv3);
5453    region.put(put);
5454    region.flush(true);
5455    // memstore
5456    put = new Put(row4);
5457    put.add(kv4);
5458    region.put(put);
5459    // scan range = ["row4", min)
5460    Scan scan = new Scan(row4);
5461    scan.setReversed(true);
5462    scan.setBatch(10);
5463    InternalScanner scanner = region.getScanner(scan);
5464    List<Cell> currRow = new ArrayList<>();
5465    boolean hasNext = scanner.next(currRow);
5466    assertEquals(1, currRow.size());
5467    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5468        .get(0).getRowLength(), row4, 0, row4.length));
5469    assertTrue(hasNext);
5470    currRow.clear();
5471    hasNext = scanner.next(currRow);
5472    assertEquals(1, currRow.size());
5473    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5474        .get(0).getRowLength(), row3, 0, row3.length));
5475    assertTrue(hasNext);
5476    currRow.clear();
5477    hasNext = scanner.next(currRow);
5478    assertEquals(1, currRow.size());
5479    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5480        .get(0).getRowLength(), row2, 0, row2.length));
5481    assertTrue(hasNext);
5482    currRow.clear();
5483    hasNext = scanner.next(currRow);
5484    assertEquals(1, currRow.size());
5485    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5486        .get(0).getRowLength(), row1, 0, row1.length));
5487    assertFalse(hasNext);
5488  }
5489
5490  /**
5491   * Test for HBASE-14497: Reverse Scan threw StackOverflow caused by readPt checking
5492   */
5493  @Test
5494  public void testReverseScanner_StackOverflow() throws IOException {
5495    byte[] cf1 = Bytes.toBytes("CF1");
5496    byte[][] families = {cf1};
5497    byte[] col = Bytes.toBytes("C");
5498    HBaseConfiguration conf = new HBaseConfiguration();
5499    this.region = initHRegion(tableName, method, conf, families);
5500    // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5501    Put put = new Put(Bytes.toBytes("19998"));
5502    put.addColumn(cf1, col, Bytes.toBytes("val"));
5503    region.put(put);
5504    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5505    Put put2 = new Put(Bytes.toBytes("19997"));
5506    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5507    region.put(put2);
5508
5509    Scan scan = new Scan(Bytes.toBytes("19998"));
5510    scan.setReversed(true);
5511    InternalScanner scanner = region.getScanner(scan);
5512
5513    // create one storefile contains many rows will be skipped
5514    // to check StoreFileScanner.seekToPreviousRow
5515    for (int i = 10000; i < 20000; i++) {
5516      Put p = new Put(Bytes.toBytes(""+i));
5517      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5518      region.put(p);
5519    }
5520    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5521
5522    // create one memstore contains many rows will be skipped
5523    // to check MemStoreScanner.seekToPreviousRow
5524    for (int i = 10000; i < 20000; i++) {
5525      Put p = new Put(Bytes.toBytes(""+i));
5526      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5527      region.put(p);
5528    }
5529
5530    List<Cell> currRow = new ArrayList<>();
5531    boolean hasNext;
5532    do {
5533      hasNext = scanner.next(currRow);
5534    } while (hasNext);
5535    assertEquals(2, currRow.size());
5536    assertEquals("19998", Bytes.toString(currRow.get(0).getRowArray(),
5537      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5538    assertEquals("19997", Bytes.toString(currRow.get(1).getRowArray(),
5539      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5540  }
5541
5542  @Test
5543  public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception {
5544    byte[] cf1 = Bytes.toBytes("CF1");
5545    byte[][] families = { cf1 };
5546    byte[] col = Bytes.toBytes("C");
5547    HBaseConfiguration conf = new HBaseConfiguration();
5548    this.region = initHRegion(tableName, method, conf, families);
5549    // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5550    Put put = new Put(Bytes.toBytes("19996"));
5551    put.addColumn(cf1, col, Bytes.toBytes("val"));
5552    region.put(put);
5553    Put put2 = new Put(Bytes.toBytes("19995"));
5554    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5555    region.put(put2);
5556    // create a reverse scan
5557    Scan scan = new Scan(Bytes.toBytes("19996"));
5558    scan.setReversed(true);
5559    RegionScannerImpl scanner = region.getScanner(scan);
5560
5561    // flush the cache. This will reset the store scanner
5562    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5563
5564    // create one memstore contains many rows will be skipped
5565    // to check MemStoreScanner.seekToPreviousRow
5566    for (int i = 10000; i < 20000; i++) {
5567      Put p = new Put(Bytes.toBytes("" + i));
5568      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5569      region.put(p);
5570    }
5571    List<Cell> currRow = new ArrayList<>();
5572    boolean hasNext;
5573    boolean assertDone = false;
5574    do {
5575      hasNext = scanner.next(currRow);
5576      // With HBASE-15871, after the scanner is reset the memstore scanner should not be
5577      // added here
5578      if (!assertDone) {
5579        StoreScanner current =
5580            (StoreScanner) (scanner.storeHeap).getCurrentForTesting();
5581        List<KeyValueScanner> scanners = current.getAllScannersForTesting();
5582        assertEquals("There should be only one scanner the store file scanner", 1,
5583          scanners.size());
5584        assertDone = true;
5585      }
5586    } while (hasNext);
5587    assertEquals(2, currRow.size());
5588    assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(),
5589      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5590    assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(),
5591      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5592  }
5593
5594  @Test
5595  public void testReverseScanWhenPutCellsAfterOpenReverseScan() throws Exception {
5596    byte[] cf1 = Bytes.toBytes("CF1");
5597    byte[][] families = { cf1 };
5598    byte[] col = Bytes.toBytes("C");
5599
5600    HBaseConfiguration conf = new HBaseConfiguration();
5601    this.region = initHRegion(tableName, method, conf, families);
5602
5603    Put put = new Put(Bytes.toBytes("199996"));
5604    put.addColumn(cf1, col, Bytes.toBytes("val"));
5605    region.put(put);
5606    Put put2 = new Put(Bytes.toBytes("199995"));
5607    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5608    region.put(put2);
5609
5610    // Create a reverse scan
5611    Scan scan = new Scan(Bytes.toBytes("199996"));
5612    scan.setReversed(true);
5613    RegionScannerImpl scanner = region.getScanner(scan);
5614
5615    // Put a lot of cells that have sequenceIDs grater than the readPt of the reverse scan
5616    for (int i = 100000; i < 200000; i++) {
5617      Put p = new Put(Bytes.toBytes("" + i));
5618      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5619      region.put(p);
5620    }
5621    List<Cell> currRow = new ArrayList<>();
5622    boolean hasNext;
5623    do {
5624      hasNext = scanner.next(currRow);
5625    } while (hasNext);
5626
5627    assertEquals(2, currRow.size());
5628    assertEquals("199996", Bytes.toString(currRow.get(0).getRowArray(),
5629      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5630    assertEquals("199995", Bytes.toString(currRow.get(1).getRowArray(),
5631      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5632  }
5633
5634  @Test
5635  public void testWriteRequestsCounter() throws IOException {
5636    byte[] fam = Bytes.toBytes("info");
5637    byte[][] families = { fam };
5638    this.region = initHRegion(tableName, method, CONF, families);
5639
5640    Assert.assertEquals(0L, region.getWriteRequestsCount());
5641
5642    Put put = new Put(row);
5643    put.addColumn(fam, fam, fam);
5644
5645    Assert.assertEquals(0L, region.getWriteRequestsCount());
5646    region.put(put);
5647    Assert.assertEquals(1L, region.getWriteRequestsCount());
5648    region.put(put);
5649    Assert.assertEquals(2L, region.getWriteRequestsCount());
5650    region.put(put);
5651    Assert.assertEquals(3L, region.getWriteRequestsCount());
5652
5653    region.delete(new Delete(row));
5654    Assert.assertEquals(4L, region.getWriteRequestsCount());
5655  }
5656
5657  @Test
5658  public void testOpenRegionWrittenToWAL() throws Exception {
5659    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
5660    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5661
5662    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
5663      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1))
5664      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build();
5665    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
5666
5667    // open the region w/o rss and wal and flush some files
5668    region =
5669         HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
5670             .getConfiguration(), htd);
5671    assertNotNull(region);
5672
5673    // create a file in fam1 for the region before opening in OpenRegionHandler
5674    region.put(new Put(Bytes.toBytes("a")).addColumn(fam1, fam1, fam1));
5675    region.flush(true);
5676    HBaseTestingUtility.closeRegionAndWAL(region);
5677
5678    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5679
5680    // capture append() calls
5681    WAL wal = mockWAL();
5682    when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);
5683
5684    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
5685      TEST_UTIL.getConfiguration(), rss, null);
5686
5687    verify(wal, times(1)).appendMarker(any(RegionInfo.class), any(WALKeyImpl.class),
5688      editCaptor.capture());
5689
5690    WALEdit edit = editCaptor.getValue();
5691    assertNotNull(edit);
5692    assertNotNull(edit.getCells());
5693    assertEquals(1, edit.getCells().size());
5694    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
5695    assertNotNull(desc);
5696
5697    LOG.info("RegionEventDescriptor from WAL: " + desc);
5698
5699    assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
5700    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
5701    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
5702      hri.getEncodedNameAsBytes()));
5703    assertTrue(desc.getLogSequenceNumber() > 0);
5704    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
5705    assertEquals(2, desc.getStoresCount());
5706
5707    StoreDescriptor store = desc.getStores(0);
5708    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
5709    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
5710    assertEquals(1, store.getStoreFileCount()); // 1store file
5711    assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
5712
5713    store = desc.getStores(1);
5714    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
5715    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
5716    assertEquals(0, store.getStoreFileCount()); // no store files
5717  }
5718
5719  // Helper for test testOpenRegionWrittenToWALForLogReplay
5720  static class HRegionWithSeqId extends HRegion {
5721    public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs,
5722        final Configuration confParam, final RegionInfo regionInfo,
5723        final TableDescriptor htd, final RegionServerServices rsServices) {
5724      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
5725    }
5726    @Override
5727    protected long getNextSequenceId(WAL wal) throws IOException {
5728      return 42;
5729    }
5730  }
5731
5732  @Test
5733  public void testFlushedFileWithNoTags() throws Exception {
5734    final TableName tableName = TableName.valueOf(name.getMethodName());
5735    HTableDescriptor htd = new HTableDescriptor(tableName);
5736    htd.addFamily(new HColumnDescriptor(fam1));
5737    HRegionInfo info = new HRegionInfo(tableName, null, null, false);
5738    Path path = TEST_UTIL.getDataTestDir(getClass().getSimpleName());
5739    region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
5740    Put put = new Put(Bytes.toBytes("a-b-0-0"));
5741    put.addColumn(fam1, qual1, Bytes.toBytes("c1-value"));
5742    region.put(put);
5743    region.flush(true);
5744    HStore store = region.getStore(fam1);
5745    Collection<HStoreFile> storefiles = store.getStorefiles();
5746    for (HStoreFile sf : storefiles) {
5747      assertFalse("Tags should not be present "
5748          ,sf.getReader().getHFileReader().getFileContext().isIncludesTags());
5749    }
5750  }
5751
5752  /**
5753   * Utility method to setup a WAL mock.
5754   * <p/>
5755   * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs.
5756   * @return a mock WAL
5757   */
5758  private WAL mockWAL() throws IOException {
5759    WAL wal = mock(WAL.class);
5760    when(wal.appendData(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class)))
5761      .thenAnswer(new Answer<Long>() {
5762        @Override
5763        public Long answer(InvocationOnMock invocation) throws Throwable {
5764          WALKeyImpl key = invocation.getArgument(1);
5765          MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
5766          key.setWriteEntry(we);
5767          return 1L;
5768        }
5769      });
5770    when(wal.appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class))).
5771        thenAnswer(new Answer<Long>() {
5772          @Override
5773          public Long answer(InvocationOnMock invocation) throws Throwable {
5774            WALKeyImpl key = invocation.getArgument(1);
5775            MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
5776            key.setWriteEntry(we);
5777            return 1L;
5778          }
5779        });
5780    return wal;
5781  }
5782
5783  @Test
5784  public void testCloseRegionWrittenToWAL() throws Exception {
5785    Path rootDir = new Path(dir + name.getMethodName());
5786    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
5787
5788    final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42);
5789    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5790
5791    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
5792      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1))
5793      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build();
5794    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
5795
5796    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5797
5798    // capture append() calls
5799    WAL wal = mockWAL();
5800    when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);
5801
5802
5803    // create and then open a region first so that it can be closed later
5804    region = HRegion.createHRegion(hri, rootDir, TEST_UTIL.getConfiguration(), htd, rss.getWAL(hri));
5805    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
5806      TEST_UTIL.getConfiguration(), rss, null);
5807
5808    // close the region
5809    region.close(false);
5810
5811    // 2 times, one for region open, the other close region
5812    verify(wal, times(2)).appendMarker(any(RegionInfo.class),
5813        (WALKeyImpl) any(WALKeyImpl.class), editCaptor.capture());
5814
5815    WALEdit edit = editCaptor.getAllValues().get(1);
5816    assertNotNull(edit);
5817    assertNotNull(edit.getCells());
5818    assertEquals(1, edit.getCells().size());
5819    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
5820    assertNotNull(desc);
5821
5822    LOG.info("RegionEventDescriptor from WAL: " + desc);
5823
5824    assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType());
5825    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
5826    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
5827      hri.getEncodedNameAsBytes()));
5828    assertTrue(desc.getLogSequenceNumber() > 0);
5829    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
5830    assertEquals(2, desc.getStoresCount());
5831
5832    StoreDescriptor store = desc.getStores(0);
5833    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
5834    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
5835    assertEquals(0, store.getStoreFileCount()); // no store files
5836
5837    store = desc.getStores(1);
5838    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
5839    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
5840    assertEquals(0, store.getStoreFileCount()); // no store files
5841  }
5842
5843  /**
5844   * Test RegionTooBusyException thrown when region is busy
5845   */
5846  @Test
5847  public void testRegionTooBusy() throws IOException {
5848    byte[] family = Bytes.toBytes("family");
5849    long defaultBusyWaitDuration = CONF.getLong("hbase.busy.wait.duration",
5850      HRegion.DEFAULT_BUSY_WAIT_DURATION);
5851    CONF.setLong("hbase.busy.wait.duration", 1000);
5852    region = initHRegion(tableName, method, CONF, family);
5853    final AtomicBoolean stopped = new AtomicBoolean(true);
5854    Thread t = new Thread(new Runnable() {
5855      @Override
5856      public void run() {
5857        try {
5858          region.lock.writeLock().lock();
5859          stopped.set(false);
5860          while (!stopped.get()) {
5861            Thread.sleep(100);
5862          }
5863        } catch (InterruptedException ie) {
5864        } finally {
5865          region.lock.writeLock().unlock();
5866        }
5867      }
5868    });
5869    t.start();
5870    Get get = new Get(row);
5871    try {
5872      while (stopped.get()) {
5873        Thread.sleep(100);
5874      }
5875      region.get(get);
5876      fail("Should throw RegionTooBusyException");
5877    } catch (InterruptedException ie) {
5878      fail("test interrupted");
5879    } catch (RegionTooBusyException e) {
5880      // Good, expected
5881    } finally {
5882      stopped.set(true);
5883      try {
5884        t.join();
5885      } catch (Throwable e) {
5886      }
5887
5888      HBaseTestingUtility.closeRegionAndWAL(region);
5889      region = null;
5890      CONF.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration);
5891    }
5892  }
5893
5894  @Test
5895  public void testCellTTLs() throws IOException {
5896    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
5897    EnvironmentEdgeManager.injectEdge(edge);
5898
5899    final byte[] row = Bytes.toBytes("testRow");
5900    final byte[] q1 = Bytes.toBytes("q1");
5901    final byte[] q2 = Bytes.toBytes("q2");
5902    final byte[] q3 = Bytes.toBytes("q3");
5903    final byte[] q4 = Bytes.toBytes("q4");
5904
5905    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
5906    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
5907    hcd.setTimeToLive(10); // 10 seconds
5908    htd.addFamily(hcd);
5909
5910    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
5911    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
5912
5913    region = HBaseTestingUtility.createRegionAndWAL(new HRegionInfo(htd.getTableName(),
5914            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
5915        TEST_UTIL.getDataTestDir(), conf, htd);
5916    assertNotNull(region);
5917    long now = EnvironmentEdgeManager.currentTime();
5918    // Add a cell that will expire in 5 seconds via cell TTL
5919    region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
5920      HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
5921        // TTL tags specify ts in milliseconds
5922        new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) })));
5923    // Add a cell that will expire after 10 seconds via family setting
5924    region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
5925    // Add a cell that will expire in 15 seconds via cell TTL
5926    region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
5927      HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
5928        // TTL tags specify ts in milliseconds
5929        new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) })));
5930    // Add a cell that will expire in 20 seconds via family setting
5931    region.put(new Put(row).addColumn(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
5932
5933    // Flush so we are sure store scanning gets this right
5934    region.flush(true);
5935
5936    // A query at time T+0 should return all cells
5937    Result r = region.get(new Get(row));
5938    assertNotNull(r.getValue(fam1, q1));
5939    assertNotNull(r.getValue(fam1, q2));
5940    assertNotNull(r.getValue(fam1, q3));
5941    assertNotNull(r.getValue(fam1, q4));
5942
5943    // Increment time to T+5 seconds
5944    edge.incrementTime(5000);
5945
5946    r = region.get(new Get(row));
5947    assertNull(r.getValue(fam1, q1));
5948    assertNotNull(r.getValue(fam1, q2));
5949    assertNotNull(r.getValue(fam1, q3));
5950    assertNotNull(r.getValue(fam1, q4));
5951
5952    // Increment time to T+10 seconds
5953    edge.incrementTime(5000);
5954
5955    r = region.get(new Get(row));
5956    assertNull(r.getValue(fam1, q1));
5957    assertNull(r.getValue(fam1, q2));
5958    assertNotNull(r.getValue(fam1, q3));
5959    assertNotNull(r.getValue(fam1, q4));
5960
5961    // Increment time to T+15 seconds
5962    edge.incrementTime(5000);
5963
5964    r = region.get(new Get(row));
5965    assertNull(r.getValue(fam1, q1));
5966    assertNull(r.getValue(fam1, q2));
5967    assertNull(r.getValue(fam1, q3));
5968    assertNotNull(r.getValue(fam1, q4));
5969
5970    // Increment time to T+20 seconds
5971    edge.incrementTime(10000);
5972
5973    r = region.get(new Get(row));
5974    assertNull(r.getValue(fam1, q1));
5975    assertNull(r.getValue(fam1, q2));
5976    assertNull(r.getValue(fam1, q3));
5977    assertNull(r.getValue(fam1, q4));
5978
5979    // Fun with disappearing increments
5980
5981    // Start at 1
5982    region.put(new Put(row).addColumn(fam1, q1, Bytes.toBytes(1L)));
5983    r = region.get(new Get(row));
5984    byte[] val = r.getValue(fam1, q1);
5985    assertNotNull(val);
5986    assertEquals(1L, Bytes.toLong(val));
5987
5988    // Increment with a TTL of 5 seconds
5989    Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
5990    incr.setTTL(5000);
5991    region.increment(incr); // 2
5992
5993    // New value should be 2
5994    r = region.get(new Get(row));
5995    val = r.getValue(fam1, q1);
5996    assertNotNull(val);
5997    assertEquals(2L, Bytes.toLong(val));
5998
5999    // Increment time to T+25 seconds
6000    edge.incrementTime(5000);
6001
6002    // Value should be back to 1
6003    r = region.get(new Get(row));
6004    val = r.getValue(fam1, q1);
6005    assertNotNull(val);
6006    assertEquals(1L, Bytes.toLong(val));
6007
6008    // Increment time to T+30 seconds
6009    edge.incrementTime(5000);
6010
6011    // Original value written at T+20 should be gone now via family TTL
6012    r = region.get(new Get(row));
6013    assertNull(r.getValue(fam1, q1));
6014  }
6015
6016  @Test
6017  public void testIncrementTimestampsAreMonotonic() throws IOException {
6018    region = initHRegion(tableName, method, CONF, fam1);
6019    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6020    EnvironmentEdgeManager.injectEdge(edge);
6021
6022    edge.setValue(10);
6023    Increment inc = new Increment(row);
6024    inc.setDurability(Durability.SKIP_WAL);
6025    inc.addColumn(fam1, qual1, 1L);
6026    region.increment(inc);
6027
6028    Result result = region.get(new Get(row));
6029    Cell c = result.getColumnLatestCell(fam1, qual1);
6030    assertNotNull(c);
6031    assertEquals(10L, c.getTimestamp());
6032
6033    edge.setValue(1); // clock goes back
6034    region.increment(inc);
6035    result = region.get(new Get(row));
6036    c = result.getColumnLatestCell(fam1, qual1);
6037    assertEquals(11L, c.getTimestamp());
6038    assertEquals(2L, Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6039  }
6040
6041  @Test
6042  public void testAppendTimestampsAreMonotonic() throws IOException {
6043    region = initHRegion(tableName, method, CONF, fam1);
6044    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6045    EnvironmentEdgeManager.injectEdge(edge);
6046
6047    edge.setValue(10);
6048    Append a = new Append(row);
6049    a.setDurability(Durability.SKIP_WAL);
6050    a.addColumn(fam1, qual1, qual1);
6051    region.append(a);
6052
6053    Result result = region.get(new Get(row));
6054    Cell c = result.getColumnLatestCell(fam1, qual1);
6055    assertNotNull(c);
6056    assertEquals(10L, c.getTimestamp());
6057
6058    edge.setValue(1); // clock goes back
6059    region.append(a);
6060    result = region.get(new Get(row));
6061    c = result.getColumnLatestCell(fam1, qual1);
6062    assertEquals(11L, c.getTimestamp());
6063
6064    byte[] expected = new byte[qual1.length*2];
6065    System.arraycopy(qual1, 0, expected, 0, qual1.length);
6066    System.arraycopy(qual1, 0, expected, qual1.length, qual1.length);
6067
6068    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6069      expected, 0, expected.length));
6070  }
6071
6072  @Test
6073  public void testCheckAndMutateTimestampsAreMonotonic() throws IOException {
6074    region = initHRegion(tableName, method, CONF, fam1);
6075    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6076    EnvironmentEdgeManager.injectEdge(edge);
6077
6078    edge.setValue(10);
6079    Put p = new Put(row);
6080    p.setDurability(Durability.SKIP_WAL);
6081    p.addColumn(fam1, qual1, qual1);
6082    region.put(p);
6083
6084    Result result = region.get(new Get(row));
6085    Cell c = result.getColumnLatestCell(fam1, qual1);
6086    assertNotNull(c);
6087    assertEquals(10L, c.getTimestamp());
6088
6089    edge.setValue(1); // clock goes back
6090    p = new Put(row);
6091    p.setDurability(Durability.SKIP_WAL);
6092    p.addColumn(fam1, qual1, qual2);
6093    region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p);
6094    result = region.get(new Get(row));
6095    c = result.getColumnLatestCell(fam1, qual1);
6096    assertEquals(10L, c.getTimestamp());
6097
6098    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6099      qual2, 0, qual2.length));
6100  }
6101
6102  @Test
6103  public void testBatchMutateWithWrongRegionException() throws Exception {
6104    final byte[] a = Bytes.toBytes("a");
6105    final byte[] b = Bytes.toBytes("b");
6106    final byte[] c = Bytes.toBytes("c"); // exclusive
6107
6108    int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000);
6109    CONF.setInt("hbase.rowlock.wait.duration", 1000);
6110    region = initHRegion(tableName, a, c, method, CONF, false, fam1);
6111
6112    Mutation[] mutations = new Mutation[] {
6113        new Put(a)
6114            .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6115              .setRow(a)
6116              .setFamily(fam1)
6117              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6118              .setType(Cell.Type.Put)
6119              .build()),
6120        // this is outside the region boundary
6121        new Put(c).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6122              .setRow(c)
6123              .setFamily(fam1)
6124              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6125              .setType(Type.Put)
6126              .build()),
6127        new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6128              .setRow(b)
6129              .setFamily(fam1)
6130              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6131              .setType(Cell.Type.Put)
6132              .build())
6133    };
6134
6135    OperationStatus[] status = region.batchMutate(mutations);
6136    assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6137    assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, status[1].getOperationStatusCode());
6138    assertEquals(OperationStatusCode.SUCCESS, status[2].getOperationStatusCode());
6139
6140
6141    // test with a row lock held for a long time
6142    final CountDownLatch obtainedRowLock = new CountDownLatch(1);
6143    ExecutorService exec = Executors.newFixedThreadPool(2);
6144    Future<Void> f1 = exec.submit(new Callable<Void>() {
6145      @Override
6146      public Void call() throws Exception {
6147        LOG.info("Acquiring row lock");
6148        RowLock rl = region.getRowLock(b);
6149        obtainedRowLock.countDown();
6150        LOG.info("Waiting for 5 seconds before releasing lock");
6151        Threads.sleep(5000);
6152        LOG.info("Releasing row lock");
6153        rl.release();
6154        return null;
6155      }
6156    });
6157    obtainedRowLock.await(30, TimeUnit.SECONDS);
6158
6159    Future<Void> f2 = exec.submit(new Callable<Void>() {
6160      @Override
6161      public Void call() throws Exception {
6162        Mutation[] mutations = new Mutation[] {
6163            new Put(a).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6164                .setRow(a)
6165                .setFamily(fam1)
6166                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6167                .setType(Cell.Type.Put)
6168                .build()),
6169            new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6170                .setRow(b)
6171                .setFamily(fam1)
6172                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6173                .setType(Cell.Type.Put)
6174                .build()),
6175        };
6176
6177        // this will wait for the row lock, and it will eventually succeed
6178        OperationStatus[] status = region.batchMutate(mutations);
6179        assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6180        assertEquals(OperationStatusCode.SUCCESS, status[1].getOperationStatusCode());
6181        return null;
6182      }
6183    });
6184
6185    f1.get();
6186    f2.get();
6187
6188    CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
6189  }
6190
6191  @Test
6192  public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
6193    region = initHRegion(tableName, method, CONF, fam1);
6194    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6195    EnvironmentEdgeManager.injectEdge(edge);
6196
6197    edge.setValue(10);
6198    Put p = new Put(row);
6199    p.setDurability(Durability.SKIP_WAL);
6200    p.addColumn(fam1, qual1, qual1);
6201    region.put(p);
6202
6203    Result result = region.get(new Get(row));
6204    Cell c = result.getColumnLatestCell(fam1, qual1);
6205    assertNotNull(c);
6206    assertEquals(10L, c.getTimestamp());
6207
6208    edge.setValue(1); // clock goes back
6209    p = new Put(row);
6210    p.setDurability(Durability.SKIP_WAL);
6211    p.addColumn(fam1, qual1, qual2);
6212    RowMutations rm = new RowMutations(row);
6213    rm.add(p);
6214    assertTrue(region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL,
6215        new BinaryComparator(qual1), rm));
6216    result = region.get(new Get(row));
6217    c = result.getColumnLatestCell(fam1, qual1);
6218    assertEquals(10L, c.getTimestamp());
6219    LOG.info("c value " +
6220      Bytes.toStringBinary(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6221
6222    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6223      qual2, 0, qual2.length));
6224  }
6225
6226  HRegion initHRegion(TableName tableName, String callingMethod,
6227      byte[]... families) throws IOException {
6228    return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
6229        families);
6230  }
6231
6232  /**
6233   * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends
6234   * @throws IOException if IO error occurred during test
6235   */
6236  @Test
6237  public void testWritesWhileRollWriter() throws IOException {
6238    int testCount = 10;
6239    int numRows = 1024;
6240    int numFamilies = 2;
6241    int numQualifiers = 2;
6242    final byte[][] families = new byte[numFamilies][];
6243    for (int i = 0; i < numFamilies; i++) {
6244      families[i] = Bytes.toBytes("family" + i);
6245    }
6246    final byte[][] qualifiers = new byte[numQualifiers][];
6247    for (int i = 0; i < numQualifiers; i++) {
6248      qualifiers[i] = Bytes.toBytes("qual" + i);
6249    }
6250
6251    CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2);
6252    this.region = initHRegion(tableName, method, CONF, families);
6253    try {
6254      List<Thread> threads = new ArrayList<>();
6255      for (int i = 0; i < numRows; i++) {
6256        final int count = i;
6257        Thread t = new Thread(new Runnable() {
6258
6259          @Override
6260          public void run() {
6261            byte[] row = Bytes.toBytes("row" + count);
6262            Put put = new Put(row);
6263            put.setDurability(Durability.SYNC_WAL);
6264            byte[] value = Bytes.toBytes(String.valueOf(count));
6265            for (byte[] family : families) {
6266              for (byte[] qualifier : qualifiers) {
6267                put.addColumn(family, qualifier, count, value);
6268              }
6269            }
6270            try {
6271              region.put(put);
6272            } catch (IOException e) {
6273              throw new RuntimeException(e);
6274            }
6275          }
6276        });
6277        threads.add(t);
6278      }
6279      for (Thread t : threads) {
6280        t.start();
6281      }
6282
6283      for (int i = 0; i < testCount; i++) {
6284        region.getWAL().rollWriter();
6285        Thread.yield();
6286      }
6287    } finally {
6288      try {
6289        HBaseTestingUtility.closeRegionAndWAL(this.region);
6290        CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024);
6291      } catch (DroppedSnapshotException dse) {
6292        // We could get this on way out because we interrupt the background flusher and it could
6293        // fail anywhere causing a DSE over in the background flusher... only it is not properly
6294        // dealt with so could still be memory hanging out when we get to here -- memory we can't
6295        // flush because the accounting is 'off' since original DSE.
6296      }
6297      this.region = null;
6298    }
6299  }
6300
6301  @Test
6302  public void testMutateRow_WriteRequestCount() throws Exception {
6303    byte[] row1 = Bytes.toBytes("row1");
6304    byte[] fam1 = Bytes.toBytes("fam1");
6305    byte[] qf1 = Bytes.toBytes("qualifier");
6306    byte[] val1 = Bytes.toBytes("value1");
6307
6308    RowMutations rm = new RowMutations(row1);
6309    Put put = new Put(row1);
6310    put.addColumn(fam1, qf1, val1);
6311    rm.add(put);
6312
6313    this.region = initHRegion(tableName, method, CONF, fam1);
6314    long wrcBeforeMutate = this.region.writeRequestsCount.longValue();
6315    this.region.mutateRow(rm);
6316    long wrcAfterMutate = this.region.writeRequestsCount.longValue();
6317    Assert.assertEquals(wrcBeforeMutate + rm.getMutations().size(), wrcAfterMutate);
6318  }
6319
6320  @Test
6321  public void testBulkLoadReplicationEnabled() throws IOException {
6322    TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
6323    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
6324    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6325
6326    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
6327    htd.addFamily(new HColumnDescriptor(fam1));
6328    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6329        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6330    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(),
6331        rss, null);
6332
6333    assertTrue(region.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false));
6334    String plugins = region.conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
6335    String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName();
6336    assertTrue(plugins.contains(replicationCoprocessorClass));
6337    assertTrue(region.getCoprocessorHost().
6338        getCoprocessors().contains(ReplicationObserver.class.getSimpleName()));
6339  }
6340
6341  /**
6342   * The same as HRegion class, the only difference is that instantiateHStore will
6343   * create a different HStore - HStoreForTesting. [HBASE-8518]
6344   */
6345  public static class HRegionForTesting extends HRegion {
6346
6347    public HRegionForTesting(final Path tableDir, final WAL wal, final FileSystem fs,
6348                             final Configuration confParam, final RegionInfo regionInfo,
6349                             final TableDescriptor htd, final RegionServerServices rsServices) {
6350      this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
6351          wal, confParam, htd, rsServices);
6352    }
6353
6354    public HRegionForTesting(HRegionFileSystem fs, WAL wal,
6355                             Configuration confParam, TableDescriptor htd,
6356                             RegionServerServices rsServices) {
6357      super(fs, wal, confParam, htd, rsServices);
6358    }
6359
6360    /**
6361     * Create HStore instance.
6362     * @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting.
6363     */
6364    @Override
6365    protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup)
6366        throws IOException {
6367      if (family.isMobEnabled()) {
6368        if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
6369          throw new IOException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS +
6370              " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY +
6371              " accordingly.");
6372        }
6373        return new HMobStore(this, family, this.conf, warmup);
6374      }
6375      return new HStoreForTesting(this, family, this.conf, warmup);
6376    }
6377  }
6378
6379  /**
6380   * HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method
6381   * of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which
6382   * doesn't let hstore compaction complete. In the former edition, this config is set in
6383   * HStore class inside compact method, though this is just for testing, otherwise it
6384   * doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete"
6385   * config (except for testing code).
6386   */
6387  public static class HStoreForTesting extends HStore {
6388
6389    protected HStoreForTesting(final HRegion region,
6390        final ColumnFamilyDescriptor family,
6391        final Configuration confParam, boolean warmup) throws IOException {
6392      super(region, family, confParam, warmup);
6393    }
6394
6395    @Override
6396    protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
6397        Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
6398        List<Path> newFiles) throws IOException {
6399      // let compaction incomplete.
6400      if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
6401        LOG.warn("hbase.hstore.compaction.complete is set to false");
6402        List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
6403        final boolean evictOnClose =
6404            cacheConf != null? cacheConf.shouldEvictOnClose(): true;
6405        for (Path newFile : newFiles) {
6406          // Create storefile around what we wrote with a reader on it.
6407          HStoreFile sf = createStoreFileAndReader(newFile);
6408          sf.closeStoreFile(evictOnClose);
6409          sfs.add(sf);
6410        }
6411        return sfs;
6412      }
6413      return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles);
6414    }
6415  }
6416}