001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
023import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
024import static org.junit.Assert.assertArrayEquals;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertFalse;
027import static org.junit.Assert.assertNotNull;
028import static org.junit.Assert.assertNull;
029import static org.junit.Assert.assertTrue;
030import static org.junit.Assert.fail;
031import static org.mockito.ArgumentMatchers.any;
032import static org.mockito.ArgumentMatchers.anyBoolean;
033import static org.mockito.ArgumentMatchers.anyLong;
034import static org.mockito.Mockito.doThrow;
035import static org.mockito.Mockito.mock;
036import static org.mockito.Mockito.never;
037import static org.mockito.Mockito.spy;
038import static org.mockito.Mockito.times;
039import static org.mockito.Mockito.verify;
040import static org.mockito.Mockito.when;
041
042import java.io.IOException;
043import java.io.InterruptedIOException;
044import java.math.BigDecimal;
045import java.nio.charset.StandardCharsets;
046import java.security.PrivilegedExceptionAction;
047import java.util.ArrayList;
048import java.util.Arrays;
049import java.util.Collection;
050import java.util.List;
051import java.util.Map;
052import java.util.NavigableMap;
053import java.util.Objects;
054import java.util.TreeMap;
055import java.util.concurrent.Callable;
056import java.util.concurrent.CountDownLatch;
057import java.util.concurrent.ExecutorService;
058import java.util.concurrent.Executors;
059import java.util.concurrent.Future;
060import java.util.concurrent.TimeUnit;
061import java.util.concurrent.atomic.AtomicBoolean;
062import java.util.concurrent.atomic.AtomicInteger;
063import java.util.concurrent.atomic.AtomicReference;
064import org.apache.commons.lang3.RandomStringUtils;
065import org.apache.hadoop.conf.Configuration;
066import org.apache.hadoop.fs.FSDataOutputStream;
067import org.apache.hadoop.fs.FileStatus;
068import org.apache.hadoop.fs.FileSystem;
069import org.apache.hadoop.fs.Path;
070import org.apache.hadoop.hbase.ArrayBackedTag;
071import org.apache.hadoop.hbase.Cell;
072import org.apache.hadoop.hbase.Cell.Type;
073import org.apache.hadoop.hbase.CellBuilderFactory;
074import org.apache.hadoop.hbase.CellBuilderType;
075import org.apache.hadoop.hbase.CellUtil;
076import org.apache.hadoop.hbase.CompareOperator;
077import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
078import org.apache.hadoop.hbase.DroppedSnapshotException;
079import org.apache.hadoop.hbase.HBaseClassTestRule;
080import org.apache.hadoop.hbase.HBaseConfiguration;
081import org.apache.hadoop.hbase.HBaseTestingUtility;
082import org.apache.hadoop.hbase.HColumnDescriptor;
083import org.apache.hadoop.hbase.HConstants;
084import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
085import org.apache.hadoop.hbase.HDFSBlocksDistribution;
086import org.apache.hadoop.hbase.HRegionInfo;
087import org.apache.hadoop.hbase.HTableDescriptor;
088import org.apache.hadoop.hbase.KeyValue;
089import org.apache.hadoop.hbase.KeyValueUtil;
090import org.apache.hadoop.hbase.MiniHBaseCluster;
091import org.apache.hadoop.hbase.MultithreadedTestUtil;
092import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
093import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
094import org.apache.hadoop.hbase.NotServingRegionException;
095import org.apache.hadoop.hbase.PrivateCellUtil;
096import org.apache.hadoop.hbase.RegionTooBusyException;
097import org.apache.hadoop.hbase.ServerName;
098import org.apache.hadoop.hbase.TableName;
099import org.apache.hadoop.hbase.TagType;
100import org.apache.hadoop.hbase.Waiter;
101import org.apache.hadoop.hbase.client.Append;
102import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
103import org.apache.hadoop.hbase.client.Delete;
104import org.apache.hadoop.hbase.client.Durability;
105import org.apache.hadoop.hbase.client.Get;
106import org.apache.hadoop.hbase.client.Increment;
107import org.apache.hadoop.hbase.client.Mutation;
108import org.apache.hadoop.hbase.client.Put;
109import org.apache.hadoop.hbase.client.RegionInfo;
110import org.apache.hadoop.hbase.client.RegionInfoBuilder;
111import org.apache.hadoop.hbase.client.Result;
112import org.apache.hadoop.hbase.client.RowMutations;
113import org.apache.hadoop.hbase.client.Scan;
114import org.apache.hadoop.hbase.client.Table;
115import org.apache.hadoop.hbase.client.TableDescriptor;
116import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
117import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
118import org.apache.hadoop.hbase.filter.BigDecimalComparator;
119import org.apache.hadoop.hbase.filter.BinaryComparator;
120import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
121import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
122import org.apache.hadoop.hbase.filter.Filter;
123import org.apache.hadoop.hbase.filter.FilterBase;
124import org.apache.hadoop.hbase.filter.FilterList;
125import org.apache.hadoop.hbase.filter.NullComparator;
126import org.apache.hadoop.hbase.filter.PrefixFilter;
127import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
128import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
129import org.apache.hadoop.hbase.filter.SubstringComparator;
130import org.apache.hadoop.hbase.filter.ValueFilter;
131import org.apache.hadoop.hbase.io.hfile.HFile;
132import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
133import org.apache.hadoop.hbase.monitoring.MonitoredTask;
134import org.apache.hadoop.hbase.monitoring.TaskMonitor;
135import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
136import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
137import org.apache.hadoop.hbase.regionserver.Region.RowLock;
138import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
139import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
140import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
141import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
142import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
143import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
144import org.apache.hadoop.hbase.security.User;
145import org.apache.hadoop.hbase.test.MetricsAssertHelper;
146import org.apache.hadoop.hbase.testclassification.LargeTests;
147import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
148import org.apache.hadoop.hbase.util.Bytes;
149import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
150import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
151import org.apache.hadoop.hbase.util.FSUtils;
152import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
153import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
154import org.apache.hadoop.hbase.util.Threads;
155import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
156import org.apache.hadoop.hbase.wal.FaultyFSLog;
157import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
158import org.apache.hadoop.hbase.wal.WAL;
159import org.apache.hadoop.hbase.wal.WALEdit;
160import org.apache.hadoop.hbase.wal.WALFactory;
161import org.apache.hadoop.hbase.wal.WALKeyImpl;
162import org.apache.hadoop.hbase.wal.WALProvider;
163import org.apache.hadoop.hbase.wal.WALProvider.Writer;
164import org.apache.hadoop.hbase.wal.WALSplitter;
165import org.junit.After;
166import org.junit.Assert;
167import org.junit.Before;
168import org.junit.ClassRule;
169import org.junit.Rule;
170import org.junit.Test;
171import org.junit.experimental.categories.Category;
172import org.junit.rules.ExpectedException;
173import org.junit.rules.TestName;
174import org.mockito.ArgumentCaptor;
175import org.mockito.ArgumentMatcher;
176import org.mockito.Mockito;
177import org.mockito.invocation.InvocationOnMock;
178import org.mockito.stubbing.Answer;
179import org.slf4j.Logger;
180import org.slf4j.LoggerFactory;
181
182import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
183import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
184import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
185import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
186import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
187
188import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
195
196/**
197 * Basic stand-alone testing of HRegion.  No clusters!
198 *
199 * A lot of the meta information for an HRegion now lives inside other HRegions
200 * or in the HBaseMaster, so only basic testing is possible.
201 */
202@Category({VerySlowRegionServerTests.class, LargeTests.class})
203@SuppressWarnings("deprecation")
204public class TestHRegion {
205
206  @ClassRule
207  public static final HBaseClassTestRule CLASS_RULE =
208      HBaseClassTestRule.forClass(TestHRegion.class);
209
210  // Do not spin up clusters in here. If you need to spin up a cluster, do it
211  // over in TestHRegionOnCluster.
212  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
213  @Rule
214  public TestName name = new TestName();
215  @Rule public final ExpectedException thrown = ExpectedException.none();
216
217  private static final String COLUMN_FAMILY = "MyCF";
218  private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
219  private static final EventLoopGroup GROUP = new NioEventLoopGroup();
220
221  HRegion region = null;
222  // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  St.Ack)
223  protected static HBaseTestingUtility TEST_UTIL;
224  public static Configuration CONF ;
225  private String dir;
226  private static FileSystem FILESYSTEM;
227  private final int MAX_VERSIONS = 2;
228
229  // Test names
230  protected TableName tableName;
231  protected String method;
232  protected final byte[] qual = Bytes.toBytes("qual");
233  protected final byte[] qual1 = Bytes.toBytes("qual1");
234  protected final byte[] qual2 = Bytes.toBytes("qual2");
235  protected final byte[] qual3 = Bytes.toBytes("qual3");
236  protected final byte[] value = Bytes.toBytes("value");
237  protected final byte[] value1 = Bytes.toBytes("value1");
238  protected final byte[] value2 = Bytes.toBytes("value2");
239  protected final byte[] row = Bytes.toBytes("rowA");
240  protected final byte[] row2 = Bytes.toBytes("rowB");
241
242  protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory
243      .getInstance(MetricsAssertHelper.class);
244
245  @Before
246  public void setup() throws IOException {
247    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
248    FILESYSTEM = TEST_UTIL.getTestFileSystem();
249    CONF = TEST_UTIL.getConfiguration();
250    NettyAsyncFSWALConfigHelper.setEventLoopConfig(CONF, GROUP, NioSocketChannel.class);
251    dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
252    method = name.getMethodName();
253    tableName = TableName.valueOf(method);
254    CONF.set(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, String.valueOf(0.09));
255  }
256
257  @After
258  public void tearDown() throws Exception {
259    EnvironmentEdgeManagerTestHelper.reset();
260    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
261    TEST_UTIL.cleanupTestDir();
262  }
263
264  /**
265   * Test that I can use the max flushed sequence id after the close.
266   * @throws IOException
267   */
268  @Test
269  public void testSequenceId() throws IOException {
270    HRegion region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
271    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
272    // Weird. This returns 0 if no store files or no edits. Afraid to change it.
273    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
274    region.close();
275    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
276    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
277    // Open region again.
278    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
279    byte [] value = Bytes.toBytes(method);
280    // Make a random put against our cf.
281    Put put = new Put(value);
282    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
283    region.put(put);
284    // No flush yet so init numbers should still be in place.
285    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
286    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
287    region.flush(true);
288    long max = region.getMaxFlushedSeqId();
289    region.close();
290    assertEquals(max, region.getMaxFlushedSeqId());
291  }
292
293  /**
294   * Test for Bug 2 of HBASE-10466.
295   * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
296   * is smaller than a certain value, or when region close starts a flush is ongoing, the first
297   * flush is skipped and only the second flush takes place. However, two flushes are required in
298   * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
299   * in current memstore. The fix is removing all conditions except abort check so we ensure 2
300   * flushes for region close."
301   * @throws IOException
302   */
303  @Test
304  public void testCloseCarryingSnapshot() throws IOException {
305    HRegion region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
306    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
307    // Get some random bytes.
308    byte [] value = Bytes.toBytes(method);
309    // Make a random put against our cf.
310    Put put = new Put(value);
311    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
312    // First put something in current memstore, which will be in snapshot after flusher.prepare()
313    region.put(put);
314    StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
315    storeFlushCtx.prepare();
316    // Second put something in current memstore
317    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
318    region.put(put);
319    // Close with something in memstore and something in the snapshot.  Make sure all is cleared.
320    region.close();
321    assertEquals(0, region.getMemStoreDataSize());
322    HBaseTestingUtility.closeRegionAndWAL(region);
323  }
324
325  /*
326   * This test is for verifying memstore snapshot size is correctly updated in case of rollback
327   * See HBASE-10845
328   */
329  @Test
330  public void testMemstoreSnapshotSize() throws IOException {
331    class MyFaultyFSLog extends FaultyFSLog {
332      StoreFlushContext storeFlushCtx;
333      public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
334          throws IOException {
335        super(fs, rootDir, logName, conf);
336      }
337
338      void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
339        this.storeFlushCtx = storeFlushCtx;
340      }
341
342      @Override
343      public void sync(long txid) throws IOException {
344        storeFlushCtx.prepare();
345        super.sync(txid);
346      }
347    }
348
349    FileSystem fs = FileSystem.get(CONF);
350    Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
351    MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
352    HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog,
353        COLUMN_FAMILY_BYTES);
354
355    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
356    // Get some random bytes.
357    byte [] value = Bytes.toBytes(method);
358    faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY));
359
360    Put put = new Put(value);
361    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
362    faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
363
364    boolean threwIOE = false;
365    try {
366      region.put(put);
367    } catch (IOException ioe) {
368      threwIOE = true;
369    } finally {
370      assertTrue("The regionserver should have thrown an exception", threwIOE);
371    }
372    MemStoreSize mss = store.getFlushableSize();
373    assertTrue("flushable size should be zero, but it is " + mss,
374        mss.getDataSize() == 0);
375    HBaseTestingUtility.closeRegionAndWAL(region);
376  }
377
378  /**
379   * Create a WAL outside of the usual helper in
380   * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method
381   * doesn't play nicely with FaultyFileSystem. Call this method before overriding
382   * {@code fs.file.impl}.
383   * @param callingMethod a unique component for the path, probably the name of the test method.
384   */
385  private static WAL createWALCompatibleWithFaultyFileSystem(String callingMethod,
386      Configuration conf, TableName tableName) throws IOException {
387    final Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
388    final Configuration walConf = new Configuration(conf);
389    FSUtils.setRootDir(walConf, logDir);
390    return new WALFactory(walConf, callingMethod)
391        .getWAL(RegionInfoBuilder.newBuilder(tableName).build());
392  }
393
394  @Test
395  public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
396    String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
397    FileSystem fs = FileSystem.get(CONF);
398    Path rootDir = new Path(dir + testName);
399    FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
400    HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
401        COLUMN_FAMILY_BYTES);
402    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
403    assertEquals(0, region.getMemStoreDataSize());
404
405    // Put one value
406    byte [] value = Bytes.toBytes(method);
407    Put put = new Put(value);
408    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
409    region.put(put);
410    long onePutSize = region.getMemStoreDataSize();
411    assertTrue(onePutSize > 0);
412
413    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
414    doThrow(new IOException())
415       .when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any());
416    region.setCoprocessorHost(mockedCPHost);
417
418    put = new Put(value);
419    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value);
420    try {
421      region.put(put);
422      fail("Should have failed with IOException");
423    } catch (IOException expected) {
424    }
425    long expectedSize = onePutSize * 2;
426    assertEquals("memstoreSize should be incremented",
427        expectedSize, region.getMemStoreDataSize());
428    assertEquals("flushable size should be incremented",
429        expectedSize, store.getFlushableSize().getDataSize());
430
431    region.setCoprocessorHost(null);
432    HBaseTestingUtility.closeRegionAndWAL(region);
433  }
434
435  /**
436   * A test case of HBASE-21041
437   * @throws Exception Exception
438   */
439  @Test
440  public void testFlushAndMemstoreSizeCounting() throws Exception {
441    byte[] family = Bytes.toBytes("family");
442    this.region = initHRegion(tableName, method, CONF, family);
443    final WALFactory wals = new WALFactory(CONF, method);
444    try {
445      for (byte[] row : HBaseTestingUtility.ROWS) {
446        Put put = new Put(row);
447        put.addColumn(family, family, row);
448        region.put(put);
449      }
450      region.flush(true);
451      // After flush, data size should be zero
452      assertEquals(0, region.getMemStoreDataSize());
453      // After flush, a new active mutable segment is created, so the heap size
454      // should equal to MutableSegment.DEEP_OVERHEAD
455      assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
456      // After flush, offheap should be zero
457      assertEquals(0, region.getMemStoreOffHeapSize());
458    } finally {
459      HBaseTestingUtility.closeRegionAndWAL(this.region);
460      this.region = null;
461      wals.close();
462    }
463  }
464
465  /**
466   * Test we do not lose data if we fail a flush and then close.
467   * Part of HBase-10466.  Tests the following from the issue description:
468   * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
469   * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
470   * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
471   * the sum of current memstore sizes instead of snapshots left from previous failed flush. This
472   * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
473   * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
474   * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
475   * much smaller than expected. In extreme case, if the error accumulates to even bigger than
476   * HRegion's memstore size limit, any further flush is skipped because flush does not do anything
477   * if memstoreSize is not larger than 0."
478   * @throws Exception
479   */
480  @Test
481  public void testFlushSizeAccounting() throws Exception {
482    final Configuration conf = HBaseConfiguration.create(CONF);
483    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
484    // Only retry once.
485    conf.setInt("hbase.hstore.flush.retries.number", 1);
486    final User user =
487      User.createUserForTesting(conf, method, new String[]{"foo"});
488    // Inject our faulty LocalFileSystem
489    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
490    user.runAs(new PrivilegedExceptionAction<Object>() {
491      @Override
492      public Object run() throws Exception {
493        // Make sure it worked (above is sensitive to caching details in hadoop core)
494        FileSystem fs = FileSystem.get(conf);
495        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
496        FaultyFileSystem ffs = (FaultyFileSystem)fs;
497        HRegion region = null;
498        try {
499          // Initialize region
500          region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal,
501              COLUMN_FAMILY_BYTES);
502          long size = region.getMemStoreDataSize();
503          Assert.assertEquals(0, size);
504          // Put one item into memstore.  Measure the size of one item in memstore.
505          Put p1 = new Put(row);
506          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[]) null));
507          region.put(p1);
508          final long sizeOfOnePut = region.getMemStoreDataSize();
509          // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
510          try {
511            LOG.info("Flushing");
512            region.flush(true);
513            Assert.fail("Didn't bubble up IOE!");
514          } catch (DroppedSnapshotException dse) {
515            // What we are expecting
516            region.closing.set(false); // this is needed for the rest of the test to work
517          }
518          // Make it so all writes succeed from here on out
519          ffs.fault.set(false);
520          // Check sizes.  Should still be the one entry.
521          Assert.assertEquals(sizeOfOnePut, region.getMemStoreDataSize());
522          // Now add two entries so that on this next flush that fails, we can see if we
523          // subtract the right amount, the snapshot size only.
524          Put p2 = new Put(row);
525          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
526          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
527          region.put(p2);
528          long expectedSize = sizeOfOnePut * 3;
529          Assert.assertEquals(expectedSize, region.getMemStoreDataSize());
530          // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
531          // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
532          // it
533          region.flush(true);
534          // Make sure our memory accounting is right.
535          Assert.assertEquals(sizeOfOnePut * 2, region.getMemStoreDataSize());
536        } finally {
537          HBaseTestingUtility.closeRegionAndWAL(region);
538        }
539        return null;
540      }
541    });
542    FileSystem.closeAllForUGI(user.getUGI());
543  }
544
545  @Test
546  public void testCloseWithFailingFlush() throws Exception {
547    final Configuration conf = HBaseConfiguration.create(CONF);
548    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
549    // Only retry once.
550    conf.setInt("hbase.hstore.flush.retries.number", 1);
551    final User user =
552      User.createUserForTesting(conf, this.method, new String[]{"foo"});
553    // Inject our faulty LocalFileSystem
554    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
555    user.runAs(new PrivilegedExceptionAction<Object>() {
556      @Override
557      public Object run() throws Exception {
558        // Make sure it worked (above is sensitive to caching details in hadoop core)
559        FileSystem fs = FileSystem.get(conf);
560        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
561        FaultyFileSystem ffs = (FaultyFileSystem)fs;
562        HRegion region = null;
563        try {
564          // Initialize region
565          region = initHRegion(tableName, null, null, false,
566              Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
567          long size = region.getMemStoreDataSize();
568          Assert.assertEquals(0, size);
569          // Put one item into memstore.  Measure the size of one item in memstore.
570          Put p1 = new Put(row);
571          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
572          region.put(p1);
573          // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
574          HStore store = region.getStore(COLUMN_FAMILY_BYTES);
575          StoreFlushContext storeFlushCtx =
576              store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
577          storeFlushCtx.prepare();
578          // Now add two entries to the foreground memstore.
579          Put p2 = new Put(row);
580          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
581          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
582          region.put(p2);
583          // Now try close on top of a failing flush.
584          region.close();
585          fail();
586        } catch (DroppedSnapshotException dse) {
587          // Expected
588          LOG.info("Expected DroppedSnapshotException");
589        } finally {
590          // Make it so all writes succeed from here on out so can close clean
591          ffs.fault.set(false);
592          HBaseTestingUtility.closeRegionAndWAL(region);
593        }
594        return null;
595      }
596    });
597    FileSystem.closeAllForUGI(user.getUGI());
598  }
599
600  @Test
601  public void testCompactionAffectedByScanners() throws Exception {
602    byte[] family = Bytes.toBytes("family");
603    this.region = initHRegion(tableName, method, CONF, family);
604
605    Put put = new Put(Bytes.toBytes("r1"));
606    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
607    region.put(put);
608    region.flush(true);
609
610    Scan scan = new Scan();
611    scan.setMaxVersions(3);
612    // open the first scanner
613    RegionScanner scanner1 = region.getScanner(scan);
614
615    Delete delete = new Delete(Bytes.toBytes("r1"));
616    region.delete(delete);
617    region.flush(true);
618
619    // open the second scanner
620    RegionScanner scanner2 = region.getScanner(scan);
621
622    List<Cell> results = new ArrayList<>();
623
624    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
625
626    // make a major compaction
627    region.compact(true);
628
629    // open the third scanner
630    RegionScanner scanner3 = region.getScanner(scan);
631
632    // get data from scanner 1, 2, 3 after major compaction
633    scanner1.next(results);
634    System.out.println(results);
635    assertEquals(1, results.size());
636
637    results.clear();
638    scanner2.next(results);
639    System.out.println(results);
640    assertEquals(0, results.size());
641
642    results.clear();
643    scanner3.next(results);
644    System.out.println(results);
645    assertEquals(0, results.size());
646  }
647
648  @Test
649  public void testToShowNPEOnRegionScannerReseek() throws Exception {
650    byte[] family = Bytes.toBytes("family");
651    this.region = initHRegion(tableName, method, CONF, family);
652
653    Put put = new Put(Bytes.toBytes("r1"));
654    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
655    region.put(put);
656    put = new Put(Bytes.toBytes("r2"));
657    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
658    region.put(put);
659    region.flush(true);
660
661    Scan scan = new Scan();
662    scan.setMaxVersions(3);
663    // open the first scanner
664    RegionScanner scanner1 = region.getScanner(scan);
665
666    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
667
668    region.compact(true);
669
670    scanner1.reseek(Bytes.toBytes("r2"));
671    List<Cell> results = new ArrayList<>();
672    scanner1.next(results);
673    Cell keyValue = results.get(0);
674    Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0);
675    scanner1.close();
676  }
677
678  @Test
679  public void testSkipRecoveredEditsReplay() throws Exception {
680    byte[] family = Bytes.toBytes("family");
681    this.region = initHRegion(tableName, method, CONF, family);
682    final WALFactory wals = new WALFactory(CONF, method);
683    try {
684      Path regiondir = region.getRegionFileSystem().getRegionDir();
685      FileSystem fs = region.getRegionFileSystem().getFileSystem();
686      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
687
688      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
689
690      long maxSeqId = 1050;
691      long minSeqId = 1000;
692
693      for (long i = minSeqId; i <= maxSeqId; i += 10) {
694        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
695        fs.create(recoveredEdits);
696        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
697
698        long time = System.nanoTime();
699        WALEdit edit = new WALEdit();
700        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
701            .toBytes(i)));
702        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
703            HConstants.DEFAULT_CLUSTER_ID), edit));
704
705        writer.close();
706      }
707      MonitoredTask status = TaskMonitor.get().createStatus(method);
708      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
709      for (HStore store : region.getStores()) {
710        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
711      }
712      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
713      assertEquals(maxSeqId, seqId);
714      region.getMVCC().advanceTo(seqId);
715      Get get = new Get(row);
716      Result result = region.get(get);
717      for (long i = minSeqId; i <= maxSeqId; i += 10) {
718        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
719        assertEquals(1, kvs.size());
720        assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
721      }
722    } finally {
723      HBaseTestingUtility.closeRegionAndWAL(this.region);
724      this.region = null;
725      wals.close();
726    }
727  }
728
729  @Test
730  public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception {
731    byte[] family = Bytes.toBytes("family");
732    this.region = initHRegion(tableName, method, CONF, family);
733    final WALFactory wals = new WALFactory(CONF, method);
734    try {
735      Path regiondir = region.getRegionFileSystem().getRegionDir();
736      FileSystem fs = region.getRegionFileSystem().getFileSystem();
737      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
738
739      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
740
741      long maxSeqId = 1050;
742      long minSeqId = 1000;
743
744      for (long i = minSeqId; i <= maxSeqId; i += 10) {
745        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
746        fs.create(recoveredEdits);
747        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
748
749        long time = System.nanoTime();
750        WALEdit edit = new WALEdit();
751        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
752            .toBytes(i)));
753        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
754            HConstants.DEFAULT_CLUSTER_ID), edit));
755
756        writer.close();
757      }
758      long recoverSeqId = 1030;
759      MonitoredTask status = TaskMonitor.get().createStatus(method);
760      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
761      for (HStore store : region.getStores()) {
762        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
763      }
764      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
765      assertEquals(maxSeqId, seqId);
766      region.getMVCC().advanceTo(seqId);
767      Get get = new Get(row);
768      Result result = region.get(get);
769      for (long i = minSeqId; i <= maxSeqId; i += 10) {
770        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
771        if (i < recoverSeqId) {
772          assertEquals(0, kvs.size());
773        } else {
774          assertEquals(1, kvs.size());
775          assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
776        }
777      }
778    } finally {
779      HBaseTestingUtility.closeRegionAndWAL(this.region);
780      this.region = null;
781      wals.close();
782    }
783  }
784
785  @Test
786  public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
787    byte[] family = Bytes.toBytes("family");
788    this.region = initHRegion(tableName, method, CONF, family);
789    try {
790      Path regiondir = region.getRegionFileSystem().getRegionDir();
791      FileSystem fs = region.getRegionFileSystem().getFileSystem();
792
793      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
794      for (int i = 1000; i < 1050; i += 10) {
795        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
796        FSDataOutputStream dos = fs.create(recoveredEdits);
797        dos.writeInt(i);
798        dos.close();
799      }
800      long minSeqId = 2000;
801      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1));
802      FSDataOutputStream dos = fs.create(recoveredEdits);
803      dos.close();
804
805      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
806      for (HStore store : region.getStores()) {
807        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId);
808      }
809      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null);
810      assertEquals(minSeqId, seqId);
811    } finally {
812      HBaseTestingUtility.closeRegionAndWAL(this.region);
813      this.region = null;
814    }
815  }
816
817  @Test
818  public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
819    byte[] family = Bytes.toBytes("family");
820    this.region = initHRegion(tableName, method, CONF, family);
821    final WALFactory wals = new WALFactory(CONF, method);
822    try {
823      Path regiondir = region.getRegionFileSystem().getRegionDir();
824      FileSystem fs = region.getRegionFileSystem().getFileSystem();
825      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
826      byte[][] columns = region.getTableDescriptor().getColumnFamilyNames().toArray(new byte[0][]);
827
828      assertEquals(0, region.getStoreFileList(columns).size());
829
830      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
831
832      long maxSeqId = 1050;
833      long minSeqId = 1000;
834
835      for (long i = minSeqId; i <= maxSeqId; i += 10) {
836        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
837        fs.create(recoveredEdits);
838        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
839
840        long time = System.nanoTime();
841        WALEdit edit = null;
842        if (i == maxSeqId) {
843          edit = WALEdit.createCompaction(region.getRegionInfo(),
844          CompactionDescriptor.newBuilder()
845          .setTableName(ByteString.copyFrom(tableName.getName()))
846          .setFamilyName(ByteString.copyFrom(regionName))
847          .setEncodedRegionName(ByteString.copyFrom(regionName))
848          .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
849          .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
850          .build());
851        } else {
852          edit = new WALEdit();
853          edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
854            .toBytes(i)));
855        }
856        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
857            HConstants.DEFAULT_CLUSTER_ID), edit));
858        writer.close();
859      }
860
861      long recoverSeqId = 1030;
862      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
863      MonitoredTask status = TaskMonitor.get().createStatus(method);
864      for (HStore store : region.getStores()) {
865        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
866      }
867      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
868      assertEquals(maxSeqId, seqId);
869
870      // assert that the files are flushed
871      assertEquals(1, region.getStoreFileList(columns).size());
872
873    } finally {
874      HBaseTestingUtility.closeRegionAndWAL(this.region);
875      this.region = null;
876      wals.close();
877    }
878  }
879
880  @Test
881  public void testRecoveredEditsReplayCompaction() throws Exception {
882    testRecoveredEditsReplayCompaction(false);
883    testRecoveredEditsReplayCompaction(true);
884  }
885
886  public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception {
887    CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class);
888    byte[] family = Bytes.toBytes("family");
889    this.region = initHRegion(tableName, method, CONF, family);
890    final WALFactory wals = new WALFactory(CONF, method);
891    try {
892      Path regiondir = region.getRegionFileSystem().getRegionDir();
893      FileSystem fs = region.getRegionFileSystem().getFileSystem();
894      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
895
896      long maxSeqId = 3;
897      long minSeqId = 0;
898
899      for (long i = minSeqId; i < maxSeqId; i++) {
900        Put put = new Put(Bytes.toBytes(i));
901        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
902        region.put(put);
903        region.flush(true);
904      }
905
906      // this will create a region with 3 files
907      assertEquals(3, region.getStore(family).getStorefilesCount());
908      List<Path> storeFiles = new ArrayList<>(3);
909      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
910        storeFiles.add(sf.getPath());
911      }
912
913      // disable compaction completion
914      CONF.setBoolean("hbase.hstore.compaction.complete", false);
915      region.compactStores();
916
917      // ensure that nothing changed
918      assertEquals(3, region.getStore(family).getStorefilesCount());
919
920      // now find the compacted file, and manually add it to the recovered edits
921      Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family));
922      FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
923      String errorMsg = "Expected to find 1 file in the region temp directory "
924          + "from the compaction, could not find any";
925      assertNotNull(errorMsg, files);
926      assertEquals(errorMsg, 1, files.length);
927      // move the file inside region dir
928      Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family),
929          files[0].getPath());
930
931      byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes();
932      byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length];
933      for (int i=0; i < encodedNameAsBytes.length; i++) {
934        // Mix the byte array to have a new encodedName
935        fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1);
936      }
937
938      CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region
939        .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family,
940            storeFiles, Lists.newArrayList(newFile),
941            region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
942
943      WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
944          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
945
946      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
947
948      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
949      fs.create(recoveredEdits);
950      WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
951
952      long time = System.nanoTime();
953
954      writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, 10, time,
955          HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
956          compactionDescriptor)));
957      writer.close();
958
959      // close the region now, and reopen again
960      region.getTableDescriptor();
961      region.getRegionInfo();
962      region.close();
963      try {
964        region = HRegion.openHRegion(region, null);
965      } catch (WrongRegionException wre) {
966        fail("Matching encoded region name should not have produced WrongRegionException");
967      }
968
969      // now check whether we have only one store file, the compacted one
970      Collection<HStoreFile> sfs = region.getStore(family).getStorefiles();
971      for (HStoreFile sf : sfs) {
972        LOG.info(Objects.toString(sf.getPath()));
973      }
974      if (!mismatchedRegionName) {
975        assertEquals(1, region.getStore(family).getStorefilesCount());
976      }
977      files = FSUtils.listStatus(fs, tmpDir);
978      assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0);
979
980      for (long i = minSeqId; i < maxSeqId; i++) {
981        Get get = new Get(Bytes.toBytes(i));
982        Result result = region.get(get);
983        byte[] value = result.getValue(family, Bytes.toBytes(i));
984        assertArrayEquals(Bytes.toBytes(i), value);
985      }
986    } finally {
987      HBaseTestingUtility.closeRegionAndWAL(this.region);
988      this.region = null;
989      wals.close();
990      CONF.setClass(HConstants.REGION_IMPL, HRegion.class, Region.class);
991    }
992  }
993
994  @Test
995  public void testFlushMarkers() throws Exception {
996    // tests that flush markers are written to WAL and handled at recovered edits
997    byte[] family = Bytes.toBytes("family");
998    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
999    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1000    FSUtils.setRootDir(walConf, logDir);
1001    final WALFactory wals = new WALFactory(walConf, method);
1002    final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build());
1003
1004    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1005      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1006    try {
1007      Path regiondir = region.getRegionFileSystem().getRegionDir();
1008      FileSystem fs = region.getRegionFileSystem().getFileSystem();
1009      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
1010
1011      long maxSeqId = 3;
1012      long minSeqId = 0;
1013
1014      for (long i = minSeqId; i < maxSeqId; i++) {
1015        Put put = new Put(Bytes.toBytes(i));
1016        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1017        region.put(put);
1018        region.flush(true);
1019      }
1020
1021      // this will create a region with 3 files from flush
1022      assertEquals(3, region.getStore(family).getStorefilesCount());
1023      List<String> storeFiles = new ArrayList<>(3);
1024      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
1025        storeFiles.add(sf.getPath().getName());
1026      }
1027
1028      // now verify that the flush markers are written
1029      wal.shutdown();
1030      WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal),
1031        TEST_UTIL.getConfiguration());
1032      try {
1033        List<WAL.Entry> flushDescriptors = new ArrayList<>();
1034        long lastFlushSeqId = -1;
1035        while (true) {
1036          WAL.Entry entry = reader.next();
1037          if (entry == null) {
1038            break;
1039          }
1040          Cell cell = entry.getEdit().getCells().get(0);
1041          if (WALEdit.isMetaEditFamily(cell)) {
1042            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
1043            assertNotNull(flushDesc);
1044            assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray());
1045            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1046              assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
1047            } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1048              assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
1049            }
1050            lastFlushSeqId = flushDesc.getFlushSequenceNumber();
1051            assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray());
1052            assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
1053            StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
1054            assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray());
1055            assertEquals("family", storeFlushDesc.getStoreHomeDir());
1056            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1057              assertEquals(0, storeFlushDesc.getFlushOutputCount());
1058            } else {
1059              assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush
1060              assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
1061            }
1062
1063            flushDescriptors.add(entry);
1064          }
1065        }
1066
1067        assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush
1068
1069        // now write those markers to the recovered edits again.
1070
1071        Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
1072
1073        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1074        fs.create(recoveredEdits);
1075        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1076
1077        for (WAL.Entry entry : flushDescriptors) {
1078          writer.append(entry);
1079        }
1080        writer.close();
1081      } finally {
1082        if (null != reader) {
1083          try {
1084            reader.close();
1085          } catch (IOException exception) {
1086            LOG.warn("Problem closing wal: " + exception.getMessage());
1087            LOG.debug("exception details", exception);
1088          }
1089        }
1090      }
1091
1092
1093      // close the region now, and reopen again
1094      region.close();
1095      region = HRegion.openHRegion(region, null);
1096
1097      // now check whether we have can read back the data from region
1098      for (long i = minSeqId; i < maxSeqId; i++) {
1099        Get get = new Get(Bytes.toBytes(i));
1100        Result result = region.get(get);
1101        byte[] value = result.getValue(family, Bytes.toBytes(i));
1102        assertArrayEquals(Bytes.toBytes(i), value);
1103      }
1104    } finally {
1105      HBaseTestingUtility.closeRegionAndWAL(this.region);
1106      this.region = null;
1107      wals.close();
1108    }
1109  }
1110
1111  static class IsFlushWALMarker implements ArgumentMatcher<WALEdit> {
1112    volatile FlushAction[] actions;
1113    public IsFlushWALMarker(FlushAction... actions) {
1114      this.actions = actions;
1115    }
1116    @Override
1117    public boolean matches(WALEdit edit) {
1118      List<Cell> cells = edit.getCells();
1119      if (cells.isEmpty()) {
1120        return false;
1121      }
1122      if (WALEdit.isMetaEditFamily(cells.get(0))) {
1123        FlushDescriptor desc;
1124        try {
1125          desc = WALEdit.getFlushDescriptor(cells.get(0));
1126        } catch (IOException e) {
1127          LOG.warn(e.toString(), e);
1128          return false;
1129        }
1130        if (desc != null) {
1131          for (FlushAction action : actions) {
1132            if (desc.getAction() == action) {
1133              return true;
1134            }
1135          }
1136        }
1137      }
1138      return false;
1139    }
1140    public IsFlushWALMarker set(FlushAction... actions) {
1141      this.actions = actions;
1142      return this;
1143    }
1144  }
1145
1146  @Test
1147  public void testFlushMarkersWALFail() throws Exception {
1148    // test the cases where the WAL append for flush markers fail.
1149    byte[] family = Bytes.toBytes("family");
1150
1151    // spy an actual WAL implementation to throw exception (was not able to mock)
1152    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log");
1153
1154    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1155    FSUtils.setRootDir(walConf, logDir);
1156    // Make up a WAL that we can manipulate at append time.
1157    class FailAppendFlushMarkerWAL extends FSHLog {
1158      volatile FlushAction [] flushActions = null;
1159
1160      public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf)
1161      throws IOException {
1162        super(fs, root, logDir, conf);
1163      }
1164
1165      @Override
1166      protected Writer createWriterInstance(Path path) throws IOException {
1167        final Writer w = super.createWriterInstance(path);
1168        return new Writer() {
1169          @Override
1170          public void close() throws IOException {
1171            w.close();
1172          }
1173
1174          @Override
1175          public void sync(boolean forceSync) throws IOException {
1176            w.sync(forceSync);
1177          }
1178
1179          @Override
1180          public void append(Entry entry) throws IOException {
1181            List<Cell> cells = entry.getEdit().getCells();
1182            if (WALEdit.isMetaEditFamily(cells.get(0))) {
1183              FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0));
1184              if (desc != null) {
1185                for (FlushAction flushAction: flushActions) {
1186                  if (desc.getAction().equals(flushAction)) {
1187                    throw new IOException("Failed to append flush marker! " + flushAction);
1188                  }
1189                }
1190              }
1191            }
1192            w.append(entry);
1193          }
1194
1195          @Override
1196          public long getLength() {
1197            return w.getLength();
1198          }
1199        };
1200      }
1201    }
1202    FailAppendFlushMarkerWAL wal =
1203      new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1204        method, walConf);
1205    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1206      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1207    try {
1208      int i = 0;
1209      Put put = new Put(Bytes.toBytes(i));
1210      put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
1211      put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1212      region.put(put);
1213
1214      // 1. Test case where START_FLUSH throws exception
1215      wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH};
1216
1217      // start cache flush will throw exception
1218      try {
1219        region.flush(true);
1220        fail("This should have thrown exception");
1221      } catch (DroppedSnapshotException unexpected) {
1222        // this should not be a dropped snapshot exception. Meaning that RS will not abort
1223        throw unexpected;
1224      } catch (IOException expected) {
1225        // expected
1226      }
1227      // The WAL is hosed now. It has two edits appended. We cannot roll the log without it
1228      // throwing a DroppedSnapshotException to force an abort. Just clean up the mess.
1229      region.close(true);
1230      wal.close();
1231
1232      // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
1233      wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH};
1234      wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1235            method, walConf);
1236
1237      this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1238        HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1239      region.put(put);
1240
1241      // 3. Test case where ABORT_FLUSH will throw exception.
1242      // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
1243      // DroppedSnapshotException. Below COMMMIT_FLUSH will cause flush to abort
1244      wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH};
1245
1246      try {
1247        region.flush(true);
1248        fail("This should have thrown exception");
1249      } catch (DroppedSnapshotException expected) {
1250        // we expect this exception, since we were able to write the snapshot, but failed to
1251        // write the flush marker to WAL
1252      } catch (IOException unexpected) {
1253        throw unexpected;
1254      }
1255
1256    } finally {
1257      HBaseTestingUtility.closeRegionAndWAL(this.region);
1258      this.region = null;
1259    }
1260  }
1261
1262  @Test
1263  public void testGetWhileRegionClose() throws IOException {
1264    Configuration hc = initSplit();
1265    int numRows = 100;
1266    byte[][] families = { fam1, fam2, fam3 };
1267
1268    // Setting up region
1269    this.region = initHRegion(tableName, method, hc, families);
1270    try {
1271      // Put data in region
1272      final int startRow = 100;
1273      putData(startRow, numRows, qual1, families);
1274      putData(startRow, numRows, qual2, families);
1275      putData(startRow, numRows, qual3, families);
1276      final AtomicBoolean done = new AtomicBoolean(false);
1277      final AtomicInteger gets = new AtomicInteger(0);
1278      GetTillDoneOrException[] threads = new GetTillDoneOrException[10];
1279      try {
1280        // Set ten threads running concurrently getting from the region.
1281        for (int i = 0; i < threads.length / 2; i++) {
1282          threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1283          threads[i].setDaemon(true);
1284          threads[i].start();
1285        }
1286        // Artificially make the condition by setting closing flag explicitly.
1287        // I can't make the issue happen with a call to region.close().
1288        this.region.closing.set(true);
1289        for (int i = threads.length / 2; i < threads.length; i++) {
1290          threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1291          threads[i].setDaemon(true);
1292          threads[i].start();
1293        }
1294      } finally {
1295        if (this.region != null) {
1296          HBaseTestingUtility.closeRegionAndWAL(this.region);
1297        }
1298      }
1299      done.set(true);
1300      for (GetTillDoneOrException t : threads) {
1301        try {
1302          t.join();
1303        } catch (InterruptedException e) {
1304          e.printStackTrace();
1305        }
1306        if (t.e != null) {
1307          LOG.info("Exception=" + t.e);
1308          assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException);
1309        }
1310      }
1311    } finally {
1312      HBaseTestingUtility.closeRegionAndWAL(this.region);
1313      this.region = null;
1314    }
1315  }
1316
1317  /*
1318   * Thread that does get on single row until 'done' flag is flipped. If an
1319   * exception causes us to fail, it records it.
1320   */
1321  class GetTillDoneOrException extends Thread {
1322    private final Get g;
1323    private final AtomicBoolean done;
1324    private final AtomicInteger count;
1325    private Exception e;
1326
1327    GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d,
1328        final AtomicInteger c) {
1329      super("getter." + i);
1330      this.g = new Get(r);
1331      this.done = d;
1332      this.count = c;
1333    }
1334
1335    @Override
1336    public void run() {
1337      while (!this.done.get()) {
1338        try {
1339          assertTrue(region.get(g).size() > 0);
1340          this.count.incrementAndGet();
1341        } catch (Exception e) {
1342          this.e = e;
1343          break;
1344        }
1345      }
1346    }
1347  }
1348
1349  /*
1350   * An involved filter test. Has multiple column families and deletes in mix.
1351   */
1352  @Test
1353  public void testWeirdCacheBehaviour() throws Exception {
1354    final TableName tableName = TableName.valueOf(name.getMethodName());
1355    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"),
1356        Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
1357    this.region = initHRegion(tableName, method, CONF, FAMILIES);
1358    try {
1359      String value = "this is the value";
1360      String value2 = "this is some other value";
1361      String keyPrefix1 = "prefix1";
1362      String keyPrefix2 = "prefix2";
1363      String keyPrefix3 = "prefix3";
1364      putRows(this.region, 3, value, keyPrefix1);
1365      putRows(this.region, 3, value, keyPrefix2);
1366      putRows(this.region, 3, value, keyPrefix3);
1367      putRows(this.region, 3, value2, keyPrefix1);
1368      putRows(this.region, 3, value2, keyPrefix2);
1369      putRows(this.region, 3, value2, keyPrefix3);
1370      System.out.println("Checking values for key: " + keyPrefix1);
1371      assertEquals("Got back incorrect number of rows from scan", 3,
1372          getNumberOfRows(keyPrefix1, value2, this.region));
1373      System.out.println("Checking values for key: " + keyPrefix2);
1374      assertEquals("Got back incorrect number of rows from scan", 3,
1375          getNumberOfRows(keyPrefix2, value2, this.region));
1376      System.out.println("Checking values for key: " + keyPrefix3);
1377      assertEquals("Got back incorrect number of rows from scan", 3,
1378          getNumberOfRows(keyPrefix3, value2, this.region));
1379      deleteColumns(this.region, value2, keyPrefix1);
1380      deleteColumns(this.region, value2, keyPrefix2);
1381      deleteColumns(this.region, value2, keyPrefix3);
1382      System.out.println("Starting important checks.....");
1383      assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0,
1384          getNumberOfRows(keyPrefix1, value2, this.region));
1385      assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0,
1386          getNumberOfRows(keyPrefix2, value2, this.region));
1387      assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0,
1388          getNumberOfRows(keyPrefix3, value2, this.region));
1389    } finally {
1390      HBaseTestingUtility.closeRegionAndWAL(this.region);
1391      this.region = null;
1392    }
1393  }
1394
1395  @Test
1396  public void testAppendWithReadOnlyTable() throws Exception {
1397    final TableName tableName = TableName.valueOf(name.getMethodName());
1398    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1399    boolean exceptionCaught = false;
1400    Append append = new Append(Bytes.toBytes("somerow"));
1401    append.setDurability(Durability.SKIP_WAL);
1402    append.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"),
1403        Bytes.toBytes("somevalue"));
1404    try {
1405      region.append(append);
1406    } catch (IOException e) {
1407      exceptionCaught = true;
1408    } finally {
1409      HBaseTestingUtility.closeRegionAndWAL(this.region);
1410      this.region = null;
1411    }
1412    assertTrue(exceptionCaught == true);
1413  }
1414
1415  @Test
1416  public void testIncrWithReadOnlyTable() throws Exception {
1417    final TableName tableName = TableName.valueOf(name.getMethodName());
1418    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1419    boolean exceptionCaught = false;
1420    Increment inc = new Increment(Bytes.toBytes("somerow"));
1421    inc.setDurability(Durability.SKIP_WAL);
1422    inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L);
1423    try {
1424      region.increment(inc);
1425    } catch (IOException e) {
1426      exceptionCaught = true;
1427    } finally {
1428      HBaseTestingUtility.closeRegionAndWAL(this.region);
1429      this.region = null;
1430    }
1431    assertTrue(exceptionCaught == true);
1432  }
1433
1434  private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException {
1435    InternalScanner scanner = buildScanner(keyPrefix, value, r);
1436    int count = 0;
1437    boolean more = false;
1438    List<Cell> results = new ArrayList<>();
1439    do {
1440      more = scanner.next(results);
1441      if (results != null && !results.isEmpty())
1442        count++;
1443      else
1444        break;
1445      Delete delete = new Delete(CellUtil.cloneRow(results.get(0)));
1446      delete.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"));
1447      r.delete(delete);
1448      results.clear();
1449    } while (more);
1450    assertEquals("Did not perform correct number of deletes", 3, count);
1451  }
1452
1453  private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception {
1454    InternalScanner resultScanner = buildScanner(keyPrefix, value, r);
1455    int numberOfResults = 0;
1456    List<Cell> results = new ArrayList<>();
1457    boolean more = false;
1458    do {
1459      more = resultScanner.next(results);
1460      if (results != null && !results.isEmpty())
1461        numberOfResults++;
1462      else
1463        break;
1464      for (Cell kv : results) {
1465        System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv)));
1466      }
1467      results.clear();
1468    } while (more);
1469    return numberOfResults;
1470  }
1471
1472  private InternalScanner buildScanner(String keyPrefix, String value, HRegion r)
1473      throws IOException {
1474    // Defaults FilterList.Operator.MUST_PASS_ALL.
1475    FilterList allFilters = new FilterList();
1476    allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
1477    // Only return rows where this column value exists in the row.
1478    SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"),
1479        Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value));
1480    filter.setFilterIfMissing(true);
1481    allFilters.addFilter(filter);
1482    Scan scan = new Scan();
1483    scan.addFamily(Bytes.toBytes("trans-blob"));
1484    scan.addFamily(Bytes.toBytes("trans-type"));
1485    scan.addFamily(Bytes.toBytes("trans-date"));
1486    scan.addFamily(Bytes.toBytes("trans-tags"));
1487    scan.addFamily(Bytes.toBytes("trans-group"));
1488    scan.setFilter(allFilters);
1489    return r.getScanner(scan);
1490  }
1491
1492  private void putRows(HRegion r, int numRows, String value, String key) throws IOException {
1493    for (int i = 0; i < numRows; i++) {
1494      String row = key + "_" + i/* UUID.randomUUID().toString() */;
1495      System.out.println(String.format("Saving row: %s, with value %s", row, value));
1496      Put put = new Put(Bytes.toBytes(row));
1497      put.setDurability(Durability.SKIP_WAL);
1498      put.addColumn(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob"));
1499      put.addColumn(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement"));
1500      put.addColumn(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999"));
1501      put.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value));
1502      put.addColumn(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId"));
1503      r.put(put);
1504    }
1505  }
1506
1507  @Test
1508  public void testFamilyWithAndWithoutColon() throws Exception {
1509    byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1510    this.region = initHRegion(tableName, method, CONF, cf);
1511    try {
1512      Put p = new Put(tableName.toBytes());
1513      byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":");
1514      p.addColumn(cfwithcolon, cfwithcolon, cfwithcolon);
1515      boolean exception = false;
1516      try {
1517        this.region.put(p);
1518      } catch (NoSuchColumnFamilyException e) {
1519        exception = true;
1520      }
1521      assertTrue(exception);
1522    } finally {
1523      HBaseTestingUtility.closeRegionAndWAL(this.region);
1524      this.region = null;
1525    }
1526  }
1527
1528  @Test
1529  public void testBatchPut_whileNoRowLocksHeld() throws IOException {
1530    final Put[] puts = new Put[10];
1531    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1532    try {
1533      long syncs = prepareRegionForBachPut(puts, source, false);
1534
1535      OperationStatus[] codes = this.region.batchMutate(puts);
1536      assertEquals(10, codes.length);
1537      for (int i = 0; i < 10; i++) {
1538        assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1539      }
1540      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1541
1542      LOG.info("Next a batch put with one invalid family");
1543      puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1544      codes = this.region.batchMutate(puts);
1545      assertEquals(10, codes.length);
1546      for (int i = 0; i < 10; i++) {
1547        assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1548            codes[i].getOperationStatusCode());
1549      }
1550
1551      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
1552    } finally {
1553      HBaseTestingUtility.closeRegionAndWAL(this.region);
1554      this.region = null;
1555    }
1556  }
1557
1558  @Test
1559  public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
1560    final Put[] puts = new Put[10];
1561    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1562    try {
1563      long syncs = prepareRegionForBachPut(puts, source, false);
1564
1565      puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1566
1567      LOG.info("batchPut will have to break into four batches to avoid row locks");
1568      RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
1569      RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_1"));
1570      RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
1571      RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
1572
1573      MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1574      final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>();
1575      final CountDownLatch startingPuts = new CountDownLatch(1);
1576      final CountDownLatch startingClose = new CountDownLatch(1);
1577      TestThread putter = new TestThread(ctx) {
1578        @Override
1579        public void doWork() throws IOException {
1580          startingPuts.countDown();
1581          retFromThread.set(region.batchMutate(puts));
1582        }
1583      };
1584      LOG.info("...starting put thread while holding locks");
1585      ctx.addThread(putter);
1586      ctx.startThreads();
1587
1588      // Now attempt to close the region from another thread.  Prior to HBASE-12565
1589      // this would cause the in-progress batchMutate operation to to fail with
1590      // exception because it use to release and re-acquire the close-guard lock
1591      // between batches.  Caller then didn't get status indicating which writes succeeded.
1592      // We now expect this thread to block until the batchMutate call finishes.
1593      Thread regionCloseThread = new TestThread(ctx) {
1594        @Override
1595        public void doWork() {
1596          try {
1597            startingPuts.await();
1598            // Give some time for the batch mutate to get in.
1599            // We don't want to race with the mutate
1600            Thread.sleep(10);
1601            startingClose.countDown();
1602            HBaseTestingUtility.closeRegionAndWAL(region);
1603          } catch (IOException e) {
1604            throw new RuntimeException(e);
1605          } catch (InterruptedException e) {
1606            throw new RuntimeException(e);
1607          }
1608        }
1609      };
1610      regionCloseThread.start();
1611
1612      startingClose.await();
1613      startingPuts.await();
1614      Thread.sleep(100);
1615      LOG.info("...releasing row lock 1, which should let put thread continue");
1616      rowLock1.release();
1617      rowLock2.release();
1618      rowLock3.release();
1619      waitForCounter(source, "syncTimeNumOps", syncs + 1);
1620
1621      LOG.info("...joining on put thread");
1622      ctx.stop();
1623      regionCloseThread.join();
1624
1625      OperationStatus[] codes = retFromThread.get();
1626      for (int i = 0; i < codes.length; i++) {
1627        assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1628            codes[i].getOperationStatusCode());
1629      }
1630      rowLock4.release();
1631    } finally {
1632      HBaseTestingUtility.closeRegionAndWAL(this.region);
1633      this.region = null;
1634    }
1635  }
1636
1637  private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount)
1638      throws InterruptedException {
1639    long startWait = System.currentTimeMillis();
1640    long currentCount;
1641    while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) {
1642      Thread.sleep(100);
1643      if (System.currentTimeMillis() - startWait > 10000) {
1644        fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
1645            expectedCount, currentCount));
1646      }
1647    }
1648  }
1649
1650  @Test
1651  public void testAtomicBatchPut() throws IOException {
1652    final Put[] puts = new Put[10];
1653    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1654    try {
1655      long syncs = prepareRegionForBachPut(puts, source, false);
1656
1657      // 1. Straight forward case, should succeed
1658      MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true,
1659          HConstants.NO_NONCE, HConstants.NO_NONCE);
1660      OperationStatus[] codes = this.region.batchMutate(batchOp);
1661      assertEquals(10, codes.length);
1662      for (int i = 0; i < 10; i++) {
1663        assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1664      }
1665      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1666
1667      // 2. Failed to get lock
1668      RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3));
1669      // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this
1670      // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread
1671      MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1672      final AtomicReference<IOException> retFromThread = new AtomicReference<>();
1673      final CountDownLatch finishedPuts = new CountDownLatch(1);
1674      final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true,
1675          HConstants
1676          .NO_NONCE,
1677          HConstants.NO_NONCE);
1678      TestThread putter = new TestThread(ctx) {
1679        @Override
1680        public void doWork() throws IOException {
1681          try {
1682            region.batchMutate(finalBatchOp);
1683          } catch (IOException ioe) {
1684            LOG.error("test failed!", ioe);
1685            retFromThread.set(ioe);
1686          }
1687          finishedPuts.countDown();
1688        }
1689      };
1690      LOG.info("...starting put thread while holding locks");
1691      ctx.addThread(putter);
1692      ctx.startThreads();
1693      LOG.info("...waiting for batch puts while holding locks");
1694      try {
1695        finishedPuts.await();
1696      } catch (InterruptedException e) {
1697        LOG.error("Interrupted!", e);
1698      } finally {
1699        if (lock != null) {
1700          lock.release();
1701        }
1702      }
1703      assertNotNull(retFromThread.get());
1704      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1705
1706      // 3. Exception thrown in validation
1707      LOG.info("Next a batch put with one invalid family");
1708      puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1709      batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE,
1710          HConstants.NO_NONCE);
1711      thrown.expect(NoSuchColumnFamilyException.class);
1712      this.region.batchMutate(batchOp);
1713    } finally {
1714      HBaseTestingUtility.closeRegionAndWAL(this.region);
1715      this.region = null;
1716    }
1717  }
1718
1719  @Test
1720  public void testBatchPutWithTsSlop() throws Exception {
1721    // add data with a timestamp that is too recent for range. Ensure assert
1722    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
1723    final Put[] puts = new Put[10];
1724    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1725
1726    try {
1727      long syncs = prepareRegionForBachPut(puts, source, true);
1728
1729      OperationStatus[] codes = this.region.batchMutate(puts);
1730      assertEquals(10, codes.length);
1731      for (int i = 0; i < 10; i++) {
1732        assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
1733      }
1734      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1735    } finally {
1736      HBaseTestingUtility.closeRegionAndWAL(this.region);
1737      this.region = null;
1738    }
1739  }
1740
1741  /**
1742   * @return syncs initial syncTimeNumOps
1743   */
1744  private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source,
1745      boolean slop) throws IOException {
1746    this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
1747
1748    LOG.info("First a batch put with all valid puts");
1749    for (int i = 0; i < puts.length; i++) {
1750      puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) :
1751          new Put(Bytes.toBytes("row_" + i));
1752      puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value);
1753    }
1754
1755    long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1756    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1757    return syncs;
1758  }
1759
1760  // ////////////////////////////////////////////////////////////////////////////
1761  // checkAndMutate tests
1762  // ////////////////////////////////////////////////////////////////////////////
1763  @Test
1764  public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
1765    byte[] row1 = Bytes.toBytes("row1");
1766    byte[] fam1 = Bytes.toBytes("fam1");
1767    byte[] qf1 = Bytes.toBytes("qualifier");
1768    byte[] emptyVal = new byte[] {};
1769    byte[] val1 = Bytes.toBytes("value1");
1770    byte[] val2 = Bytes.toBytes("value2");
1771
1772    // Setting up region
1773    this.region = initHRegion(tableName, method, CONF, fam1);
1774    try {
1775      // Putting empty data in key
1776      Put put = new Put(row1);
1777      put.addColumn(fam1, qf1, emptyVal);
1778
1779      // checkAndPut with empty value
1780      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
1781          emptyVal), put);
1782      assertTrue(res);
1783
1784      // Putting data in key
1785      put = new Put(row1);
1786      put.addColumn(fam1, qf1, val1);
1787
1788      // checkAndPut with correct value
1789      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
1790          put);
1791      assertTrue(res);
1792
1793      // not empty anymore
1794      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
1795          put);
1796      assertFalse(res);
1797
1798      Delete delete = new Delete(row1);
1799      delete.addColumn(fam1, qf1);
1800      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
1801          delete);
1802      assertFalse(res);
1803
1804      put = new Put(row1);
1805      put.addColumn(fam1, qf1, val2);
1806      // checkAndPut with correct value
1807      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
1808          put);
1809      assertTrue(res);
1810
1811      // checkAndDelete with correct value
1812      delete = new Delete(row1);
1813      delete.addColumn(fam1, qf1);
1814      delete.addColumn(fam1, qf1);
1815      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val2),
1816          delete);
1817      assertTrue(res);
1818
1819      delete = new Delete(row1);
1820      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
1821          delete);
1822      assertTrue(res);
1823
1824      // checkAndPut looking for a null value
1825      put = new Put(row1);
1826      put.addColumn(fam1, qf1, val1);
1827
1828      res = region
1829          .checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put);
1830      assertTrue(res);
1831    } finally {
1832      HBaseTestingUtility.closeRegionAndWAL(this.region);
1833      this.region = null;
1834    }
1835  }
1836
1837  @Test
1838  public void testCheckAndMutate_WithWrongValue() throws IOException {
1839    byte[] row1 = Bytes.toBytes("row1");
1840    byte[] fam1 = Bytes.toBytes("fam1");
1841    byte[] qf1 = Bytes.toBytes("qualifier");
1842    byte[] val1 = Bytes.toBytes("value1");
1843    byte[] val2 = Bytes.toBytes("value2");
1844    BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE);
1845    BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE);
1846
1847    // Setting up region
1848    this.region = initHRegion(tableName, method, CONF, fam1);
1849    try {
1850      // Putting data in key
1851      Put put = new Put(row1);
1852      put.addColumn(fam1, qf1, val1);
1853      region.put(put);
1854
1855      // checkAndPut with wrong value
1856      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
1857          val2), put);
1858      assertEquals(false, res);
1859
1860      // checkAndDelete with wrong value
1861      Delete delete = new Delete(row1);
1862      delete.addFamily(fam1);
1863      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val2),
1864          put);
1865      assertEquals(false, res);
1866
1867      // Putting data in key
1868      put = new Put(row1);
1869      put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1870      region.put(put);
1871
1872      // checkAndPut with wrong value
1873      res =
1874          region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1875              bd2), put);
1876      assertEquals(false, res);
1877
1878      // checkAndDelete with wrong value
1879      delete = new Delete(row1);
1880      delete.addFamily(fam1);
1881      res =
1882          region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1883              bd2), put);
1884      assertEquals(false, res);
1885    } finally {
1886      HBaseTestingUtility.closeRegionAndWAL(this.region);
1887      this.region = null;
1888    }
1889  }
1890
1891  @Test
1892  public void testCheckAndMutate_WithCorrectValue() throws IOException {
1893    byte[] row1 = Bytes.toBytes("row1");
1894    byte[] fam1 = Bytes.toBytes("fam1");
1895    byte[] qf1 = Bytes.toBytes("qualifier");
1896    byte[] val1 = Bytes.toBytes("value1");
1897    BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE);
1898
1899    // Setting up region
1900    this.region = initHRegion(tableName, method, CONF, fam1);
1901    try {
1902      // Putting data in key
1903      Put put = new Put(row1);
1904      put.addColumn(fam1, qf1, val1);
1905      region.put(put);
1906
1907      // checkAndPut with correct value
1908      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
1909          val1), put);
1910      assertEquals(true, res);
1911
1912      // checkAndDelete with correct value
1913      Delete delete = new Delete(row1);
1914      delete.addColumn(fam1, qf1);
1915      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
1916          delete);
1917      assertEquals(true, res);
1918
1919      // Putting data in key
1920      put = new Put(row1);
1921      put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1922      region.put(put);
1923
1924      // checkAndPut with correct value
1925      res =
1926          region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1927              bd1), put);
1928      assertEquals(true, res);
1929
1930      // checkAndDelete with correct value
1931      delete = new Delete(row1);
1932      delete.addColumn(fam1, qf1);
1933      res =
1934          region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1935              bd1), delete);
1936      assertEquals(true, res);
1937    } finally {
1938      HBaseTestingUtility.closeRegionAndWAL(this.region);
1939      this.region = null;
1940    }
1941  }
1942
1943  @Test
1944  public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
1945    byte[] row1 = Bytes.toBytes("row1");
1946    byte[] fam1 = Bytes.toBytes("fam1");
1947    byte[] qf1 = Bytes.toBytes("qualifier");
1948    byte[] val1 = Bytes.toBytes("value1");
1949    byte[] val2 = Bytes.toBytes("value2");
1950    byte[] val3 = Bytes.toBytes("value3");
1951    byte[] val4 = Bytes.toBytes("value4");
1952
1953    // Setting up region
1954    this.region = initHRegion(tableName, method, CONF, fam1);
1955    try {
1956      // Putting val3 in key
1957      Put put = new Put(row1);
1958      put.addColumn(fam1, qf1, val3);
1959      region.put(put);
1960
1961      // Test CompareOp.LESS: original = val3, compare with val3, fail
1962      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1963          new BinaryComparator(val3), put);
1964      assertEquals(false, res);
1965
1966      // Test CompareOp.LESS: original = val3, compare with val4, fail
1967      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1968          new BinaryComparator(val4), put);
1969      assertEquals(false, res);
1970
1971      // Test CompareOp.LESS: original = val3, compare with val2,
1972      // succeed (now value = val2)
1973      put = new Put(row1);
1974      put.addColumn(fam1, qf1, val2);
1975      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1976          new BinaryComparator(val2), put);
1977      assertEquals(true, res);
1978
1979      // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
1980      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1981          new BinaryComparator(val3), put);
1982      assertEquals(false, res);
1983
1984      // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
1985      // succeed (value still = val2)
1986      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1987          new BinaryComparator(val2), put);
1988      assertEquals(true, res);
1989
1990      // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
1991      // succeed (now value = val3)
1992      put = new Put(row1);
1993      put.addColumn(fam1, qf1, val3);
1994      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1995          new BinaryComparator(val1), put);
1996      assertEquals(true, res);
1997
1998      // Test CompareOp.GREATER: original = val3, compare with val3, fail
1999      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
2000          new BinaryComparator(val3), put);
2001      assertEquals(false, res);
2002
2003      // Test CompareOp.GREATER: original = val3, compare with val2, fail
2004      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
2005          new BinaryComparator(val2), put);
2006      assertEquals(false, res);
2007
2008      // Test CompareOp.GREATER: original = val3, compare with val4,
2009      // succeed (now value = val2)
2010      put = new Put(row1);
2011      put.addColumn(fam1, qf1, val2);
2012      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
2013          new BinaryComparator(val4), put);
2014      assertEquals(true, res);
2015
2016      // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
2017      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2018          new BinaryComparator(val1), put);
2019      assertEquals(false, res);
2020
2021      // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
2022      // succeed (value still = val2)
2023      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2024          new BinaryComparator(val2), put);
2025      assertEquals(true, res);
2026
2027      // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
2028      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2029          new BinaryComparator(val3), put);
2030      assertEquals(true, res);
2031    } finally {
2032      HBaseTestingUtility.closeRegionAndWAL(this.region);
2033      this.region = null;
2034    }
2035  }
2036
2037  @Test
2038  public void testCheckAndPut_ThatPutWasWritten() throws IOException {
2039    byte[] row1 = Bytes.toBytes("row1");
2040    byte[] fam1 = Bytes.toBytes("fam1");
2041    byte[] fam2 = Bytes.toBytes("fam2");
2042    byte[] qf1 = Bytes.toBytes("qualifier");
2043    byte[] val1 = Bytes.toBytes("value1");
2044    byte[] val2 = Bytes.toBytes("value2");
2045
2046    byte[][] families = { fam1, fam2 };
2047
2048    // Setting up region
2049    this.region = initHRegion(tableName, method, CONF, families);
2050    try {
2051      // Putting data in the key to check
2052      Put put = new Put(row1);
2053      put.addColumn(fam1, qf1, val1);
2054      region.put(put);
2055
2056      // Creating put to add
2057      long ts = System.currentTimeMillis();
2058      KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
2059      put = new Put(row1);
2060      put.add(kv);
2061
2062      // checkAndPut with wrong value
2063      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
2064          val1), put);
2065      assertEquals(true, res);
2066
2067      Get get = new Get(row1);
2068      get.addColumn(fam2, qf1);
2069      Cell[] actual = region.get(get).rawCells();
2070
2071      Cell[] expected = { kv };
2072
2073      assertEquals(expected.length, actual.length);
2074      for (int i = 0; i < actual.length; i++) {
2075        assertEquals(expected[i], actual[i]);
2076      }
2077    } finally {
2078      HBaseTestingUtility.closeRegionAndWAL(this.region);
2079      this.region = null;
2080    }
2081  }
2082
2083  @Test
2084  public void testCheckAndPut_wrongRowInPut() throws IOException {
2085    this.region = initHRegion(tableName, method, CONF, COLUMNS);
2086    try {
2087      Put put = new Put(row2);
2088      put.addColumn(fam1, qual1, value1);
2089      try {
2090        region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL,
2091            new BinaryComparator(value2), put);
2092        fail();
2093      } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
2094        // expected exception.
2095      }
2096    } finally {
2097      HBaseTestingUtility.closeRegionAndWAL(this.region);
2098      this.region = null;
2099    }
2100  }
2101
2102  @Test
2103  public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
2104    byte[] row1 = Bytes.toBytes("row1");
2105    byte[] fam1 = Bytes.toBytes("fam1");
2106    byte[] fam2 = Bytes.toBytes("fam2");
2107    byte[] qf1 = Bytes.toBytes("qualifier1");
2108    byte[] qf2 = Bytes.toBytes("qualifier2");
2109    byte[] qf3 = Bytes.toBytes("qualifier3");
2110    byte[] val1 = Bytes.toBytes("value1");
2111    byte[] val2 = Bytes.toBytes("value2");
2112    byte[] val3 = Bytes.toBytes("value3");
2113    byte[] emptyVal = new byte[] {};
2114
2115    byte[][] families = { fam1, fam2 };
2116
2117    // Setting up region
2118    this.region = initHRegion(tableName, method, CONF, families);
2119    try {
2120      // Put content
2121      Put put = new Put(row1);
2122      put.addColumn(fam1, qf1, val1);
2123      region.put(put);
2124      Threads.sleep(2);
2125
2126      put = new Put(row1);
2127      put.addColumn(fam1, qf1, val2);
2128      put.addColumn(fam2, qf1, val3);
2129      put.addColumn(fam2, qf2, val2);
2130      put.addColumn(fam2, qf3, val1);
2131      put.addColumn(fam1, qf3, val1);
2132      region.put(put);
2133
2134      // Multi-column delete
2135      Delete delete = new Delete(row1);
2136      delete.addColumn(fam1, qf1);
2137      delete.addColumn(fam2, qf1);
2138      delete.addColumn(fam1, qf3);
2139      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
2140          val2), delete);
2141      assertEquals(true, res);
2142
2143      Get get = new Get(row1);
2144      get.addColumn(fam1, qf1);
2145      get.addColumn(fam1, qf3);
2146      get.addColumn(fam2, qf2);
2147      Result r = region.get(get);
2148      assertEquals(2, r.size());
2149      assertArrayEquals(val1, r.getValue(fam1, qf1));
2150      assertArrayEquals(val2, r.getValue(fam2, qf2));
2151
2152      // Family delete
2153      delete = new Delete(row1);
2154      delete.addFamily(fam2);
2155      res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
2156          delete);
2157      assertEquals(true, res);
2158
2159      get = new Get(row1);
2160      r = region.get(get);
2161      assertEquals(1, r.size());
2162      assertArrayEquals(val1, r.getValue(fam1, qf1));
2163
2164      // Row delete
2165      delete = new Delete(row1);
2166      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
2167          delete);
2168      assertEquals(true, res);
2169      get = new Get(row1);
2170      r = region.get(get);
2171      assertEquals(0, r.size());
2172    } finally {
2173      HBaseTestingUtility.closeRegionAndWAL(this.region);
2174      this.region = null;
2175    }
2176  }
2177
2178  // ////////////////////////////////////////////////////////////////////////////
2179  // Delete tests
2180  // ////////////////////////////////////////////////////////////////////////////
2181  @Test
2182  public void testDelete_multiDeleteColumn() throws IOException {
2183    byte[] row1 = Bytes.toBytes("row1");
2184    byte[] fam1 = Bytes.toBytes("fam1");
2185    byte[] qual = Bytes.toBytes("qualifier");
2186    byte[] value = Bytes.toBytes("value");
2187
2188    Put put = new Put(row1);
2189    put.addColumn(fam1, qual, 1, value);
2190    put.addColumn(fam1, qual, 2, value);
2191
2192    this.region = initHRegion(tableName, method, CONF, fam1);
2193    try {
2194      region.put(put);
2195
2196      // We do support deleting more than 1 'latest' version
2197      Delete delete = new Delete(row1);
2198      delete.addColumn(fam1, qual);
2199      delete.addColumn(fam1, qual);
2200      region.delete(delete);
2201
2202      Get get = new Get(row1);
2203      get.addFamily(fam1);
2204      Result r = region.get(get);
2205      assertEquals(0, r.size());
2206    } finally {
2207      HBaseTestingUtility.closeRegionAndWAL(this.region);
2208      this.region = null;
2209    }
2210  }
2211
2212  @Test
2213  public void testDelete_CheckFamily() throws IOException {
2214    byte[] row1 = Bytes.toBytes("row1");
2215    byte[] fam1 = Bytes.toBytes("fam1");
2216    byte[] fam2 = Bytes.toBytes("fam2");
2217    byte[] fam3 = Bytes.toBytes("fam3");
2218    byte[] fam4 = Bytes.toBytes("fam4");
2219
2220    // Setting up region
2221    this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3);
2222    try {
2223      List<Cell> kvs = new ArrayList<>();
2224      kvs.add(new KeyValue(row1, fam4, null, null));
2225
2226      // testing existing family
2227      byte[] family = fam2;
2228      try {
2229        NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2230        deleteMap.put(family, kvs);
2231        region.delete(deleteMap, Durability.SYNC_WAL);
2232      } catch (Exception e) {
2233        fail("Family " + new String(family, StandardCharsets.UTF_8) + " does not exist");
2234      }
2235
2236      // testing non existing family
2237      boolean ok = false;
2238      family = fam4;
2239      try {
2240        NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2241        deleteMap.put(family, kvs);
2242        region.delete(deleteMap, Durability.SYNC_WAL);
2243      } catch (Exception e) {
2244        ok = true;
2245      }
2246      assertTrue("Family " + new String(family, StandardCharsets.UTF_8) + " does exist", ok);
2247    } finally {
2248      HBaseTestingUtility.closeRegionAndWAL(this.region);
2249      this.region = null;
2250    }
2251  }
2252
2253  @Test
2254  public void testDelete_mixed() throws IOException, InterruptedException {
2255    byte[] fam = Bytes.toBytes("info");
2256    byte[][] families = { fam };
2257    this.region = initHRegion(tableName, method, CONF, families);
2258    try {
2259      EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2260
2261      byte[] row = Bytes.toBytes("table_name");
2262      // column names
2263      byte[] serverinfo = Bytes.toBytes("serverinfo");
2264      byte[] splitA = Bytes.toBytes("splitA");
2265      byte[] splitB = Bytes.toBytes("splitB");
2266
2267      // add some data:
2268      Put put = new Put(row);
2269      put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2270      region.put(put);
2271
2272      put = new Put(row);
2273      put.addColumn(fam, splitB, Bytes.toBytes("reference_B"));
2274      region.put(put);
2275
2276      put = new Put(row);
2277      put.addColumn(fam, serverinfo, Bytes.toBytes("ip_address"));
2278      region.put(put);
2279
2280      // ok now delete a split:
2281      Delete delete = new Delete(row);
2282      delete.addColumns(fam, splitA);
2283      region.delete(delete);
2284
2285      // assert some things:
2286      Get get = new Get(row).addColumn(fam, serverinfo);
2287      Result result = region.get(get);
2288      assertEquals(1, result.size());
2289
2290      get = new Get(row).addColumn(fam, splitA);
2291      result = region.get(get);
2292      assertEquals(0, result.size());
2293
2294      get = new Get(row).addColumn(fam, splitB);
2295      result = region.get(get);
2296      assertEquals(1, result.size());
2297
2298      // Assert that after a delete, I can put.
2299      put = new Put(row);
2300      put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2301      region.put(put);
2302      get = new Get(row);
2303      result = region.get(get);
2304      assertEquals(3, result.size());
2305
2306      // Now delete all... then test I can add stuff back
2307      delete = new Delete(row);
2308      region.delete(delete);
2309      assertEquals(0, region.get(get).size());
2310
2311      region.put(new Put(row).addColumn(fam, splitA, Bytes.toBytes("reference_A")));
2312      result = region.get(get);
2313      assertEquals(1, result.size());
2314    } finally {
2315      HBaseTestingUtility.closeRegionAndWAL(this.region);
2316      this.region = null;
2317    }
2318  }
2319
2320  @Test
2321  public void testDeleteRowWithFutureTs() throws IOException {
2322    byte[] fam = Bytes.toBytes("info");
2323    byte[][] families = { fam };
2324    this.region = initHRegion(tableName, method, CONF, families);
2325    try {
2326      byte[] row = Bytes.toBytes("table_name");
2327      // column names
2328      byte[] serverinfo = Bytes.toBytes("serverinfo");
2329
2330      // add data in the far future
2331      Put put = new Put(row);
2332      put.addColumn(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value"));
2333      region.put(put);
2334
2335      // now delete something in the present
2336      Delete delete = new Delete(row);
2337      region.delete(delete);
2338
2339      // make sure we still see our data
2340      Get get = new Get(row).addColumn(fam, serverinfo);
2341      Result result = region.get(get);
2342      assertEquals(1, result.size());
2343
2344      // delete the future row
2345      delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3);
2346      region.delete(delete);
2347
2348      // make sure it is gone
2349      get = new Get(row).addColumn(fam, serverinfo);
2350      result = region.get(get);
2351      assertEquals(0, result.size());
2352    } finally {
2353      HBaseTestingUtility.closeRegionAndWAL(this.region);
2354      this.region = null;
2355    }
2356  }
2357
2358  /**
2359   * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by
2360   * the actual timestamp
2361   */
2362  @Test
2363  public void testPutWithLatestTS() throws IOException {
2364    byte[] fam = Bytes.toBytes("info");
2365    byte[][] families = { fam };
2366    this.region = initHRegion(tableName, method, CONF, families);
2367    try {
2368      byte[] row = Bytes.toBytes("row1");
2369      // column names
2370      byte[] qual = Bytes.toBytes("qual");
2371
2372      // add data with LATEST_TIMESTAMP, put without WAL
2373      Put put = new Put(row);
2374      put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2375      region.put(put);
2376
2377      // Make sure it shows up with an actual timestamp
2378      Get get = new Get(row).addColumn(fam, qual);
2379      Result result = region.get(get);
2380      assertEquals(1, result.size());
2381      Cell kv = result.rawCells()[0];
2382      LOG.info("Got: " + kv);
2383      assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2384          kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2385
2386      // Check same with WAL enabled (historically these took different
2387      // code paths, so check both)
2388      row = Bytes.toBytes("row2");
2389      put = new Put(row);
2390      put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2391      region.put(put);
2392
2393      // Make sure it shows up with an actual timestamp
2394      get = new Get(row).addColumn(fam, qual);
2395      result = region.get(get);
2396      assertEquals(1, result.size());
2397      kv = result.rawCells()[0];
2398      LOG.info("Got: " + kv);
2399      assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2400          kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2401    } finally {
2402      HBaseTestingUtility.closeRegionAndWAL(this.region);
2403      this.region = null;
2404    }
2405
2406  }
2407
2408  /**
2409   * Tests that there is server-side filtering for invalid timestamp upper
2410   * bound. Note that the timestamp lower bound is automatically handled for us
2411   * by the TTL field.
2412   */
2413  @Test
2414  public void testPutWithTsSlop() throws IOException {
2415    byte[] fam = Bytes.toBytes("info");
2416    byte[][] families = { fam };
2417
2418    // add data with a timestamp that is too recent for range. Ensure assert
2419    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
2420    this.region = initHRegion(tableName, method, CONF, families);
2421    boolean caughtExcep = false;
2422    try {
2423      try {
2424        // no TS specified == use latest. should not error
2425        region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), Bytes.toBytes("value")));
2426        // TS out of range. should error
2427        region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"),
2428            System.currentTimeMillis() + 2000, Bytes.toBytes("value")));
2429        fail("Expected IOE for TS out of configured timerange");
2430      } catch (FailedSanityCheckException ioe) {
2431        LOG.debug("Received expected exception", ioe);
2432        caughtExcep = true;
2433      }
2434      assertTrue("Should catch FailedSanityCheckException", caughtExcep);
2435    } finally {
2436      HBaseTestingUtility.closeRegionAndWAL(this.region);
2437      this.region = null;
2438    }
2439  }
2440
2441  @Test
2442  public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
2443    byte[] fam1 = Bytes.toBytes("columnA");
2444    byte[] fam2 = Bytes.toBytes("columnB");
2445    this.region = initHRegion(tableName, method, CONF, fam1, fam2);
2446    try {
2447      byte[] rowA = Bytes.toBytes("rowA");
2448      byte[] rowB = Bytes.toBytes("rowB");
2449
2450      byte[] value = Bytes.toBytes("value");
2451
2452      Delete delete = new Delete(rowA);
2453      delete.addFamily(fam1);
2454
2455      region.delete(delete);
2456
2457      // now create data.
2458      Put put = new Put(rowA);
2459      put.addColumn(fam2, null, value);
2460      region.put(put);
2461
2462      put = new Put(rowB);
2463      put.addColumn(fam1, null, value);
2464      put.addColumn(fam2, null, value);
2465      region.put(put);
2466
2467      Scan scan = new Scan();
2468      scan.addFamily(fam1).addFamily(fam2);
2469      InternalScanner s = region.getScanner(scan);
2470      List<Cell> results = new ArrayList<>();
2471      s.next(results);
2472      assertTrue(CellUtil.matchingRows(results.get(0), rowA));
2473
2474      results.clear();
2475      s.next(results);
2476      assertTrue(CellUtil.matchingRows(results.get(0), rowB));
2477    } finally {
2478      HBaseTestingUtility.closeRegionAndWAL(this.region);
2479      this.region = null;
2480    }
2481  }
2482
2483  @Test
2484  public void testDataInMemoryWithoutWAL() throws IOException {
2485    FileSystem fs = FileSystem.get(CONF);
2486    Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
2487    FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
2488    // This chunk creation is done throughout the code base. Do we want to move it into core?
2489    // It is missing from this test. W/o it we NPE.
2490    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2491    HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
2492        COLUMN_FAMILY_BYTES);
2493
2494    Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
2495      System.currentTimeMillis(), KeyValue.Type.Put.getCode(), value1);
2496    final long originalSize = KeyValueUtil.length(originalCell);
2497
2498    Cell addCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
2499      System.currentTimeMillis(), KeyValue.Type.Put.getCode(), Bytes.toBytes("xxxxxxxxxx"));
2500    final long addSize = KeyValueUtil.length(addCell);
2501
2502    LOG.info("originalSize:" + originalSize
2503      + ", addSize:" + addSize);
2504    // start test. We expect that the addPut's durability will be replaced
2505    // by originalPut's durability.
2506
2507    // case 1:
2508    testDataInMemoryWithoutWAL(region,
2509            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2510            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2511            originalSize + addSize);
2512
2513    // case 2:
2514    testDataInMemoryWithoutWAL(region,
2515            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2516            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2517            originalSize + addSize);
2518
2519    // case 3:
2520    testDataInMemoryWithoutWAL(region,
2521            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2522            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2523            0);
2524
2525    // case 4:
2526    testDataInMemoryWithoutWAL(region,
2527            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2528            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2529            0);
2530  }
2531
2532  private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut,
2533          final Put addPut, long delta) throws IOException {
2534    final long initSize = region.getDataInMemoryWithoutWAL();
2535    // save normalCPHost and replaced by mockedCPHost
2536    RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
2537    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
2538    // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must
2539    // do below format (from Mockito doc).
2540    Mockito.doAnswer(new Answer() {
2541      @Override
2542      public Object answer(InvocationOnMock invocation) throws Throwable {
2543        MiniBatchOperationInProgress<Mutation> mb = invocation.getArgument(0);
2544        mb.addOperationsFromCP(0, new Mutation[]{addPut});
2545        return null;
2546      }
2547    }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
2548    region.setCoprocessorHost(mockedCPHost);
2549    region.put(originalPut);
2550    region.setCoprocessorHost(normalCPHost);
2551    final long finalSize = region.getDataInMemoryWithoutWAL();
2552    assertEquals("finalSize:" + finalSize + ", initSize:"
2553      + initSize + ", delta:" + delta,finalSize, initSize + delta);
2554  }
2555
2556  @Test
2557  public void testDeleteColumns_PostInsert() throws IOException, InterruptedException {
2558    Delete delete = new Delete(row);
2559    delete.addColumns(fam1, qual1);
2560    doTestDelete_AndPostInsert(delete);
2561  }
2562
2563  @Test
2564  public void testaddFamily_PostInsert() throws IOException, InterruptedException {
2565    Delete delete = new Delete(row);
2566    delete.addFamily(fam1);
2567    doTestDelete_AndPostInsert(delete);
2568  }
2569
2570  public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException {
2571    this.region = initHRegion(tableName, method, CONF, fam1);
2572    try {
2573      EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2574      Put put = new Put(row);
2575      put.addColumn(fam1, qual1, value1);
2576      region.put(put);
2577
2578      // now delete the value:
2579      region.delete(delete);
2580
2581      // ok put data:
2582      put = new Put(row);
2583      put.addColumn(fam1, qual1, value2);
2584      region.put(put);
2585
2586      // ok get:
2587      Get get = new Get(row);
2588      get.addColumn(fam1, qual1);
2589
2590      Result r = region.get(get);
2591      assertEquals(1, r.size());
2592      assertArrayEquals(value2, r.getValue(fam1, qual1));
2593
2594      // next:
2595      Scan scan = new Scan(row);
2596      scan.addColumn(fam1, qual1);
2597      InternalScanner s = region.getScanner(scan);
2598
2599      List<Cell> results = new ArrayList<>();
2600      assertEquals(false, s.next(results));
2601      assertEquals(1, results.size());
2602      Cell kv = results.get(0);
2603
2604      assertArrayEquals(value2, CellUtil.cloneValue(kv));
2605      assertArrayEquals(fam1, CellUtil.cloneFamily(kv));
2606      assertArrayEquals(qual1, CellUtil.cloneQualifier(kv));
2607      assertArrayEquals(row, CellUtil.cloneRow(kv));
2608    } finally {
2609      HBaseTestingUtility.closeRegionAndWAL(this.region);
2610      this.region = null;
2611    }
2612  }
2613
2614  @Test
2615  public void testDelete_CheckTimestampUpdated() throws IOException {
2616    byte[] row1 = Bytes.toBytes("row1");
2617    byte[] col1 = Bytes.toBytes("col1");
2618    byte[] col2 = Bytes.toBytes("col2");
2619    byte[] col3 = Bytes.toBytes("col3");
2620
2621    // Setting up region
2622    this.region = initHRegion(tableName, method, CONF, fam1);
2623    try {
2624      // Building checkerList
2625      List<Cell> kvs = new ArrayList<>();
2626      kvs.add(new KeyValue(row1, fam1, col1, null));
2627      kvs.add(new KeyValue(row1, fam1, col2, null));
2628      kvs.add(new KeyValue(row1, fam1, col3, null));
2629
2630      NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2631      deleteMap.put(fam1, kvs);
2632      region.delete(deleteMap, Durability.SYNC_WAL);
2633
2634      // extract the key values out the memstore:
2635      // This is kinda hacky, but better than nothing...
2636      long now = System.currentTimeMillis();
2637      AbstractMemStore memstore = (AbstractMemStore)region.getStore(fam1).memstore;
2638      Cell firstCell = memstore.getActive().first();
2639      assertTrue(firstCell.getTimestamp() <= now);
2640      now = firstCell.getTimestamp();
2641      for (Cell cell : memstore.getActive().getCellSet()) {
2642        assertTrue(cell.getTimestamp() <= now);
2643        now = cell.getTimestamp();
2644      }
2645    } finally {
2646      HBaseTestingUtility.closeRegionAndWAL(this.region);
2647      this.region = null;
2648    }
2649  }
2650
2651  // ////////////////////////////////////////////////////////////////////////////
2652  // Get tests
2653  // ////////////////////////////////////////////////////////////////////////////
2654  @Test
2655  public void testGet_FamilyChecker() throws IOException {
2656    byte[] row1 = Bytes.toBytes("row1");
2657    byte[] fam1 = Bytes.toBytes("fam1");
2658    byte[] fam2 = Bytes.toBytes("False");
2659    byte[] col1 = Bytes.toBytes("col1");
2660
2661    // Setting up region
2662    this.region = initHRegion(tableName, method, CONF, fam1);
2663    try {
2664      Get get = new Get(row1);
2665      get.addColumn(fam2, col1);
2666
2667      // Test
2668      try {
2669        region.get(get);
2670      } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
2671        assertFalse(false);
2672        return;
2673      }
2674      assertFalse(true);
2675    } finally {
2676      HBaseTestingUtility.closeRegionAndWAL(this.region);
2677      this.region = null;
2678    }
2679  }
2680
2681  @Test
2682  public void testGet_Basic() throws IOException {
2683    byte[] row1 = Bytes.toBytes("row1");
2684    byte[] fam1 = Bytes.toBytes("fam1");
2685    byte[] col1 = Bytes.toBytes("col1");
2686    byte[] col2 = Bytes.toBytes("col2");
2687    byte[] col3 = Bytes.toBytes("col3");
2688    byte[] col4 = Bytes.toBytes("col4");
2689    byte[] col5 = Bytes.toBytes("col5");
2690
2691    // Setting up region
2692    this.region = initHRegion(tableName, method, CONF, fam1);
2693    try {
2694      // Add to memstore
2695      Put put = new Put(row1);
2696      put.addColumn(fam1, col1, null);
2697      put.addColumn(fam1, col2, null);
2698      put.addColumn(fam1, col3, null);
2699      put.addColumn(fam1, col4, null);
2700      put.addColumn(fam1, col5, null);
2701      region.put(put);
2702
2703      Get get = new Get(row1);
2704      get.addColumn(fam1, col2);
2705      get.addColumn(fam1, col4);
2706      // Expected result
2707      KeyValue kv1 = new KeyValue(row1, fam1, col2);
2708      KeyValue kv2 = new KeyValue(row1, fam1, col4);
2709      KeyValue[] expected = { kv1, kv2 };
2710
2711      // Test
2712      Result res = region.get(get);
2713      assertEquals(expected.length, res.size());
2714      for (int i = 0; i < res.size(); i++) {
2715        assertTrue(CellUtil.matchingRows(expected[i], res.rawCells()[i]));
2716        assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i]));
2717        assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i]));
2718      }
2719
2720      // Test using a filter on a Get
2721      Get g = new Get(row1);
2722      final int count = 2;
2723      g.setFilter(new ColumnCountGetFilter(count));
2724      res = region.get(g);
2725      assertEquals(count, res.size());
2726    } finally {
2727      HBaseTestingUtility.closeRegionAndWAL(this.region);
2728      this.region = null;
2729    }
2730  }
2731
2732  @Test
2733  public void testGet_Empty() throws IOException {
2734    byte[] row = Bytes.toBytes("row");
2735    byte[] fam = Bytes.toBytes("fam");
2736
2737    this.region = initHRegion(tableName, method, CONF, fam);
2738    try {
2739      Get get = new Get(row);
2740      get.addFamily(fam);
2741      Result r = region.get(get);
2742
2743      assertTrue(r.isEmpty());
2744    } finally {
2745      HBaseTestingUtility.closeRegionAndWAL(this.region);
2746      this.region = null;
2747    }
2748  }
2749
2750  @Test
2751  public void testGetWithFilter() throws IOException, InterruptedException {
2752    byte[] row1 = Bytes.toBytes("row1");
2753    byte[] fam1 = Bytes.toBytes("fam1");
2754    byte[] col1 = Bytes.toBytes("col1");
2755    byte[] value1 = Bytes.toBytes("value1");
2756    byte[] value2 = Bytes.toBytes("value2");
2757
2758    final int maxVersions = 3;
2759    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
2760    hcd.setMaxVersions(maxVersions);
2761    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker"));
2762    htd.addFamily(hcd);
2763    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2764    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
2765    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
2766    final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info);
2767    this.region = TEST_UTIL.createLocalHRegion(info, htd, wal);
2768
2769    try {
2770      // Put 4 version to memstore
2771      long ts = 0;
2772      Put put = new Put(row1, ts);
2773      put.addColumn(fam1, col1, value1);
2774      region.put(put);
2775      put = new Put(row1, ts + 1);
2776      put.addColumn(fam1, col1, Bytes.toBytes("filter1"));
2777      region.put(put);
2778      put = new Put(row1, ts + 2);
2779      put.addColumn(fam1, col1, Bytes.toBytes("filter2"));
2780      region.put(put);
2781      put = new Put(row1, ts + 3);
2782      put.addColumn(fam1, col1, value2);
2783      region.put(put);
2784
2785      Get get = new Get(row1);
2786      get.setMaxVersions();
2787      Result res = region.get(get);
2788      // Get 3 versions, the oldest version has gone from user view
2789      assertEquals(maxVersions, res.size());
2790
2791      get.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value")));
2792      res = region.get(get);
2793      // When use value filter, the oldest version should still gone from user view and it
2794      // should only return one key vaule
2795      assertEquals(1, res.size());
2796      assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2797      assertEquals(ts + 3, res.rawCells()[0].getTimestamp());
2798
2799      region.flush(true);
2800      region.compact(true);
2801      Thread.sleep(1000);
2802      res = region.get(get);
2803      // After flush and compact, the result should be consistent with previous result
2804      assertEquals(1, res.size());
2805      assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2806    } finally {
2807      HBaseTestingUtility.closeRegionAndWAL(this.region);
2808      this.region = null;
2809    }
2810  }
2811
2812  // ////////////////////////////////////////////////////////////////////////////
2813  // Scanner tests
2814  // ////////////////////////////////////////////////////////////////////////////
2815  @Test
2816  public void testGetScanner_WithOkFamilies() throws IOException {
2817    byte[] fam1 = Bytes.toBytes("fam1");
2818    byte[] fam2 = Bytes.toBytes("fam2");
2819
2820    byte[][] families = { fam1, fam2 };
2821
2822    // Setting up region
2823    this.region = initHRegion(tableName, method, CONF, families);
2824    try {
2825      Scan scan = new Scan();
2826      scan.addFamily(fam1);
2827      scan.addFamily(fam2);
2828      try {
2829        region.getScanner(scan);
2830      } catch (Exception e) {
2831        assertTrue("Families could not be found in Region", false);
2832      }
2833    } finally {
2834      HBaseTestingUtility.closeRegionAndWAL(this.region);
2835      this.region = null;
2836    }
2837  }
2838
2839  @Test
2840  public void testGetScanner_WithNotOkFamilies() throws IOException {
2841    byte[] fam1 = Bytes.toBytes("fam1");
2842    byte[] fam2 = Bytes.toBytes("fam2");
2843
2844    byte[][] families = { fam1 };
2845
2846    // Setting up region
2847    this.region = initHRegion(tableName, method, CONF, families);
2848    try {
2849      Scan scan = new Scan();
2850      scan.addFamily(fam2);
2851      boolean ok = false;
2852      try {
2853        region.getScanner(scan);
2854      } catch (Exception e) {
2855        ok = true;
2856      }
2857      assertTrue("Families could not be found in Region", ok);
2858    } finally {
2859      HBaseTestingUtility.closeRegionAndWAL(this.region);
2860      this.region = null;
2861    }
2862  }
2863
2864  @Test
2865  public void testGetScanner_WithNoFamilies() throws IOException {
2866    byte[] row1 = Bytes.toBytes("row1");
2867    byte[] fam1 = Bytes.toBytes("fam1");
2868    byte[] fam2 = Bytes.toBytes("fam2");
2869    byte[] fam3 = Bytes.toBytes("fam3");
2870    byte[] fam4 = Bytes.toBytes("fam4");
2871
2872    byte[][] families = { fam1, fam2, fam3, fam4 };
2873
2874    // Setting up region
2875    this.region = initHRegion(tableName, method, CONF, families);
2876    try {
2877
2878      // Putting data in Region
2879      Put put = new Put(row1);
2880      put.addColumn(fam1, null, null);
2881      put.addColumn(fam2, null, null);
2882      put.addColumn(fam3, null, null);
2883      put.addColumn(fam4, null, null);
2884      region.put(put);
2885
2886      Scan scan = null;
2887      HRegion.RegionScannerImpl is = null;
2888
2889      // Testing to see how many scanners that is produced by getScanner,
2890      // starting
2891      // with known number, 2 - current = 1
2892      scan = new Scan();
2893      scan.addFamily(fam2);
2894      scan.addFamily(fam4);
2895      is = region.getScanner(scan);
2896      assertEquals(1, is.storeHeap.getHeap().size());
2897
2898      scan = new Scan();
2899      is = region.getScanner(scan);
2900      assertEquals(families.length - 1, is.storeHeap.getHeap().size());
2901    } finally {
2902      HBaseTestingUtility.closeRegionAndWAL(this.region);
2903      this.region = null;
2904    }
2905  }
2906
2907  /**
2908   * This method tests https://issues.apache.org/jira/browse/HBASE-2516.
2909   *
2910   * @throws IOException
2911   */
2912  @Test
2913  public void testGetScanner_WithRegionClosed() throws IOException {
2914    byte[] fam1 = Bytes.toBytes("fam1");
2915    byte[] fam2 = Bytes.toBytes("fam2");
2916
2917    byte[][] families = { fam1, fam2 };
2918
2919    // Setting up region
2920    try {
2921      this.region = initHRegion(tableName, method, CONF, families);
2922    } catch (IOException e) {
2923      e.printStackTrace();
2924      fail("Got IOException during initHRegion, " + e.getMessage());
2925    }
2926    try {
2927      region.closed.set(true);
2928      try {
2929        region.getScanner(null);
2930        fail("Expected to get an exception during getScanner on a region that is closed");
2931      } catch (NotServingRegionException e) {
2932        // this is the correct exception that is expected
2933      } catch (IOException e) {
2934        fail("Got wrong type of exception - should be a NotServingRegionException, " +
2935            "but was an IOException: "
2936            + e.getMessage());
2937      }
2938    } finally {
2939      HBaseTestingUtility.closeRegionAndWAL(this.region);
2940      this.region = null;
2941    }
2942  }
2943
2944  @Test
2945  public void testRegionScanner_Next() throws IOException {
2946    byte[] row1 = Bytes.toBytes("row1");
2947    byte[] row2 = Bytes.toBytes("row2");
2948    byte[] fam1 = Bytes.toBytes("fam1");
2949    byte[] fam2 = Bytes.toBytes("fam2");
2950    byte[] fam3 = Bytes.toBytes("fam3");
2951    byte[] fam4 = Bytes.toBytes("fam4");
2952
2953    byte[][] families = { fam1, fam2, fam3, fam4 };
2954    long ts = System.currentTimeMillis();
2955
2956    // Setting up region
2957    this.region = initHRegion(tableName, method, CONF, families);
2958    try {
2959      // Putting data in Region
2960      Put put = null;
2961      put = new Put(row1);
2962      put.addColumn(fam1, (byte[]) null, ts, null);
2963      put.addColumn(fam2, (byte[]) null, ts, null);
2964      put.addColumn(fam3, (byte[]) null, ts, null);
2965      put.addColumn(fam4, (byte[]) null, ts, null);
2966      region.put(put);
2967
2968      put = new Put(row2);
2969      put.addColumn(fam1, (byte[]) null, ts, null);
2970      put.addColumn(fam2, (byte[]) null, ts, null);
2971      put.addColumn(fam3, (byte[]) null, ts, null);
2972      put.addColumn(fam4, (byte[]) null, ts, null);
2973      region.put(put);
2974
2975      Scan scan = new Scan();
2976      scan.addFamily(fam2);
2977      scan.addFamily(fam4);
2978      InternalScanner is = region.getScanner(scan);
2979
2980      List<Cell> res = null;
2981
2982      // Result 1
2983      List<Cell> expected1 = new ArrayList<>();
2984      expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null));
2985      expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null));
2986
2987      res = new ArrayList<>();
2988      is.next(res);
2989      for (int i = 0; i < res.size(); i++) {
2990        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected1.get(i), res.get(i)));
2991      }
2992
2993      // Result 2
2994      List<Cell> expected2 = new ArrayList<>();
2995      expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null));
2996      expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null));
2997
2998      res = new ArrayList<>();
2999      is.next(res);
3000      for (int i = 0; i < res.size(); i++) {
3001        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected2.get(i), res.get(i)));
3002      }
3003    } finally {
3004      HBaseTestingUtility.closeRegionAndWAL(this.region);
3005      this.region = null;
3006    }
3007  }
3008
3009  @Test
3010  public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException {
3011    byte[] row1 = Bytes.toBytes("row1");
3012    byte[] qf1 = Bytes.toBytes("qualifier1");
3013    byte[] qf2 = Bytes.toBytes("qualifier2");
3014    byte[] fam1 = Bytes.toBytes("fam1");
3015    byte[][] families = { fam1 };
3016
3017    long ts1 = System.currentTimeMillis();
3018    long ts2 = ts1 + 1;
3019    long ts3 = ts1 + 2;
3020
3021    // Setting up region
3022    this.region = initHRegion(tableName, method, CONF, families);
3023    try {
3024      // Putting data in Region
3025      Put put = null;
3026      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3027      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3028      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3029
3030      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3031      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3032      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3033
3034      put = new Put(row1);
3035      put.add(kv13);
3036      put.add(kv12);
3037      put.add(kv11);
3038      put.add(kv23);
3039      put.add(kv22);
3040      put.add(kv21);
3041      region.put(put);
3042
3043      // Expected
3044      List<Cell> expected = new ArrayList<>();
3045      expected.add(kv13);
3046      expected.add(kv12);
3047
3048      Scan scan = new Scan(row1);
3049      scan.addColumn(fam1, qf1);
3050      scan.setMaxVersions(MAX_VERSIONS);
3051      List<Cell> actual = new ArrayList<>();
3052      InternalScanner scanner = region.getScanner(scan);
3053
3054      boolean hasNext = scanner.next(actual);
3055      assertEquals(false, hasNext);
3056
3057      // Verify result
3058      for (int i = 0; i < expected.size(); i++) {
3059        assertEquals(expected.get(i), actual.get(i));
3060      }
3061    } finally {
3062      HBaseTestingUtility.closeRegionAndWAL(this.region);
3063      this.region = null;
3064    }
3065  }
3066
3067  @Test
3068  public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException {
3069    byte[] row1 = Bytes.toBytes("row1");
3070    byte[] qf1 = Bytes.toBytes("qualifier1");
3071    byte[] qf2 = Bytes.toBytes("qualifier2");
3072    byte[] fam1 = Bytes.toBytes("fam1");
3073    byte[][] families = { fam1 };
3074
3075    long ts1 = 1; // System.currentTimeMillis();
3076    long ts2 = ts1 + 1;
3077    long ts3 = ts1 + 2;
3078
3079    // Setting up region
3080    this.region = initHRegion(tableName, method, CONF, families);
3081    try {
3082      // Putting data in Region
3083      Put put = null;
3084      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3085      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3086      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3087
3088      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3089      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3090      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3091
3092      put = new Put(row1);
3093      put.add(kv13);
3094      put.add(kv12);
3095      put.add(kv11);
3096      put.add(kv23);
3097      put.add(kv22);
3098      put.add(kv21);
3099      region.put(put);
3100      region.flush(true);
3101
3102      // Expected
3103      List<Cell> expected = new ArrayList<>();
3104      expected.add(kv13);
3105      expected.add(kv12);
3106      expected.add(kv23);
3107      expected.add(kv22);
3108
3109      Scan scan = new Scan(row1);
3110      scan.addColumn(fam1, qf1);
3111      scan.addColumn(fam1, qf2);
3112      scan.setMaxVersions(MAX_VERSIONS);
3113      List<Cell> actual = new ArrayList<>();
3114      InternalScanner scanner = region.getScanner(scan);
3115
3116      boolean hasNext = scanner.next(actual);
3117      assertEquals(false, hasNext);
3118
3119      // Verify result
3120      for (int i = 0; i < expected.size(); i++) {
3121        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3122      }
3123    } finally {
3124      HBaseTestingUtility.closeRegionAndWAL(this.region);
3125      this.region = null;
3126    }
3127  }
3128
3129  @Test
3130  public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws
3131      IOException {
3132    byte[] row1 = Bytes.toBytes("row1");
3133    byte[] fam1 = Bytes.toBytes("fam1");
3134    byte[][] families = { fam1 };
3135    byte[] qf1 = Bytes.toBytes("qualifier1");
3136    byte[] qf2 = Bytes.toBytes("qualifier2");
3137
3138    long ts1 = 1;
3139    long ts2 = ts1 + 1;
3140    long ts3 = ts1 + 2;
3141    long ts4 = ts1 + 3;
3142
3143    // Setting up region
3144    this.region = initHRegion(tableName, method, CONF, families);
3145    try {
3146      // Putting data in Region
3147      KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3148      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3149      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3150      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3151
3152      KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3153      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3154      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3155      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3156
3157      Put put = null;
3158      put = new Put(row1);
3159      put.add(kv14);
3160      put.add(kv24);
3161      region.put(put);
3162      region.flush(true);
3163
3164      put = new Put(row1);
3165      put.add(kv23);
3166      put.add(kv13);
3167      region.put(put);
3168      region.flush(true);
3169
3170      put = new Put(row1);
3171      put.add(kv22);
3172      put.add(kv12);
3173      region.put(put);
3174      region.flush(true);
3175
3176      put = new Put(row1);
3177      put.add(kv21);
3178      put.add(kv11);
3179      region.put(put);
3180
3181      // Expected
3182      List<Cell> expected = new ArrayList<>();
3183      expected.add(kv14);
3184      expected.add(kv13);
3185      expected.add(kv12);
3186      expected.add(kv24);
3187      expected.add(kv23);
3188      expected.add(kv22);
3189
3190      Scan scan = new Scan(row1);
3191      scan.addColumn(fam1, qf1);
3192      scan.addColumn(fam1, qf2);
3193      int versions = 3;
3194      scan.setMaxVersions(versions);
3195      List<Cell> actual = new ArrayList<>();
3196      InternalScanner scanner = region.getScanner(scan);
3197
3198      boolean hasNext = scanner.next(actual);
3199      assertEquals(false, hasNext);
3200
3201      // Verify result
3202      for (int i = 0; i < expected.size(); i++) {
3203        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3204      }
3205    } finally {
3206      HBaseTestingUtility.closeRegionAndWAL(this.region);
3207      this.region = null;
3208    }
3209  }
3210
3211  @Test
3212  public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException {
3213    byte[] row1 = Bytes.toBytes("row1");
3214    byte[] qf1 = Bytes.toBytes("qualifier1");
3215    byte[] qf2 = Bytes.toBytes("qualifier2");
3216    byte[] fam1 = Bytes.toBytes("fam1");
3217    byte[][] families = { fam1 };
3218
3219    long ts1 = System.currentTimeMillis();
3220    long ts2 = ts1 + 1;
3221    long ts3 = ts1 + 2;
3222
3223    // Setting up region
3224    this.region = initHRegion(tableName, method, CONF, families);
3225    try {
3226      // Putting data in Region
3227      Put put = null;
3228      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3229      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3230      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3231
3232      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3233      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3234      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3235
3236      put = new Put(row1);
3237      put.add(kv13);
3238      put.add(kv12);
3239      put.add(kv11);
3240      put.add(kv23);
3241      put.add(kv22);
3242      put.add(kv21);
3243      region.put(put);
3244
3245      // Expected
3246      List<Cell> expected = new ArrayList<>();
3247      expected.add(kv13);
3248      expected.add(kv12);
3249      expected.add(kv23);
3250      expected.add(kv22);
3251
3252      Scan scan = new Scan(row1);
3253      scan.addFamily(fam1);
3254      scan.setMaxVersions(MAX_VERSIONS);
3255      List<Cell> actual = new ArrayList<>();
3256      InternalScanner scanner = region.getScanner(scan);
3257
3258      boolean hasNext = scanner.next(actual);
3259      assertEquals(false, hasNext);
3260
3261      // Verify result
3262      for (int i = 0; i < expected.size(); i++) {
3263        assertEquals(expected.get(i), actual.get(i));
3264      }
3265    } finally {
3266      HBaseTestingUtility.closeRegionAndWAL(this.region);
3267      this.region = null;
3268    }
3269  }
3270
3271  @Test
3272  public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException {
3273    byte[] row1 = Bytes.toBytes("row1");
3274    byte[] qf1 = Bytes.toBytes("qualifier1");
3275    byte[] qf2 = Bytes.toBytes("qualifier2");
3276    byte[] fam1 = Bytes.toBytes("fam1");
3277
3278    long ts1 = 1; // System.currentTimeMillis();
3279    long ts2 = ts1 + 1;
3280    long ts3 = ts1 + 2;
3281
3282    // Setting up region
3283    this.region = initHRegion(tableName, method, CONF, fam1);
3284    try {
3285      // Putting data in Region
3286      Put put = null;
3287      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3288      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3289      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3290
3291      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3292      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3293      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3294
3295      put = new Put(row1);
3296      put.add(kv13);
3297      put.add(kv12);
3298      put.add(kv11);
3299      put.add(kv23);
3300      put.add(kv22);
3301      put.add(kv21);
3302      region.put(put);
3303      region.flush(true);
3304
3305      // Expected
3306      List<Cell> expected = new ArrayList<>();
3307      expected.add(kv13);
3308      expected.add(kv12);
3309      expected.add(kv23);
3310      expected.add(kv22);
3311
3312      Scan scan = new Scan(row1);
3313      scan.addFamily(fam1);
3314      scan.setMaxVersions(MAX_VERSIONS);
3315      List<Cell> actual = new ArrayList<>();
3316      InternalScanner scanner = region.getScanner(scan);
3317
3318      boolean hasNext = scanner.next(actual);
3319      assertEquals(false, hasNext);
3320
3321      // Verify result
3322      for (int i = 0; i < expected.size(); i++) {
3323        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3324      }
3325    } finally {
3326      HBaseTestingUtility.closeRegionAndWAL(this.region);
3327      this.region = null;
3328    }
3329  }
3330
3331  @Test
3332  public void testScanner_StopRow1542() throws IOException {
3333    byte[] family = Bytes.toBytes("testFamily");
3334    this.region = initHRegion(tableName, method, CONF, family);
3335    try {
3336      byte[] row1 = Bytes.toBytes("row111");
3337      byte[] row2 = Bytes.toBytes("row222");
3338      byte[] row3 = Bytes.toBytes("row333");
3339      byte[] row4 = Bytes.toBytes("row444");
3340      byte[] row5 = Bytes.toBytes("row555");
3341
3342      byte[] col1 = Bytes.toBytes("Pub111");
3343      byte[] col2 = Bytes.toBytes("Pub222");
3344
3345      Put put = new Put(row1);
3346      put.addColumn(family, col1, Bytes.toBytes(10L));
3347      region.put(put);
3348
3349      put = new Put(row2);
3350      put.addColumn(family, col1, Bytes.toBytes(15L));
3351      region.put(put);
3352
3353      put = new Put(row3);
3354      put.addColumn(family, col2, Bytes.toBytes(20L));
3355      region.put(put);
3356
3357      put = new Put(row4);
3358      put.addColumn(family, col2, Bytes.toBytes(30L));
3359      region.put(put);
3360
3361      put = new Put(row5);
3362      put.addColumn(family, col1, Bytes.toBytes(40L));
3363      region.put(put);
3364
3365      Scan scan = new Scan(row3, row4);
3366      scan.setMaxVersions();
3367      scan.addColumn(family, col1);
3368      InternalScanner s = region.getScanner(scan);
3369
3370      List<Cell> results = new ArrayList<>();
3371      assertEquals(false, s.next(results));
3372      assertEquals(0, results.size());
3373    } finally {
3374      HBaseTestingUtility.closeRegionAndWAL(this.region);
3375      this.region = null;
3376    }
3377  }
3378
3379  @Test
3380  public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException {
3381    byte[] row1 = Bytes.toBytes("row1");
3382    byte[] fam1 = Bytes.toBytes("fam1");
3383    byte[] qf1 = Bytes.toBytes("qualifier1");
3384    byte[] qf2 = Bytes.toBytes("quateslifier2");
3385
3386    long ts1 = 1;
3387    long ts2 = ts1 + 1;
3388    long ts3 = ts1 + 2;
3389    long ts4 = ts1 + 3;
3390
3391    // Setting up region
3392    this.region = initHRegion(tableName, method, CONF, fam1);
3393    try {
3394      // Putting data in Region
3395      KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3396      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3397      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3398      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3399
3400      KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3401      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3402      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3403      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3404
3405      Put put = null;
3406      put = new Put(row1);
3407      put.add(kv14);
3408      put.add(kv24);
3409      region.put(put);
3410      region.flush(true);
3411
3412      put = new Put(row1);
3413      put.add(kv23);
3414      put.add(kv13);
3415      region.put(put);
3416      region.flush(true);
3417
3418      put = new Put(row1);
3419      put.add(kv22);
3420      put.add(kv12);
3421      region.put(put);
3422      region.flush(true);
3423
3424      put = new Put(row1);
3425      put.add(kv21);
3426      put.add(kv11);
3427      region.put(put);
3428
3429      // Expected
3430      List<KeyValue> expected = new ArrayList<>();
3431      expected.add(kv14);
3432      expected.add(kv13);
3433      expected.add(kv12);
3434      expected.add(kv24);
3435      expected.add(kv23);
3436      expected.add(kv22);
3437
3438      Scan scan = new Scan(row1);
3439      int versions = 3;
3440      scan.setMaxVersions(versions);
3441      List<Cell> actual = new ArrayList<>();
3442      InternalScanner scanner = region.getScanner(scan);
3443
3444      boolean hasNext = scanner.next(actual);
3445      assertEquals(false, hasNext);
3446
3447      // Verify result
3448      for (int i = 0; i < expected.size(); i++) {
3449        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3450      }
3451    } finally {
3452      HBaseTestingUtility.closeRegionAndWAL(this.region);
3453      this.region = null;
3454    }
3455  }
3456
3457  /**
3458   * Added for HBASE-5416
3459   *
3460   * Here we test scan optimization when only subset of CFs are used in filter
3461   * conditions.
3462   */
3463  @Test
3464  public void testScanner_JoinedScanners() throws IOException {
3465    byte[] cf_essential = Bytes.toBytes("essential");
3466    byte[] cf_joined = Bytes.toBytes("joined");
3467    byte[] cf_alpha = Bytes.toBytes("alpha");
3468    this.region = initHRegion(tableName, method, CONF, cf_essential, cf_joined, cf_alpha);
3469    try {
3470      byte[] row1 = Bytes.toBytes("row1");
3471      byte[] row2 = Bytes.toBytes("row2");
3472      byte[] row3 = Bytes.toBytes("row3");
3473
3474      byte[] col_normal = Bytes.toBytes("d");
3475      byte[] col_alpha = Bytes.toBytes("a");
3476
3477      byte[] filtered_val = Bytes.toBytes(3);
3478
3479      Put put = new Put(row1);
3480      put.addColumn(cf_essential, col_normal, Bytes.toBytes(1));
3481      put.addColumn(cf_joined, col_alpha, Bytes.toBytes(1));
3482      region.put(put);
3483
3484      put = new Put(row2);
3485      put.addColumn(cf_essential, col_alpha, Bytes.toBytes(2));
3486      put.addColumn(cf_joined, col_normal, Bytes.toBytes(2));
3487      put.addColumn(cf_alpha, col_alpha, Bytes.toBytes(2));
3488      region.put(put);
3489
3490      put = new Put(row3);
3491      put.addColumn(cf_essential, col_normal, filtered_val);
3492      put.addColumn(cf_joined, col_normal, filtered_val);
3493      region.put(put);
3494
3495      // Check two things:
3496      // 1. result list contains expected values
3497      // 2. result list is sorted properly
3498
3499      Scan scan = new Scan();
3500      Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
3501          CompareOp.NOT_EQUAL, filtered_val);
3502      scan.setFilter(filter);
3503      scan.setLoadColumnFamiliesOnDemand(true);
3504      InternalScanner s = region.getScanner(scan);
3505
3506      List<Cell> results = new ArrayList<>();
3507      assertTrue(s.next(results));
3508      assertEquals(1, results.size());
3509      results.clear();
3510
3511      assertTrue(s.next(results));
3512      assertEquals(3, results.size());
3513      assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha));
3514      assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential));
3515      assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined));
3516      results.clear();
3517
3518      assertFalse(s.next(results));
3519      assertEquals(0, results.size());
3520    } finally {
3521      HBaseTestingUtility.closeRegionAndWAL(this.region);
3522      this.region = null;
3523    }
3524  }
3525
3526  /**
3527   * HBASE-5416
3528   *
3529   * Test case when scan limits amount of KVs returned on each next() call.
3530   */
3531  @Test
3532  public void testScanner_JoinedScannersWithLimits() throws IOException {
3533    final byte[] cf_first = Bytes.toBytes("first");
3534    final byte[] cf_second = Bytes.toBytes("second");
3535
3536    this.region = initHRegion(tableName, method, CONF, cf_first, cf_second);
3537    try {
3538      final byte[] col_a = Bytes.toBytes("a");
3539      final byte[] col_b = Bytes.toBytes("b");
3540
3541      Put put;
3542
3543      for (int i = 0; i < 10; i++) {
3544        put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
3545        put.addColumn(cf_first, col_a, Bytes.toBytes(i));
3546        if (i < 5) {
3547          put.addColumn(cf_first, col_b, Bytes.toBytes(i));
3548          put.addColumn(cf_second, col_a, Bytes.toBytes(i));
3549          put.addColumn(cf_second, col_b, Bytes.toBytes(i));
3550        }
3551        region.put(put);
3552      }
3553
3554      Scan scan = new Scan();
3555      scan.setLoadColumnFamiliesOnDemand(true);
3556      Filter bogusFilter = new FilterBase() {
3557        @Override
3558        public ReturnCode filterCell(final Cell ignored) throws IOException {
3559          return ReturnCode.INCLUDE;
3560        }
3561        @Override
3562        public boolean isFamilyEssential(byte[] name) {
3563          return Bytes.equals(name, cf_first);
3564        }
3565      };
3566
3567      scan.setFilter(bogusFilter);
3568      InternalScanner s = region.getScanner(scan);
3569
3570      // Our data looks like this:
3571      // r0: first:a, first:b, second:a, second:b
3572      // r1: first:a, first:b, second:a, second:b
3573      // r2: first:a, first:b, second:a, second:b
3574      // r3: first:a, first:b, second:a, second:b
3575      // r4: first:a, first:b, second:a, second:b
3576      // r5: first:a
3577      // r6: first:a
3578      // r7: first:a
3579      // r8: first:a
3580      // r9: first:a
3581
3582      // But due to next's limit set to 3, we should get this:
3583      // r0: first:a, first:b, second:a
3584      // r0: second:b
3585      // r1: first:a, first:b, second:a
3586      // r1: second:b
3587      // r2: first:a, first:b, second:a
3588      // r2: second:b
3589      // r3: first:a, first:b, second:a
3590      // r3: second:b
3591      // r4: first:a, first:b, second:a
3592      // r4: second:b
3593      // r5: first:a
3594      // r6: first:a
3595      // r7: first:a
3596      // r8: first:a
3597      // r9: first:a
3598
3599      List<Cell> results = new ArrayList<>();
3600      int index = 0;
3601      ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(3).build();
3602      while (true) {
3603        boolean more = s.next(results, scannerContext);
3604        if ((index >> 1) < 5) {
3605          if (index % 2 == 0) {
3606            assertEquals(3, results.size());
3607          } else {
3608            assertEquals(1, results.size());
3609          }
3610        } else {
3611          assertEquals(1, results.size());
3612        }
3613        results.clear();
3614        index++;
3615        if (!more) {
3616          break;
3617        }
3618      }
3619    } finally {
3620      HBaseTestingUtility.closeRegionAndWAL(this.region);
3621      this.region = null;
3622    }
3623  }
3624
3625  /**
3626   * Write an HFile block full with Cells whose qualifier that are identical between
3627   * 0 and Short.MAX_VALUE. See HBASE-13329.
3628   * @throws Exception
3629   */
3630  @Test
3631  public void testLongQualifier() throws Exception {
3632    byte[] family = Bytes.toBytes("family");
3633    this.region = initHRegion(tableName, method, CONF, family);
3634    byte[] q = new byte[Short.MAX_VALUE+2];
3635    Arrays.fill(q, 0, q.length-1, (byte)42);
3636    for (byte i=0; i<10; i++) {
3637      Put p = new Put(Bytes.toBytes("row"));
3638      // qualifiers that differ past Short.MAX_VALUE
3639      q[q.length-1]=i;
3640      p.addColumn(family, q, q);
3641      region.put(p);
3642    }
3643    region.flush(false);
3644    HBaseTestingUtility.closeRegionAndWAL(this.region);
3645    this.region = null;
3646  }
3647
3648  /**
3649   * Flushes the cache in a thread while scanning. The tests verify that the
3650   * scan is coherent - e.g. the returned results are always of the same or
3651   * later update as the previous results.
3652   *
3653   * @throws IOException
3654   *           scan / compact
3655   * @throws InterruptedException
3656   *           thread join
3657   */
3658  @Test
3659  public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
3660    byte[] family = Bytes.toBytes("family");
3661    int numRows = 1000;
3662    int flushAndScanInterval = 10;
3663    int compactInterval = 10 * flushAndScanInterval;
3664
3665    this.region = initHRegion(tableName, method, CONF, family);
3666    FlushThread flushThread = new FlushThread();
3667    try {
3668      flushThread.start();
3669
3670      Scan scan = new Scan();
3671      scan.addFamily(family);
3672      scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOp.EQUAL,
3673          new BinaryComparator(Bytes.toBytes(5L))));
3674
3675      int expectedCount = 0;
3676      List<Cell> res = new ArrayList<>();
3677
3678      boolean toggle = true;
3679      for (long i = 0; i < numRows; i++) {
3680        Put put = new Put(Bytes.toBytes(i));
3681        put.setDurability(Durability.SKIP_WAL);
3682        put.addColumn(family, qual1, Bytes.toBytes(i % 10));
3683        region.put(put);
3684
3685        if (i != 0 && i % compactInterval == 0) {
3686          LOG.debug("iteration = " + i+ " ts="+System.currentTimeMillis());
3687          region.compact(true);
3688        }
3689
3690        if (i % 10 == 5L) {
3691          expectedCount++;
3692        }
3693
3694        if (i != 0 && i % flushAndScanInterval == 0) {
3695          res.clear();
3696          InternalScanner scanner = region.getScanner(scan);
3697          if (toggle) {
3698            flushThread.flush();
3699          }
3700          while (scanner.next(res))
3701            ;
3702          if (!toggle) {
3703            flushThread.flush();
3704          }
3705          assertEquals("toggle="+toggle+"i=" + i + " ts="+System.currentTimeMillis(),
3706              expectedCount, res.size());
3707          toggle = !toggle;
3708        }
3709      }
3710
3711    } finally {
3712      try {
3713        flushThread.done();
3714        flushThread.join();
3715        flushThread.checkNoError();
3716      } catch (InterruptedException ie) {
3717        LOG.warn("Caught exception when joining with flushThread", ie);
3718      }
3719      HBaseTestingUtility.closeRegionAndWAL(this.region);
3720      this.region = null;
3721    }
3722  }
3723
3724  protected class FlushThread extends Thread {
3725    private volatile boolean done;
3726    private Throwable error = null;
3727
3728    FlushThread() {
3729      super("FlushThread");
3730    }
3731
3732    public void done() {
3733      done = true;
3734      synchronized (this) {
3735        interrupt();
3736      }
3737    }
3738
3739    public void checkNoError() {
3740      if (error != null) {
3741        assertNull(error);
3742      }
3743    }
3744
3745    @Override
3746    public void run() {
3747      done = false;
3748      while (!done) {
3749        synchronized (this) {
3750          try {
3751            wait();
3752          } catch (InterruptedException ignored) {
3753            if (done) {
3754              break;
3755            }
3756          }
3757        }
3758        try {
3759          region.flush(true);
3760        } catch (IOException e) {
3761          if (!done) {
3762            LOG.error("Error while flushing cache", e);
3763            error = e;
3764          }
3765          break;
3766        } catch (Throwable t) {
3767          LOG.error("Uncaught exception", t);
3768          throw t;
3769        }
3770      }
3771    }
3772
3773    public void flush() {
3774      synchronized (this) {
3775        notify();
3776      }
3777    }
3778  }
3779
3780  /**
3781   * Writes very wide records and scans for the latest every time.. Flushes and
3782   * compacts the region every now and then to keep things realistic.
3783   *
3784   * @throws IOException
3785   *           by flush / scan / compaction
3786   * @throws InterruptedException
3787   *           when joining threads
3788   */
3789  @Test
3790  public void testWritesWhileScanning() throws IOException, InterruptedException {
3791    int testCount = 100;
3792    int numRows = 1;
3793    int numFamilies = 10;
3794    int numQualifiers = 100;
3795    int flushInterval = 7;
3796    int compactInterval = 5 * flushInterval;
3797    byte[][] families = new byte[numFamilies][];
3798    for (int i = 0; i < numFamilies; i++) {
3799      families[i] = Bytes.toBytes("family" + i);
3800    }
3801    byte[][] qualifiers = new byte[numQualifiers][];
3802    for (int i = 0; i < numQualifiers; i++) {
3803      qualifiers[i] = Bytes.toBytes("qual" + i);
3804    }
3805
3806    this.region = initHRegion(tableName, method, CONF, families);
3807    FlushThread flushThread = new FlushThread();
3808    PutThread putThread = new PutThread(numRows, families, qualifiers);
3809    try {
3810      putThread.start();
3811      putThread.waitForFirstPut();
3812
3813      flushThread.start();
3814
3815      Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
3816
3817      int expectedCount = numFamilies * numQualifiers;
3818      List<Cell> res = new ArrayList<>();
3819
3820      long prevTimestamp = 0L;
3821      for (int i = 0; i < testCount; i++) {
3822
3823        if (i != 0 && i % compactInterval == 0) {
3824          region.compact(true);
3825          for (HStore store : region.getStores()) {
3826            store.closeAndArchiveCompactedFiles();
3827          }
3828        }
3829
3830        if (i != 0 && i % flushInterval == 0) {
3831          flushThread.flush();
3832        }
3833
3834        boolean previousEmpty = res.isEmpty();
3835        res.clear();
3836        InternalScanner scanner = region.getScanner(scan);
3837        while (scanner.next(res))
3838          ;
3839        if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
3840          assertEquals("i=" + i, expectedCount, res.size());
3841          long timestamp = res.get(0).getTimestamp();
3842          assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
3843              timestamp >= prevTimestamp);
3844          prevTimestamp = timestamp;
3845        }
3846      }
3847
3848      putThread.done();
3849
3850      region.flush(true);
3851
3852    } finally {
3853      try {
3854        flushThread.done();
3855        flushThread.join();
3856        flushThread.checkNoError();
3857
3858        putThread.join();
3859        putThread.checkNoError();
3860      } catch (InterruptedException ie) {
3861        LOG.warn("Caught exception when joining with flushThread", ie);
3862      }
3863
3864      try {
3865          HBaseTestingUtility.closeRegionAndWAL(this.region);
3866      } catch (DroppedSnapshotException dse) {
3867        // We could get this on way out because we interrupt the background flusher and it could
3868        // fail anywhere causing a DSE over in the background flusher... only it is not properly
3869        // dealt with so could still be memory hanging out when we get to here -- memory we can't
3870        // flush because the accounting is 'off' since original DSE.
3871      }
3872      this.region = null;
3873    }
3874  }
3875
3876  protected class PutThread extends Thread {
3877    private volatile boolean done;
3878    private volatile int numPutsFinished = 0;
3879
3880    private Throwable error = null;
3881    private int numRows;
3882    private byte[][] families;
3883    private byte[][] qualifiers;
3884
3885    private PutThread(int numRows, byte[][] families, byte[][] qualifiers) {
3886      super("PutThread");
3887      this.numRows = numRows;
3888      this.families = families;
3889      this.qualifiers = qualifiers;
3890    }
3891
3892    /**
3893     * Block calling thread until this instance of PutThread has put at least one row.
3894     */
3895    public void waitForFirstPut() throws InterruptedException {
3896      // wait until put thread actually puts some data
3897      while (isAlive() && numPutsFinished == 0) {
3898        checkNoError();
3899        Thread.sleep(50);
3900      }
3901    }
3902
3903    public void done() {
3904      done = true;
3905      synchronized (this) {
3906        interrupt();
3907      }
3908    }
3909
3910    public void checkNoError() {
3911      if (error != null) {
3912        assertNull(error);
3913      }
3914    }
3915
3916    @Override
3917    public void run() {
3918      done = false;
3919      while (!done) {
3920        try {
3921          for (int r = 0; r < numRows; r++) {
3922            byte[] row = Bytes.toBytes("row" + r);
3923            Put put = new Put(row);
3924            put.setDurability(Durability.SKIP_WAL);
3925            byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished));
3926            for (byte[] family : families) {
3927              for (byte[] qualifier : qualifiers) {
3928                put.addColumn(family, qualifier, numPutsFinished, value);
3929              }
3930            }
3931            region.put(put);
3932            numPutsFinished++;
3933            if (numPutsFinished > 0 && numPutsFinished % 47 == 0) {
3934              System.out.println("put iteration = " + numPutsFinished);
3935              Delete delete = new Delete(row, (long) numPutsFinished - 30);
3936              region.delete(delete);
3937            }
3938            numPutsFinished++;
3939          }
3940        } catch (InterruptedIOException e) {
3941          // This is fine. It means we are done, or didn't get the lock on time
3942          LOG.info("Interrupted", e);
3943        } catch (IOException e) {
3944          LOG.error("Error while putting records", e);
3945          error = e;
3946          break;
3947        }
3948      }
3949
3950    }
3951
3952  }
3953
3954  /**
3955   * Writes very wide records and gets the latest row every time.. Flushes and
3956   * compacts the region aggressivly to catch issues.
3957   *
3958   * @throws IOException
3959   *           by flush / scan / compaction
3960   * @throws InterruptedException
3961   *           when joining threads
3962   */
3963  @Test
3964  public void testWritesWhileGetting() throws Exception {
3965    int testCount = 50;
3966    int numRows = 1;
3967    int numFamilies = 10;
3968    int numQualifiers = 100;
3969    int compactInterval = 100;
3970    byte[][] families = new byte[numFamilies][];
3971    for (int i = 0; i < numFamilies; i++) {
3972      families[i] = Bytes.toBytes("family" + i);
3973    }
3974    byte[][] qualifiers = new byte[numQualifiers][];
3975    for (int i = 0; i < numQualifiers; i++) {
3976      qualifiers[i] = Bytes.toBytes("qual" + i);
3977    }
3978
3979
3980    // This test flushes constantly and can cause many files to be created,
3981    // possibly
3982    // extending over the ulimit. Make sure compactions are aggressive in
3983    // reducing
3984    // the number of HFiles created.
3985    Configuration conf = HBaseConfiguration.create(CONF);
3986    conf.setInt("hbase.hstore.compaction.min", 1);
3987    conf.setInt("hbase.hstore.compaction.max", 1000);
3988    this.region = initHRegion(tableName, method, conf, families);
3989    PutThread putThread = null;
3990    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
3991    try {
3992      putThread = new PutThread(numRows, families, qualifiers);
3993      putThread.start();
3994      putThread.waitForFirstPut();
3995
3996      // Add a thread that flushes as fast as possible
3997      ctx.addThread(new RepeatingTestThread(ctx) {
3998
3999        @Override
4000        public void doAnAction() throws Exception {
4001          region.flush(true);
4002          // Compact regularly to avoid creating too many files and exceeding
4003          // the ulimit.
4004          region.compact(false);
4005          for (HStore store : region.getStores()) {
4006            store.closeAndArchiveCompactedFiles();
4007          }
4008        }
4009      });
4010      ctx.startThreads();
4011
4012      Get get = new Get(Bytes.toBytes("row0"));
4013      Result result = null;
4014
4015      int expectedCount = numFamilies * numQualifiers;
4016
4017      long prevTimestamp = 0L;
4018      for (int i = 0; i < testCount; i++) {
4019        LOG.info("testWritesWhileGetting verify turn " + i);
4020        boolean previousEmpty = result == null || result.isEmpty();
4021        result = region.get(get);
4022        if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
4023          assertEquals("i=" + i, expectedCount, result.size());
4024          // TODO this was removed, now what dangit?!
4025          // search looking for the qualifier in question?
4026          long timestamp = 0;
4027          for (Cell kv : result.rawCells()) {
4028            if (CellUtil.matchingFamily(kv, families[0])
4029                && CellUtil.matchingQualifier(kv, qualifiers[0])) {
4030              timestamp = kv.getTimestamp();
4031            }
4032          }
4033          assertTrue(timestamp >= prevTimestamp);
4034          prevTimestamp = timestamp;
4035          Cell previousKV = null;
4036
4037          for (Cell kv : result.rawCells()) {
4038            byte[] thisValue = CellUtil.cloneValue(kv);
4039            if (previousKV != null) {
4040              if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) {
4041                LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV
4042                    + "(memStoreTS:" + previousKV.getSequenceId() + ")" + ", New KV: " + kv
4043                    + "(memStoreTS:" + kv.getSequenceId() + ")");
4044                assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue));
4045              }
4046            }
4047            previousKV = kv;
4048          }
4049        }
4050      }
4051    } finally {
4052      if (putThread != null)
4053        putThread.done();
4054
4055      region.flush(true);
4056
4057      if (putThread != null) {
4058        putThread.join();
4059        putThread.checkNoError();
4060      }
4061
4062      ctx.stop();
4063      HBaseTestingUtility.closeRegionAndWAL(this.region);
4064      this.region = null;
4065    }
4066  }
4067
4068  @Test
4069  public void testHolesInMeta() throws Exception {
4070    byte[] family = Bytes.toBytes("family");
4071    this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF,
4072        false, family);
4073    try {
4074      byte[] rowNotServed = Bytes.toBytes("a");
4075      Get g = new Get(rowNotServed);
4076      try {
4077        region.get(g);
4078        fail();
4079      } catch (WrongRegionException x) {
4080        // OK
4081      }
4082      byte[] row = Bytes.toBytes("y");
4083      g = new Get(row);
4084      region.get(g);
4085    } finally {
4086      HBaseTestingUtility.closeRegionAndWAL(this.region);
4087      this.region = null;
4088    }
4089  }
4090
4091  @Test
4092  public void testIndexesScanWithOneDeletedRow() throws IOException {
4093    byte[] family = Bytes.toBytes("family");
4094
4095    // Setting up region
4096    this.region = initHRegion(tableName, method, CONF, family);
4097    try {
4098      Put put = new Put(Bytes.toBytes(1L));
4099      put.addColumn(family, qual1, 1L, Bytes.toBytes(1L));
4100      region.put(put);
4101
4102      region.flush(true);
4103
4104      Delete delete = new Delete(Bytes.toBytes(1L), 1L);
4105      region.delete(delete);
4106
4107      put = new Put(Bytes.toBytes(2L));
4108      put.addColumn(family, qual1, 2L, Bytes.toBytes(2L));
4109      region.put(put);
4110
4111      Scan idxScan = new Scan();
4112      idxScan.addFamily(family);
4113      idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList(
4114          new SingleColumnValueFilter(family, qual1, CompareOp.GREATER_OR_EQUAL,
4115              new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1,
4116              CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L))))));
4117      InternalScanner scanner = region.getScanner(idxScan);
4118      List<Cell> res = new ArrayList<>();
4119
4120      while (scanner.next(res))
4121        ;
4122      assertEquals(1L, res.size());
4123    } finally {
4124      HBaseTestingUtility.closeRegionAndWAL(this.region);
4125      this.region = null;
4126    }
4127  }
4128
4129  // ////////////////////////////////////////////////////////////////////////////
4130  // Bloom filter test
4131  // ////////////////////////////////////////////////////////////////////////////
4132  @Test
4133  public void testBloomFilterSize() throws IOException {
4134    byte[] fam1 = Bytes.toBytes("fam1");
4135    byte[] qf1 = Bytes.toBytes("col");
4136    byte[] val1 = Bytes.toBytes("value1");
4137    // Create Table
4138    HColumnDescriptor hcd = new HColumnDescriptor(fam1).setMaxVersions(Integer.MAX_VALUE)
4139        .setBloomFilterType(BloomType.ROWCOL);
4140
4141    HTableDescriptor htd = new HTableDescriptor(tableName);
4142    htd.addFamily(hcd);
4143    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4144    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4145    try {
4146      int num_unique_rows = 10;
4147      int duplicate_multiplier = 2;
4148      int num_storefiles = 4;
4149
4150      int version = 0;
4151      for (int f = 0; f < num_storefiles; f++) {
4152        for (int i = 0; i < duplicate_multiplier; i++) {
4153          for (int j = 0; j < num_unique_rows; j++) {
4154            Put put = new Put(Bytes.toBytes("row" + j));
4155            put.setDurability(Durability.SKIP_WAL);
4156            long ts = version++;
4157            put.addColumn(fam1, qf1, ts, val1);
4158            region.put(put);
4159          }
4160        }
4161        region.flush(true);
4162      }
4163      // before compaction
4164      HStore store = region.getStore(fam1);
4165      Collection<HStoreFile> storeFiles = store.getStorefiles();
4166      for (HStoreFile storefile : storeFiles) {
4167        StoreFileReader reader = storefile.getReader();
4168        reader.loadFileInfo();
4169        reader.loadBloomfilter();
4170        assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
4171        assertEquals(num_unique_rows, reader.getFilterEntries());
4172      }
4173
4174      region.compact(true);
4175
4176      // after compaction
4177      storeFiles = store.getStorefiles();
4178      for (HStoreFile storefile : storeFiles) {
4179        StoreFileReader reader = storefile.getReader();
4180        reader.loadFileInfo();
4181        reader.loadBloomfilter();
4182        assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries());
4183        assertEquals(num_unique_rows, reader.getFilterEntries());
4184      }
4185    } finally {
4186      HBaseTestingUtility.closeRegionAndWAL(this.region);
4187      this.region = null;
4188    }
4189  }
4190
4191  @Test
4192  public void testAllColumnsWithBloomFilter() throws IOException {
4193    byte[] TABLE = Bytes.toBytes(name.getMethodName());
4194    byte[] FAMILY = Bytes.toBytes("family");
4195
4196    // Create table
4197    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE)
4198        .setBloomFilterType(BloomType.ROWCOL);
4199    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
4200    htd.addFamily(hcd);
4201    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4202    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4203    try {
4204      // For row:0, col:0: insert versions 1 through 5.
4205      byte row[] = Bytes.toBytes("row:" + 0);
4206      byte column[] = Bytes.toBytes("column:" + 0);
4207      Put put = new Put(row);
4208      put.setDurability(Durability.SKIP_WAL);
4209      for (long idx = 1; idx <= 4; idx++) {
4210        put.addColumn(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx));
4211      }
4212      region.put(put);
4213
4214      // Flush
4215      region.flush(true);
4216
4217      // Get rows
4218      Get get = new Get(row);
4219      get.setMaxVersions();
4220      Cell[] kvs = region.get(get).rawCells();
4221
4222      // Check if rows are correct
4223      assertEquals(4, kvs.length);
4224      checkOneCell(kvs[0], FAMILY, 0, 0, 4);
4225      checkOneCell(kvs[1], FAMILY, 0, 0, 3);
4226      checkOneCell(kvs[2], FAMILY, 0, 0, 2);
4227      checkOneCell(kvs[3], FAMILY, 0, 0, 1);
4228    } finally {
4229      HBaseTestingUtility.closeRegionAndWAL(this.region);
4230      this.region = null;
4231    }
4232  }
4233
4234  /**
4235   * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when
4236   * issuing delete row on columns with bloom filter set to row+col
4237   * (BloomType.ROWCOL)
4238   */
4239  @Test
4240  public void testDeleteRowWithBloomFilter() throws IOException {
4241    byte[] familyName = Bytes.toBytes("familyName");
4242
4243    // Create Table
4244    HColumnDescriptor hcd = new HColumnDescriptor(familyName).setMaxVersions(Integer.MAX_VALUE)
4245        .setBloomFilterType(BloomType.ROWCOL);
4246
4247    HTableDescriptor htd = new HTableDescriptor(tableName);
4248    htd.addFamily(hcd);
4249    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4250    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4251    try {
4252      // Insert some data
4253      byte row[] = Bytes.toBytes("row1");
4254      byte col[] = Bytes.toBytes("col1");
4255
4256      Put put = new Put(row);
4257      put.addColumn(familyName, col, 1, Bytes.toBytes("SomeRandomValue"));
4258      region.put(put);
4259      region.flush(true);
4260
4261      Delete del = new Delete(row);
4262      region.delete(del);
4263      region.flush(true);
4264
4265      // Get remaining rows (should have none)
4266      Get get = new Get(row);
4267      get.addColumn(familyName, col);
4268
4269      Cell[] keyValues = region.get(get).rawCells();
4270      assertTrue(keyValues.length == 0);
4271    } finally {
4272      HBaseTestingUtility.closeRegionAndWAL(this.region);
4273      this.region = null;
4274    }
4275  }
4276
4277  @Test
4278  public void testgetHDFSBlocksDistribution() throws Exception {
4279    HBaseTestingUtility htu = new HBaseTestingUtility();
4280    // Why do we set the block size in this test?  If we set it smaller than the kvs, then we'll
4281    // break up the file in to more pieces that can be distributed across the three nodes and we
4282    // won't be able to have the condition this test asserts; that at least one node has
4283    // a copy of all replicas -- if small block size, then blocks are spread evenly across the
4284    // the three nodes.  hfilev3 with tags seems to put us over the block size.  St.Ack.
4285    // final int DEFAULT_BLOCK_SIZE = 1024;
4286    // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
4287    htu.getConfiguration().setInt("dfs.replication", 2);
4288
4289    // set up a cluster with 3 nodes
4290    MiniHBaseCluster cluster = null;
4291    String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
4292    int regionServersCount = 3;
4293
4294    try {
4295      cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts);
4296      byte[][] families = { fam1, fam2 };
4297      Table ht = htu.createTable(tableName, families);
4298
4299      // Setting up region
4300      byte row[] = Bytes.toBytes("row1");
4301      byte col[] = Bytes.toBytes("col1");
4302
4303      Put put = new Put(row);
4304      put.addColumn(fam1, col, 1, Bytes.toBytes("test1"));
4305      put.addColumn(fam2, col, 1, Bytes.toBytes("test2"));
4306      ht.put(put);
4307
4308      HRegion firstRegion = htu.getHBaseCluster().getRegions(tableName).get(0);
4309      firstRegion.flush(true);
4310      HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution();
4311
4312      // Given the default replication factor is 2 and we have 2 HFiles,
4313      // we will have total of 4 replica of blocks on 3 datanodes; thus there
4314      // must be at least one host that have replica for 2 HFiles. That host's
4315      // weight will be equal to the unique block weight.
4316      long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight();
4317      StringBuilder sb = new StringBuilder();
4318      for (String host: blocksDistribution1.getTopHosts()) {
4319        if (sb.length() > 0) sb.append(", ");
4320        sb.append(host);
4321        sb.append("=");
4322        sb.append(blocksDistribution1.getWeight(host));
4323      }
4324
4325      String topHost = blocksDistribution1.getTopHosts().get(0);
4326      long topHostWeight = blocksDistribution1.getWeight(topHost);
4327      String msg = "uniqueBlocksWeight=" + uniqueBlocksWeight1 + ", topHostWeight=" +
4328        topHostWeight + ", topHost=" + topHost + "; " + sb.toString();
4329      LOG.info(msg);
4330      assertTrue(msg, uniqueBlocksWeight1 == topHostWeight);
4331
4332      // use the static method to compute the value, it should be the same.
4333      // static method is used by load balancer or other components
4334      HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution(
4335          htu.getConfiguration(), firstRegion.getTableDescriptor(), firstRegion.getRegionInfo());
4336      long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight();
4337
4338      assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2);
4339
4340      ht.close();
4341    } finally {
4342      if (cluster != null) {
4343        htu.shutdownMiniCluster();
4344      }
4345    }
4346  }
4347
4348  /**
4349   * Testcase to check state of region initialization task set to ABORTED or not
4350   * if any exceptions during initialization
4351   *
4352   * @throws Exception
4353   */
4354  @Test
4355  public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception {
4356    HRegionInfo info;
4357    try {
4358      FileSystem fs = Mockito.mock(FileSystem.class);
4359      Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException());
4360      HTableDescriptor htd = new HTableDescriptor(tableName);
4361      htd.addFamily(new HColumnDescriptor("cf"));
4362      info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
4363          HConstants.EMPTY_BYTE_ARRAY, false);
4364      Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
4365      region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null);
4366      // region initialization throws IOException and set task state to ABORTED.
4367      region.initialize();
4368      fail("Region initialization should fail due to IOException");
4369    } catch (IOException io) {
4370      List<MonitoredTask> tasks = TaskMonitor.get().getTasks();
4371      for (MonitoredTask monitoredTask : tasks) {
4372        if (!(monitoredTask instanceof MonitoredRPCHandler)
4373            && monitoredTask.getDescription().contains(region.toString())) {
4374          assertTrue("Region state should be ABORTED.",
4375              monitoredTask.getState().equals(MonitoredTask.State.ABORTED));
4376          break;
4377        }
4378      }
4379    } finally {
4380      HBaseTestingUtility.closeRegionAndWAL(region);
4381    }
4382  }
4383
4384  /**
4385   * Verifies that the .regioninfo file is written on region creation and that
4386   * is recreated if missing during region opening.
4387   */
4388  @Test
4389  public void testRegionInfoFileCreation() throws IOException {
4390    Path rootDir = new Path(dir + "testRegionInfoFileCreation");
4391
4392    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4393    htd.addFamily(new HColumnDescriptor("cf"));
4394
4395    HRegionInfo hri = new HRegionInfo(htd.getTableName());
4396
4397    // Create a region and skip the initialization (like CreateTableHandler)
4398    HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, rootDir, CONF, htd, false);
4399    Path regionDir = region.getRegionFileSystem().getRegionDir();
4400    FileSystem fs = region.getRegionFileSystem().getFileSystem();
4401    HBaseTestingUtility.closeRegionAndWAL(region);
4402
4403    Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE);
4404
4405    // Verify that the .regioninfo file is present
4406    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4407        fs.exists(regionInfoFile));
4408
4409    // Try to open the region
4410    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4411    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4412    HBaseTestingUtility.closeRegionAndWAL(region);
4413
4414    // Verify that the .regioninfo file is still there
4415    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4416        fs.exists(regionInfoFile));
4417
4418    // Remove the .regioninfo file and verify is recreated on region open
4419    fs.delete(regionInfoFile, true);
4420    assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir",
4421        fs.exists(regionInfoFile));
4422
4423    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4424//    region = TEST_UTIL.openHRegion(hri, htd);
4425    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4426    HBaseTestingUtility.closeRegionAndWAL(region);
4427
4428    // Verify that the .regioninfo file is still there
4429    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4430        fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE)));
4431  }
4432
4433  /**
4434   * TestCase for increment
4435   */
4436  private static class Incrementer implements Runnable {
4437    private HRegion region;
4438    private final static byte[] incRow = Bytes.toBytes("incRow");
4439    private final static byte[] family = Bytes.toBytes("family");
4440    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4441    private final static long ONE = 1L;
4442    private int incCounter;
4443
4444    public Incrementer(HRegion region, int incCounter) {
4445      this.region = region;
4446      this.incCounter = incCounter;
4447    }
4448
4449    @Override
4450    public void run() {
4451      int count = 0;
4452      while (count < incCounter) {
4453        Increment inc = new Increment(incRow);
4454        inc.addColumn(family, qualifier, ONE);
4455        count++;
4456        try {
4457          region.increment(inc);
4458        } catch (IOException e) {
4459          LOG.info("Count=" + count + ", " + e);
4460          break;
4461        }
4462      }
4463    }
4464  }
4465
4466  /**
4467   * Test case to check increment function with memstore flushing
4468   * @throws Exception
4469   */
4470  @Test
4471  public void testParallelIncrementWithMemStoreFlush() throws Exception {
4472    byte[] family = Incrementer.family;
4473    this.region = initHRegion(tableName, method, CONF, family);
4474    final HRegion region = this.region;
4475    final AtomicBoolean incrementDone = new AtomicBoolean(false);
4476    Runnable flusher = new Runnable() {
4477      @Override
4478      public void run() {
4479        while (!incrementDone.get()) {
4480          try {
4481            region.flush(true);
4482          } catch (Exception e) {
4483            e.printStackTrace();
4484          }
4485        }
4486      }
4487    };
4488
4489    // after all increment finished, the row will increment to 20*100 = 2000
4490    int threadNum = 20;
4491    int incCounter = 100;
4492    long expected = (long) threadNum * incCounter;
4493    Thread[] incrementers = new Thread[threadNum];
4494    Thread flushThread = new Thread(flusher);
4495    for (int i = 0; i < threadNum; i++) {
4496      incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
4497      incrementers[i].start();
4498    }
4499    flushThread.start();
4500    for (int i = 0; i < threadNum; i++) {
4501      incrementers[i].join();
4502    }
4503
4504    incrementDone.set(true);
4505    flushThread.join();
4506
4507    Get get = new Get(Incrementer.incRow);
4508    get.addColumn(Incrementer.family, Incrementer.qualifier);
4509    get.setMaxVersions(1);
4510    Result res = this.region.get(get);
4511    List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier);
4512
4513    // we just got the latest version
4514    assertEquals(1, kvs.size());
4515    Cell kv = kvs.get(0);
4516    assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset()));
4517    this.region = null;
4518  }
4519
4520  /**
4521   * TestCase for append
4522   */
4523  private static class Appender implements Runnable {
4524    private HRegion region;
4525    private final static byte[] appendRow = Bytes.toBytes("appendRow");
4526    private final static byte[] family = Bytes.toBytes("family");
4527    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4528    private final static byte[] CHAR = Bytes.toBytes("a");
4529    private int appendCounter;
4530
4531    public Appender(HRegion region, int appendCounter) {
4532      this.region = region;
4533      this.appendCounter = appendCounter;
4534    }
4535
4536    @Override
4537    public void run() {
4538      int count = 0;
4539      while (count < appendCounter) {
4540        Append app = new Append(appendRow);
4541        app.addColumn(family, qualifier, CHAR);
4542        count++;
4543        try {
4544          region.append(app);
4545        } catch (IOException e) {
4546          LOG.info("Count=" + count + ", max=" + appendCounter + ", " + e);
4547          break;
4548        }
4549      }
4550    }
4551  }
4552
4553  /**
4554   * Test case to check append function with memstore flushing
4555   * @throws Exception
4556   */
4557  @Test
4558  public void testParallelAppendWithMemStoreFlush() throws Exception {
4559    byte[] family = Appender.family;
4560    this.region = initHRegion(tableName, method, CONF, family);
4561    final HRegion region = this.region;
4562    final AtomicBoolean appendDone = new AtomicBoolean(false);
4563    Runnable flusher = new Runnable() {
4564      @Override
4565      public void run() {
4566        while (!appendDone.get()) {
4567          try {
4568            region.flush(true);
4569          } catch (Exception e) {
4570            e.printStackTrace();
4571          }
4572        }
4573      }
4574    };
4575
4576    // After all append finished, the value will append to threadNum *
4577    // appendCounter Appender.CHAR
4578    int threadNum = 20;
4579    int appendCounter = 100;
4580    byte[] expected = new byte[threadNum * appendCounter];
4581    for (int i = 0; i < threadNum * appendCounter; i++) {
4582      System.arraycopy(Appender.CHAR, 0, expected, i, 1);
4583    }
4584    Thread[] appenders = new Thread[threadNum];
4585    Thread flushThread = new Thread(flusher);
4586    for (int i = 0; i < threadNum; i++) {
4587      appenders[i] = new Thread(new Appender(this.region, appendCounter));
4588      appenders[i].start();
4589    }
4590    flushThread.start();
4591    for (int i = 0; i < threadNum; i++) {
4592      appenders[i].join();
4593    }
4594
4595    appendDone.set(true);
4596    flushThread.join();
4597
4598    Get get = new Get(Appender.appendRow);
4599    get.addColumn(Appender.family, Appender.qualifier);
4600    get.setMaxVersions(1);
4601    Result res = this.region.get(get);
4602    List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier);
4603
4604    // we just got the latest version
4605    assertEquals(1, kvs.size());
4606    Cell kv = kvs.get(0);
4607    byte[] appendResult = new byte[kv.getValueLength()];
4608    System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
4609    assertArrayEquals(expected, appendResult);
4610    this.region = null;
4611  }
4612
4613  /**
4614   * Test case to check put function with memstore flushing for same row, same ts
4615   * @throws Exception
4616   */
4617  @Test
4618  public void testPutWithMemStoreFlush() throws Exception {
4619    byte[] family = Bytes.toBytes("family");
4620    byte[] qualifier = Bytes.toBytes("qualifier");
4621    byte[] row = Bytes.toBytes("putRow");
4622    byte[] value = null;
4623    this.region = initHRegion(tableName, method, CONF, family);
4624    Put put = null;
4625    Get get = null;
4626    List<Cell> kvs = null;
4627    Result res = null;
4628
4629    put = new Put(row);
4630    value = Bytes.toBytes("value0");
4631    put.addColumn(family, qualifier, 1234567L, value);
4632    region.put(put);
4633    get = new Get(row);
4634    get.addColumn(family, qualifier);
4635    get.setMaxVersions();
4636    res = this.region.get(get);
4637    kvs = res.getColumnCells(family, qualifier);
4638    assertEquals(1, kvs.size());
4639    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4640
4641    region.flush(true);
4642    get = new Get(row);
4643    get.addColumn(family, qualifier);
4644    get.setMaxVersions();
4645    res = this.region.get(get);
4646    kvs = res.getColumnCells(family, qualifier);
4647    assertEquals(1, kvs.size());
4648    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4649
4650    put = new Put(row);
4651    value = Bytes.toBytes("value1");
4652    put.addColumn(family, qualifier, 1234567L, value);
4653    region.put(put);
4654    get = new Get(row);
4655    get.addColumn(family, qualifier);
4656    get.setMaxVersions();
4657    res = this.region.get(get);
4658    kvs = res.getColumnCells(family, qualifier);
4659    assertEquals(1, kvs.size());
4660    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4661
4662    region.flush(true);
4663    get = new Get(row);
4664    get.addColumn(family, qualifier);
4665    get.setMaxVersions();
4666    res = this.region.get(get);
4667    kvs = res.getColumnCells(family, qualifier);
4668    assertEquals(1, kvs.size());
4669    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4670  }
4671
4672  @Test
4673  public void testDurability() throws Exception {
4674    // there are 5 x 5 cases:
4675    // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation
4676    // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
4677
4678    // expected cases for append and sync wal
4679    durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4680    durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4681    durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4682
4683    durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4684    durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4685    durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4686
4687    durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4688    durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4689
4690    durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false);
4691    durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4692
4693    durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false);
4694    durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false);
4695    durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false);
4696
4697    // expected cases for async wal
4698    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4699    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4700    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4701    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4702    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false);
4703    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false);
4704
4705    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4706    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4707    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4708    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4709    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true);
4710    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
4711
4712    // expect skip wal cases
4713    durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4714    durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4715    durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4716    durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
4717    durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false);
4718    durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false);
4719
4720  }
4721
4722  private void durabilityTest(String method, Durability tableDurability,
4723      Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
4724      final boolean expectSyncFromLogSyncer) throws Exception {
4725    Configuration conf = HBaseConfiguration.create(CONF);
4726    method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
4727    byte[] family = Bytes.toBytes("family");
4728    Path logDir = new Path(new Path(dir + method), "log");
4729    final Configuration walConf = new Configuration(conf);
4730    FSUtils.setRootDir(walConf, logDir);
4731    // XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not
4732    // deal with classes which have a field of an inner class. See discussions in HBASE-15536.
4733    walConf.set(WALFactory.WAL_PROVIDER, "filesystem");
4734    final WALFactory wals = new WALFactory(walConf, TEST_UTIL.getRandomUUID().toString());
4735    final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()));
4736    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
4737        HConstants.EMPTY_END_ROW, false, tableDurability, wal,
4738        new byte[][] { family });
4739
4740    Put put = new Put(Bytes.toBytes("r1"));
4741    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
4742    put.setDurability(mutationDurability);
4743    region.put(put);
4744
4745    //verify append called or not
4746    verify(wal, expectAppend ? times(1) : never())
4747      .append((HRegionInfo)any(), (WALKeyImpl)any(),
4748          (WALEdit)any(), Mockito.anyBoolean());
4749
4750    // verify sync called or not
4751    if (expectSync || expectSyncFromLogSyncer) {
4752      TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() {
4753        @Override
4754        public boolean evaluate() throws Exception {
4755          try {
4756            if (expectSync) {
4757              verify(wal, times(1)).sync(anyLong()); // Hregion calls this one
4758            } else if (expectSyncFromLogSyncer) {
4759              verify(wal, times(1)).sync(); // wal syncer calls this one
4760            }
4761          } catch (Throwable ignore) {
4762          }
4763          return true;
4764        }
4765      });
4766    } else {
4767      //verify(wal, never()).sync(anyLong());
4768      verify(wal, never()).sync();
4769    }
4770
4771    HBaseTestingUtility.closeRegionAndWAL(this.region);
4772    wals.close();
4773    this.region = null;
4774  }
4775
4776  @Test
4777  public void testRegionReplicaSecondary() throws IOException {
4778    // create a primary region, load some data and flush
4779    // create a secondary region, and do a get against that
4780    Path rootDir = new Path(dir + name.getMethodName());
4781    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4782
4783    byte[][] families = new byte[][] {
4784        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4785    };
4786    byte[] cq = Bytes.toBytes("cq");
4787    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4788    for (byte[] family : families) {
4789      htd.addFamily(new HColumnDescriptor(family));
4790    }
4791
4792    long time = System.currentTimeMillis();
4793    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4794      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4795      false, time, 0);
4796    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4797      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4798      false, time, 1);
4799
4800    HRegion primaryRegion = null, secondaryRegion = null;
4801
4802    try {
4803      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4804          rootDir, TEST_UTIL.getConfiguration(), htd);
4805
4806      // load some data
4807      putData(primaryRegion, 0, 1000, cq, families);
4808
4809      // flush region
4810      primaryRegion.flush(true);
4811
4812      // open secondary region
4813      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4814
4815      verifyData(secondaryRegion, 0, 1000, cq, families);
4816    } finally {
4817      if (primaryRegion != null) {
4818        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4819      }
4820      if (secondaryRegion != null) {
4821        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4822      }
4823    }
4824  }
4825
4826  @Test
4827  public void testRegionReplicaSecondaryIsReadOnly() throws IOException {
4828    // create a primary region, load some data and flush
4829    // create a secondary region, and do a put against that
4830    Path rootDir = new Path(dir + name.getMethodName());
4831    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4832
4833    byte[][] families = new byte[][] {
4834        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4835    };
4836    byte[] cq = Bytes.toBytes("cq");
4837    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4838    for (byte[] family : families) {
4839      htd.addFamily(new HColumnDescriptor(family));
4840    }
4841
4842    long time = System.currentTimeMillis();
4843    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4844      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4845      false, time, 0);
4846    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4847      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4848      false, time, 1);
4849
4850    HRegion primaryRegion = null, secondaryRegion = null;
4851
4852    try {
4853      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4854          rootDir, TEST_UTIL.getConfiguration(), htd);
4855
4856      // load some data
4857      putData(primaryRegion, 0, 1000, cq, families);
4858
4859      // flush region
4860      primaryRegion.flush(true);
4861
4862      // open secondary region
4863      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4864
4865      try {
4866        putData(secondaryRegion, 0, 1000, cq, families);
4867        fail("Should have thrown exception");
4868      } catch (IOException ex) {
4869        // expected
4870      }
4871    } finally {
4872      if (primaryRegion != null) {
4873        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4874      }
4875      if (secondaryRegion != null) {
4876        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4877      }
4878    }
4879  }
4880
4881  static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException {
4882    Configuration confForWAL = new Configuration(conf);
4883    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
4884    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8));
4885  }
4886
4887  @Test
4888  public void testCompactionFromPrimary() throws IOException {
4889    Path rootDir = new Path(dir + name.getMethodName());
4890    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4891
4892    byte[][] families = new byte[][] {
4893        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4894    };
4895    byte[] cq = Bytes.toBytes("cq");
4896    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4897    for (byte[] family : families) {
4898      htd.addFamily(new HColumnDescriptor(family));
4899    }
4900
4901    long time = System.currentTimeMillis();
4902    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4903      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4904      false, time, 0);
4905    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4906      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4907      false, time, 1);
4908
4909    HRegion primaryRegion = null, secondaryRegion = null;
4910
4911    try {
4912      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4913          rootDir, TEST_UTIL.getConfiguration(), htd);
4914
4915      // load some data
4916      putData(primaryRegion, 0, 1000, cq, families);
4917
4918      // flush region
4919      primaryRegion.flush(true);
4920
4921      // open secondary region
4922      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4923
4924      // move the file of the primary region to the archive, simulating a compaction
4925      Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
4926      primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
4927      Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
4928          .getStoreFiles(families[0]);
4929      Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
4930
4931      verifyData(secondaryRegion, 0, 1000, cq, families);
4932    } finally {
4933      if (primaryRegion != null) {
4934        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4935      }
4936      if (secondaryRegion != null) {
4937        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4938      }
4939    }
4940  }
4941
4942  private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws
4943      IOException {
4944    putData(this.region, startRow, numRows, qf, families);
4945  }
4946
4947  private void putData(HRegion region,
4948      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4949    putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families);
4950  }
4951
4952  static void putData(HRegion region, Durability durability,
4953      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4954    for (int i = startRow; i < startRow + numRows; i++) {
4955      Put put = new Put(Bytes.toBytes("" + i));
4956      put.setDurability(durability);
4957      for (byte[] family : families) {
4958        put.addColumn(family, qf, null);
4959      }
4960      region.put(put);
4961      LOG.info(put.toString());
4962    }
4963  }
4964
4965  static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
4966      throws IOException {
4967    for (int i = startRow; i < startRow + numRows; i++) {
4968      byte[] row = Bytes.toBytes("" + i);
4969      Get get = new Get(row);
4970      for (byte[] family : families) {
4971        get.addColumn(family, qf);
4972      }
4973      Result result = newReg.get(get);
4974      Cell[] raw = result.rawCells();
4975      assertEquals(families.length, result.size());
4976      for (int j = 0; j < families.length; j++) {
4977        assertTrue(CellUtil.matchingRows(raw[j], row));
4978        assertTrue(CellUtil.matchingFamily(raw[j], families[j]));
4979        assertTrue(CellUtil.matchingQualifier(raw[j], qf));
4980      }
4981    }
4982  }
4983
4984  static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
4985    // Now I have k, get values out and assert they are as expected.
4986    Get get = new Get(k).addFamily(family).setMaxVersions();
4987    Cell[] results = r.get(get).rawCells();
4988    for (int j = 0; j < results.length; j++) {
4989      byte[] tmp = CellUtil.cloneValue(results[j]);
4990      // Row should be equal to value every time.
4991      assertTrue(Bytes.equals(k, tmp));
4992    }
4993  }
4994
4995  /*
4996   * Assert first value in the passed region is <code>firstValue</code>.
4997   *
4998   * @param r
4999   *
5000   * @param fs
5001   *
5002   * @param firstValue
5003   *
5004   * @throws IOException
5005   */
5006  protected void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue)
5007      throws IOException {
5008    byte[][] families = { fs };
5009    Scan scan = new Scan();
5010    for (int i = 0; i < families.length; i++)
5011      scan.addFamily(families[i]);
5012    InternalScanner s = r.getScanner(scan);
5013    try {
5014      List<Cell> curVals = new ArrayList<>();
5015      boolean first = true;
5016      OUTER_LOOP: while (s.next(curVals)) {
5017        for (Cell kv : curVals) {
5018          byte[] val = CellUtil.cloneValue(kv);
5019          byte[] curval = val;
5020          if (first) {
5021            first = false;
5022            assertTrue(Bytes.compareTo(curval, firstValue) == 0);
5023          } else {
5024            // Not asserting anything. Might as well break.
5025            break OUTER_LOOP;
5026          }
5027        }
5028      }
5029    } finally {
5030      s.close();
5031    }
5032  }
5033
5034  /**
5035   * Test that we get the expected flush results back
5036   */
5037  @Test
5038  public void testFlushResult() throws IOException {
5039    byte[] family = Bytes.toBytes("family");
5040
5041    this.region = initHRegion(tableName, method, family);
5042
5043    // empty memstore, flush doesn't run
5044    HRegion.FlushResult fr = region.flush(true);
5045    assertFalse(fr.isFlushSucceeded());
5046    assertFalse(fr.isCompactionNeeded());
5047
5048    // Flush enough files to get up to the threshold, doesn't need compactions
5049    for (int i = 0; i < 2; i++) {
5050      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
5051      region.put(put);
5052      fr = region.flush(true);
5053      assertTrue(fr.isFlushSucceeded());
5054      assertFalse(fr.isCompactionNeeded());
5055    }
5056
5057    // Two flushes after the threshold, compactions are needed
5058    for (int i = 0; i < 2; i++) {
5059      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
5060      region.put(put);
5061      fr = region.flush(true);
5062      assertTrue(fr.isFlushSucceeded());
5063      assertTrue(fr.isCompactionNeeded());
5064    }
5065  }
5066
5067  protected Configuration initSplit() {
5068    // Always compact if there is more than one store file.
5069    CONF.setInt("hbase.hstore.compactionThreshold", 2);
5070
5071    CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
5072
5073    // Increase the amount of time between client retries
5074    CONF.setLong("hbase.client.pause", 15 * 1000);
5075
5076    // This size should make it so we always split using the addContent
5077    // below. After adding all data, the first region is 1.3M
5078    CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
5079    return CONF;
5080  }
5081
5082  /**
5083   * @return A region on which you must call
5084   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5085   */
5086  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
5087      byte[]... families) throws IOException {
5088    return initHRegion(tableName, callingMethod, conf, false, families);
5089  }
5090
5091  /**
5092   * @return A region on which you must call
5093   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5094   */
5095  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
5096      boolean isReadOnly, byte[]... families) throws IOException {
5097    return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
5098  }
5099
5100  protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
5101      String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
5102      throws IOException {
5103    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
5104    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
5105    HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
5106    final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
5107    return initHRegion(tableName, startKey, stopKey, isReadOnly,
5108        Durability.SYNC_WAL, wal, families);
5109  }
5110
5111  /**
5112   * @return A region on which you must call
5113   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5114   */
5115  public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
5116      boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
5117    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey,
5118        isReadOnly, durability, wal, families);
5119  }
5120
5121  /**
5122   * Assert that the passed in Cell has expected contents for the specified row,
5123   * column & timestamp.
5124   */
5125  private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
5126    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
5127    assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx,
5128        Bytes.toString(CellUtil.cloneRow(kv)));
5129    assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf),
5130        Bytes.toString(CellUtil.cloneFamily(kv)));
5131    assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx,
5132        Bytes.toString(CellUtil.cloneQualifier(kv)));
5133    assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp());
5134    assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
5135        Bytes.toString(CellUtil.cloneValue(kv)));
5136  }
5137
5138  @Test
5139  public void testReverseScanner_FromMemStore_SingleCF_Normal()
5140      throws IOException {
5141    byte[] rowC = Bytes.toBytes("rowC");
5142    byte[] rowA = Bytes.toBytes("rowA");
5143    byte[] rowB = Bytes.toBytes("rowB");
5144    byte[] cf = Bytes.toBytes("CF");
5145    byte[][] families = { cf };
5146    byte[] col = Bytes.toBytes("C");
5147    long ts = 1;
5148    this.region = initHRegion(tableName, method, families);
5149    try {
5150      KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5151      KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5152          null);
5153      KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5154      KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5155      Put put = null;
5156      put = new Put(rowC);
5157      put.add(kv1);
5158      put.add(kv11);
5159      region.put(put);
5160      put = new Put(rowA);
5161      put.add(kv2);
5162      region.put(put);
5163      put = new Put(rowB);
5164      put.add(kv3);
5165      region.put(put);
5166
5167      Scan scan = new Scan(rowC);
5168      scan.setMaxVersions(5);
5169      scan.setReversed(true);
5170      InternalScanner scanner = region.getScanner(scan);
5171      List<Cell> currRow = new ArrayList<>();
5172      boolean hasNext = scanner.next(currRow);
5173      assertEquals(2, currRow.size());
5174      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5175          .get(0).getRowLength(), rowC, 0, rowC.length));
5176      assertTrue(hasNext);
5177      currRow.clear();
5178      hasNext = scanner.next(currRow);
5179      assertEquals(1, currRow.size());
5180      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5181          .get(0).getRowLength(), rowB, 0, rowB.length));
5182      assertTrue(hasNext);
5183      currRow.clear();
5184      hasNext = scanner.next(currRow);
5185      assertEquals(1, currRow.size());
5186      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5187          .get(0).getRowLength(), rowA, 0, rowA.length));
5188      assertFalse(hasNext);
5189      scanner.close();
5190    } finally {
5191      HBaseTestingUtility.closeRegionAndWAL(this.region);
5192      this.region = null;
5193    }
5194  }
5195
5196  @Test
5197  public void testReverseScanner_FromMemStore_SingleCF_LargerKey()
5198      throws IOException {
5199    byte[] rowC = Bytes.toBytes("rowC");
5200    byte[] rowA = Bytes.toBytes("rowA");
5201    byte[] rowB = Bytes.toBytes("rowB");
5202    byte[] rowD = Bytes.toBytes("rowD");
5203    byte[] cf = Bytes.toBytes("CF");
5204    byte[][] families = { cf };
5205    byte[] col = Bytes.toBytes("C");
5206    long ts = 1;
5207    this.region = initHRegion(tableName, method, families);
5208    try {
5209      KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5210      KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5211          null);
5212      KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5213      KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5214      Put put = null;
5215      put = new Put(rowC);
5216      put.add(kv1);
5217      put.add(kv11);
5218      region.put(put);
5219      put = new Put(rowA);
5220      put.add(kv2);
5221      region.put(put);
5222      put = new Put(rowB);
5223      put.add(kv3);
5224      region.put(put);
5225
5226      Scan scan = new Scan(rowD);
5227      List<Cell> currRow = new ArrayList<>();
5228      scan.setReversed(true);
5229      scan.setMaxVersions(5);
5230      InternalScanner scanner = region.getScanner(scan);
5231      boolean hasNext = scanner.next(currRow);
5232      assertEquals(2, currRow.size());
5233      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5234          .get(0).getRowLength(), rowC, 0, rowC.length));
5235      assertTrue(hasNext);
5236      currRow.clear();
5237      hasNext = scanner.next(currRow);
5238      assertEquals(1, currRow.size());
5239      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5240          .get(0).getRowLength(), rowB, 0, rowB.length));
5241      assertTrue(hasNext);
5242      currRow.clear();
5243      hasNext = scanner.next(currRow);
5244      assertEquals(1, currRow.size());
5245      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5246          .get(0).getRowLength(), rowA, 0, rowA.length));
5247      assertFalse(hasNext);
5248      scanner.close();
5249    } finally {
5250      HBaseTestingUtility.closeRegionAndWAL(this.region);
5251      this.region = null;
5252    }
5253  }
5254
5255  @Test
5256  public void testReverseScanner_FromMemStore_SingleCF_FullScan()
5257      throws IOException {
5258    byte[] rowC = Bytes.toBytes("rowC");
5259    byte[] rowA = Bytes.toBytes("rowA");
5260    byte[] rowB = Bytes.toBytes("rowB");
5261    byte[] cf = Bytes.toBytes("CF");
5262    byte[][] families = { cf };
5263    byte[] col = Bytes.toBytes("C");
5264    long ts = 1;
5265    this.region = initHRegion(tableName, method, families);
5266    try {
5267      KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5268      KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5269          null);
5270      KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5271      KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5272      Put put = null;
5273      put = new Put(rowC);
5274      put.add(kv1);
5275      put.add(kv11);
5276      region.put(put);
5277      put = new Put(rowA);
5278      put.add(kv2);
5279      region.put(put);
5280      put = new Put(rowB);
5281      put.add(kv3);
5282      region.put(put);
5283      Scan scan = new Scan();
5284      List<Cell> currRow = new ArrayList<>();
5285      scan.setReversed(true);
5286      InternalScanner scanner = region.getScanner(scan);
5287      boolean hasNext = scanner.next(currRow);
5288      assertEquals(1, currRow.size());
5289      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5290          .get(0).getRowLength(), rowC, 0, rowC.length));
5291      assertTrue(hasNext);
5292      currRow.clear();
5293      hasNext = scanner.next(currRow);
5294      assertEquals(1, currRow.size());
5295      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5296          .get(0).getRowLength(), rowB, 0, rowB.length));
5297      assertTrue(hasNext);
5298      currRow.clear();
5299      hasNext = scanner.next(currRow);
5300      assertEquals(1, currRow.size());
5301      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5302          .get(0).getRowLength(), rowA, 0, rowA.length));
5303      assertFalse(hasNext);
5304      scanner.close();
5305    } finally {
5306      HBaseTestingUtility.closeRegionAndWAL(this.region);
5307      this.region = null;
5308    }
5309  }
5310
5311  @Test
5312  public void testReverseScanner_moreRowsMayExistAfter() throws IOException {
5313    // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop
5314    byte[] rowA = Bytes.toBytes("rowA");
5315    byte[] rowB = Bytes.toBytes("rowB");
5316    byte[] rowC = Bytes.toBytes("rowC");
5317    byte[] rowD = Bytes.toBytes("rowD");
5318    byte[] rowE = Bytes.toBytes("rowE");
5319    byte[] cf = Bytes.toBytes("CF");
5320    byte[][] families = { cf };
5321    byte[] col1 = Bytes.toBytes("col1");
5322    byte[] col2 = Bytes.toBytes("col2");
5323    long ts = 1;
5324    this.region = initHRegion(tableName, method, families);
5325    try {
5326      KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5327      KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5328      KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5329      KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5330      KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5331      KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5332      Put put = null;
5333      put = new Put(rowA);
5334      put.add(kv1);
5335      region.put(put);
5336      put = new Put(rowB);
5337      put.add(kv2);
5338      region.put(put);
5339      put = new Put(rowC);
5340      put.add(kv3);
5341      region.put(put);
5342      put = new Put(rowD);
5343      put.add(kv4_1);
5344      region.put(put);
5345      put = new Put(rowD);
5346      put.add(kv4_2);
5347      region.put(put);
5348      put = new Put(rowE);
5349      put.add(kv5);
5350      region.put(put);
5351      region.flush(true);
5352      Scan scan = new Scan(rowD, rowA);
5353      scan.addColumn(families[0], col1);
5354      scan.setReversed(true);
5355      List<Cell> currRow = new ArrayList<>();
5356      InternalScanner scanner = region.getScanner(scan);
5357      boolean hasNext = scanner.next(currRow);
5358      assertEquals(1, currRow.size());
5359      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5360          .get(0).getRowLength(), rowD, 0, rowD.length));
5361      assertTrue(hasNext);
5362      currRow.clear();
5363      hasNext = scanner.next(currRow);
5364      assertEquals(1, currRow.size());
5365      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5366          .get(0).getRowLength(), rowC, 0, rowC.length));
5367      assertTrue(hasNext);
5368      currRow.clear();
5369      hasNext = scanner.next(currRow);
5370      assertEquals(1, currRow.size());
5371      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5372          .get(0).getRowLength(), rowB, 0, rowB.length));
5373      assertFalse(hasNext);
5374      scanner.close();
5375
5376      scan = new Scan(rowD, rowA);
5377      scan.addColumn(families[0], col2);
5378      scan.setReversed(true);
5379      currRow.clear();
5380      scanner = region.getScanner(scan);
5381      hasNext = scanner.next(currRow);
5382      assertEquals(1, currRow.size());
5383      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5384          .get(0).getRowLength(), rowD, 0, rowD.length));
5385      scanner.close();
5386    } finally {
5387      HBaseTestingUtility.closeRegionAndWAL(this.region);
5388      this.region = null;
5389    }
5390  }
5391
5392  @Test
5393  public void testReverseScanner_smaller_blocksize() throws IOException {
5394    // case to ensure no conflict with HFile index optimization
5395    byte[] rowA = Bytes.toBytes("rowA");
5396    byte[] rowB = Bytes.toBytes("rowB");
5397    byte[] rowC = Bytes.toBytes("rowC");
5398    byte[] rowD = Bytes.toBytes("rowD");
5399    byte[] rowE = Bytes.toBytes("rowE");
5400    byte[] cf = Bytes.toBytes("CF");
5401    byte[][] families = { cf };
5402    byte[] col1 = Bytes.toBytes("col1");
5403    byte[] col2 = Bytes.toBytes("col2");
5404    long ts = 1;
5405    HBaseConfiguration config = new HBaseConfiguration();
5406    config.setInt("test.block.size", 1);
5407    this.region = initHRegion(tableName, method, config, families);
5408    try {
5409      KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5410      KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5411      KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5412      KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5413      KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5414      KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5415      Put put = null;
5416      put = new Put(rowA);
5417      put.add(kv1);
5418      region.put(put);
5419      put = new Put(rowB);
5420      put.add(kv2);
5421      region.put(put);
5422      put = new Put(rowC);
5423      put.add(kv3);
5424      region.put(put);
5425      put = new Put(rowD);
5426      put.add(kv4_1);
5427      region.put(put);
5428      put = new Put(rowD);
5429      put.add(kv4_2);
5430      region.put(put);
5431      put = new Put(rowE);
5432      put.add(kv5);
5433      region.put(put);
5434      region.flush(true);
5435      Scan scan = new Scan(rowD, rowA);
5436      scan.addColumn(families[0], col1);
5437      scan.setReversed(true);
5438      List<Cell> currRow = new ArrayList<>();
5439      InternalScanner scanner = region.getScanner(scan);
5440      boolean hasNext = scanner.next(currRow);
5441      assertEquals(1, currRow.size());
5442      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5443          .get(0).getRowLength(), rowD, 0, rowD.length));
5444      assertTrue(hasNext);
5445      currRow.clear();
5446      hasNext = scanner.next(currRow);
5447      assertEquals(1, currRow.size());
5448      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5449          .get(0).getRowLength(), rowC, 0, rowC.length));
5450      assertTrue(hasNext);
5451      currRow.clear();
5452      hasNext = scanner.next(currRow);
5453      assertEquals(1, currRow.size());
5454      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5455          .get(0).getRowLength(), rowB, 0, rowB.length));
5456      assertFalse(hasNext);
5457      scanner.close();
5458
5459      scan = new Scan(rowD, rowA);
5460      scan.addColumn(families[0], col2);
5461      scan.setReversed(true);
5462      currRow.clear();
5463      scanner = region.getScanner(scan);
5464      hasNext = scanner.next(currRow);
5465      assertEquals(1, currRow.size());
5466      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5467          .get(0).getRowLength(), rowD, 0, rowD.length));
5468      scanner.close();
5469    } finally {
5470      HBaseTestingUtility.closeRegionAndWAL(this.region);
5471      this.region = null;
5472    }
5473  }
5474
5475  @Test
5476  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1()
5477      throws IOException {
5478    byte[] row0 = Bytes.toBytes("row0"); // 1 kv
5479    byte[] row1 = Bytes.toBytes("row1"); // 2 kv
5480    byte[] row2 = Bytes.toBytes("row2"); // 4 kv
5481    byte[] row3 = Bytes.toBytes("row3"); // 2 kv
5482    byte[] row4 = Bytes.toBytes("row4"); // 5 kv
5483    byte[] row5 = Bytes.toBytes("row5"); // 2 kv
5484    byte[] cf1 = Bytes.toBytes("CF1");
5485    byte[] cf2 = Bytes.toBytes("CF2");
5486    byte[] cf3 = Bytes.toBytes("CF3");
5487    byte[][] families = { cf1, cf2, cf3 };
5488    byte[] col = Bytes.toBytes("C");
5489    long ts = 1;
5490    HBaseConfiguration conf = new HBaseConfiguration();
5491    // disable compactions in this test.
5492    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5493    this.region = initHRegion(tableName, method, conf, families);
5494    try {
5495      // kv naming style: kv(row number) totalKvCountInThisRow seq no
5496      KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put,
5497          null);
5498      KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put,
5499          null);
5500      KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1,
5501          KeyValue.Type.Put, null);
5502      KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put,
5503          null);
5504      KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put,
5505          null);
5506      KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put,
5507          null);
5508      KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4,
5509          KeyValue.Type.Put, null);
5510      KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put,
5511          null);
5512      KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4,
5513          KeyValue.Type.Put, null);
5514      KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put,
5515          null);
5516      KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put,
5517          null);
5518      KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5,
5519          KeyValue.Type.Put, null);
5520      KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put,
5521          null);
5522      KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3,
5523          KeyValue.Type.Put, null);
5524      KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put,
5525          null);
5526      KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put,
5527          null);
5528      // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv)
5529      Put put = null;
5530      put = new Put(row1);
5531      put.add(kv1_2_1);
5532      region.put(put);
5533      put = new Put(row2);
5534      put.add(kv2_4_1);
5535      region.put(put);
5536      put = new Put(row4);
5537      put.add(kv4_5_4);
5538      put.add(kv4_5_5);
5539      region.put(put);
5540      region.flush(true);
5541      // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv)
5542      put = new Put(row4);
5543      put.add(kv4_5_1);
5544      put.add(kv4_5_3);
5545      region.put(put);
5546      put = new Put(row1);
5547      put.add(kv1_2_2);
5548      region.put(put);
5549      put = new Put(row2);
5550      put.add(kv2_4_4);
5551      region.put(put);
5552      region.flush(true);
5553      // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv)
5554      put = new Put(row4);
5555      put.add(kv4_5_2);
5556      region.put(put);
5557      put = new Put(row2);
5558      put.add(kv2_4_2);
5559      put.add(kv2_4_3);
5560      region.put(put);
5561      put = new Put(row3);
5562      put.add(kv3_2_2);
5563      region.put(put);
5564      region.flush(true);
5565      // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max)
5566      // ( 2 kv)
5567      put = new Put(row0);
5568      put.add(kv0_1_1);
5569      region.put(put);
5570      put = new Put(row3);
5571      put.add(kv3_2_1);
5572      region.put(put);
5573      put = new Put(row5);
5574      put.add(kv5_2_1);
5575      put.add(kv5_2_2);
5576      region.put(put);
5577      // scan range = ["row4", min), skip the max "row5"
5578      Scan scan = new Scan(row4);
5579      scan.setMaxVersions(5);
5580      scan.setBatch(3);
5581      scan.setReversed(true);
5582      InternalScanner scanner = region.getScanner(scan);
5583      List<Cell> currRow = new ArrayList<>();
5584      boolean hasNext = false;
5585      // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not
5586      // included in scan range
5587      // "row4" takes 2 next() calls since batch=3
5588      hasNext = scanner.next(currRow);
5589      assertEquals(3, currRow.size());
5590      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5591          .get(0).getRowLength(), row4, 0, row4.length));
5592      assertTrue(hasNext);
5593      currRow.clear();
5594      hasNext = scanner.next(currRow);
5595      assertEquals(2, currRow.size());
5596      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
5597          currRow.get(0).getRowLength(), row4, 0,
5598        row4.length));
5599      assertTrue(hasNext);
5600      // 2. scan out "row3" (2 kv)
5601      currRow.clear();
5602      hasNext = scanner.next(currRow);
5603      assertEquals(2, currRow.size());
5604      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5605          .get(0).getRowLength(), row3, 0, row3.length));
5606      assertTrue(hasNext);
5607      // 3. scan out "row2" (4 kvs)
5608      // "row2" takes 2 next() calls since batch=3
5609      currRow.clear();
5610      hasNext = scanner.next(currRow);
5611      assertEquals(3, currRow.size());
5612      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5613          .get(0).getRowLength(), row2, 0, row2.length));
5614      assertTrue(hasNext);
5615      currRow.clear();
5616      hasNext = scanner.next(currRow);
5617      assertEquals(1, currRow.size());
5618      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5619          .get(0).getRowLength(), row2, 0, row2.length));
5620      assertTrue(hasNext);
5621      // 4. scan out "row1" (2 kv)
5622      currRow.clear();
5623      hasNext = scanner.next(currRow);
5624      assertEquals(2, currRow.size());
5625      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5626          .get(0).getRowLength(), row1, 0, row1.length));
5627      assertTrue(hasNext);
5628      // 5. scan out "row0" (1 kv)
5629      currRow.clear();
5630      hasNext = scanner.next(currRow);
5631      assertEquals(1, currRow.size());
5632      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5633          .get(0).getRowLength(), row0, 0, row0.length));
5634      assertFalse(hasNext);
5635
5636      scanner.close();
5637    } finally {
5638      HBaseTestingUtility.closeRegionAndWAL(this.region);
5639      this.region = null;
5640    }
5641  }
5642
5643  @Test
5644  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2()
5645      throws IOException {
5646    byte[] row1 = Bytes.toBytes("row1");
5647    byte[] row2 = Bytes.toBytes("row2");
5648    byte[] row3 = Bytes.toBytes("row3");
5649    byte[] row4 = Bytes.toBytes("row4");
5650    byte[] cf1 = Bytes.toBytes("CF1");
5651    byte[] cf2 = Bytes.toBytes("CF2");
5652    byte[] cf3 = Bytes.toBytes("CF3");
5653    byte[] cf4 = Bytes.toBytes("CF4");
5654    byte[][] families = { cf1, cf2, cf3, cf4 };
5655    byte[] col = Bytes.toBytes("C");
5656    long ts = 1;
5657    HBaseConfiguration conf = new HBaseConfiguration();
5658    // disable compactions in this test.
5659    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5660    this.region = initHRegion(tableName, method, conf, families);
5661    try {
5662      KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null);
5663      KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null);
5664      KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null);
5665      KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null);
5666      // storefile1
5667      Put put = new Put(row1);
5668      put.add(kv1);
5669      region.put(put);
5670      region.flush(true);
5671      // storefile2
5672      put = new Put(row2);
5673      put.add(kv2);
5674      region.put(put);
5675      region.flush(true);
5676      // storefile3
5677      put = new Put(row3);
5678      put.add(kv3);
5679      region.put(put);
5680      region.flush(true);
5681      // memstore
5682      put = new Put(row4);
5683      put.add(kv4);
5684      region.put(put);
5685      // scan range = ["row4", min)
5686      Scan scan = new Scan(row4);
5687      scan.setReversed(true);
5688      scan.setBatch(10);
5689      InternalScanner scanner = region.getScanner(scan);
5690      List<Cell> currRow = new ArrayList<>();
5691      boolean hasNext = scanner.next(currRow);
5692      assertEquals(1, currRow.size());
5693      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5694          .get(0).getRowLength(), row4, 0, row4.length));
5695      assertTrue(hasNext);
5696      currRow.clear();
5697      hasNext = scanner.next(currRow);
5698      assertEquals(1, currRow.size());
5699      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5700          .get(0).getRowLength(), row3, 0, row3.length));
5701      assertTrue(hasNext);
5702      currRow.clear();
5703      hasNext = scanner.next(currRow);
5704      assertEquals(1, currRow.size());
5705      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5706          .get(0).getRowLength(), row2, 0, row2.length));
5707      assertTrue(hasNext);
5708      currRow.clear();
5709      hasNext = scanner.next(currRow);
5710      assertEquals(1, currRow.size());
5711      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5712          .get(0).getRowLength(), row1, 0, row1.length));
5713      assertFalse(hasNext);
5714    } finally {
5715      HBaseTestingUtility.closeRegionAndWAL(this.region);
5716      this.region = null;
5717    }
5718  }
5719
5720  /**
5721   * Test for HBASE-14497: Reverse Scan threw StackOverflow caused by readPt checking
5722   */
5723  @Test
5724  public void testReverseScanner_StackOverflow() throws IOException {
5725    byte[] cf1 = Bytes.toBytes("CF1");
5726    byte[][] families = {cf1};
5727    byte[] col = Bytes.toBytes("C");
5728    HBaseConfiguration conf = new HBaseConfiguration();
5729    this.region = initHRegion(tableName, method, conf, families);
5730    try {
5731      // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5732      Put put = new Put(Bytes.toBytes("19998"));
5733      put.addColumn(cf1, col, Bytes.toBytes("val"));
5734      region.put(put);
5735      region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5736      Put put2 = new Put(Bytes.toBytes("19997"));
5737      put2.addColumn(cf1, col, Bytes.toBytes("val"));
5738      region.put(put2);
5739
5740      Scan scan = new Scan(Bytes.toBytes("19998"));
5741      scan.setReversed(true);
5742      InternalScanner scanner = region.getScanner(scan);
5743
5744      // create one storefile contains many rows will be skipped
5745      // to check StoreFileScanner.seekToPreviousRow
5746      for (int i = 10000; i < 20000; i++) {
5747        Put p = new Put(Bytes.toBytes(""+i));
5748        p.addColumn(cf1, col, Bytes.toBytes("" + i));
5749        region.put(p);
5750      }
5751      region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5752
5753      // create one memstore contains many rows will be skipped
5754      // to check MemStoreScanner.seekToPreviousRow
5755      for (int i = 10000; i < 20000; i++) {
5756        Put p = new Put(Bytes.toBytes(""+i));
5757        p.addColumn(cf1, col, Bytes.toBytes("" + i));
5758        region.put(p);
5759      }
5760
5761      List<Cell> currRow = new ArrayList<>();
5762      boolean hasNext;
5763      do {
5764        hasNext = scanner.next(currRow);
5765      } while (hasNext);
5766      assertEquals(2, currRow.size());
5767      assertEquals("19998", Bytes.toString(currRow.get(0).getRowArray(),
5768        currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5769      assertEquals("19997", Bytes.toString(currRow.get(1).getRowArray(),
5770        currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5771    } finally {
5772      HBaseTestingUtility.closeRegionAndWAL(this.region);
5773      this.region = null;
5774    }
5775  }
5776
5777  @Test
5778  public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception {
5779    byte[] cf1 = Bytes.toBytes("CF1");
5780    byte[][] families = { cf1 };
5781    byte[] col = Bytes.toBytes("C");
5782    HBaseConfiguration conf = new HBaseConfiguration();
5783    this.region = initHRegion(tableName, method, conf, families);
5784    try {
5785      // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5786      Put put = new Put(Bytes.toBytes("19996"));
5787      put.addColumn(cf1, col, Bytes.toBytes("val"));
5788      region.put(put);
5789      Put put2 = new Put(Bytes.toBytes("19995"));
5790      put2.addColumn(cf1, col, Bytes.toBytes("val"));
5791      region.put(put2);
5792      // create a reverse scan
5793      Scan scan = new Scan(Bytes.toBytes("19996"));
5794      scan.setReversed(true);
5795      RegionScannerImpl scanner = region.getScanner(scan);
5796
5797      // flush the cache. This will reset the store scanner
5798      region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5799
5800      // create one memstore contains many rows will be skipped
5801      // to check MemStoreScanner.seekToPreviousRow
5802      for (int i = 10000; i < 20000; i++) {
5803        Put p = new Put(Bytes.toBytes("" + i));
5804        p.addColumn(cf1, col, Bytes.toBytes("" + i));
5805        region.put(p);
5806      }
5807      List<Cell> currRow = new ArrayList<>();
5808      boolean hasNext;
5809      boolean assertDone = false;
5810      do {
5811        hasNext = scanner.next(currRow);
5812        // With HBASE-15871, after the scanner is reset the memstore scanner should not be
5813        // added here
5814        if (!assertDone) {
5815          StoreScanner current =
5816              (StoreScanner) (scanner.storeHeap).getCurrentForTesting();
5817          List<KeyValueScanner> scanners = current.getAllScannersForTesting();
5818          assertEquals("There should be only one scanner the store file scanner", 1,
5819            scanners.size());
5820          assertDone = true;
5821        }
5822      } while (hasNext);
5823      assertEquals(2, currRow.size());
5824      assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(),
5825        currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5826      assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(),
5827        currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5828    } finally {
5829      HBaseTestingUtility.closeRegionAndWAL(this.region);
5830      this.region = null;
5831    }
5832  }
5833
5834  @Test
5835  public void testReverseScanWhenPutCellsAfterOpenReverseScan() throws Exception {
5836    byte[] cf1 = Bytes.toBytes("CF1");
5837    byte[][] families = { cf1 };
5838    byte[] col = Bytes.toBytes("C");
5839
5840    HBaseConfiguration conf = new HBaseConfiguration();
5841    this.region = initHRegion(tableName, method, conf, families);
5842
5843    Put put = new Put(Bytes.toBytes("199996"));
5844    put.addColumn(cf1, col, Bytes.toBytes("val"));
5845    region.put(put);
5846    Put put2 = new Put(Bytes.toBytes("199995"));
5847    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5848    region.put(put2);
5849
5850    // Create a reverse scan
5851    Scan scan = new Scan(Bytes.toBytes("199996"));
5852    scan.setReversed(true);
5853    RegionScannerImpl scanner = region.getScanner(scan);
5854
5855    // Put a lot of cells that have sequenceIDs grater than the readPt of the reverse scan
5856    for (int i = 100000; i < 200000; i++) {
5857      Put p = new Put(Bytes.toBytes("" + i));
5858      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5859      region.put(p);
5860    }
5861    List<Cell> currRow = new ArrayList<>();
5862    boolean hasNext;
5863    do {
5864      hasNext = scanner.next(currRow);
5865    } while (hasNext);
5866
5867    assertEquals(2, currRow.size());
5868    assertEquals("199996", Bytes.toString(currRow.get(0).getRowArray(),
5869      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5870    assertEquals("199995", Bytes.toString(currRow.get(1).getRowArray(),
5871      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5872  }
5873
5874  @Test
5875  public void testWriteRequestsCounter() throws IOException {
5876    byte[] fam = Bytes.toBytes("info");
5877    byte[][] families = { fam };
5878    this.region = initHRegion(tableName, method, CONF, families);
5879
5880    Assert.assertEquals(0L, region.getWriteRequestsCount());
5881
5882    Put put = new Put(row);
5883    put.addColumn(fam, fam, fam);
5884
5885    Assert.assertEquals(0L, region.getWriteRequestsCount());
5886    region.put(put);
5887    Assert.assertEquals(1L, region.getWriteRequestsCount());
5888    region.put(put);
5889    Assert.assertEquals(2L, region.getWriteRequestsCount());
5890    region.put(put);
5891    Assert.assertEquals(3L, region.getWriteRequestsCount());
5892
5893    region.delete(new Delete(row));
5894    Assert.assertEquals(4L, region.getWriteRequestsCount());
5895
5896    HBaseTestingUtility.closeRegionAndWAL(this.region);
5897    this.region = null;
5898  }
5899
5900  @Test
5901  public void testOpenRegionWrittenToWAL() throws Exception {
5902    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
5903    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5904
5905    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
5906    htd.addFamily(new HColumnDescriptor(fam1));
5907    htd.addFamily(new HColumnDescriptor(fam2));
5908
5909    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
5910      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
5911
5912    // open the region w/o rss and wal and flush some files
5913    HRegion region =
5914         HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
5915             .getConfiguration(), htd);
5916    assertNotNull(region);
5917
5918    // create a file in fam1 for the region before opening in OpenRegionHandler
5919    region.put(new Put(Bytes.toBytes("a")).addColumn(fam1, fam1, fam1));
5920    region.flush(true);
5921    HBaseTestingUtility.closeRegionAndWAL(region);
5922
5923    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5924
5925    // capture append() calls
5926    WAL wal = mockWAL();
5927    when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
5928
5929    try {
5930      region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
5931        TEST_UTIL.getConfiguration(), rss, null);
5932
5933      verify(wal, times(1)).append((HRegionInfo)any(), (WALKeyImpl)any()
5934        , editCaptor.capture(), anyBoolean());
5935
5936      WALEdit edit = editCaptor.getValue();
5937      assertNotNull(edit);
5938      assertNotNull(edit.getCells());
5939      assertEquals(1, edit.getCells().size());
5940      RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
5941      assertNotNull(desc);
5942
5943      LOG.info("RegionEventDescriptor from WAL: " + desc);
5944
5945      assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
5946      assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
5947      assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
5948        hri.getEncodedNameAsBytes()));
5949      assertTrue(desc.getLogSequenceNumber() > 0);
5950      assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
5951      assertEquals(2, desc.getStoresCount());
5952
5953      StoreDescriptor store = desc.getStores(0);
5954      assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
5955      assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
5956      assertEquals(1, store.getStoreFileCount()); // 1store file
5957      assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
5958
5959      store = desc.getStores(1);
5960      assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
5961      assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
5962      assertEquals(0, store.getStoreFileCount()); // no store files
5963
5964    } finally {
5965      HBaseTestingUtility.closeRegionAndWAL(region);
5966    }
5967  }
5968
5969  // Helper for test testOpenRegionWrittenToWALForLogReplay
5970  static class HRegionWithSeqId extends HRegion {
5971    public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs,
5972        final Configuration confParam, final RegionInfo regionInfo,
5973        final TableDescriptor htd, final RegionServerServices rsServices) {
5974      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
5975    }
5976    @Override
5977    protected long getNextSequenceId(WAL wal) throws IOException {
5978      return 42;
5979    }
5980  }
5981
5982  @Test
5983  public void testFlushedFileWithNoTags() throws Exception {
5984    final TableName tableName = TableName.valueOf(name.getMethodName());
5985    HTableDescriptor htd = new HTableDescriptor(tableName);
5986    htd.addFamily(new HColumnDescriptor(fam1));
5987    HRegionInfo info = new HRegionInfo(tableName, null, null, false);
5988    Path path = TEST_UTIL.getDataTestDir(getClass().getSimpleName());
5989    region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
5990    Put put = new Put(Bytes.toBytes("a-b-0-0"));
5991    put.addColumn(fam1, qual1, Bytes.toBytes("c1-value"));
5992    region.put(put);
5993    region.flush(true);
5994    HStore store = region.getStore(fam1);
5995    Collection<HStoreFile> storefiles = store.getStorefiles();
5996    for (HStoreFile sf : storefiles) {
5997      assertFalse("Tags should not be present "
5998          ,sf.getReader().getHFileReader().getFileContext().isIncludesTags());
5999    }
6000  }
6001
6002  /**
6003   * Utility method to setup a WAL mock.
6004   * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs.
6005   * @return a mock WAL
6006   * @throws IOException
6007   */
6008  private WAL mockWAL() throws IOException {
6009    WAL wal = mock(WAL.class);
6010    Mockito.when(wal.append((HRegionInfo)Mockito.any(),
6011        (WALKeyImpl)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())).
6012      thenAnswer(new Answer<Long>() {
6013        @Override
6014        public Long answer(InvocationOnMock invocation) throws Throwable {
6015          WALKeyImpl key = invocation.getArgument(1);
6016          MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
6017          key.setWriteEntry(we);
6018          return 1L;
6019        }
6020
6021    });
6022    return wal;
6023  }
6024
6025  @Test
6026  public void testCloseRegionWrittenToWAL() throws Exception {
6027
6028    Path rootDir = new Path(dir + name.getMethodName());
6029    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
6030
6031    final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42);
6032    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6033
6034    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
6035    htd.addFamily(new HColumnDescriptor(fam1));
6036    htd.addFamily(new HColumnDescriptor(fam2));
6037
6038    final HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6039      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6040
6041    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
6042
6043    // capture append() calls
6044    WAL wal = mockWAL();
6045    when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
6046
6047
6048    // create and then open a region first so that it can be closed later
6049    region = HRegion.createHRegion(hri, rootDir, TEST_UTIL.getConfiguration(), htd, rss.getWAL(hri));
6050    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
6051      TEST_UTIL.getConfiguration(), rss, null);
6052
6053    // close the region
6054    region.close(false);
6055
6056    // 2 times, one for region open, the other close region
6057    verify(wal, times(2)).append((HRegionInfo)any(), (WALKeyImpl)any(),
6058      editCaptor.capture(), anyBoolean());
6059
6060    WALEdit edit = editCaptor.getAllValues().get(1);
6061    assertNotNull(edit);
6062    assertNotNull(edit.getCells());
6063    assertEquals(1, edit.getCells().size());
6064    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
6065    assertNotNull(desc);
6066
6067    LOG.info("RegionEventDescriptor from WAL: " + desc);
6068
6069    assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType());
6070    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
6071    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
6072      hri.getEncodedNameAsBytes()));
6073    assertTrue(desc.getLogSequenceNumber() > 0);
6074    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
6075    assertEquals(2, desc.getStoresCount());
6076
6077    StoreDescriptor store = desc.getStores(0);
6078    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
6079    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
6080    assertEquals(0, store.getStoreFileCount()); // no store files
6081
6082    store = desc.getStores(1);
6083    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
6084    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
6085    assertEquals(0, store.getStoreFileCount()); // no store files
6086  }
6087
6088  /**
6089   * Test RegionTooBusyException thrown when region is busy
6090   */
6091  @Test
6092  public void testRegionTooBusy() throws IOException {
6093    byte[] family = Bytes.toBytes("family");
6094    long defaultBusyWaitDuration = CONF.getLong("hbase.busy.wait.duration",
6095      HRegion.DEFAULT_BUSY_WAIT_DURATION);
6096    CONF.setLong("hbase.busy.wait.duration", 1000);
6097    region = initHRegion(tableName, method, CONF, family);
6098    final AtomicBoolean stopped = new AtomicBoolean(true);
6099    Thread t = new Thread(new Runnable() {
6100      @Override
6101      public void run() {
6102        try {
6103          region.lock.writeLock().lock();
6104          stopped.set(false);
6105          while (!stopped.get()) {
6106            Thread.sleep(100);
6107          }
6108        } catch (InterruptedException ie) {
6109        } finally {
6110          region.lock.writeLock().unlock();
6111        }
6112      }
6113    });
6114    t.start();
6115    Get get = new Get(row);
6116    try {
6117      while (stopped.get()) {
6118        Thread.sleep(100);
6119      }
6120      region.get(get);
6121      fail("Should throw RegionTooBusyException");
6122    } catch (InterruptedException ie) {
6123      fail("test interrupted");
6124    } catch (RegionTooBusyException e) {
6125      // Good, expected
6126    } finally {
6127      stopped.set(true);
6128      try {
6129        t.join();
6130      } catch (Throwable e) {
6131      }
6132
6133      HBaseTestingUtility.closeRegionAndWAL(region);
6134      region = null;
6135      CONF.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration);
6136    }
6137  }
6138
6139  @Test
6140  public void testCellTTLs() throws IOException {
6141    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
6142    EnvironmentEdgeManager.injectEdge(edge);
6143
6144    final byte[] row = Bytes.toBytes("testRow");
6145    final byte[] q1 = Bytes.toBytes("q1");
6146    final byte[] q2 = Bytes.toBytes("q2");
6147    final byte[] q3 = Bytes.toBytes("q3");
6148    final byte[] q4 = Bytes.toBytes("q4");
6149
6150    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
6151    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
6152    hcd.setTimeToLive(10); // 10 seconds
6153    htd.addFamily(hcd);
6154
6155    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
6156    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
6157
6158    HRegion region = HBaseTestingUtility.createRegionAndWAL(new HRegionInfo(htd.getTableName(),
6159            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
6160        TEST_UTIL.getDataTestDir(), conf, htd);
6161    assertNotNull(region);
6162    try {
6163      long now = EnvironmentEdgeManager.currentTime();
6164      // Add a cell that will expire in 5 seconds via cell TTL
6165      region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
6166        HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
6167          // TTL tags specify ts in milliseconds
6168          new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
6169      // Add a cell that will expire after 10 seconds via family setting
6170      region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
6171      // Add a cell that will expire in 15 seconds via cell TTL
6172      region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
6173        HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
6174          // TTL tags specify ts in milliseconds
6175          new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
6176      // Add a cell that will expire in 20 seconds via family setting
6177      region.put(new Put(row).addColumn(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
6178
6179      // Flush so we are sure store scanning gets this right
6180      region.flush(true);
6181
6182      // A query at time T+0 should return all cells
6183      Result r = region.get(new Get(row));
6184      assertNotNull(r.getValue(fam1, q1));
6185      assertNotNull(r.getValue(fam1, q2));
6186      assertNotNull(r.getValue(fam1, q3));
6187      assertNotNull(r.getValue(fam1, q4));
6188
6189      // Increment time to T+5 seconds
6190      edge.incrementTime(5000);
6191
6192      r = region.get(new Get(row));
6193      assertNull(r.getValue(fam1, q1));
6194      assertNotNull(r.getValue(fam1, q2));
6195      assertNotNull(r.getValue(fam1, q3));
6196      assertNotNull(r.getValue(fam1, q4));
6197
6198      // Increment time to T+10 seconds
6199      edge.incrementTime(5000);
6200
6201      r = region.get(new Get(row));
6202      assertNull(r.getValue(fam1, q1));
6203      assertNull(r.getValue(fam1, q2));
6204      assertNotNull(r.getValue(fam1, q3));
6205      assertNotNull(r.getValue(fam1, q4));
6206
6207      // Increment time to T+15 seconds
6208      edge.incrementTime(5000);
6209
6210      r = region.get(new Get(row));
6211      assertNull(r.getValue(fam1, q1));
6212      assertNull(r.getValue(fam1, q2));
6213      assertNull(r.getValue(fam1, q3));
6214      assertNotNull(r.getValue(fam1, q4));
6215
6216      // Increment time to T+20 seconds
6217      edge.incrementTime(10000);
6218
6219      r = region.get(new Get(row));
6220      assertNull(r.getValue(fam1, q1));
6221      assertNull(r.getValue(fam1, q2));
6222      assertNull(r.getValue(fam1, q3));
6223      assertNull(r.getValue(fam1, q4));
6224
6225      // Fun with disappearing increments
6226
6227      // Start at 1
6228      region.put(new Put(row).addColumn(fam1, q1, Bytes.toBytes(1L)));
6229      r = region.get(new Get(row));
6230      byte[] val = r.getValue(fam1, q1);
6231      assertNotNull(val);
6232      assertEquals(1L, Bytes.toLong(val));
6233
6234      // Increment with a TTL of 5 seconds
6235      Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
6236      incr.setTTL(5000);
6237      region.increment(incr); // 2
6238
6239      // New value should be 2
6240      r = region.get(new Get(row));
6241      val = r.getValue(fam1, q1);
6242      assertNotNull(val);
6243      assertEquals(2L, Bytes.toLong(val));
6244
6245      // Increment time to T+25 seconds
6246      edge.incrementTime(5000);
6247
6248      // Value should be back to 1
6249      r = region.get(new Get(row));
6250      val = r.getValue(fam1, q1);
6251      assertNotNull(val);
6252      assertEquals(1L, Bytes.toLong(val));
6253
6254      // Increment time to T+30 seconds
6255      edge.incrementTime(5000);
6256
6257      // Original value written at T+20 should be gone now via family TTL
6258      r = region.get(new Get(row));
6259      assertNull(r.getValue(fam1, q1));
6260
6261    } finally {
6262      HBaseTestingUtility.closeRegionAndWAL(region);
6263    }
6264  }
6265
6266  @Test
6267  public void testIncrementTimestampsAreMonotonic() throws IOException {
6268    HRegion region = initHRegion(tableName, method, CONF, fam1);
6269    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6270    EnvironmentEdgeManager.injectEdge(edge);
6271
6272    edge.setValue(10);
6273    Increment inc = new Increment(row);
6274    inc.setDurability(Durability.SKIP_WAL);
6275    inc.addColumn(fam1, qual1, 1L);
6276    region.increment(inc);
6277
6278    Result result = region.get(new Get(row));
6279    Cell c = result.getColumnLatestCell(fam1, qual1);
6280    assertNotNull(c);
6281    assertEquals(10L, c.getTimestamp());
6282
6283    edge.setValue(1); // clock goes back
6284    region.increment(inc);
6285    result = region.get(new Get(row));
6286    c = result.getColumnLatestCell(fam1, qual1);
6287    assertEquals(11L, c.getTimestamp());
6288    assertEquals(2L, Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6289  }
6290
6291  @Test
6292  public void testAppendTimestampsAreMonotonic() throws IOException {
6293    HRegion region = initHRegion(tableName, method, CONF, fam1);
6294    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6295    EnvironmentEdgeManager.injectEdge(edge);
6296
6297    edge.setValue(10);
6298    Append a = new Append(row);
6299    a.setDurability(Durability.SKIP_WAL);
6300    a.addColumn(fam1, qual1, qual1);
6301    region.append(a);
6302
6303    Result result = region.get(new Get(row));
6304    Cell c = result.getColumnLatestCell(fam1, qual1);
6305    assertNotNull(c);
6306    assertEquals(10L, c.getTimestamp());
6307
6308    edge.setValue(1); // clock goes back
6309    region.append(a);
6310    result = region.get(new Get(row));
6311    c = result.getColumnLatestCell(fam1, qual1);
6312    assertEquals(11L, c.getTimestamp());
6313
6314    byte[] expected = new byte[qual1.length*2];
6315    System.arraycopy(qual1, 0, expected, 0, qual1.length);
6316    System.arraycopy(qual1, 0, expected, qual1.length, qual1.length);
6317
6318    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6319      expected, 0, expected.length));
6320  }
6321
6322  @Test
6323  public void testCheckAndMutateTimestampsAreMonotonic() throws IOException {
6324    HRegion region = initHRegion(tableName, method, CONF, fam1);
6325    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6326    EnvironmentEdgeManager.injectEdge(edge);
6327
6328    edge.setValue(10);
6329    Put p = new Put(row);
6330    p.setDurability(Durability.SKIP_WAL);
6331    p.addColumn(fam1, qual1, qual1);
6332    region.put(p);
6333
6334    Result result = region.get(new Get(row));
6335    Cell c = result.getColumnLatestCell(fam1, qual1);
6336    assertNotNull(c);
6337    assertEquals(10L, c.getTimestamp());
6338
6339    edge.setValue(1); // clock goes back
6340    p = new Put(row);
6341    p.setDurability(Durability.SKIP_WAL);
6342    p.addColumn(fam1, qual1, qual2);
6343    region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p);
6344    result = region.get(new Get(row));
6345    c = result.getColumnLatestCell(fam1, qual1);
6346    assertEquals(10L, c.getTimestamp());
6347
6348    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6349      qual2, 0, qual2.length));
6350  }
6351
6352  @Test
6353  public void testBatchMutateWithWrongRegionException() throws Exception {
6354    final byte[] a = Bytes.toBytes("a");
6355    final byte[] b = Bytes.toBytes("b");
6356    final byte[] c = Bytes.toBytes("c"); // exclusive
6357
6358    int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000);
6359    CONF.setInt("hbase.rowlock.wait.duration", 1000);
6360    final HRegion region = initHRegion(tableName, a, c, method, CONF, false, fam1);
6361
6362    Mutation[] mutations = new Mutation[] {
6363        new Put(a)
6364            .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6365              .setRow(a)
6366              .setFamily(fam1)
6367              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6368              .setType(Cell.Type.Put)
6369              .build()),
6370        // this is outside the region boundary
6371        new Put(c).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6372              .setRow(c)
6373              .setFamily(fam1)
6374              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6375              .setType(Type.Put)
6376              .build()),
6377        new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6378              .setRow(b)
6379              .setFamily(fam1)
6380              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6381              .setType(Cell.Type.Put)
6382              .build())
6383    };
6384
6385    OperationStatus[] status = region.batchMutate(mutations);
6386    assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6387    assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, status[1].getOperationStatusCode());
6388    assertEquals(OperationStatusCode.SUCCESS, status[2].getOperationStatusCode());
6389
6390
6391    // test with a row lock held for a long time
6392    final CountDownLatch obtainedRowLock = new CountDownLatch(1);
6393    ExecutorService exec = Executors.newFixedThreadPool(2);
6394    Future<Void> f1 = exec.submit(new Callable<Void>() {
6395      @Override
6396      public Void call() throws Exception {
6397        LOG.info("Acquiring row lock");
6398        RowLock rl = region.getRowLock(b);
6399        obtainedRowLock.countDown();
6400        LOG.info("Waiting for 5 seconds before releasing lock");
6401        Threads.sleep(5000);
6402        LOG.info("Releasing row lock");
6403        rl.release();
6404        return null;
6405      }
6406    });
6407    obtainedRowLock.await(30, TimeUnit.SECONDS);
6408
6409    Future<Void> f2 = exec.submit(new Callable<Void>() {
6410      @Override
6411      public Void call() throws Exception {
6412        Mutation[] mutations = new Mutation[] {
6413            new Put(a).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6414                .setRow(a)
6415                .setFamily(fam1)
6416                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6417                .setType(Cell.Type.Put)
6418                .build()),
6419            new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6420                .setRow(b)
6421                .setFamily(fam1)
6422                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6423                .setType(Cell.Type.Put)
6424                .build()),
6425        };
6426
6427        // this will wait for the row lock, and it will eventually succeed
6428        OperationStatus[] status = region.batchMutate(mutations);
6429        assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6430        assertEquals(OperationStatusCode.SUCCESS, status[1].getOperationStatusCode());
6431        return null;
6432      }
6433    });
6434
6435    f1.get();
6436    f2.get();
6437
6438    CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
6439  }
6440
6441  @Test
6442  public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
6443    HRegion region = initHRegion(tableName, method, CONF, fam1);
6444    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6445    EnvironmentEdgeManager.injectEdge(edge);
6446
6447    edge.setValue(10);
6448    Put p = new Put(row);
6449    p.setDurability(Durability.SKIP_WAL);
6450    p.addColumn(fam1, qual1, qual1);
6451    region.put(p);
6452
6453    Result result = region.get(new Get(row));
6454    Cell c = result.getColumnLatestCell(fam1, qual1);
6455    assertNotNull(c);
6456    assertEquals(10L, c.getTimestamp());
6457
6458    edge.setValue(1); // clock goes back
6459    p = new Put(row);
6460    p.setDurability(Durability.SKIP_WAL);
6461    p.addColumn(fam1, qual1, qual2);
6462    RowMutations rm = new RowMutations(row);
6463    rm.add(p);
6464    assertTrue(region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL,
6465        new BinaryComparator(qual1), rm));
6466    result = region.get(new Get(row));
6467    c = result.getColumnLatestCell(fam1, qual1);
6468    assertEquals(10L, c.getTimestamp());
6469    LOG.info("c value " +
6470      Bytes.toStringBinary(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6471
6472    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6473      qual2, 0, qual2.length));
6474  }
6475
6476  HRegion initHRegion(TableName tableName, String callingMethod,
6477      byte[]... families) throws IOException {
6478    return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
6479        families);
6480  }
6481
6482  /**
6483   * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends
6484   * @throws IOException if IO error occurred during test
6485   */
6486  @Test
6487  public void testWritesWhileRollWriter() throws IOException {
6488    int testCount = 10;
6489    int numRows = 1024;
6490    int numFamilies = 2;
6491    int numQualifiers = 2;
6492    final byte[][] families = new byte[numFamilies][];
6493    for (int i = 0; i < numFamilies; i++) {
6494      families[i] = Bytes.toBytes("family" + i);
6495    }
6496    final byte[][] qualifiers = new byte[numQualifiers][];
6497    for (int i = 0; i < numQualifiers; i++) {
6498      qualifiers[i] = Bytes.toBytes("qual" + i);
6499    }
6500
6501    CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2);
6502    this.region = initHRegion(tableName, method, CONF, families);
6503    try {
6504      List<Thread> threads = new ArrayList<>();
6505      for (int i = 0; i < numRows; i++) {
6506        final int count = i;
6507        Thread t = new Thread(new Runnable() {
6508
6509          @Override
6510          public void run() {
6511            byte[] row = Bytes.toBytes("row" + count);
6512            Put put = new Put(row);
6513            put.setDurability(Durability.SYNC_WAL);
6514            byte[] value = Bytes.toBytes(String.valueOf(count));
6515            for (byte[] family : families) {
6516              for (byte[] qualifier : qualifiers) {
6517                put.addColumn(family, qualifier, count, value);
6518              }
6519            }
6520            try {
6521              region.put(put);
6522            } catch (IOException e) {
6523              throw new RuntimeException(e);
6524            }
6525          }
6526        });
6527        threads.add(t);
6528      }
6529      for (Thread t : threads) {
6530        t.start();
6531      }
6532
6533      for (int i = 0; i < testCount; i++) {
6534        region.getWAL().rollWriter();
6535        Thread.yield();
6536      }
6537    } finally {
6538      try {
6539        HBaseTestingUtility.closeRegionAndWAL(this.region);
6540        CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024);
6541      } catch (DroppedSnapshotException dse) {
6542        // We could get this on way out because we interrupt the background flusher and it could
6543        // fail anywhere causing a DSE over in the background flusher... only it is not properly
6544        // dealt with so could still be memory hanging out when we get to here -- memory we can't
6545        // flush because the accounting is 'off' since original DSE.
6546      }
6547      this.region = null;
6548    }
6549  }
6550
6551  @Test
6552  public void testMutateRow_WriteRequestCount() throws Exception {
6553    byte[] row1 = Bytes.toBytes("row1");
6554    byte[] fam1 = Bytes.toBytes("fam1");
6555    byte[] qf1 = Bytes.toBytes("qualifier");
6556    byte[] val1 = Bytes.toBytes("value1");
6557
6558    RowMutations rm = new RowMutations(row1);
6559    Put put = new Put(row1);
6560    put.addColumn(fam1, qf1, val1);
6561    rm.add(put);
6562
6563    this.region = initHRegion(tableName, method, CONF, fam1);
6564    try {
6565      long wrcBeforeMutate = this.region.writeRequestsCount.longValue();
6566      this.region.mutateRow(rm);
6567      long wrcAfterMutate = this.region.writeRequestsCount.longValue();
6568      Assert.assertEquals(wrcBeforeMutate + rm.getMutations().size(), wrcAfterMutate);
6569    } finally {
6570      HBaseTestingUtility.closeRegionAndWAL(this.region);
6571      this.region = null;
6572    }
6573  }
6574
6575  @Test
6576  public void testBulkLoadReplicationEnabled() throws IOException {
6577    TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
6578    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
6579    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6580
6581    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
6582    htd.addFamily(new HColumnDescriptor(fam1));
6583    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6584        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6585    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(),
6586        rss, null);
6587
6588    assertTrue(region.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false));
6589    String plugins = region.conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
6590    String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName();
6591    assertTrue(plugins.contains(replicationCoprocessorClass));
6592    assertTrue(region.getCoprocessorHost().
6593        getCoprocessors().contains(ReplicationObserver.class.getSimpleName()));
6594
6595    region.close();
6596  }
6597
6598  /**
6599   * The same as HRegion class, the only difference is that instantiateHStore will
6600   * create a different HStore - HStoreForTesting. [HBASE-8518]
6601   */
6602  public static class HRegionForTesting extends HRegion {
6603
6604    public HRegionForTesting(final Path tableDir, final WAL wal, final FileSystem fs,
6605                             final Configuration confParam, final RegionInfo regionInfo,
6606                             final TableDescriptor htd, final RegionServerServices rsServices) {
6607      this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
6608          wal, confParam, htd, rsServices);
6609    }
6610
6611    public HRegionForTesting(HRegionFileSystem fs, WAL wal,
6612                             Configuration confParam, TableDescriptor htd,
6613                             RegionServerServices rsServices) {
6614      super(fs, wal, confParam, htd, rsServices);
6615    }
6616
6617    /**
6618     * Create HStore instance.
6619     * @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting.
6620     */
6621    @Override
6622    protected HStore instantiateHStore(final ColumnFamilyDescriptor family) throws IOException {
6623      if (family.isMobEnabled()) {
6624        if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
6625          throw new IOException("A minimum HFile version of "
6626              + HFile.MIN_FORMAT_VERSION_WITH_TAGS
6627              + " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY
6628              + " accordingly.");
6629        }
6630        return new HMobStore(this, family, this.conf);
6631      }
6632      return new HStoreForTesting(this, family, this.conf);
6633    }
6634  }
6635
6636  /**
6637   * HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method
6638   * of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which
6639   * doesn't let hstore compaction complete. In the former edition, this config is set in
6640   * HStore class inside compact method, though this is just for testing, otherwise it
6641   * doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete"
6642   * config (except for testing code).
6643   */
6644  public static class HStoreForTesting extends HStore {
6645
6646    protected HStoreForTesting(final HRegion region,
6647        final ColumnFamilyDescriptor family,
6648        final Configuration confParam) throws IOException {
6649      super(region, family, confParam);
6650    }
6651
6652    @Override
6653    protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
6654        Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
6655        List<Path> newFiles) throws IOException {
6656      // let compaction incomplete.
6657      if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
6658        LOG.warn("hbase.hstore.compaction.complete is set to false");
6659        List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
6660        final boolean evictOnClose =
6661            cacheConf != null? cacheConf.shouldEvictOnClose(): true;
6662        for (Path newFile : newFiles) {
6663          // Create storefile around what we wrote with a reader on it.
6664          HStoreFile sf = createStoreFileAndReader(newFile);
6665          sf.closeStoreFile(evictOnClose);
6666          sfs.add(sf);
6667        }
6668        return sfs;
6669      }
6670      return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles);
6671    }
6672  }
6673}